From a8619a70d29813bfd3e0cbf03a678c1c21d46ea0 Mon Sep 17 00:00:00 2001 From: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com> Date: Tue, 18 Jun 2024 02:29:34 -0400 Subject: [PATCH] GRPC streaming move lock above send orderbook updates (#1707) --- protocol/streaming/grpc/grpc_streaming_manager.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index b47ed6de6b..420efbc759 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -122,6 +122,9 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( updatesByClobPairId[clobPairId] = v1updates } + sm.Lock() + defer sm.Unlock() + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) for id, subscription := range sm.orderbookSubscriptions { // Consolidate orderbook updates into a single `StreamUpdate`. @@ -185,6 +188,9 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } + sm.Lock() + defer sm.Unlock() + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) for id, subscription := range sm.orderbookSubscriptions { streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) @@ -215,9 +221,6 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( 1, ) - sm.Lock() - defer sm.Unlock() - // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions {