diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index d60d5fb2f8..b47ed6de6b 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -113,26 +113,41 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) for clobPairId, update := range updates { v1updates, err := GetOffchainUpdatesV1(update) if err != nil { panic(err) } - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: snapshot, + updatesByClobPairId[clobPairId] = v1updates + } + + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) + for id, subscription := range sm.orderbookSubscriptions { + // Consolidate orderbook updates into a single `StreamUpdate`. + v1updates := make([]ocutypes.OffChainUpdateV1, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := updatesByClobPairId[clobPairId]; ok { + v1updates = append(v1updates, update...) + } + } + + if len(v1updates) > 0 { + updatesBySubscriptionId[id] = []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: snapshot, + }, }, }, - }, + } } } sm.sendStreamUpdate( - updatesByClobPairId, + updatesBySubscriptionId, blockHeight, execMode, ) @@ -170,8 +185,20 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) + for id, subscription := range sm.orderbookSubscriptions { + streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := updatesByClobPairId[clobPairId]; ok { + streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) + } + } + + updatesBySubscriptionId[id] = streamUpdatesForSubscription + } + sm.sendStreamUpdate( - updatesByClobPairId, + updatesBySubscriptionId, blockHeight, execMode, ) @@ -179,7 +206,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( // sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( - updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + updatesBySubscriptionId map[uint32][]clobtypes.StreamUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -194,26 +221,21 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } - } - - if len(streamUpdatesForSubscription) > 0 { - metrics.IncrCounter( - metrics.GrpcSendResponseToSubscriberCount, - 1, - ) - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) + if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok { + if len(streamUpdatesForSubscription) > 0 { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdatesForSubscription, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } } }