From 8950ba90edaa1eb104508e26a58a8307d8b2c6eb Mon Sep 17 00:00:00 2001 From: Jonathan Fung Date: Thu, 25 Jul 2024 15:29:51 -0400 Subject: [PATCH] sync client --- protocol/app/app.go | 1 + protocol/mocks/MemClob.go | 10 ++- protocol/streaming/grpc/client/client.go | 78 +++++++++---------- .../streaming/grpc/grpc_streaming_manager.go | 75 ++++++++++++++---- .../streaming/grpc/noop_streaming_manager.go | 4 + protocol/streaming/grpc/types/manager.go | 4 + protocol/x/clob/abci.go | 9 ++- .../x/clob/keeper/grpc_stream_orderbook.go | 67 +++++++++++++++- protocol/x/clob/memclob/memclob.go | 4 +- .../x/clob/memclob/memclob_open_orders.go | 3 +- protocol/x/clob/types/memclob.go | 7 +- protocol/x/clob/types/orderbook.go | 10 +++ 12 files changed, 203 insertions(+), 69 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index 9deaf54ce1..b95a53a1ca 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1419,6 +1419,7 @@ func New( if app.GrpcStreamingManager.Enabled() { app.GrpcStreamingTestClient = streamingclient.NewGrpcClient(appFlags, app.Logger()) + app.GrpcStreamingManager.SubscribeTestClient(app.GrpcStreamingTestClient) } // Report out app version and git commit. This will be run when validators restart. diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index 66a5ee204c..49b7cabb90 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -338,18 +338,20 @@ func (_m *MemClob) GetOrderRemainingAmount(ctx types.Context, order clobtypes.Or } // GetOrderbook provides a mock function with given fields: ctx, clobPairId -func (_m *MemClob) GetOrderbook(ctx types.Context, clobPairId clobtypes.ClobPairId) clobtypes.Orderbook { +func (_m *MemClob) GetOrderbook(ctx types.Context, clobPairId clobtypes.ClobPairId) clobtypes.OrderbookInterface { ret := _m.Called(ctx, clobPairId) if len(ret) == 0 { panic("no return value specified for GetOrderbook") } - var r0 clobtypes.Orderbook - if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobPairId) clobtypes.Orderbook); ok { + var r0 clobtypes.OrderbookInterface + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobPairId) clobtypes.OrderbookInterface); ok { r0 = rf(ctx, clobPairId) } else { - r0 = ret.Get(0).(clobtypes.Orderbook) + if ret.Get(0) != nil { + r0 = ret.Get(0).(clobtypes.OrderbookInterface) + } } return r0 diff --git a/protocol/streaming/grpc/client/client.go b/protocol/streaming/grpc/client/client.go index 227faef8f5..113be59442 100644 --- a/protocol/streaming/grpc/client/client.go +++ b/protocol/streaming/grpc/client/client.go @@ -1,13 +1,11 @@ package client import ( - "context" "sync" "cosmossdk.io/log" appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" - daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1" v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -40,44 +38,44 @@ func NewGrpcClient(appflags appflags.Flags, logger log.Logger) *GrpcClient { } // Subscribe to grpc orderbook updates. - go func() { - grpcClient := daemontypes.GrpcClientImpl{} - - // Make a connection to the Cosmos gRPC query services. - queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress) - if err != nil { - logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) - return - } - defer func() { - if err := grpcClient.CloseConnection(queryConn); err != nil { - logger.Error("Failed to close gRPC connection", "error", err) - } - }() - - clobQueryClient := clobtypes.NewQueryClient(queryConn) - updateClient, err := clobQueryClient.StreamOrderbookUpdates( - context.Background(), - &clobtypes.StreamOrderbookUpdatesRequest{ - ClobPairId: []uint32{0, 1}, - }, - ) - if err != nil { - logger.Error("Failed to stream orderbook updates", "error", err) - return - } - - for { - update, err := updateClient.Recv() - if err != nil { - logger.Error("Failed to receive orderbook update", "error", err) - return - } - - logger.Info("Received orderbook update", "update", update) - client.Update(update) - } - }() + // go func() { + // grpcClient := daemontypes.GrpcClientImpl{} + + // // Make a connection to the Cosmos gRPC query services. + // queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress) + // if err != nil { + // logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) + // return + // } + // defer func() { + // if err := grpcClient.CloseConnection(queryConn); err != nil { + // logger.Error("Failed to close gRPC connection", "error", err) + // } + // }() + + // clobQueryClient := clobtypes.NewQueryClient(queryConn) + // updateClient, err := clobQueryClient.StreamOrderbookUpdates( + // context.Background(), + // &clobtypes.StreamOrderbookUpdatesRequest{ + // ClobPairId: []uint32{0, 1}, + // }, + // ) + // if err != nil { + // logger.Error("Failed to stream orderbook updates", "error", err) + // return + // } + + // for { + // update, err := updateClient.Recv() + // if err != nil { + // logger.Error("Failed to receive orderbook update", "error", err) + // return + // } + + // logger.Info("Received orderbook update", "update", update) + // client.Update(update) + // } + // }() return client } diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 54037813f9..6ee69500ad 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -10,6 +10,7 @@ import ( "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -52,6 +53,9 @@ type OrderbookSubscription struct { // Channel to buffer writes before the stream updatesChannel chan []clobtypes.StreamUpdate + + // Testing + client *client.GrpcClient } func NewGrpcStreamingManager( @@ -185,7 +189,20 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( return err } -// removeSubscription removes a subscription from the grpc streaming manager. +func (sm *GrpcStreamingManagerImpl) SubscribeTestClient(client *client.GrpcClient) { + subscription := &OrderbookSubscription{ + clobPairIds: []uint32{0, 1}, + client: client, + } + + sm.Lock() + defer sm.Unlock() + + sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription + sm.nextSubscriptionId++ +} + +// removeSubscription removes a subscription from the streaming manager. // The streaming manager's lock should already be acquired before calling this. func (sm *GrpcStreamingManagerImpl) removeSubscription( subscriptionIdToRemove uint32, @@ -253,17 +270,11 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot( metrics.GrpcAddToSubscriptionChannelCount, 1, ) - select { - case subscription.updatesChannel <- streamUpdates: - default: - sm.logger.Error( - fmt.Sprintf( - "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", - subscriptionId, - ), - ) - removeSubscription = true - } + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdates, + }, + ) } // Clean up subscriptions that have been closed. @@ -317,7 +328,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) + sm.Lock() + defer sm.Unlock() + + for _, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if updates, ok := updatesByClobPairId[clobPairId]; ok { + updatesToSend = append(updatesToSend, updates...) + } + } + + if len(updatesToSend) > 0 { + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + }, + ) + } + } } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -354,7 +383,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) + sm.Lock() + defer sm.Unlock() + + for _, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if updates, ok := updatesByClobPairId[clobPairId]; ok { + updatesToSend = append(updatesToSend, updates...) + } + } + + if len(updatesToSend) > 0 { + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + }, + ) + } + } } func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index f5c61f0713..3ccfedc18b 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -2,6 +2,7 @@ package grpc import ( sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -42,6 +43,9 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( ) { } +func (sm *NoopGrpcStreamingManager) SubscribeTestClient(client *client.GrpcClient) { +} + func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 74b145985c..08b6cc5b81 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -2,6 +2,7 @@ package types import ( sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -31,6 +32,9 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + SubscribeTestClient( + client *client.GrpcClient, + ) SendOrderbookFillUpdates( ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 8efbfc71e1..46e0ff77fd 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -143,6 +143,12 @@ func PrepareCheckState( log.BlockHeight, ctx.BlockHeight()+1, ) + // Prune any rate limiting information that is no longer relevant. + keeper.PruneRateLimits(ctx) + + // Initialize new streams with orderbook snapshots, if any. + keeper.InitializeNewGrpcStreams(ctx) + // Get the events generated from processing the matches in the latest block. processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx) if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) { @@ -254,9 +260,6 @@ func PrepareCheckState( types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue), ) - // Initialize new GRPC streams with orderbook snapshots, if any. - keeper.InitializeNewGrpcStreams(ctx) - // Set per-orderbook gauges. keeper.MemClob.SetMemclobGauges(ctx) } diff --git a/protocol/x/clob/keeper/grpc_stream_orderbook.go b/protocol/x/clob/keeper/grpc_stream_orderbook.go index d63775387e..22b087feb5 100644 --- a/protocol/x/clob/keeper/grpc_stream_orderbook.go +++ b/protocol/x/clob/keeper/grpc_stream_orderbook.go @@ -41,9 +41,10 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( logger.Info("Comparing grpc orderbook with actual memclob orderbook!") orderbook := k.MemClob.GetOrderbook(ctx, id) + orderbookBids := orderbook.GetBids() // Compare bids. - bids := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Bids) + bids := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbookBids) logger.Info("Comparing bids", "bids", bids) if len(bids) != len(localOrderbook.Bids) { @@ -55,7 +56,7 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( } for _, bid := range bids { - level := orderbook.Bids[bid] + level := orderbookBids[bid] expectedAggregatedQuantity := uint64(0) expectedOrders := make([]types.Order, 0) @@ -91,10 +92,40 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( "actual_remaining_amounts", actualRemainingAmounts, ) } + + if len(expectedOrders) != len(actualOrders) { + logger.Error( + "Different number of orders at bid level", + "price", bid, + "expected", expectedOrders, + "actual", actualOrders, + ) + } else { + for i, expected := range expectedOrders { + if expected.OrderId.ClientId != actualOrders[i].OrderId.ClientId { + logger.Error( + "Different order at bid level", + "price", bid, + "expected", expected, + "actual", actualOrders[i], + ) + } + if expectedRemainingAmounts[i] != actualRemainingAmounts[i] { + logger.Error( + "Different remaining amount at bid level", + "price", bid, + "expected", expectedRemainingAmounts[i], + "actual", actualRemainingAmounts[i], + ) + } + } + } } + orderbookAsks := orderbook.GetAsks() + // Compare asks. - asks := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Asks) + asks := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbookAsks) logger.Info("Comparing asks", "asks", asks) if len(asks) != len(localOrderbook.Asks) { @@ -106,7 +137,7 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( } for _, ask := range asks { - level := orderbook.Asks[ask] + level := orderbookAsks[ask] expectedAggregatedQuantity := uint64(0) expectedOrders := make([]types.Order, 0) @@ -142,6 +173,34 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( "actual_remaining_amounts", actualRemainingAmounts, ) } + + if len(expectedOrders) != len(actualOrders) { + logger.Error( + "Different number of orders at ask level", + "price", ask, + "expected", expectedOrders, + "actual", actualOrders, + ) + } else { + for i, expected := range expectedOrders { + if expected.OrderId.ClientId != actualOrders[i].OrderId.ClientId { + logger.Error( + "Different order at ask level", + "price", ask, + "expected", expected, + "actual", actualOrders[i], + ) + } + if expectedRemainingAmounts[i] != actualRemainingAmounts[i] { + logger.Error( + "Different remaining amount at ask level", + "price", ask, + "expected", expectedRemainingAmounts[i], + "actual", actualRemainingAmounts[i], + ) + } + } + } } // Compare Fills in State with fills on the locally constructed orderbook from diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 3b8cef8e7c..c232250733 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -2442,6 +2442,6 @@ func (m *MemClobPriceTimePriority) resizeReduceOnlyMatchIfNecessary( return satypes.BaseQuantums(maxMatchSize.Uint64()) } -func (m *MemClobPriceTimePriority) GetOrderbook(ctx sdk.Context, clobPairId types.ClobPairId) types.Orderbook { - return *m.openOrders.mustGetOrderbook(ctx, clobPairId) +func (m *MemClobPriceTimePriority) GetOrderbook(ctx sdk.Context, clobPairId types.ClobPairId) types.OrderbookInterface { + return m.openOrders.orderbooksMap[clobPairId] } diff --git a/protocol/x/clob/memclob/memclob_open_orders.go b/protocol/x/clob/memclob/memclob_open_orders.go index a1ef5ffbae..e9c13daa11 100644 --- a/protocol/x/clob/memclob/memclob_open_orders.go +++ b/protocol/x/clob/memclob/memclob_open_orders.go @@ -1,10 +1,11 @@ package memclob import ( - errorsmod "cosmossdk.io/errors" "fmt" "math" + errorsmod "cosmossdk.io/errors" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index f7e23c6c39..94d774439d 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -162,5 +162,10 @@ type MemClob interface { GetOrderbook( ctx sdk.Context, clobPairId ClobPairId, - ) Orderbook + ) OrderbookInterface +} + +type OrderbookInterface interface { + GetAsks() map[Subticks]*Level + GetBids() map[Subticks]*Level } diff --git a/protocol/x/clob/types/orderbook.go b/protocol/x/clob/types/orderbook.go index 9fa532d3fc..891e3c48a2 100644 --- a/protocol/x/clob/types/orderbook.go +++ b/protocol/x/clob/types/orderbook.go @@ -85,6 +85,16 @@ type Orderbook struct { TotalOpenOrders uint } +// GetAsks gets the asks +func (ob *Orderbook) GetAsks() map[Subticks]*Level { + return ob.Asks +} + +// GetBids gets the asks +func (ob *Orderbook) GetBids() map[Subticks]*Level { + return ob.Bids +} + // GetSide returns the Bid-side levels if `isBuy == true` otherwise, returns the Ask-side levels. func (ob *Orderbook) GetSide(isBuy bool) map[Subticks]*Level { if isBuy {