diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index b47ed6de6b..420efbc759 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -122,6 +122,9 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( updatesByClobPairId[clobPairId] = v1updates } + sm.Lock() + defer sm.Unlock() + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) for id, subscription := range sm.orderbookSubscriptions { // Consolidate orderbook updates into a single `StreamUpdate`. @@ -185,6 +188,9 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } + sm.Lock() + defer sm.Unlock() + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) for id, subscription := range sm.orderbookSubscriptions { streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) @@ -215,9 +221,6 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( 1, ) - sm.Lock() - defer sm.Unlock() - // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions {