diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 3b3cab68ad..34b7eaaf36 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -490,11 +490,19 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. return exists } -func getStreamUpdatesFromOffchainUpdates( +// SendOrderbookUpdates groups updates by their clob pair ids and +// sends messages to the subscribers. +func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, execMode sdk.ExecMode, -) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookUpdatesLatency, + time.Now(), + ) + // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) for _, message := range offchainUpdates.Messages { @@ -506,8 +514,8 @@ func getStreamUpdatesFromOffchainUpdates( } // Unmarshal each per-clob pair message to v1 updates. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - clobPairIds = make([]uint32, 0) + streamUpdates := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for clobPairId, update := range updates { v1updates, err := streaming_util.GetOffchainUpdatesV1(update) if err != nil { @@ -527,39 +535,26 @@ func getStreamUpdatesFromOffchainUpdates( clobPairIds = append(clobPairIds, clobPairId) } - return streamUpdates, clobPairIds + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) } -// SendOrderbookUpdates groups updates by their clob pair ids and +// SendOrderbookFillUpdates groups fills by their clob pair ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( - offchainUpdates *clobtypes.OffchainUpdates, +func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( + orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, + metrics.GrpcSendOrderbookFillsLatency, time.Now(), ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode) - - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) -} - -func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( - orderbookFills []clobtypes.StreamOrderbookFill, - blockHeight uint32, - execMode sdk.ExecMode, - perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, -) ( - streamUpdates []clobtypes.StreamUpdate, - clobPairIds []uint32, -) { // Group fills by clob pair id. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - clobPairIds = make([]uint32, 0) + streamUpdates := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for _, orderbookFill := range orderbookFills { // If this is a deleveraging fill, fetch the clob pair id from the deleveraged // perpetual id. @@ -582,29 +577,6 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( streamUpdates = append(streamUpdates, streamUpdate) clobPairIds = append(clobPairIds, clobPairId) } - return streamUpdates, clobPairIds -} - -// SendOrderbookFillUpdates groups fills by their clob pair ids and -// sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, - blockHeight uint32, - execMode sdk.ExecMode, - perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookFillsLatency, - time.Now(), - ) - - streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( - orderbookFills, - blockHeight, - execMode, - perpetualIdToClobPairId, - ) sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) } @@ -637,31 +609,6 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( ) } -func getStreamUpdatesForSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) ( - streamUpdates []clobtypes.StreamUpdate, - subaccountIds []*satypes.SubaccountId, -) { - // Group subaccount updates by subaccount id. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - subaccountIds = make([]*satypes.SubaccountId, 0) - for _, subaccountUpdate := range subaccountUpdates { - streamUpdate := clobtypes.StreamUpdate{ - UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ - SubaccountUpdate: &subaccountUpdate, - }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - } - streamUpdates = append(streamUpdates, streamUpdate) - subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId) - } - return streamUpdates, subaccountIds -} - // SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( @@ -679,11 +626,20 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") } - streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( - subaccountUpdates, - blockHeight, - execMode, - ) + // Group subaccount updates by subaccount id. + streamUpdates := make([]clobtypes.StreamUpdate, 0) + subaccountIds := make([]*satypes.SubaccountId, 0) + for _, subaccountUpdate := range subaccountUpdates { + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, + }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + } + streamUpdates = append(streamUpdates, streamUpdate) + subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId) + } sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds) } @@ -840,47 +796,6 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } -// addBatchUpdatesToCacheWithLock adds batched updates to the cache. -// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill -// and subaccount updates in a single stream. -// Note this method requires the lock and assumes that the lock has already been -// acquired by the caller. -func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates []clobtypes.StreamUpdate, - orderbookClobPairIds []uint32, - fillStreamUpdates []clobtypes.StreamUpdate, - fillClobPairIds []uint32, - subaccountStreamUpdates []clobtypes.StreamUpdate, - subaccountIds []*satypes.SubaccountId, -) { - // Add orderbook updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...) - for _, clobPairId := range orderbookClobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } - - // Add fill updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...) - for _, clobPairId := range fillClobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } - - // Add subaccount updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...) - for _, subaccountId := range subaccountIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.subaccountIdToSubscriptionIdMapping[*subaccountId], - ) - } -} - // Grpc Streaming logic after consensus agrees on a block. // - Stream all events staged during `FinalizeBlock`. // - Stream orderbook updates to sync fills in local ops queue. @@ -889,45 +804,33 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { + // Flush all pending updates, since we want the onchain updates to arrive in a batch. + sm.FlushStreamUpdates() + finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) - orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( + // TODO(CT-1190): Stream below in a single batch. + // Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock. + sm.SendOrderbookUpdates( orderBookUpdatesToSyncLocalOpsQueue, uint32(ctx.BlockHeight()), ctx.ExecMode(), ) - fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( + // Send finalized fills from FinalizeBlock. + sm.SendOrderbookFillUpdates( finalizedFills, uint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) - subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( + // Send finalized subaccount updates from FinalizeBlock. + sm.SendFinalizedSubaccountUpdates( finalizedSubaccountUpdates, uint32(ctx.BlockHeight()), ctx.ExecMode(), ) - - sm.Lock() - defer sm.Unlock() - - // Flush all pending updates, since we want the onchain updates to arrive in a batch. - sm.FlushStreamUpdatesWithLock() - - sm.addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates, - orderbookClobPairIds, - fillStreamUpdates, - fillClobPairIds, - subaccountStreamUpdates, - subaccountIds, - ) - - // Emit all stream updates in a single batch. - // Note we still have the lock, which is released right before function returns. - sm.FlushStreamUpdatesWithLock() } // getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.