diff --git a/protocol/app/app.go b/protocol/app/app.go index 527db9b43f..8e5b0496b0 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -3,6 +3,7 @@ package app import ( "context" "encoding/json" + "fmt" "io" "math/big" "net/http" @@ -425,6 +426,9 @@ func New( if app.Server != nil { app.Server.Stop() } + if app.GrpcStreamingManager != nil { + app.GrpcStreamingManager.Stop() + } return nil }, ) @@ -1510,6 +1514,7 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) { } block := app.IndexerEventManager.ProduceBlock(ctx) app.IndexerEventManager.SendOnchainData(block) + app.GrpcStreamingManager.EmitMetrics() return response, err } @@ -1527,6 +1532,7 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) { if err := app.ModuleManager.PrepareCheckState(ctx); err != nil { panic(err) } + app.GrpcStreamingManager.EmitMetrics() } // InitChainer application update at chain initialization. @@ -1765,8 +1771,9 @@ func getGrpcStreamingManagerFromOptions( logger log.Logger, ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { - logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager() + grpcStreamingBufferSize := uint32(appFlags.GrpcStreamingBufferSize) + logger.Info(fmt.Sprintf("GRPC streaming is enabled with buffer size %d", grpcStreamingBufferSize)) + return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index a900e81b40..68d7a96762 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -21,7 +21,8 @@ type Flags struct { GrpcEnable bool // Grpc Streaming - GrpcStreamingEnabled bool + GrpcStreamingEnabled bool + GrpcStreamingBufferSize uint16 } // List of CLI flags. @@ -36,7 +37,8 @@ const ( GrpcEnable = "grpc.enable" // Grpc Streaming - GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingBufferSize = "grpc-streaming-buffer-size" ) // Default values. @@ -46,7 +48,8 @@ const ( DefaultNonValidatingFullNode = false DefaultDdErrorTrackingFormat = false - DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingBufferSize = 1000 ) // AddFlagsToCmd adds flags to app initialization. @@ -80,6 +83,11 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultGrpcStreamingEnabled, "Whether to enable grpc streaming for full nodes", ) + cmd.Flags().Uint16( + GrpcStreamingBufferSize, + DefaultGrpcStreamingBufferSize, + "Protocol-side buffer channel size to store grpc stream updates before dropping messages", + ) } // Validate checks that the flags are valid. @@ -114,7 +122,8 @@ func GetFlagValuesFromOptions( GrpcAddress: config.DefaultGRPCAddress, GrpcEnable: true, - GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingBufferSize: DefaultGrpcStreamingBufferSize, } // Populate the flags if they exist. @@ -160,5 +169,11 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(GrpcStreamingBufferSize); option != nil { + if v, err := cast.ToUint16E(option); err == nil { + result.GrpcStreamingBufferSize = v + } + } + return result } diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index dd6f12db85..b052d8a56b 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -32,6 +32,9 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): { flagName: flags.GrpcStreamingEnabled, }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingBufferSize): { + flagName: flags.GrpcStreamingBufferSize, + }, } for name, tc := range tests { @@ -63,9 +66,10 @@ func TestValidate(t *testing.T) { }, "success - gRPC streaming enabled for validating nodes": { flags: flags.Flags{ - NonValidatingFullNode: false, - GrpcEnable: true, - GrpcStreamingEnabled: true, + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingBufferSize: 15, }, }, "failure - gRPC disabled": { @@ -107,6 +111,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress string expectedGrpcEnable bool expectedGrpcStreamingEnable bool + expectedGrpcStreamingBufferSize uint16 }{ "Sets to default if unset": { expectedNonValidatingFullNodeFlag: false, @@ -115,6 +120,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress: "localhost:9090", expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, + expectedGrpcStreamingBufferSize: 1000, }, "Sets values from options": { optsMap: map[string]any{ @@ -124,6 +130,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { flags.GrpcEnable: false, flags.GrpcAddress: "localhost:9091", flags.GrpcStreamingEnabled: "true", + flags.GrpcStreamingBufferSize: "15", }, expectedNonValidatingFullNodeFlag: true, expectedDdAgentHost: "agentHostTest", @@ -131,6 +138,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: false, expectedGrpcAddress: "localhost:9091", expectedGrpcStreamingEnable: true, + expectedGrpcStreamingBufferSize: 15, }, } @@ -168,6 +176,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) { tc.expectedGrpcAddress, flags.GrpcAddress, ) + require.Equal( + t, + tc.expectedGrpcAddress, + flags.GrpcAddress, + ) + require.Equal( + t, + tc.expectedGrpcStreamingEnable, + flags.GrpcStreamingEnabled, + ) + require.Equal( + t, + tc.expectedGrpcStreamingBufferSize, + flags.GrpcStreamingBufferSize, + ) }) } } diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 09607d76e0..51e03e52f9 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -60,6 +60,8 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcStreamingBufferSize = "grpc_streaming_buffer_size" + GrpcStreamingNumConnections = "grpc_streaming_num_connections" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index a8ce6ad1f2..fcee3d33f1 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -1,9 +1,11 @@ package grpc import ( + "fmt" "sync" "time" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" @@ -17,11 +19,26 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) // GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. type GrpcStreamingManagerImpl struct { + logger log.Logger sync.Mutex // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription nextSubscriptionId uint32 + + // Readonly buffer to enqueue orderbook updates before pushing them through grpc streams. + // Decouples the execution of abci logic with full node streaming. + updateBuffer chan bufferInternalResponse + updateBufferWindowSize uint32 +} + +// bufferInternalResponse is enqueued into the readonly buffer. +// It contains an update respnose and the clob pair id to send this information to. +type bufferInternalResponse struct { + response clobtypes.StreamOrderbookUpdatesResponse + + // Information relevant to which Orderbook Subscription to send out to + clobPairId uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -36,16 +53,78 @@ type OrderbookSubscription struct { srv clobtypes.Query_StreamOrderbookUpdatesServer } -func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { - return &GrpcStreamingManagerImpl{ +func NewGrpcStreamingManager( + logger log.Logger, + bufferWindow uint32, +) *GrpcStreamingManagerImpl { + grpcStreamingManager := &GrpcStreamingManagerImpl{ + logger: logger.With("module", "grpc-streaming"), orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), + nextSubscriptionId: 0, + + updateBuffer: make(chan bufferInternalResponse, bufferWindow), + updateBufferWindowSize: bufferWindow, } + + // Worker goroutine to consistently read from channel and send out updates + go func() { + for internalResponse := range grpcStreamingManager.updateBuffer { + grpcStreamingManager.sendUpdateResponse(internalResponse) + } + }() + + return grpcStreamingManager } func (sm *GrpcStreamingManagerImpl) Enabled() bool { return true } +func (sm *GrpcStreamingManagerImpl) Stop() { + close(sm.updateBuffer) +} + +func (sm *GrpcStreamingManagerImpl) EmitMetrics() { + metrics.SetGauge(metrics.GrpcStreamingBufferSize, float32(len(sm.updateBuffer))) + metrics.SetGauge(metrics.GrpcStreamingNumConnections, float32(len(sm.orderbookSubscriptions))) +} + +func (sm *GrpcStreamingManagerImpl) removeSubscription(id uint32) { + sm.logger.Info( + fmt.Sprintf( + "Removing orderbook subscriptions for subscription id %+v", + id, + ), + ) + delete(sm.orderbookSubscriptions, id) +} + +func (sm *GrpcStreamingManagerImpl) sendUpdateResponse( + internalResponse bufferInternalResponse, +) { + // Send update to subscribers. + subscriptionIdsToRemove := make([]uint32, 0) + + for id, subscription := range sm.orderbookSubscriptions { + for _, clobPairId := range subscription.clobPairIds { + if clobPairId == internalResponse.clobPairId { + if err := subscription.srv.Send( + &internalResponse.response, + ); err != nil { + sm.logger.Error("Error sending out update", "err", err) + subscriptionIdsToRemove = append(subscriptionIdsToRemove, id) + } + } + } + } + // Clean up subscriptions that have been closed. + // If a Send update has failed for any clob pair id, the whole subscription will be removed. + for _, id := range subscriptionIdsToRemove { + sm.removeSubscription(id) + } + sm.EmitMetrics() +} + // Subscribe subscribes to the orderbook updates stream. func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, @@ -70,89 +149,13 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription sm.nextSubscriptionId++ - + sm.logger.Info(fmt.Sprintf("New GRPC Stream Connection established, %+v", clobPairIds)) return nil } -// SendOrderbookUpdates groups updates by their clob pair ids and -// sends messages to the subscribers. -func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( - offchainUpdates *clobtypes.OffchainUpdates, - snapshot bool, - blockHeight uint32, - execMode sdk.ExecMode, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, - time.Now(), - ) - - // Group updates by clob pair id. - updates := make(map[uint32]*clobtypes.OffchainUpdates) - for _, message := range offchainUpdates.Messages { - clobPairId := message.OrderId.ClobPairId - if _, ok := updates[clobPairId]; !ok { - updates[clobPairId] = clobtypes.NewOffchainUpdates() - } - updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) - } - - // Unmarshal messages to v1 updates. - v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1) - for clobPairId, update := range updates { - v1update, err := GetOffchainUpdatesV1(update) - if err != nil { - panic(err) - } - v1updates[clobPairId] = v1update - } - - sm.Lock() - defer sm.Unlock() - - // Send updates to subscribers. - idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) - for _, clobPairId := range subscription.clobPairIds { - if updates, ok := v1updates[clobPairId]; ok { - updatesToSend = append(updatesToSend, updates...) - } - } - - if len(updatesToSend) > 0 { - streamUpdates := clobtypes.StreamUpdate{ - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: updatesToSend, - Snapshot: snapshot, - }, - }, - } - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: []clobtypes.StreamUpdate{streamUpdates}, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - } - } - } - - // Clean up subscriptions that have been closed. - // If a Send update has failed for any clob pair id, the whole subscription will be removed. - for _, id := range idsToRemove { - delete(sm.orderbookSubscriptions, id) - } -} - // SendOrderbookFillUpdates groups fills by their clob pair ids and -// sends messages to the subscribers. +// enqueues messages to be sent to the subscribers. func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, @@ -162,6 +165,8 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( metrics.GrpcSendOrderbookFillsLatency, time.Now(), ) + sm.Lock() + defer sm.Unlock() // Group fills by clob pair id. updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) @@ -181,37 +186,89 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } + // Send response updates into the stream buffer + for clobPairId, streamUpdates := range updatesByClobPairId { + streamResponse := clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdates, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + } + + sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{ + response: streamResponse, + clobPairId: clobPairId, + }) + } +} + +// SendOrderbookUpdates groups updates by their clob pair ids and +// enqueues messages to be sent to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( + offchainUpdates *clobtypes.OffchainUpdates, + snapshot bool, + blockHeight uint32, + execMode sdk.ExecMode, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookUpdatesLatency, + time.Now(), + ) sm.Lock() defer sm.Unlock() - // Send updates to subscribers. - idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } + // Group updates by clob pair id. + updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) + for _, message := range offchainUpdates.Messages { + clobPairId := message.OrderId.ClobPairId + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = clobtypes.NewOffchainUpdates() } + updatesByClobPairId[clobPairId].Messages = append(updatesByClobPairId[clobPairId].Messages, message) + } - if len(streamUpdatesForSubscription) > 0 { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), + // Unmarshal messages to v1 updates and enqueue in buffer to be sent. + for clobPairId, update := range updatesByClobPairId { + v1updates, err := GetOffchainUpdatesV1(update) + if err != nil { + panic(err) + } + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: snapshot, }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - } + }, } + sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{ + response: clobtypes.StreamOrderbookUpdatesResponse{ + Updates: []clobtypes.StreamUpdate{streamUpdate}, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + }, + clobPairId: clobPairId, + }) } +} - // Clean up subscriptions that have been closed. - // If a Send update has failed for any clob pair id, the whole subscription will be removed. - for _, id := range idsToRemove { - delete(sm.orderbookSubscriptions, id) +// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send. +// If the buffer is full, *all* streaming subscriptions will be shut down. +func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) { + select { + case sm.updateBuffer <- internalResponse: + default: + sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " + + "Disconnect all clients and increase buffer size via the `grpc-streaming-buffer-size flag.") + for k := range sm.orderbookSubscriptions { + sm.removeSubscription(k) + } + // Clear out the buffer + for len(sm.updateBuffer) > 0 { + <-sm.updateBuffer + } } + sm.EmitMetrics() } // GetUninitializedClobPairIds returns the clob pair ids that have not been initialized. diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 424871b4c3..3a79eab23e 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -36,7 +36,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( } func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, @@ -46,3 +45,9 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } + +func (sm *NoopGrpcStreamingManager) Stop() { +} + +func (sm *NoopGrpcStreamingManager) EmitMetrics() { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 9b5af0c093..35a1ec7307 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -22,8 +22,9 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + Stop() + EmitMetrics() SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index b15b5d20d9..cecf3e39f4 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -266,7 +266,6 @@ func (k Keeper) SendOrderbookFillUpdates( return } k.GetGrpcStreamingManager().SendOrderbookFillUpdates( - ctx, orderbookFills, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(),