From 619fffdab62d0eebe84a6140f46930547b79e4b3 Mon Sep 17 00:00:00 2001 From: Kirill Date: Mon, 6 May 2024 15:42:08 +0400 Subject: [PATCH] Refactor p2p handlers: move commong iterator logic to a helper (#1856) --- p2p/starknet/handlers.go | 312 +++++++++++++++++---------------------- 1 file changed, 138 insertions(+), 174 deletions(-) diff --git a/p2p/starknet/handlers.go b/p2p/starknet/handlers.go index 62bb6e4f24..302831fb02 100644 --- a/p2p/starknet/handlers.go +++ b/p2p/starknet/handlers.go @@ -10,6 +10,7 @@ import ( "github.com/NethermindEth/juno/adapters/core2p2p" "github.com/NethermindEth/juno/adapters/p2p2core" "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/p2p/starknet/spec" "github.com/NethermindEth/juno/utils" "github.com/NethermindEth/juno/utils/iter" @@ -57,11 +58,14 @@ func streamHandler[ReqT proto.Message](ctx context.Context, stream network.Strea buffer := getBuffer() defer bufferPool.Put(buffer) + // todo add limit reader + // todo add read timeout if _, err := buffer.ReadFrom(stream); err != nil { log.Debugw("Error reading from stream", "peer", stream.ID(), "protocol", stream.Protocol(), "err", err) return } + // todo double check that wrong proto request is not possible var zero ReqT req := zero.ProtoReflect().New().Interface() if err := proto.Unmarshal(buffer.Bytes(), req); err != nil { @@ -69,19 +73,22 @@ func streamHandler[ReqT proto.Message](ctx context.Context, stream network.Strea return } - response, err := reqHandler(req.(ReqT)) + responseIterator, err := reqHandler(req.(ReqT)) if err != nil { - log.Debugw("Error handling request peer %v protocol %v err %v\n", stream.ID(), stream.Protocol(), err) + // todo report error to client? + log.Debugw("Error handling request", "peer", stream.ID(), "protocol", stream.Protocol(), "err", err) return } - response(func(msg proto.Message) bool { + // todo add write timeout + responseIterator(func(msg proto.Message) bool { if ctx.Err() != nil { return false } if _, err := protodelim.MarshalTo(stream, msg); err != nil { // todo: figure out if we need buffered io here log.Debugw("Error writing response", "peer", stream.ID(), "protocol", stream.Protocol(), "err", err) + return false } return true @@ -112,75 +119,64 @@ func (h *Handler) CurrentBlockHeaderHandler(stream network.Stream) { streamHandler[*spec.CurrentBlockHeaderRequest](h.ctx, stream, h.onCurrentBlockHeaderRequest, h.log) } -type yieldFunc = func(proto.Message) bool - -func (h *Handler) onCurrentBlockHeaderRequest(req *spec.CurrentBlockHeaderRequest) (iter.Seq[proto.Message], error) { +func (h *Handler) onCurrentBlockHeaderRequest(*spec.CurrentBlockHeaderRequest) (iter.Seq[proto.Message], error) { curHeight, err := h.bcReader.Height() if err != nil { return nil, err } - it, err := newIteratorByNumber(h.bcReader, curHeight, 1, 1, true) - if err != nil { - return nil, err - } - return h.blockHeaders(it, blockHeadersRequestFin()), nil + return h.onBlockHeadersRequest(&spec.BlockHeadersRequest{ + Iteration: &spec.Iteration{ + Start: &spec.Iteration_BlockNumber{ + BlockNumber: curHeight, + }, + Direction: spec.Iteration_Forward, + Limit: 1, + Step: 1, + }, + }) } func (h *Handler) onBlockHeadersRequest(req *spec.BlockHeadersRequest) (iter.Seq[proto.Message], error) { - it, err := h.newIterator(req.Iteration) - if err != nil { - return nil, err + finMsg := &spec.BlockHeadersResponse{ + Part: []*spec.BlockHeadersResponsePart{ + { + HeaderMessage: &spec.BlockHeadersResponsePart_Fin{}, + }, + }, } - return h.blockHeaders(it, blockHeadersRequestFin()), nil -} -func (h *Handler) blockHeaders(it *iterator, fin Stream[proto.Message]) iter.Seq[proto.Message] { - return func(yield func(proto.Message) bool) { - for it.Valid() { - header, err := it.Header() - if err != nil { - h.log.Debugw("Failed to fetch header", "blockNumber", it.BlockNumber(), "err", err) - break - } + return h.processIterationRequest(req.Iteration, finMsg, func(it blockDataAccessor) (proto.Message, error) { + header, err := it.Header() + if err != nil { + return nil, err + } - h.log.Debugw("Created Header Iterator", "blockNumber", header.Number) + h.log.Debugw("Created Header Iterator", "blockNumber", header.Number) - commitments, err := h.bcReader.BlockCommitmentsByNumber(header.Number) - if err != nil { - h.log.Debugw("Failed to fetch block commitments", "blockNumber", it.BlockNumber(), "err", err) - break - } + commitments, err := h.bcReader.BlockCommitmentsByNumber(header.Number) + if err != nil { + return nil, err + } - msg := &spec.BlockHeadersResponse{ - Part: []*spec.BlockHeadersResponsePart{ - { - HeaderMessage: &spec.BlockHeadersResponsePart_Header{ - Header: core2p2p.AdaptHeader(header, commitments), - }, + return &spec.BlockHeadersResponse{ + Part: []*spec.BlockHeadersResponsePart{ + { + HeaderMessage: &spec.BlockHeadersResponsePart_Header{ + Header: core2p2p.AdaptHeader(header, commitments), }, - { - HeaderMessage: &spec.BlockHeadersResponsePart_Signatures{ - Signatures: &spec.Signatures{ - Block: core2p2p.AdaptBlockID(header), - Signatures: utils.Map(header.Signatures, core2p2p.AdaptSignature), - }, + }, + { + HeaderMessage: &spec.BlockHeadersResponsePart_Signatures{ + Signatures: &spec.Signatures{ + Block: core2p2p.AdaptBlockID(header), + Signatures: utils.Map(header.Signatures, core2p2p.AdaptSignature), }, }, }, - } - - if !yield(msg) { - return - } - - it.Next() - } - - if finMsg, ok := fin(); ok { - yield(finMsg) - } - } + }, + }, nil + }) } func (h *Handler) onBlockBodiesRequest(req *spec.BlockBodiesRequest) (iter.Seq[proto.Message], error) { @@ -189,10 +185,6 @@ func (h *Handler) onBlockBodiesRequest(req *spec.BlockBodiesRequest) (iter.Seq[p return nil, err } - fin := newFin(&spec.BlockBodiesResponse{ - BodyMessage: &spec.BlockBodiesResponse_Fin{}, - }) - return func(yield func(proto.Message) bool) { outerLoop: for it.Valid() { @@ -223,140 +215,136 @@ func (h *Handler) onBlockBodiesRequest(req *spec.BlockBodiesRequest) (iter.Seq[p it.Next() } - if finMs, ok := fin(); ok { - yield(finMs) + finMsg := &spec.BlockBodiesResponse{ + BodyMessage: &spec.BlockBodiesResponse_Fin{}, } + yield(finMsg) }, nil } func (h *Handler) onEventsRequest(req *spec.EventsRequest) (iter.Seq[proto.Message], error) { - it, err := h.newIterator(req.Iteration) - if err != nil { - return nil, err - } - - fin := newFin(&spec.EventsResponse{ + finMsg := &spec.EventsResponse{ Responses: &spec.EventsResponse_Fin{}, - }) - return func(yield yieldFunc) { - for it.Valid() { - block, err := it.Block() - if err != nil { - h.log.Debugw("Failed to fetch block for Events", "blockNumber", it.BlockNumber(), "err", err) - break - } + } + return h.processIterationRequest(req.Iteration, finMsg, func(it blockDataAccessor) (proto.Message, error) { + block, err := it.Block() + if err != nil { + return nil, err + } - events := make([]*spec.Event, 0, len(block.Receipts)) - for _, receipt := range block.Receipts { - for _, event := range receipt.Events { - events = append(events, core2p2p.AdaptEvent(event, receipt.TransactionHash)) - } + events := make([]*spec.Event, 0, len(block.Receipts)) + for _, receipt := range block.Receipts { + for _, event := range receipt.Events { + events = append(events, core2p2p.AdaptEvent(event, receipt.TransactionHash)) } + } - msg := &spec.EventsResponse{ - Id: core2p2p.AdaptBlockID(block.Header), - Responses: &spec.EventsResponse_Events{ - Events: &spec.Events{ - Items: events, - }, + return &spec.EventsResponse{ + Id: core2p2p.AdaptBlockID(block.Header), + Responses: &spec.EventsResponse_Events{ + Events: &spec.Events{ + Items: events, }, - } - - if !yield(msg) { - return - } + }, + }, nil + }) +} - it.Next() +func (h *Handler) onReceiptsRequest(req *spec.ReceiptsRequest) (iter.Seq[proto.Message], error) { + finMsg := &spec.ReceiptsResponse{Responses: &spec.ReceiptsResponse_Fin{}} + return h.processIterationRequest(req.Iteration, finMsg, func(it blockDataAccessor) (proto.Message, error) { + block, err := it.Block() + if err != nil { + return nil, err } - if finMsg, ok := fin(); ok { - yield(finMsg) + receipts := make([]*spec.Receipt, len(block.Receipts)) + for i := 0; i < len(block.Receipts); i++ { + receipts[i] = core2p2p.AdaptReceipt(block.Receipts[i], block.Transactions[i]) } - }, nil + + return &spec.ReceiptsResponse{ + Id: core2p2p.AdaptBlockID(block.Header), + Responses: &spec.ReceiptsResponse_Receipts{ + Receipts: &spec.Receipts{Items: receipts}, + }, + }, nil + }) } -func (h *Handler) onReceiptsRequest(req *spec.ReceiptsRequest) (iter.Seq[proto.Message], error) { - it, err := h.newIterator(req.Iteration) - if err != nil { - return nil, err +func (h *Handler) onTransactionsRequest(req *spec.TransactionsRequest) (iter.Seq[proto.Message], error) { + finMsg := &spec.TransactionsResponse{ + Responses: &spec.TransactionsResponse_Fin{}, } - - fin := newFin(&spec.ReceiptsResponse{Responses: &spec.ReceiptsResponse_Fin{}}) - - return func(yield yieldFunc) { - for it.Valid() { - block, err := it.Block() - if err != nil { - h.log.Debugw("Failed to fetch block for Receipts", "blockNumber", it.BlockNumber(), "err", err) - break - } - - receipts := make([]*spec.Receipt, len(block.Receipts)) - for i := 0; i < len(block.Receipts); i++ { - receipts[i] = core2p2p.AdaptReceipt(block.Receipts[i], block.Transactions[i]) - } - - rs := &spec.Receipts{Items: receipts} - msg := &spec.ReceiptsResponse{ - Id: core2p2p.AdaptBlockID(block.Header), - Responses: &spec.ReceiptsResponse_Receipts{Receipts: rs}, - } - - if !yield(msg) { - return - } - - it.Next() + return h.processIterationRequest(req.Iteration, finMsg, func(it blockDataAccessor) (proto.Message, error) { + block, err := it.Block() + if err != nil { + return nil, err } - if finMsg, ok := fin(); ok { - yield(finMsg) - } - }, nil + return &spec.TransactionsResponse{ + Id: core2p2p.AdaptBlockID(block.Header), + Responses: &spec.TransactionsResponse_Transactions{ + Transactions: &spec.Transactions{ + Items: utils.Map(block.Transactions, core2p2p.AdaptTransaction), + }, + }, + }, nil + }) } -func (h *Handler) onTransactionsRequest(req *spec.TransactionsRequest) (iter.Seq[proto.Message], error) { - it, err := h.newIterator(req.Iteration) +// blockDataAccessor provides access to either entire block or header +// for current iteration +type blockDataAccessor interface { + Block() (*core.Block, error) + Header() (*core.Header, error) +} + +// iterationProcessor is an alias for a function that will generate corresponding data +// given block data for current iteration through blockDataAccessor +type iterationProcessor = func(it blockDataAccessor) (proto.Message, error) + +// processIterationRequest is helper function that simplifies data processing for provided spec.Iteration object +// caller usually passes iteration object from received request, finMsg as final message to a peer +// and iterationProcessor function that will generate response for each iteration +func (h *Handler) processIterationRequest(iteration *spec.Iteration, finMsg proto.Message, + getMsg iterationProcessor, +) (iter.Seq[proto.Message], error) { + it, err := h.newIterator(iteration) if err != nil { return nil, err } - fin := newFin(&spec.TransactionsResponse{ - Responses: &spec.TransactionsResponse_Fin{}, - }) - + type yieldFunc = func(proto.Message) bool return func(yield yieldFunc) { + // while iterator is valid for it.Valid() { - block, err := it.Block() + // pass it to handler function (some might be interested in header, others in entire block) + msg, err := getMsg(it) if err != nil { - h.log.Debugw("Failed to fetch block for Transactions", "blockNumber", it.BlockNumber(), "err", err) + h.log.Errorw("Failed to generate data", "blockNumber", it.BlockNumber(), "err", err) break } - msg := &spec.TransactionsResponse{ - Id: core2p2p.AdaptBlockID(block.Header), - Responses: &spec.TransactionsResponse_Transactions{ - Transactions: &spec.Transactions{ - Items: utils.Map(block.Transactions, core2p2p.AdaptTransaction), - }, - }, - } + // push generated msg to caller if !yield(msg) { + // if caller is not interested in remaining data (example: connection to a peer is closed) exit + // note that in this case we won't send finMsg return } it.Next() } - if finMsg, ok := fin(); ok { - yield(finMsg) - } + // either we iterated over whole sequence or reached break statement in loop above + // note that return value of yield is not checked because this is the last message anyway + yield(finMsg) }, nil } func (h *Handler) newIterator(it *spec.Iteration) (*iterator, error) { forward := it.Direction == spec.Iteration_Forward - + // todo restrict limit max value ? switch v := it.Start.(type) { case *spec.Iteration_BlockNumber: return newIteratorByNumber(h.bcReader, v.BlockNumber, it.Limit, it.Step, forward) @@ -366,27 +354,3 @@ func (h *Handler) newIterator(it *spec.Iteration) (*iterator, error) { return nil, fmt.Errorf("unsupported iteration start type %T", v) } } - -func newFin(finMsg proto.Message) Stream[proto.Message] { - var finSent bool - - return func() (proto.Message, bool) { - if finSent { - return nil, false - } - finSent = true - - return finMsg, true - } -} - -// todo change this logic later on -func blockHeadersRequestFin() Stream[proto.Message] { - return newFin(&spec.BlockHeadersResponse{ - Part: []*spec.BlockHeadersResponsePart{ - { - HeaderMessage: &spec.BlockHeadersResponsePart_Fin{}, - }, - }, - }) -}