diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts index ddf8e9bd2d..aa31787820 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse { */ snapshot: boolean; + /** + * ---Additional fields used to debug issues--- + * Block height of the updates. + */ + + blockHeight: number; + /** Exec mode of the updates. */ + + execMode: number; } /** * StreamOrderbookUpdatesResponse is a response message for the @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType { */ snapshot: boolean; + /** + * ---Additional fields used to debug issues--- + * Block height of the updates. + */ + + block_height: number; + /** Exec mode of the updates. */ + + exec_mode: number; } function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest { @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = { function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse { return { updates: [], - snapshot: false + snapshot: false, + blockHeight: 0, + execMode: 0 }; } @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = { writer.uint32(16).bool(message.snapshot); } + if (message.blockHeight !== 0) { + writer.uint32(24).uint32(message.blockHeight); + } + + if (message.execMode !== 0) { + writer.uint32(32).uint32(message.execMode); + } + return writer; }, @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = { message.snapshot = reader.bool(); break; + case 3: + message.blockHeight = reader.uint32(); + break; + + case 4: + message.execMode = reader.uint32(); + break; + default: reader.skipType(tag & 7); break; @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = { const message = createBaseStreamOrderbookUpdatesResponse(); message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || []; message.snapshot = object.snapshot ?? false; + message.blockHeight = object.blockHeight ?? 0; + message.execMode = object.execMode ?? 0; return message; } diff --git a/proto/dydxprotocol/clob/query.proto b/proto/dydxprotocol/clob/query.proto index b4a61e2062..3745756894 100644 --- a/proto/dydxprotocol/clob/query.proto +++ b/proto/dydxprotocol/clob/query.proto @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse { // Note that if the snapshot is true, then all previous entries should be // discarded and the orderbook should be resynced. bool snapshot = 2; + + // ---Additional fields used to debug issues--- + // Block height of the updates. + uint32 block_height = 3; + + // Exec mode of the updates. + uint32 exec_mode = 4; } diff --git a/protocol/app/app.go b/protocol/app/app.go index 5e191efd0a..c21e4029ff 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -885,9 +885,8 @@ func New( clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts) logger.Info("Parsed CLOB flags", "Flags", clobFlags) - memClob := clobmodulememclob.NewMemClobPriceTimePriority( - app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(), - ) + memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled()) + memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled()) app.ClobKeeper = clobmodulekeeper.NewKeeper( appCodec, diff --git a/protocol/lib/context.go b/protocol/lib/context.go index 4380ea6ce7..9bca90064d 100644 --- a/protocol/lib/context.go +++ b/protocol/lib/context.go @@ -2,9 +2,17 @@ package lib import ( "fmt" + "github.com/cometbft/cometbft/crypto/tmhash" ) +// Custom exec modes +const ( + ExecModeBeginBlock = uint32(100) + ExecModeEndBlock = uint32(101) + ExecModePrepareCheckState = uint32(102) +) + type TxHash string func GetTxHash(tx []byte) TxHash { diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index ba2a9e6f27..df09418670 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -812,6 +812,11 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot +func (_m *ClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(ctx, offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *ClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index ffd1830df7..8e3549383f 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -254,6 +254,66 @@ func (_m *MemClob) GetOrderRemainingAmount(ctx types.Context, order clobtypes.Or return r0, r1 } +// GetOrderbookUpdatesForOrderPlacement provides a mock function with given fields: ctx, order +func (_m *MemClob) GetOrderbookUpdatesForOrderPlacement(ctx types.Context, order clobtypes.Order) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, order) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderPlacement") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.Order) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, order) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderRemoval provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderRemoval(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderRemoval") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderUpdate provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderUpdate(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderUpdate") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + // GetPricePremium provides a mock function with given fields: ctx, clobPair, params func (_m *MemClob) GetPricePremium(ctx types.Context, clobPair clobtypes.ClobPair, params perpetualstypes.GetPricePremiumParams) (int32, error) { ret := _m.Called(ctx, clobPair, params) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 32c78c5663..a7cfd906bd 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -355,6 +355,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot +func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(ctx, offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *MemClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 95ae0a984e..501d7f96f5 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -76,6 +76,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) { // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) @@ -103,17 +105,23 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) for _, clobPairId := range subscription.clobPairIds { if updates, ok := v1updates[clobPairId]; ok { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updates, - Snapshot: snapshot, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - break - } + updatesToSend = append(updatesToSend, updates...) + } + } + + if len(updatesToSend) > 0 { + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + Snapshot: snapshot, + BlockHeight: blockHeight, + ExecMode: execMode, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) } } } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index f8cc229772..0d74704ace 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -29,6 +29,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) { } diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 0027358e79..a2b5b35501 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -18,5 +18,7 @@ type GrpcStreamingManager interface { SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 3cc9df6377..849bdc3673 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -503,3 +503,10 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForNewOrder(ctx sdk func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger() } + +func (f *FakeMemClobKeeper) SendOrderbookUpdates( + ctx sdk.Context, + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { +} diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 5c2a5a7ce3..f378c1feb4 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -20,6 +20,7 @@ func BeginBlocker( ctx sdk.Context, keeper types.ClobKeeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModeBeginBlock) // Initialize the set of process proposer match events for the next block effectively // removing any events that occurred in the last block. keeper.MustSetProcessProposerMatchesEvents( @@ -35,6 +36,8 @@ func EndBlocker( ctx sdk.Context, keeper keeper.Keeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModeEndBlock) + processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx) // Prune any fill amounts from state which are now past their `pruneableBlockHeight`. @@ -117,6 +120,8 @@ func PrepareCheckState( ctx sdk.Context, keeper *keeper.Keeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModePrepareCheckState) + // Get the events generated from processing the matches in the latest block. processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx) if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) { @@ -152,6 +157,45 @@ func PrepareCheckState( offchainUpdates, ) + // For orders that are filled in the last block, send an orderbook update to the grpc streams. + if keeper.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + orderIdsToSend := make(map[types.OrderId]bool) + + // Send an update for reverted local operations. + for _, operation := range localValidatorOperationsQueue { + if match := operation.GetMatch(); match != nil { + // For normal order matches, we send an update for the taker and maker orders. + if matchedOrders := match.GetMatchOrders(); matchedOrders != nil { + orderIdsToSend[matchedOrders.TakerOrderId] = true + for _, fill := range matchedOrders.Fills { + orderIdsToSend[fill.MakerOrderId] = true + } + } + // For liquidation matches, we send an update for the maker orders. + if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil { + for _, fill := range matchedLiquidation.Fills { + orderIdsToSend[fill.MakerOrderId] = true + } + } + } + } + + // Send an update for orders that were proposed. + for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { + orderIdsToSend[orderId] = true + } + + // Send update. + for orderId := range orderIdsToSend { + if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { + orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + keeper.SendOrderbookUpdates(ctx, allUpdates, false) + } + // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. diff --git a/protocol/x/clob/abci_test.go b/protocol/x/clob/abci_test.go index e05ccd7076..d7a9fc636d 100644 --- a/protocol/x/clob/abci_test.go +++ b/protocol/x/clob/abci_test.go @@ -143,7 +143,7 @@ func TestEndBlocker_Failure(t *testing.T) { for _, orderId := range tc.expiredStatefulOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( @@ -764,7 +764,7 @@ func TestEndBlocker_Success(t *testing.T) { // Assert that the indexer events for Expired Stateful Orders were emitted. for _, orderId := range tc.expectedProcessProposerMatchesEvents.ExpiredStatefulOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( @@ -779,7 +779,7 @@ func TestEndBlocker_Success(t *testing.T) { // Assert that the indexer events for triggered conditional orders were emitted. for _, orderId := range tc.expectedTriggeredConditionalOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index bb02c92867..1c09052f6f 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -235,5 +235,24 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - streamingManager.SendOrderbookUpdates(allUpdates, true) + k.SendOrderbookUpdates(ctx, allUpdates, true) +} + +// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. +func (k Keeper) SendOrderbookUpdates( + ctx sdk.Context, + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { + if len(offchainUpdates.Messages) == 0 { + return + } + + execMode, _ := ctx.Value("ExecMode").(uint32) + k.GetGrpcStreamingManager().SendOrderbookUpdates( + offchainUpdates, + snapshot, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + execMode, + ) } diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 1f28be8f67..e75a2692bb 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -259,5 +260,27 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight()) // Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`. - k.PruneOrdersForBlockHeight(ctx, blockHeight) + prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight) + + // Send an orderbook update for each pruned order for grpc streams. + // This is needed because short term orders are pruned in PrepareCheckState using + // keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount + // pruning when there's replacement. + // Long-term fix would be to add logic to keep them in sync. + // TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync. + if k.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + for _, orderId := range prunedOrderIds { + if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { + if message, success := off_chain_updates.CreateOrderUpdateMessage( + k.Logger(ctx), + orderId, + 0, // Total filled quantums is zero because it's been pruned from state. + ); success { + allUpdates.AddUpdateMessage(orderId, message) + } + } + } + k.SendOrderbookUpdates(ctx, allUpdates, false) + } } diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index 0ae4ab9cd4..5046c51ff2 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1270,8 +1270,6 @@ func (k Keeper) SendOffchainMessages( } k.GetIndexerEventManager().SendOffchainData(update) } - - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false) } // getPessimisticCollateralCheckPrice returns the price in subticks we should use for collateralization checks. diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index f02fba7f74..3ae395f23b 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -45,6 +45,9 @@ type MemClobPriceTimePriority struct { // ---- Fields for determining if off-chain update messages should be generated ---- generateOffchainUpdates bool + + // ---- Fields for determining if orderbook updates should be generated ---- + generateOrderbookUpdates bool } type OrderWithRemovalReason struct { @@ -56,10 +59,11 @@ func NewMemClobPriceTimePriority( generateOffchainUpdates bool, ) *MemClobPriceTimePriority { return &MemClobPriceTimePriority{ - openOrders: newMemclobOpenOrders(), - cancels: newMemclobCancels(), - operationsToPropose: *types.NewOperationsToPropose(), - generateOffchainUpdates: generateOffchainUpdates, + openOrders: newMemclobOpenOrders(), + cancels: newMemclobCancels(), + operationsToPropose: *types.NewOperationsToPropose(), + generateOffchainUpdates: generateOffchainUpdates, + generateOrderbookUpdates: false, } } @@ -71,6 +75,11 @@ func (m *MemClobPriceTimePriority) SetClobKeeper(clobKeeper types.MemClobKeeper) m.clobKeeper = clobKeeper } +// SetGenerateOffchainUpdates sets the `generateOffchainUpdates` field of the MemClob. +func (m *MemClobPriceTimePriority) SetGenerateOrderbookUpdates(generateOrderbookUpdates bool) { + m.generateOrderbookUpdates = generateOrderbookUpdates +} + // CancelOrder removes a Short-Term order by `OrderId` (if it exists) from all order-related data structures // in the memclob. This method manages only Short-Term cancellations. For stateful cancellations, see // `msg_server_cancel_orders.go`. @@ -1482,6 +1491,12 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( } m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel) + + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) + } } // mustPerformTakerOrderMatching performs matching using the provided taker order while the order @@ -1917,6 +1932,12 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( !m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) { m.operationsToPropose.RemoveShortTermOrderTxBytes(order) } + + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) + } } // mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order. @@ -1934,6 +1955,12 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder panic("Total filled size of maker order greater than the order size") } + // Send an orderbook update for the order's new total filled amount. + if m.generateOrderbookUpdates { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) + } + // If the order is fully filled, remove it from the orderbook. // Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since // the order was matched. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 6cc8844bc5..1ab7c03ad3 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -3,6 +3,8 @@ package memclob import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( return offchainUpdates } -// GetOffchainUpdatesForOrder returns a place order offchain message and -// a update order offchain message used to construct an order for -// the orderbook snapshot grpc stream. -func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( +// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and +// a update order offchain message used to add an order for +// the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( ctx sdk.Context, order types.Order, ) (offchainUpdates *types.OffchainUpdates) { @@ -86,3 +88,43 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( return offchainUpdates } + +// GetOrderbookUpdatesForOrderRemoval returns a remove order offchain message +// used to remove an order for the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( + m.clobKeeper.Logger(ctx), + orderId, + indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ); success { + offchainUpdates.AddRemoveMessage(orderId, message) + } + return offchainUpdates +} + +// GetOrderbookUpdatesForOrderUpdate returns an update order offchain message +// used to update an order for the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + + // Get the current fill amount of the order. + fillAmount := m.GetOrderFilledAmount(ctx, orderId) + + // Generate an update message updating the total filled amount of order. + if message, success := off_chain_updates.CreateOrderUpdateMessage( + m.clobKeeper.Logger(ctx), + orderId, + fillAmount, + ); success { + offchainUpdates.AddUpdateMessage(orderId, message) + } + return offchainUpdates +} diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go index 586c120d4d..07471e3b22 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go @@ -25,6 +25,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { "Logger", mock.Anything, ).Return(ctx.Logger()) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -48,9 +49,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { expected := types.NewOffchainUpdates() // Buy orders are in descending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) require.Equal(t, expected, offchainUpdates) } @@ -68,6 +69,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { "Logger", mock.Anything, ).Return(ctx.Logger()) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -91,9 +93,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { expected := types.NewOffchainUpdates() // Sell orders are in ascending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) require.Equal(t, expected, offchainUpdates) } diff --git a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go index a206b9f159..8d80998aa5 100644 --- a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go +++ b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go @@ -249,10 +249,11 @@ func TestPurgeInvalidMemclobState(t *testing.T) { // Setup memclob state. ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) - mockMemClobKeeper := &mocks.MemClobKeeper{} memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} memclob.SetClobKeeper(mockMemClobKeeper) mockMemClobKeeper.On("Logger", mock.Anything).Return(log.NewNopLogger()).Maybe() + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() for _, operation := range tc.placedOperations { switch operation.Operation.(type) { @@ -339,6 +340,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateCanceledStatefulO ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + canceledStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -368,6 +373,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenNonStatefulOrderIsCanceled(t *testin ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( @@ -395,6 +404,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + expiredStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -425,6 +438,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithShortTermExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( diff --git a/protocol/x/clob/memclob/memclob_remove_order_test.go b/protocol/x/clob/memclob/memclob_remove_order_test.go index 7d3aa4e0b9..15c1d88803 100644 --- a/protocol/x/clob/memclob/memclob_remove_order_test.go +++ b/protocol/x/clob/memclob/memclob_remove_order_test.go @@ -330,6 +330,7 @@ func TestRemoveOrderIfFilled(t *testing.T) { memClobKeeper.On("AddOrderToOrderbookCollatCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)) memClobKeeper.On("ValidateSubaccountEquityTierLimitForNewOrder", mock.Anything, mock.Anything).Return(nil) + memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() // Set initial fill amount to `0` for all orders. initialCall := memClobKeeper.On("GetOrderFillAmount", mock.Anything, mock.Anything). diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index c31b741155..5d1ae91b6e 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -127,5 +127,11 @@ type ClobKeeper interface { clobPair ClobPair, ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error + // Gprc streaming InitializeNewGrpcStreams(ctx sdk.Context) + SendOrderbookUpdates( + ctx sdk.Context, + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 381c5870d1..d959698d61 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -111,4 +111,9 @@ type MemClobKeeper interface { Logger( ctx sdk.Context, ) log.Logger + SendOrderbookUpdates( + ctx sdk.Context, + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index 2a0539b884..347c2f3254 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -141,4 +141,16 @@ type MemClob interface { ctx sdk.Context, clobPairId ClobPairId, ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderPlacement( + ctx sdk.Context, + order Order, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) } diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index 7f48a50310..ee500746b5 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -711,6 +711,11 @@ type StreamOrderbookUpdatesResponse struct { // Note that if the snapshot is true, then all previous entries should be // discarded and the orderbook should be resynced. Snapshot bool `protobuf:"varint,2,opt,name=snapshot,proto3" json:"snapshot,omitempty"` + // ---Additional fields used to debug issues--- + // Block height of the updates. + BlockHeight uint32 `protobuf:"varint,3,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` + // Exec mode of the updates. + ExecMode uint32 `protobuf:"varint,4,opt,name=exec_mode,json=execMode,proto3" json:"exec_mode,omitempty"` } func (m *StreamOrderbookUpdatesResponse) Reset() { *m = StreamOrderbookUpdatesResponse{} } @@ -760,6 +765,20 @@ func (m *StreamOrderbookUpdatesResponse) GetSnapshot() bool { return false } +func (m *StreamOrderbookUpdatesResponse) GetBlockHeight() uint32 { + if m != nil { + return m.BlockHeight + } + return 0 +} + +func (m *StreamOrderbookUpdatesResponse) GetExecMode() uint32 { + if m != nil { + return m.ExecMode + } + return 0 +} + func init() { proto.RegisterType((*QueryGetClobPairRequest)(nil), "dydxprotocol.clob.QueryGetClobPairRequest") proto.RegisterType((*QueryClobPairResponse)(nil), "dydxprotocol.clob.QueryClobPairResponse") @@ -781,75 +800,77 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/query.proto", fileDescriptor_3365c195b25c5bc0) } var fileDescriptor_3365c195b25c5bc0 = []byte{ - // 1073 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcf, 0x6f, 0xdc, 0x44, - 0x14, 0xce, 0x24, 0xa5, 0x4d, 0xa7, 0x80, 0x60, 0xd2, 0xa4, 0x8b, 0x93, 0x6c, 0xb6, 0x86, 0x24, - 0x9b, 0x54, 0xd8, 0x49, 0x5a, 0xa1, 0x92, 0xa2, 0x4a, 0x49, 0x04, 0x11, 0x52, 0x43, 0x17, 0x53, - 0x02, 0x82, 0x4a, 0xd6, 0xac, 0x3d, 0xbb, 0x19, 0xc5, 0xf6, 0x38, 0xf6, 0xd8, 0x4a, 0x84, 0x10, - 0x12, 0x07, 0x2e, 0x70, 0x40, 0x42, 0x82, 0x03, 0x47, 0xee, 0xfc, 0x07, 0x08, 0xb8, 0xf5, 0x58, - 0x89, 0x0b, 0x07, 0x84, 0x50, 0xc2, 0x99, 0xbf, 0x01, 0x79, 0x3c, 0xbb, 0x5d, 0xc7, 0x3f, 0x36, - 0xc9, 0x65, 0xd7, 0x9e, 0xf9, 0xde, 0xf3, 0xf7, 0xbd, 0xf7, 0xfc, 0x8d, 0xe1, 0xac, 0x7d, 0x64, - 0x1f, 0xfa, 0x01, 0xe3, 0xcc, 0x62, 0x8e, 0x6e, 0x39, 0xac, 0xad, 0x1f, 0x44, 0x24, 0x38, 0xd2, - 0xc4, 0x1a, 0x7a, 0x79, 0x70, 0x5b, 0x4b, 0xb6, 0x95, 0xeb, 0x5d, 0xd6, 0x65, 0x62, 0x49, 0x4f, - 0xae, 0x52, 0xa0, 0x32, 0xd3, 0x65, 0xac, 0xeb, 0x10, 0x1d, 0xfb, 0x54, 0xc7, 0x9e, 0xc7, 0x38, - 0xe6, 0x94, 0x79, 0xa1, 0xdc, 0x5d, 0xb6, 0x58, 0xe8, 0xb2, 0x50, 0x6f, 0xe3, 0x90, 0xa4, 0xf9, - 0xf5, 0x78, 0xb5, 0x4d, 0x38, 0x5e, 0xd5, 0x7d, 0xdc, 0xa5, 0x9e, 0x00, 0x4b, 0xac, 0x9e, 0x67, - 0xd4, 0x76, 0x98, 0xb5, 0x6f, 0x06, 0x98, 0x13, 0xd3, 0xa1, 0x2e, 0xe5, 0xa6, 0xc5, 0xbc, 0x0e, - 0xed, 0xca, 0x80, 0x9b, 0xf9, 0x80, 0xe4, 0xc7, 0xf4, 0x31, 0x0d, 0x24, 0x64, 0x25, 0x0f, 0x21, - 0x07, 0x11, 0xe5, 0x47, 0x26, 0xa7, 0x24, 0x28, 0x4a, 0x7a, 0x2b, 0x1f, 0xe1, 0xd0, 0x83, 0x88, - 0xda, 0xa9, 0xae, 0x2c, 0x78, 0x3a, 0x0f, 0x76, 0x49, 0x2c, 0x37, 0xef, 0x67, 0x36, 0xa9, 0x67, - 0x93, 0x43, 0x12, 0xe8, 0xac, 0xd3, 0x31, 0xad, 0x3d, 0x4c, 0x3d, 0x33, 0xf2, 0x6d, 0xcc, 0x49, - 0x98, 0x5f, 0x49, 0xe3, 0xd5, 0x25, 0x78, 0xe3, 0xfd, 0xa4, 0x62, 0xdb, 0x84, 0x6f, 0x39, 0xac, - 0xdd, 0xc2, 0x34, 0x30, 0xc8, 0x41, 0x44, 0x42, 0x8e, 0x5e, 0x84, 0xa3, 0xd4, 0xae, 0x81, 0x06, - 0x68, 0xbe, 0x60, 0x8c, 0x52, 0x5b, 0xfd, 0x08, 0x4e, 0x0a, 0xe8, 0x33, 0x5c, 0xe8, 0x33, 0x2f, - 0x24, 0xe8, 0x3e, 0xbc, 0xda, 0x2f, 0x89, 0xc0, 0x5f, 0x5b, 0x9b, 0xd6, 0x72, 0xad, 0xd5, 0x7a, - 0x71, 0x9b, 0x97, 0x9e, 0xfc, 0x3d, 0x37, 0x62, 0x8c, 0x5b, 0xf2, 0x5e, 0xc5, 0x92, 0xc3, 0x86, - 0xe3, 0x9c, 0xe6, 0xf0, 0x0e, 0x84, 0xcf, 0x5a, 0x28, 0x73, 0x2f, 0x68, 0x69, 0xbf, 0xb5, 0xa4, - 0xdf, 0x5a, 0x3a, 0x4f, 0xb2, 0xdf, 0x5a, 0x0b, 0x77, 0x89, 0x8c, 0x35, 0x06, 0x22, 0xd5, 0x9f, - 0x00, 0xac, 0x65, 0xc8, 0x6f, 0x38, 0x4e, 0x19, 0xff, 0xb1, 0x73, 0xf2, 0x47, 0xdb, 0x19, 0x92, - 0xa3, 0x82, 0xe4, 0xe2, 0x50, 0x92, 0xe9, 0xc3, 0x33, 0x2c, 0xff, 0x02, 0x70, 0x6e, 0x87, 0xc4, - 0xef, 0x31, 0x9b, 0x3c, 0x62, 0xc9, 0xef, 0x16, 0x76, 0xac, 0xc8, 0x11, 0x9b, 0xbd, 0x8a, 0x3c, - 0x86, 0x53, 0xe9, 0xc0, 0xfa, 0x01, 0xf3, 0x59, 0x48, 0x02, 0xd3, 0xc5, 0xdc, 0xda, 0x23, 0x61, - 0xbf, 0x3a, 0x79, 0xe6, 0xbb, 0xd8, 0x49, 0x46, 0x8b, 0x05, 0x3b, 0x24, 0xde, 0x49, 0xd1, 0xc6, - 0x75, 0x91, 0xa5, 0x25, 0x93, 0xc8, 0x55, 0xf4, 0x29, 0x9c, 0x8c, 0x7b, 0x60, 0xd3, 0x25, 0xb1, - 0xe9, 0x12, 0x1e, 0x50, 0x2b, 0xec, 0xab, 0xca, 0x27, 0xcf, 0x10, 0xde, 0x49, 0xe1, 0xc6, 0x44, - 0x3c, 0xf8, 0xc8, 0x74, 0x51, 0xfd, 0x0f, 0xc0, 0x46, 0xb9, 0x3c, 0xd9, 0x8c, 0x2e, 0xbc, 0x12, - 0x90, 0x30, 0x72, 0x78, 0x28, 0x5b, 0xb1, 0x3d, 0xec, 0x99, 0x05, 0x59, 0x12, 0xc0, 0x86, 0x67, - 0xef, 0x32, 0x27, 0x72, 0x49, 0x8b, 0x04, 0x49, 0xeb, 0x64, 0xdb, 0x7a, 0xd9, 0x15, 0x0c, 0x27, - 0x0a, 0x50, 0xa8, 0x01, 0x9f, 0xef, 0x0f, 0x83, 0xd9, 0x9f, 0x7f, 0xd8, 0x6b, 0xf6, 0xbb, 0x36, - 0x7a, 0x09, 0x8e, 0xb9, 0x24, 0x16, 0x15, 0x19, 0x35, 0x92, 0x4b, 0x34, 0x05, 0x2f, 0xc7, 0x22, - 0x49, 0x6d, 0xac, 0x01, 0x9a, 0x97, 0x0c, 0x79, 0xa7, 0x2e, 0xc3, 0xa6, 0x18, 0xba, 0xb7, 0x85, - 0x1b, 0x3c, 0xa2, 0x24, 0x78, 0x90, 0x78, 0xc1, 0x96, 0x78, 0xbb, 0xa3, 0x60, 0xb0, 0xaf, 0xea, - 0x8f, 0x00, 0x2e, 0x9d, 0x01, 0x2c, 0xab, 0xe4, 0xc1, 0x5a, 0x99, 0xc5, 0xc8, 0x39, 0xd0, 0x0b, - 0xca, 0x56, 0x95, 0x5a, 0x96, 0x67, 0x92, 0x14, 0x61, 0xd4, 0x25, 0xb8, 0x28, 0xc8, 0x6d, 0x26, - 0x43, 0x63, 0x60, 0x4e, 0xca, 0x85, 0xfc, 0x00, 0xa4, 0xea, 0x4a, 0xac, 0xd4, 0xb1, 0x0f, 0x6f, - 0x94, 0xd8, 0xaf, 0x94, 0xa1, 0x15, 0xc8, 0xa8, 0x48, 0x2c, 0x55, 0xa4, 0xc3, 0x7d, 0x0a, 0xa2, - 0x2e, 0xc2, 0x79, 0x41, 0xec, 0xc1, 0x80, 0xd5, 0x16, 0x4a, 0xf8, 0x0a, 0xc0, 0x85, 0x61, 0x48, - 0x29, 0xe0, 0x31, 0x9c, 0x28, 0x70, 0x6e, 0x49, 0x7e, 0xbe, 0x80, 0x7c, 0x3e, 0xa5, 0xe4, 0x8c, - 0x9c, 0xdc, 0x8e, 0xba, 0x01, 0x67, 0x3f, 0xe0, 0x01, 0xc1, 0xee, 0xc3, 0xc0, 0x26, 0x41, 0x9b, - 0xb1, 0xfd, 0x0f, 0x53, 0xf7, 0xee, 0xb9, 0x41, 0x7e, 0x5a, 0xc7, 0xb2, 0xd3, 0xaa, 0x7e, 0x0f, - 0x60, 0xbd, 0x2c, 0x87, 0xd4, 0xf0, 0x31, 0xbc, 0x22, 0x0f, 0x05, 0xf9, 0xca, 0xdd, 0xcd, 0xf2, - 0x96, 0xa7, 0x8a, 0x96, 0x3f, 0x43, 0x1e, 0x76, 0x3a, 0x5b, 0xc9, 0x42, 0x9a, 0x71, 0x77, 0xb5, - 0xf7, 0x8e, 0xc9, 0x7d, 0xa4, 0xc0, 0xf1, 0xd0, 0xc3, 0x7e, 0xb8, 0xc7, 0xb8, 0x78, 0x5f, 0xc6, - 0x8d, 0xfe, 0xfd, 0xda, 0xcf, 0x57, 0xe1, 0x73, 0xa2, 0xc8, 0xe8, 0x6b, 0x00, 0xc7, 0x7b, 0xe6, - 0x8a, 0x96, 0x0b, 0x6a, 0x56, 0x72, 0x42, 0x29, 0xcd, 0x32, 0xec, 0xe9, 0x23, 0x4a, 0x5d, 0xfa, - 0xf2, 0x8f, 0x7f, 0xbf, 0x1b, 0x7d, 0x15, 0xdd, 0xd4, 0x2b, 0x8e, 0x73, 0xfd, 0x33, 0x6a, 0x7f, - 0x8e, 0xbe, 0x01, 0xf0, 0xda, 0xc0, 0x29, 0x51, 0x4e, 0x28, 0x7f, 0x5c, 0x29, 0xb7, 0x86, 0x11, - 0x1a, 0x38, 0x76, 0xd4, 0xd7, 0x04, 0xa7, 0x3a, 0x9a, 0xa9, 0xe2, 0x84, 0x7e, 0x05, 0xb0, 0x56, - 0x66, 0x77, 0x68, 0xed, 0x5c, 0xde, 0x98, 0x72, 0xbc, 0x7d, 0x01, 0x3f, 0x55, 0xd7, 0x05, 0xd7, - 0x3b, 0xeb, 0x60, 0x59, 0xd5, 0xf5, 0xc2, 0xef, 0x11, 0xd3, 0x63, 0x36, 0x31, 0x39, 0x4b, 0xff, - 0xad, 0x01, 0x92, 0xbf, 0x03, 0x38, 0x53, 0xe5, 0x3c, 0xe8, 0x5e, 0x59, 0xd5, 0xce, 0xe0, 0x9b, - 0xca, 0x5b, 0x17, 0x0b, 0x96, 0xba, 0x16, 0x84, 0xae, 0x06, 0xaa, 0xeb, 0x95, 0xdf, 0x70, 0xe8, - 0x17, 0x00, 0xa7, 0x2b, 0x6c, 0x07, 0xad, 0x97, 0xb1, 0x18, 0x6e, 0x98, 0xca, 0xbd, 0x0b, 0xc5, - 0x4a, 0x01, 0xf3, 0x42, 0xc0, 0x1c, 0x9a, 0xad, 0xfc, 0xb0, 0x45, 0xbf, 0x01, 0xf8, 0x4a, 0xa9, - 0x99, 0xa1, 0xbb, 0x65, 0x0c, 0x86, 0x39, 0xa5, 0xf2, 0xe6, 0x05, 0x22, 0x25, 0x73, 0x4d, 0x30, - 0x6f, 0xa2, 0x05, 0xfd, 0x4c, 0x1f, 0xc3, 0xe8, 0x0b, 0x38, 0x55, 0xec, 0x63, 0x68, 0xa5, 0x80, - 0x44, 0xa5, 0x6d, 0x2a, 0xab, 0xe7, 0x88, 0x48, 0xe9, 0xae, 0x80, 0xcd, 0xd6, 0x93, 0xe3, 0x3a, - 0x78, 0x7a, 0x5c, 0x07, 0xff, 0x1c, 0xd7, 0xc1, 0xb7, 0x27, 0xf5, 0x91, 0xa7, 0x27, 0xf5, 0x91, - 0x3f, 0x4f, 0xea, 0x23, 0x9f, 0xbc, 0xd1, 0xa5, 0x7c, 0x2f, 0x6a, 0x6b, 0x16, 0x73, 0xb3, 0x62, - 0xe2, 0x3b, 0xaf, 0x0b, 0xc3, 0xd4, 0xfb, 0x2b, 0x87, 0xa9, 0x40, 0x7e, 0xe4, 0x93, 0xb0, 0x7d, - 0x59, 0x2c, 0xdf, 0xfe, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x3b, 0xb1, 0xe8, 0xed, 0x27, 0x0d, 0x00, - 0x00, + // 1112 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x4f, 0x24, 0x45, + 0x14, 0xa7, 0x00, 0x77, 0xe1, 0xb1, 0x6b, 0xb4, 0x58, 0xd8, 0x71, 0x80, 0x01, 0x5a, 0xf9, 0xdc, + 0xd8, 0x0d, 0xec, 0xc6, 0xac, 0xac, 0xd9, 0x04, 0x88, 0xa2, 0xc9, 0xe2, 0x62, 0xbb, 0xa2, 0xd1, + 0x4d, 0x3a, 0x35, 0xdd, 0xc5, 0x4c, 0x85, 0xee, 0xae, 0xa1, 0xbb, 0xa6, 0x03, 0x31, 0xc6, 0xc4, + 0x83, 0x17, 0x3d, 0x98, 0x78, 0xf0, 0xe0, 0xd1, 0xbb, 0xff, 0x81, 0x51, 0x6f, 0x7b, 0xdc, 0xc4, + 0xc4, 0x78, 0x30, 0xc6, 0x80, 0x67, 0xff, 0x86, 0x4d, 0x57, 0xd7, 0xcc, 0xce, 0xd0, 0x1f, 0x03, + 0x5c, 0x66, 0xba, 0x5f, 0xfd, 0xde, 0xeb, 0xdf, 0xfb, 0xa8, 0x5f, 0x15, 0x4c, 0x39, 0xc7, 0xce, + 0x51, 0x23, 0xe0, 0x82, 0xdb, 0xdc, 0x35, 0x6c, 0x97, 0x57, 0x8d, 0xc3, 0x26, 0x0d, 0x8e, 0x75, + 0x69, 0xc3, 0x2f, 0x77, 0x2e, 0xeb, 0xf1, 0x72, 0xf9, 0x46, 0x8d, 0xd7, 0xb8, 0x34, 0x19, 0xf1, + 0x53, 0x02, 0x2c, 0x4f, 0xd6, 0x38, 0xaf, 0xb9, 0xd4, 0x20, 0x0d, 0x66, 0x10, 0xdf, 0xe7, 0x82, + 0x08, 0xc6, 0xfd, 0x50, 0xad, 0x2e, 0xdb, 0x3c, 0xf4, 0x78, 0x68, 0x54, 0x49, 0x48, 0x93, 0xf8, + 0x46, 0xb4, 0x5a, 0xa5, 0x82, 0xac, 0x1a, 0x0d, 0x52, 0x63, 0xbe, 0x04, 0x2b, 0xac, 0x91, 0x66, + 0x54, 0x75, 0xb9, 0x7d, 0x60, 0x05, 0x44, 0x50, 0xcb, 0x65, 0x1e, 0x13, 0x96, 0xcd, 0xfd, 0x7d, + 0x56, 0x53, 0x0e, 0xb3, 0x69, 0x87, 0xf8, 0xc7, 0x6a, 0x10, 0x16, 0x28, 0xc8, 0x4a, 0x1a, 0x42, + 0x0f, 0x9b, 0x4c, 0x1c, 0x5b, 0x82, 0xd1, 0x20, 0x2b, 0xe8, 0xad, 0xb4, 0x87, 0xcb, 0x0e, 0x9b, + 0xcc, 0x49, 0xf2, 0xea, 0x06, 0x4f, 0xa4, 0xc1, 0x1e, 0x8d, 0xd4, 0xe2, 0xfd, 0xae, 0x45, 0xe6, + 0x3b, 0xf4, 0x88, 0x06, 0x06, 0xdf, 0xdf, 0xb7, 0xec, 0x3a, 0x61, 0xbe, 0xd5, 0x6c, 0x38, 0x44, + 0xd0, 0x30, 0x6d, 0x49, 0xfc, 0xb5, 0x25, 0xb8, 0xf9, 0x41, 0x5c, 0xb1, 0x6d, 0x2a, 0xb6, 0x5c, + 0x5e, 0xdd, 0x25, 0x2c, 0x30, 0xe9, 0x61, 0x93, 0x86, 0x02, 0xbf, 0x08, 0xfd, 0xcc, 0x29, 0xa1, + 0x19, 0xb4, 0x78, 0xdd, 0xec, 0x67, 0x8e, 0xf6, 0x31, 0x8c, 0x49, 0xe8, 0x73, 0x5c, 0xd8, 0xe0, + 0x7e, 0x48, 0xf1, 0x7d, 0x18, 0x6e, 0x97, 0x44, 0xe2, 0x47, 0xd6, 0x26, 0xf4, 0x54, 0x6b, 0xf5, + 0x96, 0xdf, 0xe6, 0xe0, 0x93, 0x7f, 0xa6, 0xfb, 0xcc, 0x21, 0x5b, 0xbd, 0x6b, 0x44, 0x71, 0xd8, + 0x70, 0xdd, 0xb3, 0x1c, 0xde, 0x01, 0x78, 0xde, 0x42, 0x15, 0x7b, 0x5e, 0x4f, 0xfa, 0xad, 0xc7, + 0xfd, 0xd6, 0x93, 0x79, 0x52, 0xfd, 0xd6, 0x77, 0x49, 0x8d, 0x2a, 0x5f, 0xb3, 0xc3, 0x53, 0xfb, + 0x09, 0x41, 0xa9, 0x8b, 0xfc, 0x86, 0xeb, 0xe6, 0xf1, 0x1f, 0xb8, 0x20, 0x7f, 0xbc, 0xdd, 0x45, + 0xb2, 0x5f, 0x92, 0x5c, 0xe8, 0x49, 0x32, 0xf9, 0x78, 0x17, 0xcb, 0xbf, 0x11, 0x4c, 0xef, 0xd0, + 0xe8, 0x7d, 0xee, 0xd0, 0x47, 0x3c, 0xfe, 0xdd, 0x22, 0xae, 0xdd, 0x74, 0xe5, 0x62, 0xab, 0x22, + 0x8f, 0x61, 0x3c, 0x19, 0xd8, 0x46, 0xc0, 0x1b, 0x3c, 0xa4, 0x81, 0xe5, 0x11, 0x61, 0xd7, 0x69, + 0xd8, 0xae, 0x4e, 0x9a, 0xf9, 0x1e, 0x71, 0xe3, 0xd1, 0xe2, 0xc1, 0x0e, 0x8d, 0x76, 0x12, 0xb4, + 0x79, 0x43, 0x46, 0xd9, 0x55, 0x41, 0x94, 0x15, 0x7f, 0x06, 0x63, 0x51, 0x0b, 0x6c, 0x79, 0x34, + 0xb2, 0x3c, 0x2a, 0x02, 0x66, 0x87, 0xed, 0xac, 0xd2, 0xc1, 0xbb, 0x08, 0xef, 0x24, 0x70, 0x73, + 0x34, 0xea, 0xfc, 0x64, 0x62, 0xd4, 0xfe, 0x47, 0x30, 0x93, 0x9f, 0x9e, 0x6a, 0x46, 0x0d, 0xae, + 0x06, 0x34, 0x6c, 0xba, 0x22, 0x54, 0xad, 0xd8, 0xee, 0xf5, 0xcd, 0x8c, 0x28, 0x31, 0x60, 0xc3, + 0x77, 0xf6, 0xb8, 0xdb, 0xf4, 0xe8, 0x2e, 0x0d, 0xe2, 0xd6, 0xa9, 0xb6, 0xb5, 0xa2, 0x97, 0x09, + 0x8c, 0x66, 0xa0, 0xf0, 0x0c, 0x5c, 0x6b, 0x0f, 0x83, 0xd5, 0x9e, 0x7f, 0x68, 0x35, 0xfb, 0x3d, + 0x07, 0xbf, 0x04, 0x03, 0x1e, 0x8d, 0x64, 0x45, 0xfa, 0xcd, 0xf8, 0x11, 0x8f, 0xc3, 0x95, 0x48, + 0x06, 0x29, 0x0d, 0xcc, 0xa0, 0xc5, 0x41, 0x53, 0xbd, 0x69, 0xcb, 0xb0, 0x28, 0x87, 0xee, 0x6d, + 0xa9, 0x06, 0x8f, 0x18, 0x0d, 0x1e, 0xc4, 0x5a, 0xb0, 0x25, 0x77, 0x77, 0x33, 0xe8, 0xec, 0xab, + 0xf6, 0x23, 0x82, 0xa5, 0x73, 0x80, 0x55, 0x95, 0x7c, 0x28, 0xe5, 0x49, 0x8c, 0x9a, 0x03, 0x23, + 0xa3, 0x6c, 0x45, 0xa1, 0x55, 0x79, 0xc6, 0x68, 0x16, 0x46, 0x5b, 0x82, 0x05, 0x49, 0x6e, 0x33, + 0x1e, 0x1a, 0x93, 0x08, 0x9a, 0x9f, 0xc8, 0x0f, 0x48, 0x65, 0x5d, 0x88, 0x55, 0x79, 0x1c, 0xc0, + 0xcd, 0x1c, 0xf9, 0x55, 0x69, 0xe8, 0x19, 0x69, 0x14, 0x04, 0x56, 0x59, 0x24, 0xc3, 0x7d, 0x06, + 0xa2, 0x2d, 0xc0, 0x9c, 0x24, 0xf6, 0xa0, 0x43, 0x6a, 0x33, 0x53, 0xf8, 0x1a, 0xc1, 0x7c, 0x2f, + 0xa4, 0x4a, 0xe0, 0x31, 0x8c, 0x66, 0x28, 0xb7, 0x22, 0x3f, 0x97, 0x41, 0x3e, 0x1d, 0x52, 0x71, + 0xc6, 0x6e, 0x6a, 0x45, 0xdb, 0x80, 0xa9, 0x0f, 0x45, 0x40, 0x89, 0xf7, 0x30, 0x70, 0x68, 0x50, + 0xe5, 0xfc, 0xe0, 0xa3, 0x44, 0xbd, 0x5b, 0x6a, 0x90, 0x9e, 0xd6, 0x81, 0xee, 0x69, 0xd5, 0xfe, + 0x44, 0x50, 0xc9, 0x8b, 0xa1, 0x72, 0xf8, 0x04, 0xae, 0xaa, 0x43, 0x41, 0x6d, 0xb9, 0xbb, 0xdd, + 0xbc, 0xd5, 0xa9, 0xa2, 0xa7, 0xcf, 0x90, 0x87, 0xfb, 0xfb, 0x5b, 0xb1, 0x21, 0x89, 0xb8, 0xb7, + 0xda, 0xda, 0x63, 0x6a, 0x1d, 0x97, 0x61, 0x28, 0xf4, 0x49, 0x23, 0xac, 0x73, 0x21, 0xf7, 0xcb, + 0x90, 0xd9, 0x7e, 0xc7, 0xb3, 0x70, 0x2d, 0x69, 0x7d, 0x9d, 0xb2, 0x5a, 0x5d, 0xc8, 0xad, 0x73, + 0xdd, 0x1c, 0x91, 0xb6, 0x77, 0xa5, 0x09, 0x4f, 0xc0, 0x30, 0x3d, 0xa2, 0xb6, 0xe5, 0x71, 0x87, + 0x96, 0x06, 0xe5, 0xfa, 0x50, 0x6c, 0xd8, 0xe1, 0x0e, 0x5d, 0xfb, 0x79, 0x18, 0x5e, 0x90, 0x4d, + 0xc2, 0xdf, 0x20, 0x18, 0x6a, 0x89, 0x33, 0x5e, 0xce, 0xa8, 0x79, 0xce, 0x09, 0x57, 0x5e, 0xcc, + 0xc3, 0x9e, 0x3d, 0xe2, 0xb4, 0xa5, 0xaf, 0xfe, 0xf8, 0xef, 0xfb, 0xfe, 0x57, 0xf1, 0xac, 0x51, + 0x70, 0x1d, 0x30, 0x3e, 0x67, 0xce, 0x17, 0xf8, 0x5b, 0x04, 0x23, 0x1d, 0xa7, 0x4c, 0x3e, 0xa1, + 0xf4, 0x71, 0x57, 0xbe, 0xd5, 0x8b, 0x50, 0xc7, 0xb1, 0xa5, 0xbd, 0x26, 0x39, 0x55, 0xf0, 0x64, + 0x11, 0x27, 0xfc, 0x2b, 0x82, 0x52, 0x9e, 0x5c, 0xe2, 0xb5, 0x0b, 0x69, 0x6b, 0xc2, 0xf1, 0xf6, + 0x25, 0xf4, 0x58, 0x5b, 0x97, 0x5c, 0xef, 0xac, 0xa3, 0x65, 0xcd, 0x30, 0x32, 0xef, 0x33, 0x96, + 0xcf, 0x1d, 0x6a, 0x09, 0x9e, 0xfc, 0xdb, 0x1d, 0x24, 0x7f, 0x47, 0x30, 0x59, 0xa4, 0x5c, 0xf8, + 0x5e, 0x5e, 0xd5, 0xce, 0xa1, 0xbb, 0xe5, 0xb7, 0x2e, 0xe7, 0xac, 0xf2, 0x9a, 0x97, 0x79, 0xcd, + 0xe0, 0x8a, 0x51, 0x78, 0x07, 0xc4, 0xbf, 0x20, 0x98, 0x28, 0x90, 0x2d, 0xbc, 0x9e, 0xc7, 0xa2, + 0xb7, 0xe0, 0x96, 0xef, 0x5d, 0xca, 0x57, 0x25, 0x30, 0x27, 0x13, 0x98, 0xc6, 0x53, 0x85, 0x17, + 0x63, 0xfc, 0x1b, 0x82, 0x57, 0x72, 0xc5, 0x10, 0xdf, 0xcd, 0x63, 0xd0, 0x4b, 0x69, 0xcb, 0x6f, + 0x5e, 0xc2, 0x53, 0x31, 0xd7, 0x25, 0xf3, 0x45, 0x3c, 0x6f, 0x9c, 0xeb, 0x32, 0x8d, 0xbf, 0x84, + 0xf1, 0x6c, 0x1d, 0xc4, 0x2b, 0x19, 0x24, 0x0a, 0x65, 0xb7, 0xbc, 0x7a, 0x01, 0x8f, 0x84, 0xee, + 0x0a, 0xda, 0xdc, 0x7d, 0x72, 0x52, 0x41, 0x4f, 0x4f, 0x2a, 0xe8, 0xdf, 0x93, 0x0a, 0xfa, 0xee, + 0xb4, 0xd2, 0xf7, 0xf4, 0xb4, 0xd2, 0xf7, 0xd7, 0x69, 0xa5, 0xef, 0xd3, 0x37, 0x6a, 0x4c, 0xd4, + 0x9b, 0x55, 0xdd, 0xe6, 0x5e, 0x77, 0x32, 0xd1, 0x9d, 0xd7, 0xa5, 0xe0, 0x1a, 0x6d, 0xcb, 0x51, + 0x92, 0xa0, 0x38, 0x6e, 0xd0, 0xb0, 0x7a, 0x45, 0x9a, 0x6f, 0x3f, 0x0b, 0x00, 0x00, 0xff, 0xff, + 0xf1, 0xed, 0x1e, 0x02, 0x67, 0x0d, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1687,6 +1708,16 @@ func (m *StreamOrderbookUpdatesResponse) MarshalToSizedBuffer(dAtA []byte) (int, _ = i var l int _ = l + if m.ExecMode != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.ExecMode)) + i-- + dAtA[i] = 0x20 + } + if m.BlockHeight != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.BlockHeight)) + i-- + dAtA[i] = 0x18 + } if m.Snapshot { i-- if m.Snapshot { @@ -1921,6 +1952,12 @@ func (m *StreamOrderbookUpdatesResponse) Size() (n int) { if m.Snapshot { n += 2 } + if m.BlockHeight != 0 { + n += 1 + sovQuery(uint64(m.BlockHeight)) + } + if m.ExecMode != 0 { + n += 1 + sovQuery(uint64(m.ExecMode)) + } return n } @@ -3201,6 +3238,44 @@ func (m *StreamOrderbookUpdatesResponse) Unmarshal(dAtA []byte) error { } } m.Snapshot = bool(v != 0) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BlockHeight", wireType) + } + m.BlockHeight = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.BlockHeight |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ExecMode", wireType) + } + m.ExecMode = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ExecMode |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipQuery(dAtA[iNdEx:])