diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.rpc.Query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.rpc.Query.ts index 03e7723538..5f49481ef3 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.rpc.Query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.rpc.Query.ts @@ -25,7 +25,10 @@ export interface Query { /** Queries the stateful order for a given order id. */ statefulOrder(request: QueryStatefulOrderRequest): Promise; - /** Streams orderbook updates. */ + /** + * Streams orderbook updates. Updates contain orderbook data + * such as order placements, updates, and fills. + */ streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise; } diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts index 938280147a..50fe19d57f 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts @@ -1,11 +1,12 @@ import { PageRequest, PageRequestSDKType, PageResponse, PageResponseSDKType } from "../../cosmos/base/query/v1beta1/pagination"; import { ValidatorMevMatches, ValidatorMevMatchesSDKType, MevNodeToNodeMetrics, MevNodeToNodeMetricsSDKType } from "./mev"; -import { OrderId, OrderIdSDKType, LongTermOrderPlacement, LongTermOrderPlacementSDKType } from "./order"; +import { OrderId, OrderIdSDKType, LongTermOrderPlacement, LongTermOrderPlacementSDKType, Order, OrderSDKType } from "./order"; import { ClobPair, ClobPairSDKType } from "./clob_pair"; import { EquityTierLimitConfiguration, EquityTierLimitConfigurationSDKType } from "./equity_tier_limit_config"; import { BlockRateLimitConfiguration, BlockRateLimitConfigurationSDKType } from "./block_rate_limit_config"; import { LiquidationsConfig, LiquidationsConfigSDKType } from "./liquidations_config"; import { OffChainUpdateV1, OffChainUpdateV1SDKType } from "../indexer/off_chain_updates/off_chain_updates"; +import { ClobMatch, ClobMatchSDKType } from "./matches"; import * as _m0 from "protobufjs/minimal"; import { DeepPartial, Long } from "../../helpers"; /** QueryGetClobPairRequest is request type for the ClobPair method. */ @@ -267,15 +268,7 @@ export interface StreamOrderbookUpdatesRequestSDKType { export interface StreamOrderbookUpdatesResponse { /** Orderbook updates for the clob pair. */ - updates: OffChainUpdateV1[]; - /** - * Snapshot indicates if the response is from a snapshot of the orderbook. - * This is true for the initial response and false for all subsequent updates. - * Note that if the snapshot is true, then all previous entries should be - * discarded and the orderbook should be resynced. - */ - - snapshot: boolean; + updates: StreamUpdate[]; /** * ---Additional fields used to debug issues--- * Block height of the updates. @@ -293,6 +286,65 @@ export interface StreamOrderbookUpdatesResponse { export interface StreamOrderbookUpdatesResponseSDKType { /** Orderbook updates for the clob pair. */ + updates: StreamUpdateSDKType[]; + /** + * ---Additional fields used to debug issues--- + * Block height of the updates. + */ + + block_height: number; + /** Exec mode of the updates. */ + + exec_mode: number; +} +/** + * StreamUpdate is an update that will be pushed through the + * GRPC stream. + */ + +export interface StreamUpdate { + orderbookUpdate?: StreamOrderbookUpdate; + orderFill?: StreamOrderbookFill; +} +/** + * StreamUpdate is an update that will be pushed through the + * GRPC stream. + */ + +export interface StreamUpdateSDKType { + orderbook_update?: StreamOrderbookUpdateSDKType; + order_fill?: StreamOrderbookFillSDKType; +} +/** + * StreamOrderbookUpdate provides information on an orderbook update. Used in + * the full node GRPC stream. + */ + +export interface StreamOrderbookUpdate { + /** + * Orderbook updates for the clob pair. Can contain order place, removals, + * or updates. + */ + updates: OffChainUpdateV1[]; + /** + * Snapshot indicates if the response is from a snapshot of the orderbook. + * This is true for the initial response and false for all subsequent updates. + * Note that if the snapshot is true, then all previous entries should be + * discarded and the orderbook should be resynced. + */ + + snapshot: boolean; +} +/** + * StreamOrderbookUpdate provides information on an orderbook update. Used in + * the full node GRPC stream. + */ + +export interface StreamOrderbookUpdateSDKType { + /** + * Orderbook updates for the clob pair. Can contain order place, removals, + * or updates. + */ updates: OffChainUpdateV1SDKType[]; /** * Snapshot indicates if the response is from a snapshot of the orderbook. @@ -302,15 +354,48 @@ export interface StreamOrderbookUpdatesResponseSDKType { */ snapshot: boolean; +} +/** + * StreamOrderbookFill provides information on an orderbook fill. Used in + * the full node GRPC stream. + */ + +export interface StreamOrderbookFill { /** - * ---Additional fields used to debug issues--- - * Block height of the updates. + * Clob match. Provides information on which orders were matched + * and the type of order. Fill amounts here are relative. + */ + clobMatch?: ClobMatch; + /** + * All orders involved in the specified clob match. Used to look up + * price of a match through a given maker order id. */ - block_height: number; - /** Exec mode of the updates. */ + orders: Order[]; + /** Resulting fill amounts for each order in the orders array. */ - exec_mode: number; + fillAmounts: Long[]; +} +/** + * StreamOrderbookFill provides information on an orderbook fill. Used in + * the full node GRPC stream. + */ + +export interface StreamOrderbookFillSDKType { + /** + * Clob match. Provides information on which orders were matched + * and the type of order. Fill amounts here are relative. + */ + clob_match?: ClobMatchSDKType; + /** + * All orders involved in the specified clob match. Used to look up + * price of a match through a given maker order id. + */ + + orders: OrderSDKType[]; + /** Resulting fill amounts for each order in the orders array. */ + + fill_amounts: Long[]; } function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest { @@ -1075,7 +1160,6 @@ export const StreamOrderbookUpdatesRequest = { function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse { return { updates: [], - snapshot: false, blockHeight: 0, execMode: 0 }; @@ -1084,19 +1168,15 @@ function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesRespo export const StreamOrderbookUpdatesResponse = { encode(message: StreamOrderbookUpdatesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { for (const v of message.updates) { - OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim(); - } - - if (message.snapshot === true) { - writer.uint32(16).bool(message.snapshot); + StreamUpdate.encode(v!, writer.uint32(10).fork()).ldelim(); } if (message.blockHeight !== 0) { - writer.uint32(24).uint32(message.blockHeight); + writer.uint32(16).uint32(message.blockHeight); } if (message.execMode !== 0) { - writer.uint32(32).uint32(message.execMode); + writer.uint32(24).uint32(message.execMode); } return writer; @@ -1112,18 +1192,14 @@ export const StreamOrderbookUpdatesResponse = { switch (tag >>> 3) { case 1: - message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32())); + message.updates.push(StreamUpdate.decode(reader, reader.uint32())); break; case 2: - message.snapshot = reader.bool(); - break; - - case 3: message.blockHeight = reader.uint32(); break; - case 4: + case 3: message.execMode = reader.uint32(); break; @@ -1138,11 +1214,197 @@ export const StreamOrderbookUpdatesResponse = { fromPartial(object: DeepPartial): StreamOrderbookUpdatesResponse { const message = createBaseStreamOrderbookUpdatesResponse(); - message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || []; - message.snapshot = object.snapshot ?? false; + message.updates = object.updates?.map(e => StreamUpdate.fromPartial(e)) || []; message.blockHeight = object.blockHeight ?? 0; message.execMode = object.execMode ?? 0; return message; } +}; + +function createBaseStreamUpdate(): StreamUpdate { + return { + orderbookUpdate: undefined, + orderFill: undefined + }; +} + +export const StreamUpdate = { + encode(message: StreamUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.orderbookUpdate !== undefined) { + StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(10).fork()).ldelim(); + } + + if (message.orderFill !== undefined) { + StreamOrderbookFill.encode(message.orderFill, writer.uint32(18).fork()).ldelim(); + } + + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StreamUpdate { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStreamUpdate(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32()); + break; + + case 2: + message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32()); + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial): StreamUpdate { + const message = createBaseStreamUpdate(); + message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined; + message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined; + return message; + } + +}; + +function createBaseStreamOrderbookUpdate(): StreamOrderbookUpdate { + return { + updates: [], + snapshot: false + }; +} + +export const StreamOrderbookUpdate = { + encode(message: StreamOrderbookUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.updates) { + OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim(); + } + + if (message.snapshot === true) { + writer.uint32(16).bool(message.snapshot); + } + + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdate { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStreamOrderbookUpdate(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32())); + break; + + case 2: + message.snapshot = reader.bool(); + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial): StreamOrderbookUpdate { + const message = createBaseStreamOrderbookUpdate(); + message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || []; + message.snapshot = object.snapshot ?? false; + return message; + } + +}; + +function createBaseStreamOrderbookFill(): StreamOrderbookFill { + return { + clobMatch: undefined, + orders: [], + fillAmounts: [] + }; +} + +export const StreamOrderbookFill = { + encode(message: StreamOrderbookFill, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.clobMatch !== undefined) { + ClobMatch.encode(message.clobMatch, writer.uint32(10).fork()).ldelim(); + } + + for (const v of message.orders) { + Order.encode(v!, writer.uint32(18).fork()).ldelim(); + } + + writer.uint32(26).fork(); + + for (const v of message.fillAmounts) { + writer.uint64(v); + } + + writer.ldelim(); + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookFill { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStreamOrderbookFill(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.clobMatch = ClobMatch.decode(reader, reader.uint32()); + break; + + case 2: + message.orders.push(Order.decode(reader, reader.uint32())); + break; + + case 3: + if ((tag & 7) === 2) { + const end2 = reader.uint32() + reader.pos; + + while (reader.pos < end2) { + message.fillAmounts.push((reader.uint64() as Long)); + } + } else { + message.fillAmounts.push((reader.uint64() as Long)); + } + + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial): StreamOrderbookFill { + const message = createBaseStreamOrderbookFill(); + message.clobMatch = object.clobMatch !== undefined && object.clobMatch !== null ? ClobMatch.fromPartial(object.clobMatch) : undefined; + message.orders = object.orders?.map(e => Order.fromPartial(e)) || []; + message.fillAmounts = object.fillAmounts?.map(e => Long.fromValue(e)) || []; + return message; + } + }; \ No newline at end of file diff --git a/proto/dydxprotocol/clob/query.proto b/proto/dydxprotocol/clob/query.proto index 5151a838c6..cfed065ba0 100644 --- a/proto/dydxprotocol/clob/query.proto +++ b/proto/dydxprotocol/clob/query.proto @@ -7,9 +7,10 @@ import "cosmos/base/query/v1beta1/pagination.proto"; import "dydxprotocol/clob/block_rate_limit_config.proto"; import "dydxprotocol/clob/clob_pair.proto"; import "dydxprotocol/clob/equity_tier_limit_config.proto"; +import "dydxprotocol/clob/order.proto"; +import "dydxprotocol/clob/matches.proto"; import "dydxprotocol/clob/liquidations_config.proto"; import "dydxprotocol/clob/mev.proto"; -import "dydxprotocol/clob/order.proto"; import "dydxprotocol/indexer/off_chain_updates/off_chain_updates.proto"; option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"; @@ -59,7 +60,8 @@ service Query { // GRPC Streams - // Streams orderbook updates. + // Streams orderbook updates. Updates contain orderbook data + // such as order placements, updates, and fills. rpc StreamOrderbookUpdates(StreamOrderbookUpdatesRequest) returns (stream StreamOrderbookUpdatesResponse); } @@ -169,6 +171,32 @@ message StreamOrderbookUpdatesRequest { // StreamOrderbookUpdates method. message StreamOrderbookUpdatesResponse { // Orderbook updates for the clob pair. + repeated StreamUpdate updates = 1 [ (gogoproto.nullable) = false ]; + + // ---Additional fields used to debug issues--- + // Block height of the updates. + uint32 block_height = 2; + + // Exec mode of the updates. + uint32 exec_mode = 3; +} + +// StreamUpdate is an update that will be pushed through the +// GRPC stream. +message StreamUpdate { + // Contains one of an StreamOrderbookUpdate, + // StreamOrderbookFill. + oneof update_message { + StreamOrderbookUpdate orderbook_update = 1; + StreamOrderbookFill order_fill = 2; + } +} + +// StreamOrderbookUpdate provides information on an orderbook update. Used in +// the full node GRPC stream. +message StreamOrderbookUpdate { + // Orderbook updates for the clob pair. Can contain order place, removals, + // or updates. repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 1 [ (gogoproto.nullable) = false ]; @@ -177,11 +205,19 @@ message StreamOrderbookUpdatesResponse { // Note that if the snapshot is true, then all previous entries should be // discarded and the orderbook should be resynced. bool snapshot = 2; +} - // ---Additional fields used to debug issues--- - // Block height of the updates. - uint32 block_height = 3; +// StreamOrderbookFill provides information on an orderbook fill. Used in +// the full node GRPC stream. +message StreamOrderbookFill { + // Clob match. Provides information on which orders were matched + // and the type of order. Fill amounts here are relative. + ClobMatch clob_match = 1; - // Exec mode of the updates. - uint32 exec_mode = 4; + // All orders involved in the specified clob match. Used to look up + // price of a match through a given maker order id. + repeated Order orders = 2 [ (gogoproto.nullable) = false ]; + + // Resulting fill amounts for each order in the orders array. + repeated uint64 fill_amounts = 3 [ (gogoproto.nullable) = false ]; } diff --git a/protocol/app/app.go b/protocol/app/app.go index 679f4e3e73..8bea8372c2 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1930,7 +1930,7 @@ func getGrpcStreamingManagerFromOptions( ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager() + return streaming.NewGrpcStreamingManager(logger) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/lib/collections.go b/protocol/lib/collections.go index 5f57e7ff42..b2a5f11b70 100644 --- a/protocol/lib/collections.go +++ b/protocol/lib/collections.go @@ -106,3 +106,15 @@ func MergeAllMapsMustHaveDistinctKeys[K comparable, V any](maps ...map[K]V) map[ } return combinedMap } + +// MergeMaps merges all the maps into a single map. +// Does not require maps to have distinct keys. +func MergeMaps[K comparable, V any](maps ...map[K]V) map[K]V { + combinedMap := make(map[K]V) + for _, m := range maps { + for k, v := range m { + combinedMap[k] = v + } + } + return combinedMap +} diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 57f5c2f259..a9d2e36b4d 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -65,8 +65,13 @@ const ( GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency" // Full node grpc - FullNodeGrpc = "full_node_grpc" - GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" - EndBlocker = "end_blocker" - EndBlockerLag = "end_blocker_lag" + FullNodeGrpc = "full_node_grpc" + GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count" + GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" + GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" + + EndBlocker = "end_blocker" + EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index 7f97e73382..aac6f81ee9 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -103,6 +103,24 @@ func (_m *MemClob) DeleverageSubaccount(ctx types.Context, subaccountId subaccou return r0, r1 } +// GenerateStreamOrderbookFill provides a mock function with given fields: ctx, clobMatch, takerOrder, makerOrders +func (_m *MemClob) GenerateStreamOrderbookFill(ctx types.Context, clobMatch clobtypes.ClobMatch, takerOrder clobtypes.MatchableOrder, makerOrders []clobtypes.Order) clobtypes.StreamOrderbookFill { + ret := _m.Called(ctx, clobMatch, takerOrder, makerOrders) + + if len(ret) == 0 { + panic("no return value specified for GenerateStreamOrderbookFill") + } + + var r0 clobtypes.StreamOrderbookFill + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobMatch, clobtypes.MatchableOrder, []clobtypes.Order) clobtypes.StreamOrderbookFill); ok { + r0 = rf(ctx, clobMatch, takerOrder, makerOrders) + } else { + r0 = ret.Get(0).(clobtypes.StreamOrderbookFill) + } + + return r0 +} + // GetCancelOrder provides a mock function with given fields: ctx, orderId func (_m *MemClob) GetCancelOrder(ctx types.Context, orderId clobtypes.OrderId) (uint32, bool) { ret := _m.Called(ctx, orderId) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 93128130d3..12eb88d42e 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -434,6 +434,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookFillUpdates provides a mock function with given fields: ctx, orderbookFills +func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFills []clobtypes.StreamOrderbookFill) { + _m.Called(ctx, orderbookFills) +} + // SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { _m.Called(ctx, offchainUpdates, snapshot) diff --git a/protocol/mocks/PricesKeeper.go b/protocol/mocks/PricesKeeper.go index cd3f853e50..38711b9a2c 100644 --- a/protocol/mocks/PricesKeeper.go +++ b/protocol/mocks/PricesKeeper.go @@ -348,24 +348,6 @@ func (_m *PricesKeeper) InitializeCurrencyPairIdCache(ctx types.Context) { _m.Called(ctx) } -// IsCurrencyPairIdCacheInitialized provides a mock function with given fields: -func (_m *PricesKeeper) IsCurrencyPairIdCacheInitialized() bool { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for IsCurrencyPairIdCacheInitialized") - } - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // Logger provides a mock function with given fields: ctx func (_m *PricesKeeper) Logger(ctx types.Context) log.Logger { ret := _m.Called(ctx) diff --git a/protocol/scripts/genesis/sample_pregenesis.json b/protocol/scripts/genesis/sample_pregenesis.json index 946794f7ff..1b5f17551d 100644 --- a/protocol/scripts/genesis/sample_pregenesis.json +++ b/protocol/scripts/genesis/sample_pregenesis.json @@ -1836,7 +1836,7 @@ ] } }, - "app_version": "5.0.0-dev0-92-g65309d17", + "app_version": "5.0.0-dev0-155-g6e95ca92", "chain_id": "dydx-sample-1", "consensus": { "params": { diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 60a00e7bdc..d60d5fb2f8 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" @@ -19,6 +20,8 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) type GrpcStreamingManagerImpl struct { sync.Mutex + logger log.Logger + // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription nextSubscriptionId uint32 @@ -36,8 +39,12 @@ type OrderbookSubscription struct { srv clobtypes.Query_StreamOrderbookUpdatesServer } -func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { +func NewGrpcStreamingManager( + logger log.Logger, +) *GrpcStreamingManagerImpl { + logger = logger.With(log.ModuleKey, "grpc-streaming") return &GrpcStreamingManagerImpl{ + logger: logger, orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), } } @@ -46,6 +53,13 @@ func (sm *GrpcStreamingManagerImpl) Enabled() bool { return true } +func (sm *GrpcStreamingManagerImpl) EmitMetrics() { + metrics.SetGauge( + metrics.GrpcStreamSubscriberCount, + float32(len(sm.orderbookSubscriptions)), + ) +} + // Subscribe subscribes to the orderbook updates stream. func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, @@ -70,7 +84,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription sm.nextSubscriptionId++ - + sm.EmitMetrics() return nil } @@ -98,34 +112,103 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) } - // Unmarshal messages to v1 updates. - v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + // Unmarshal each per-clob pair message to v1 updates. + updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) for clobPairId, update := range updates { - v1update, err := GetOffchainUpdatesV1(update) + v1updates, err := GetOffchainUpdatesV1(update) if err != nil { panic(err) } - v1updates[clobPairId] = v1update + updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: snapshot, + }, + }, + }, + } } + sm.sendStreamUpdate( + updatesByClobPairId, + blockHeight, + execMode, + ) +} + +// SendOrderbookFillUpdates groups fills by their clob pair ids and +// sends messages to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookFillsLatency, + time.Now(), + ) + + // Group fills by clob pair id. + updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + for _, orderbookFill := range orderbookFills { + // Fetch the clob pair id from the first order in `OrderBookMatchFill`. + // We can assume there must be an order, and that all orders share the same + // clob pair id. + clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{} + } + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ + OrderFill: &orderbookFill, + }, + } + updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) + } + + sm.sendStreamUpdate( + updatesByClobPairId, + blockHeight, + execMode, + ) +} + +// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. +func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( + updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + blockHeight uint32, + execMode sdk.ExecMode, +) { + metrics.IncrCounter( + metrics.GrpcEmitProtocolUpdateCount, + 1, + ) + sm.Lock() defer sm.Unlock() // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { - updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) + streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) for _, clobPairId := range subscription.clobPairIds { - if updates, ok := v1updates[clobPairId]; ok { - updatesToSend = append(updatesToSend, updates...) + if update, ok := updatesByClobPairId[clobPairId]; ok { + streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) } } - if len(updatesToSend) > 0 { + if len(streamUpdatesForSubscription) > 0 { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) if err := subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updatesToSend, - Snapshot: snapshot, + Updates: streamUpdatesForSubscription, BlockHeight: blockHeight, ExecMode: uint32(execMode), }, @@ -140,6 +223,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( for _, id := range idsToRemove { delete(sm.orderbookSubscriptions, id) } + sm.EmitMetrics() } // GetUninitializedClobPairIds returns the clob pair ids that have not been initialized. diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 3de85deaec..424871b4c3 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -35,6 +35,14 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( ) { } +func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, +) { +} + func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 5d0d9f1ab4..9b5af0c093 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -22,4 +22,10 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, + ) } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index ea2770a18c..4ded29afdb 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -518,3 +518,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates( snapshot bool, ) { } + +func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []types.StreamOrderbookFill, +) { +} diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 7172a7c471..8efbfc71e1 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -175,45 +175,6 @@ func PrepareCheckState( offchainUpdates, ) - // For orders that are filled in the last block, send an orderbook update to the grpc streams. - if keeper.GetGrpcStreamingManager().Enabled() { - allUpdates := types.NewOffchainUpdates() - orderIdsToSend := make(map[types.OrderId]bool) - - // Send an update for reverted local operations. - for _, operation := range localValidatorOperationsQueue { - if match := operation.GetMatch(); match != nil { - // For normal order matches, we send an update for the taker and maker orders. - if matchedOrders := match.GetMatchOrders(); matchedOrders != nil { - orderIdsToSend[matchedOrders.TakerOrderId] = true - for _, fill := range matchedOrders.Fills { - orderIdsToSend[fill.MakerOrderId] = true - } - } - // For liquidation matches, we send an update for the maker orders. - if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil { - for _, fill := range matchedLiquidation.Fills { - orderIdsToSend[fill.MakerOrderId] = true - } - } - } - } - - // Send an update for orders that were proposed. - for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { - orderIdsToSend[orderId] = true - } - - // Send update. - for orderId := range orderIdsToSend { - if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { - orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) - } - } - keeper.SendOrderbookUpdates(ctx, allUpdates, false) - } - // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 7ff636916f..3ce3963e47 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -292,3 +292,19 @@ func (k Keeper) SendOrderbookUpdates( ctx.ExecMode(), ) } + +// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager. +func (k Keeper) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []types.StreamOrderbookFill, +) { + if len(orderbookFills) == 0 { + return + } + k.GetGrpcStreamingManager().SendOrderbookFillUpdates( + ctx, + orderbookFills, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) +} diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index a4bd805689..a3266c48d7 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -276,6 +276,19 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) { []byte(types.OrderAmountFilledKeyPrefix), ) memStore.Delete(orderId.ToStateKey()) + + // If grpc stream is on, zero out the fill amount. + if k.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + 0, // Total filled quantums is zero because it's been pruned from state. + ); success { + allUpdates.AddUpdateMessage(orderId, message) + } + k.SendOrderbookUpdates(ctx, allUpdates, false) + } } // PruneStateFillAmountsForShortTermOrders prunes Short-Term order fill amounts from state that are pruneable @@ -286,27 +299,5 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight()) // Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`. - prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight) - - // Send an orderbook update for each pruned order for grpc streams. - // This is needed because short term orders are pruned in PrepareCheckState using - // keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount - // pruning when there's replacement. - // Long-term fix would be to add logic to keep them in sync. - // TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync. - if k.GetGrpcStreamingManager().Enabled() { - allUpdates := types.NewOffchainUpdates() - for _, orderId := range prunedOrderIds { - if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { - if message, success := off_chain_updates.CreateOrderUpdateMessage( - ctx, - orderId, - 0, // Total filled quantums is zero because it's been pruned from state. - ); success { - allUpdates.AddUpdateMessage(orderId, message) - } - } - } - k.SendOrderbookUpdates(ctx, allUpdates, false) - } + k.PruneOrdersForBlockHeight(ctx, blockHeight) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 65ef612fb6..fabf98b218 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -19,6 +19,25 @@ import ( satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) +// fetchOrdersInvolvedInOpQueue fetches all OrderIds involved in an operations +// queue's matches + short term order placements and returns them as a set. +func fetchOrdersInvolvedInOpQueue( + operations []types.InternalOperation, +) (orderIdSet map[types.OrderId]struct{}) { + orderIdSet = make(map[types.OrderId]struct{}) + for _, operation := range operations { + if shortTermOrderPlacement := operation.GetShortTermOrderPlacement(); shortTermOrderPlacement != nil { + orderId := shortTermOrderPlacement.GetOrder().OrderId + orderIdSet[orderId] = struct{}{} + } + if clobMatch := operation.GetMatch(); clobMatch != nil { + orderIdSetForClobMatch := clobMatch.GetAllOrderIds() + orderIdSet = lib.MergeMaps(orderIdSet, orderIdSetForClobMatch) + } + } + return orderIdSet +} + // ProcessProposerOperations updates on-chain state given an []OperationRaw operations queue // representing matches that occurred in the previous block. It performs validation on an operations // queue. If all validation passes, the operations queue is written to state. @@ -38,6 +57,26 @@ func (k Keeper) ProcessProposerOperations( return errorsmod.Wrapf(types.ErrInvalidMsgProposedOperations, "Error: %+v", err) } + // If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream. + // This must be sent out to account for checkState being discarded and deliverState being used. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) + orderIdsFromProposed := fetchOrdersInvolvedInOpQueue( + operations, + ) + orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( + localValidatorOperationsQueue, + ) + orderIdSetToUpdate := lib.MergeMaps(orderIdsFromLocal, orderIdsFromProposed) + + allUpdates := types.NewOffchainUpdates() + for orderId := range orderIdSetToUpdate { + orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + k.SendOrderbookUpdates(ctx, allUpdates, false) + } + log.DebugLog(ctx, "Processing operations queue", log.OperationsQueue, types.GetInternalOperationsQueueTextString(operations)) @@ -219,26 +258,15 @@ func (k Keeper) PersistMatchToState( func (k Keeper) statUnverifiedOrderRemoval( ctx sdk.Context, orderRemoval types.OrderRemoval, - orderToRemove types.Order, ) { proposerConsAddress := sdk.ConsAddress(ctx.BlockHeader().ProposerAddress) telemetry.IncrCounterWithLabels( []string{types.ModuleName, metrics.ProcessOperations, metrics.UnverifiedStatefulOrderRemoval, metrics.Count}, 1, - append( - orderRemoval.OrderId.GetOrderIdLabels(), + []metrics.Label{ metrics.GetLabelForStringValue(metrics.RemovalReason, orderRemoval.GetRemovalReason().String()), metrics.GetLabelForStringValue(metrics.Proposer, proposerConsAddress.String()), - ), - ) - telemetry.IncrCounterWithLabels( - []string{types.ModuleName, metrics.ProcessOperations, metrics.UnverifiedStatefulOrderRemoval, metrics.BaseQuantums}, - float32(orderToRemove.Quantums), - append( - orderRemoval.OrderId.GetOrderIdLabels(), - metrics.GetLabelForStringValue(metrics.RemovalReason, orderRemoval.GetRemovalReason().String()), - metrics.GetLabelForStringValue(metrics.Proposer, proposerConsAddress.String()), - ), + }, ) } @@ -262,7 +290,7 @@ func (k Keeper) PersistOrderRemovalToState( // Statefully validate that the removal reason is valid. switch removalReason := orderRemoval.RemovalReason; removalReason { case types.OrderRemoval_REMOVAL_REASON_UNDERCOLLATERALIZED: - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) // TODO (CLOB-877) - These validations are commented out because margin requirements can be non-linear. // For the collateralization check, use the remaining amount of the order that is resting on the book. // remainingAmount, hasRemainingAmount := k.MemClob.GetOrderRemainingAmount(ctx, orderToRemove) @@ -299,8 +327,8 @@ func (k Keeper) PersistOrderRemovalToState( // ) // } case types.OrderRemoval_REMOVAL_REASON_POST_ONLY_WOULD_CROSS_MAKER_ORDER: - // TODO (CLOB-877) - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + // TODO(CLOB-877) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) // The order should be post-only if orderToRemove.TimeInForce != types.Order_TIME_IN_FORCE_POST_ONLY { @@ -310,11 +338,11 @@ func (k Keeper) PersistOrderRemovalToState( ) } case types.OrderRemoval_REMOVAL_REASON_INVALID_SELF_TRADE: - // TODO (CLOB-877) - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + // TODO(CLOB-877) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) case types.OrderRemoval_REMOVAL_REASON_CONDITIONAL_FOK_COULD_NOT_BE_FULLY_FILLED: - // TODO (CLOB-877) - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + // TODO(CLOB-877) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) // The order should be FOK if orderToRemove.TimeInForce != types.Order_TIME_IN_FORCE_FILL_OR_KILL { @@ -333,8 +361,8 @@ func (k Keeper) PersistOrderRemovalToState( ) } case types.OrderRemoval_REMOVAL_REASON_CONDITIONAL_IOC_WOULD_REST_ON_BOOK: - // TODO (CLOB-877) - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + // TODO(CLOB-877) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) // The order should be IOC. if orderToRemove.TimeInForce != types.Order_TIME_IN_FORCE_IOC { @@ -390,8 +418,8 @@ func (k Keeper) PersistOrderRemovalToState( // ) // } case types.OrderRemoval_REMOVAL_REASON_VIOLATES_ISOLATED_SUBACCOUNT_CONSTRAINTS: - // TODO (CLOB-877) - k.statUnverifiedOrderRemoval(ctx, orderRemoval, orderToRemove) + // TODO(CLOB-877) + k.statUnverifiedOrderRemoval(ctx, orderRemoval) default: return errorsmod.Wrapf( types.ErrInvalidOrderRemovalReason, @@ -462,6 +490,7 @@ func (k Keeper) PersistMatchOrdersToState( } } + makerOrders := make([]types.Order, 0) makerFills := matchOrders.GetFills() for _, makerFill := range makerFills { // Fetch the maker order from either short term orders or state. @@ -475,6 +504,7 @@ func (k Keeper) PersistMatchOrdersToState( MakerOrder: &makerOrder, FillAmount: satypes.BaseQuantums(makerFill.GetFillAmount()), } + makerOrders = append(makerOrders, makerOrder) _, _, _, _, err = k.ProcessSingleMatch(ctx, &matchWithOrders) if err != nil { @@ -518,6 +548,26 @@ func (k Keeper) PersistMatchOrdersToState( ) } + // if GRPC streaming is on, emit a generated clob match to stream. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( + ctx, + types.ClobMatch{ + Match: &types.ClobMatch_MatchOrders{ + MatchOrders: matchOrders, + }, + }, + &takerOrder, + makerOrders, + ) + k.SendOrderbookFillUpdates( + ctx, + []types.StreamOrderbookFill{ + streamOrderbookFill, + }, + ) + } + return nil } @@ -547,12 +597,14 @@ func (k Keeper) PersistMatchLiquidationToState( return err } + makerOrders := make([]types.Order, 0) for _, fill := range matchLiquidation.GetFills() { // Fetch the maker order from either short term orders or state. makerOrder, err := k.FetchOrderFromOrderId(ctx, fill.MakerOrderId, ordersMap) if err != nil { return err } + makerOrders = append(makerOrders, makerOrder) matchWithOrders := types.MatchWithOrders{ MakerOrder: &makerOrder, @@ -604,6 +656,26 @@ func (k Keeper) PersistMatchLiquidationToState( matchLiquidation.Liquidated, matchLiquidation.PerpetualId, ) + + // if GRPC streaming is on, emit a generated clob match to stream. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( + ctx, + types.ClobMatch{ + Match: &types.ClobMatch_MatchPerpetualLiquidation{ + MatchPerpetualLiquidation: matchLiquidation, + }, + }, + takerOrder, + makerOrders, + ) + k.SendOrderbookFillUpdates( + ctx, + []types.StreamOrderbookFill{ + streamOrderbookFill, + }, + ) + } return nil } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index aa9731ed88..82f21d7857 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -386,7 +386,20 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches( } // Add the new matches to the operations queue. - m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders) + internalOperation := m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders) + // If orderbook updates are on, send an orderbook update with the fill to grpc streams. + if m.generateOrderbookUpdates { + // Collect all maker orders. + makerOrders := lib.MapSlice( + makerFillWithOrders, + func(mfwo types.MakerFillWithOrder) types.Order { + return mfwo.Order + }, + ) + clobMatch := internalOperation.GetMatch() + orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders) + m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill}) + } // Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism. allSubaccounts := lib.GetSortedKeys[satypes.SortedSubaccountIds](subaccountTotalMatchedQuantums) @@ -876,6 +889,20 @@ func (m *MemClobPriceTimePriority) matchOrder( ) offchainUpdates.Append(matchOffchainUpdates) writeCache() + } else { + // If state was not written to, re-send grpc stream updates for all orders + // involved in the match to "reset" fill amounts. + allUpdates := types.NewOffchainUpdates() + if !order.IsLiquidation() { + normalOrder := order.MustGetOrder() + updates := m.GetOrderbookUpdatesForOrderUpdate(ctx, normalOrder.OrderId) + allUpdates.Append(updates) + } + for _, fill := range newMakerFills { + updates := m.GetOrderbookUpdatesForOrderUpdate(ctx, fill.MakerOrderId) + allUpdates.Append(updates) + } + m.clobKeeper.SendOrderbookUpdates(ctx, allUpdates, false) } return takerOrderStatus, offchainUpdates, makerOrdersToRemove, matchingErr @@ -1901,7 +1928,6 @@ func (m *MemClobPriceTimePriority) SetMemclobGauges( metrics.GetLabelForIntValue(metrics.ClobPairId, int(clobPairId)), }, ) - // Set gauge for best bid on each orderbook. telemetry.SetGaugeWithLabels( []string{ @@ -2001,12 +2027,6 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder panic("Total filled size of maker order greater than the order size") } - // Send an orderbook update for the order's new total filled amount. - if m.generateOrderbookUpdates { - orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) - m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) - } - // If the order is fully filled, remove it from the orderbook. // Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since // the order was matched. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 4bce8ec772..3b2dc19a2e 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -9,6 +9,35 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) +// GenerateStreamOrderbookFill wraps a clob match into the `StreamOrderbookFill` +// data structure which provides prices and fill amounts alongside clob match. +func (m *MemClobPriceTimePriority) GenerateStreamOrderbookFill( + ctx sdk.Context, + clobMatch types.ClobMatch, + takerOrder types.MatchableOrder, + makerOrders []types.Order, +) types.StreamOrderbookFill { + fillAmounts := []uint64{} + + for _, makerOrder := range makerOrders { + fillAmount := m.GetOrderFilledAmount(ctx, makerOrder.OrderId) + fillAmounts = append(fillAmounts, uint64(fillAmount)) + } + // If taker order is not a liquidation order, has to be a regular + // taker order. Add the taker order to the orders array. + if !takerOrder.IsLiquidation() { + order := takerOrder.MustGetOrder() + makerOrders = append(makerOrders, order) + fillAmount := m.GetOrderFilledAmount(ctx, order.OrderId) + fillAmounts = append(fillAmounts, uint64(fillAmount)) + } + return types.StreamOrderbookFill{ + ClobMatch: &clobMatch, + Orders: makerOrders, + FillAmounts: fillAmounts, + } +} + // GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot. // This is used by the gRPC streaming server to send the orderbook snapshot to the client. func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( diff --git a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go index a95ac04961..f60bd10b92 100644 --- a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go +++ b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go @@ -253,7 +253,7 @@ func TestPurgeInvalidMemclobState(t *testing.T) { mockMemClobKeeper := &mocks.MemClobKeeper{} memclob.SetClobKeeper(mockMemClobKeeper) mockMemClobKeeper.On("Logger", mock.Anything).Return(log.NewNopLogger()).Maybe() - mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() for _, operation := range tc.placedOperations { switch operation.Operation.(type) { @@ -265,7 +265,7 @@ func TestPurgeInvalidMemclobState(t *testing.T) { false, satypes.BaseQuantums(0), uint32(0), - ).Times(4) + ).Times(5) mockMemClobKeeper.On("AddOrderToOrderbookSubaccountUpdatesCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)).Once() diff --git a/protocol/x/clob/memclob/memclob_remove_order_test.go b/protocol/x/clob/memclob/memclob_remove_order_test.go index e823247832..44084b614e 100644 --- a/protocol/x/clob/memclob/memclob_remove_order_test.go +++ b/protocol/x/clob/memclob/memclob_remove_order_test.go @@ -330,7 +330,7 @@ func TestRemoveOrderIfFilled(t *testing.T) { memClobKeeper.On("AddOrderToOrderbookSubaccountUpdatesCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)) memClobKeeper.On("ValidateSubaccountEquityTierLimitForNewOrder", mock.Anything, mock.Anything).Return(nil) - memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() // Set initial fill amount to `0` for all orders. initialCall := memClobKeeper.On("GetOrderFillAmount", mock.Anything, mock.Anything). diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 1cbb3891ed..a4d135d37c 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -120,4 +120,8 @@ type MemClobKeeper interface { offchainUpdates *OffchainUpdates, snapshot bool, ) + SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []StreamOrderbookFill, + ) } diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index 8a1ec71488..14cd3e38dc 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -153,4 +153,10 @@ type MemClob interface { ctx sdk.Context, orderId OrderId, ) (offchainUpdates *OffchainUpdates) + GenerateStreamOrderbookFill( + ctx sdk.Context, + clobMatch ClobMatch, + takerOrder MatchableOrder, + makerOrders []Order, + ) StreamOrderbookFill } diff --git a/protocol/x/clob/types/message_clob_match.go b/protocol/x/clob/types/message_clob_match.go index 989cb307a6..47593e05ee 100644 --- a/protocol/x/clob/types/message_clob_match.go +++ b/protocol/x/clob/types/message_clob_match.go @@ -22,3 +22,21 @@ func NewClobMatchFromMatchPerpetualLiquidation( }, } } + +// GetAllOrderIds returns a set of orderIds involved in a ClobMatch. +// It assumes the ClobMatch is valid (no duplicate orderIds in fills) +func (clobMatch *ClobMatch) GetAllOrderIds() (orderIds map[OrderId]struct{}) { + orderIds = make(map[OrderId]struct{}) + if matchOrders := clobMatch.GetMatchOrders(); matchOrders != nil { + orderIds[matchOrders.GetTakerOrderId()] = struct{}{} + for _, makerFill := range matchOrders.GetFills() { + orderIds[makerFill.GetMakerOrderId()] = struct{}{} + } + } + if matchOrders := clobMatch.GetMatchPerpetualLiquidation(); matchOrders != nil { + for _, makerFill := range matchOrders.GetFills() { + orderIds[makerFill.GetMakerOrderId()] = struct{}{} + } + } + return orderIds +} diff --git a/protocol/x/clob/types/operations_to_propose.go b/protocol/x/clob/types/operations_to_propose.go index b1ed771bec..e135987178 100644 --- a/protocol/x/clob/types/operations_to_propose.go +++ b/protocol/x/clob/types/operations_to_propose.go @@ -183,7 +183,7 @@ func (o *OperationsToPropose) MustAddStatefulOrderPlacementToOperationsQueue( func (o *OperationsToPropose) MustAddMatchToOperationsQueue( takerMatchableOrder MatchableOrder, makerFillsWithOrders []MakerFillWithOrder, -) { +) InternalOperation { makerFills := lib.MapSlice( makerFillsWithOrders, func(mfwo MakerFillWithOrder) MakerFill { @@ -234,6 +234,7 @@ func (o *OperationsToPropose) MustAddMatchToOperationsQueue( o.OperationsQueue, matchOperation, ) + return matchOperation } // AddZeroFillDeleveragingToOperationsQueue adds a zero-fill deleveraging match operation to the diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index d8ecfb7bf6..9dce006bd3 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -816,17 +816,12 @@ func (m *StreamOrderbookUpdatesRequest) GetClobPairId() []uint32 { // StreamOrderbookUpdates method. type StreamOrderbookUpdatesResponse struct { // Orderbook updates for the clob pair. - Updates []types.OffChainUpdateV1 `protobuf:"bytes,1,rep,name=updates,proto3" json:"updates"` - // Snapshot indicates if the response is from a snapshot of the orderbook. - // This is true for the initial response and false for all subsequent updates. - // Note that if the snapshot is true, then all previous entries should be - // discarded and the orderbook should be resynced. - Snapshot bool `protobuf:"varint,2,opt,name=snapshot,proto3" json:"snapshot,omitempty"` + Updates []StreamUpdate `protobuf:"bytes,1,rep,name=updates,proto3" json:"updates"` // ---Additional fields used to debug issues--- // Block height of the updates. - BlockHeight uint32 `protobuf:"varint,3,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` + BlockHeight uint32 `protobuf:"varint,2,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` // Exec mode of the updates. - ExecMode uint32 `protobuf:"varint,4,opt,name=exec_mode,json=execMode,proto3" json:"exec_mode,omitempty"` + ExecMode uint32 `protobuf:"varint,3,opt,name=exec_mode,json=execMode,proto3" json:"exec_mode,omitempty"` } func (m *StreamOrderbookUpdatesResponse) Reset() { *m = StreamOrderbookUpdatesResponse{} } @@ -862,20 +857,13 @@ func (m *StreamOrderbookUpdatesResponse) XXX_DiscardUnknown() { var xxx_messageInfo_StreamOrderbookUpdatesResponse proto.InternalMessageInfo -func (m *StreamOrderbookUpdatesResponse) GetUpdates() []types.OffChainUpdateV1 { +func (m *StreamOrderbookUpdatesResponse) GetUpdates() []StreamUpdate { if m != nil { return m.Updates } return nil } -func (m *StreamOrderbookUpdatesResponse) GetSnapshot() bool { - if m != nil { - return m.Snapshot - } - return false -} - func (m *StreamOrderbookUpdatesResponse) GetBlockHeight() uint32 { if m != nil { return m.BlockHeight @@ -890,6 +878,224 @@ func (m *StreamOrderbookUpdatesResponse) GetExecMode() uint32 { return 0 } +// StreamUpdate is an update that will be pushed through the +// GRPC stream. +type StreamUpdate struct { + // Contains one of an StreamOrderbookUpdate, + // StreamOrderbookFill. + // + // Types that are valid to be assigned to UpdateMessage: + // + // *StreamUpdate_OrderbookUpdate + // *StreamUpdate_OrderFill + UpdateMessage isStreamUpdate_UpdateMessage `protobuf_oneof:"update_message"` +} + +func (m *StreamUpdate) Reset() { *m = StreamUpdate{} } +func (m *StreamUpdate) String() string { return proto.CompactTextString(m) } +func (*StreamUpdate) ProtoMessage() {} +func (*StreamUpdate) Descriptor() ([]byte, []int) { + return fileDescriptor_3365c195b25c5bc0, []int{16} +} +func (m *StreamUpdate) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamUpdate) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamUpdate.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamUpdate) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamUpdate.Merge(m, src) +} +func (m *StreamUpdate) XXX_Size() int { + return m.Size() +} +func (m *StreamUpdate) XXX_DiscardUnknown() { + xxx_messageInfo_StreamUpdate.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamUpdate proto.InternalMessageInfo + +type isStreamUpdate_UpdateMessage interface { + isStreamUpdate_UpdateMessage() + MarshalTo([]byte) (int, error) + Size() int +} + +type StreamUpdate_OrderbookUpdate struct { + OrderbookUpdate *StreamOrderbookUpdate `protobuf:"bytes,1,opt,name=orderbook_update,json=orderbookUpdate,proto3,oneof" json:"orderbook_update,omitempty"` +} +type StreamUpdate_OrderFill struct { + OrderFill *StreamOrderbookFill `protobuf:"bytes,2,opt,name=order_fill,json=orderFill,proto3,oneof" json:"order_fill,omitempty"` +} + +func (*StreamUpdate_OrderbookUpdate) isStreamUpdate_UpdateMessage() {} +func (*StreamUpdate_OrderFill) isStreamUpdate_UpdateMessage() {} + +func (m *StreamUpdate) GetUpdateMessage() isStreamUpdate_UpdateMessage { + if m != nil { + return m.UpdateMessage + } + return nil +} + +func (m *StreamUpdate) GetOrderbookUpdate() *StreamOrderbookUpdate { + if x, ok := m.GetUpdateMessage().(*StreamUpdate_OrderbookUpdate); ok { + return x.OrderbookUpdate + } + return nil +} + +func (m *StreamUpdate) GetOrderFill() *StreamOrderbookFill { + if x, ok := m.GetUpdateMessage().(*StreamUpdate_OrderFill); ok { + return x.OrderFill + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*StreamUpdate) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*StreamUpdate_OrderbookUpdate)(nil), + (*StreamUpdate_OrderFill)(nil), + } +} + +// StreamOrderbookUpdate provides information on an orderbook update. Used in +// the full node GRPC stream. +type StreamOrderbookUpdate struct { + // Orderbook updates for the clob pair. Can contain order place, removals, + // or updates. + Updates []types.OffChainUpdateV1 `protobuf:"bytes,1,rep,name=updates,proto3" json:"updates"` + // Snapshot indicates if the response is from a snapshot of the orderbook. + // This is true for the initial response and false for all subsequent updates. + // Note that if the snapshot is true, then all previous entries should be + // discarded and the orderbook should be resynced. + Snapshot bool `protobuf:"varint,2,opt,name=snapshot,proto3" json:"snapshot,omitempty"` +} + +func (m *StreamOrderbookUpdate) Reset() { *m = StreamOrderbookUpdate{} } +func (m *StreamOrderbookUpdate) String() string { return proto.CompactTextString(m) } +func (*StreamOrderbookUpdate) ProtoMessage() {} +func (*StreamOrderbookUpdate) Descriptor() ([]byte, []int) { + return fileDescriptor_3365c195b25c5bc0, []int{17} +} +func (m *StreamOrderbookUpdate) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamOrderbookUpdate) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamOrderbookUpdate.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamOrderbookUpdate) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamOrderbookUpdate.Merge(m, src) +} +func (m *StreamOrderbookUpdate) XXX_Size() int { + return m.Size() +} +func (m *StreamOrderbookUpdate) XXX_DiscardUnknown() { + xxx_messageInfo_StreamOrderbookUpdate.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamOrderbookUpdate proto.InternalMessageInfo + +func (m *StreamOrderbookUpdate) GetUpdates() []types.OffChainUpdateV1 { + if m != nil { + return m.Updates + } + return nil +} + +func (m *StreamOrderbookUpdate) GetSnapshot() bool { + if m != nil { + return m.Snapshot + } + return false +} + +// StreamOrderbookFill provides information on an orderbook fill. Used in +// the full node GRPC stream. +type StreamOrderbookFill struct { + // Clob match. Provides information on which orders were matched + // and the type of order. Fill amounts here are relative. + ClobMatch *ClobMatch `protobuf:"bytes,1,opt,name=clob_match,json=clobMatch,proto3" json:"clob_match,omitempty"` + // All orders involved in the specified clob match. Used to look up + // price of a match through a given maker order id. + Orders []Order `protobuf:"bytes,2,rep,name=orders,proto3" json:"orders"` + // Resulting fill amounts for each order in the orders array. + FillAmounts []uint64 `protobuf:"varint,3,rep,packed,name=fill_amounts,json=fillAmounts,proto3" json:"fill_amounts,omitempty"` +} + +func (m *StreamOrderbookFill) Reset() { *m = StreamOrderbookFill{} } +func (m *StreamOrderbookFill) String() string { return proto.CompactTextString(m) } +func (*StreamOrderbookFill) ProtoMessage() {} +func (*StreamOrderbookFill) Descriptor() ([]byte, []int) { + return fileDescriptor_3365c195b25c5bc0, []int{18} +} +func (m *StreamOrderbookFill) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamOrderbookFill) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamOrderbookFill.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamOrderbookFill) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamOrderbookFill.Merge(m, src) +} +func (m *StreamOrderbookFill) XXX_Size() int { + return m.Size() +} +func (m *StreamOrderbookFill) XXX_DiscardUnknown() { + xxx_messageInfo_StreamOrderbookFill.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamOrderbookFill proto.InternalMessageInfo + +func (m *StreamOrderbookFill) GetClobMatch() *ClobMatch { + if m != nil { + return m.ClobMatch + } + return nil +} + +func (m *StreamOrderbookFill) GetOrders() []Order { + if m != nil { + return m.Orders + } + return nil +} + +func (m *StreamOrderbookFill) GetFillAmounts() []uint64 { + if m != nil { + return m.FillAmounts + } + return nil +} + func init() { proto.RegisterType((*QueryGetClobPairRequest)(nil), "dydxprotocol.clob.QueryGetClobPairRequest") proto.RegisterType((*QueryClobPairResponse)(nil), "dydxprotocol.clob.QueryClobPairResponse") @@ -908,91 +1114,103 @@ func init() { proto.RegisterType((*QueryLiquidationsConfigurationResponse)(nil), "dydxprotocol.clob.QueryLiquidationsConfigurationResponse") proto.RegisterType((*StreamOrderbookUpdatesRequest)(nil), "dydxprotocol.clob.StreamOrderbookUpdatesRequest") proto.RegisterType((*StreamOrderbookUpdatesResponse)(nil), "dydxprotocol.clob.StreamOrderbookUpdatesResponse") + proto.RegisterType((*StreamUpdate)(nil), "dydxprotocol.clob.StreamUpdate") + proto.RegisterType((*StreamOrderbookUpdate)(nil), "dydxprotocol.clob.StreamOrderbookUpdate") + proto.RegisterType((*StreamOrderbookFill)(nil), "dydxprotocol.clob.StreamOrderbookFill") } func init() { proto.RegisterFile("dydxprotocol/clob/query.proto", fileDescriptor_3365c195b25c5bc0) } var fileDescriptor_3365c195b25c5bc0 = []byte{ - // 1251 bytes of a gzipped FileDescriptorProto + // 1407 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xcd, 0x6f, 0xdc, 0x44, - 0x14, 0x8f, 0x93, 0xd0, 0x6e, 0x5f, 0xda, 0x02, 0xd3, 0xaf, 0xad, 0x93, 0x6e, 0x52, 0x43, 0x3e, - 0x4b, 0xed, 0x24, 0xad, 0x50, 0x49, 0x50, 0xa5, 0x24, 0x82, 0x50, 0x29, 0xa1, 0x8b, 0x1b, 0x42, - 0x05, 0x95, 0x2c, 0xaf, 0x3d, 0xeb, 0xb5, 0x62, 0x7b, 0x36, 0xf6, 0x78, 0x95, 0x08, 0x21, 0x24, - 0x0e, 0x5c, 0xe0, 0x80, 0xc4, 0x81, 0x03, 0x12, 0x17, 0xfe, 0x06, 0x8e, 0x08, 0xb8, 0xf5, 0x58, - 0x09, 0x09, 0x71, 0x40, 0x08, 0x25, 0x9c, 0xf9, 0x1b, 0x90, 0x67, 0xc6, 0x9b, 0xdd, 0xd8, 0xde, - 0x4d, 0x72, 0xd9, 0x5d, 0xbf, 0xf7, 0x7b, 0x6f, 0x7e, 0xef, 0x63, 0xde, 0xf3, 0xc2, 0x2d, 0x7b, - 0xdf, 0xde, 0x6b, 0x86, 0x84, 0x12, 0x8b, 0x78, 0x9a, 0xe5, 0x91, 0x9a, 0xb6, 0x1b, 0xe3, 0x70, - 0x5f, 0x65, 0x32, 0xf4, 0x6a, 0xa7, 0x5a, 0x4d, 0xd4, 0xf2, 0x55, 0x87, 0x38, 0x84, 0x89, 0xb4, - 0xe4, 0x17, 0x07, 0xca, 0x63, 0x0e, 0x21, 0x8e, 0x87, 0x35, 0xb3, 0xe9, 0x6a, 0x66, 0x10, 0x10, - 0x6a, 0x52, 0x97, 0x04, 0x91, 0xd0, 0xce, 0x59, 0x24, 0xf2, 0x49, 0xa4, 0xd5, 0xcc, 0x08, 0x73, - 0xff, 0x5a, 0x6b, 0xa1, 0x86, 0xa9, 0xb9, 0xa0, 0x35, 0x4d, 0xc7, 0x0d, 0x18, 0x58, 0x60, 0xb5, - 0x2c, 0xa3, 0x9a, 0x47, 0xac, 0x1d, 0x23, 0x34, 0x29, 0x36, 0x3c, 0xd7, 0x77, 0xa9, 0x61, 0x91, - 0xa0, 0xee, 0x3a, 0xc2, 0xe0, 0x76, 0xd6, 0x20, 0xf9, 0x30, 0x9a, 0xa6, 0x1b, 0x0a, 0xc8, 0x7c, - 0x16, 0x82, 0x77, 0x63, 0x97, 0xee, 0x1b, 0xd4, 0xc5, 0x61, 0x9e, 0xd3, 0x3b, 0x59, 0x0b, 0xcf, - 0xdd, 0x8d, 0x5d, 0x9b, 0xc7, 0xd5, 0x0d, 0x1e, 0xcd, 0x82, 0x7d, 0xdc, 0x12, 0xca, 0x9c, 0x0c, - 0x93, 0xd0, 0xc6, 0x29, 0xb5, 0x87, 0x5d, 0x6a, 0x37, 0xb0, 0xf1, 0x1e, 0x0e, 0x35, 0x52, 0xaf, - 0x1b, 0x56, 0xc3, 0x74, 0x03, 0x23, 0x6e, 0xda, 0x26, 0xc5, 0x51, 0x56, 0xc2, 0xed, 0x95, 0x59, - 0xb8, 0xf1, 0x41, 0x92, 0xd0, 0x75, 0x4c, 0xd7, 0x3c, 0x52, 0xab, 0x9a, 0x6e, 0xa8, 0xe3, 0xdd, - 0x18, 0x47, 0x14, 0x5d, 0x86, 0x41, 0xd7, 0x2e, 0x4b, 0x13, 0xd2, 0xcc, 0x25, 0x7d, 0xd0, 0xb5, - 0x95, 0x8f, 0xe0, 0x1a, 0x83, 0x1e, 0xe1, 0xa2, 0x26, 0x09, 0x22, 0x8c, 0x1e, 0xc2, 0x85, 0x76, - 0xc6, 0x18, 0x7e, 0x64, 0x71, 0x54, 0xcd, 0x54, 0x5e, 0x4d, 0xed, 0x56, 0x87, 0x9f, 0xff, 0x3d, - 0x3e, 0xa0, 0x97, 0x2c, 0xf1, 0xac, 0x98, 0x82, 0xc3, 0x8a, 0xe7, 0x1d, 0xe7, 0xf0, 0x2e, 0xc0, - 0x51, 0x85, 0x85, 0xef, 0x29, 0x95, 0xb7, 0x83, 0x9a, 0xb4, 0x83, 0xca, 0xdb, 0x4d, 0xb4, 0x83, - 0x5a, 0x35, 0x1d, 0x2c, 0x6c, 0xf5, 0x0e, 0x4b, 0xe5, 0x47, 0x09, 0xca, 0x5d, 0xe4, 0x57, 0x3c, - 0xaf, 0x88, 0xff, 0xd0, 0x29, 0xf9, 0xa3, 0xf5, 0x2e, 0x92, 0x83, 0x8c, 0xe4, 0x74, 0x5f, 0x92, - 0xfc, 0xf0, 0x2e, 0x96, 0x7f, 0x49, 0x30, 0xbe, 0x89, 0x5b, 0xef, 0x13, 0x1b, 0x6f, 0x91, 0xe4, - 0x73, 0xcd, 0xf4, 0xac, 0xd8, 0x63, 0xca, 0x34, 0x23, 0xcf, 0xe0, 0x3a, 0xef, 0xe7, 0x66, 0x48, - 0x9a, 0x24, 0xc2, 0xa1, 0xe1, 0x9b, 0xd4, 0x6a, 0xe0, 0xa8, 0x9d, 0x9d, 0x2c, 0xf3, 0x6d, 0xd3, - 0x4b, 0x3a, 0x8f, 0x84, 0x9b, 0xb8, 0xb5, 0xc9, 0xd1, 0xfa, 0x55, 0xe6, 0xa5, 0x2a, 0x9c, 0x08, - 0x29, 0xfa, 0x04, 0xae, 0xb5, 0x52, 0xb0, 0xe1, 0xe3, 0x96, 0xe1, 0x63, 0x1a, 0xba, 0x56, 0xd4, - 0x8e, 0x2a, 0xeb, 0xbc, 0x8b, 0xf0, 0x26, 0x87, 0xeb, 0x57, 0x5a, 0x9d, 0x47, 0x72, 0xa1, 0xf2, - 0x9f, 0x04, 0x13, 0xc5, 0xe1, 0x89, 0x62, 0x38, 0x70, 0x3e, 0xc4, 0x51, 0xec, 0xd1, 0x48, 0x94, - 0x62, 0xbd, 0xdf, 0x99, 0x39, 0x5e, 0x12, 0xc0, 0x4a, 0x60, 0x6f, 0x13, 0x2f, 0xf6, 0x71, 0x15, - 0x87, 0x49, 0xe9, 0x44, 0xd9, 0x52, 0xef, 0xb2, 0x09, 0x57, 0x72, 0x50, 0x68, 0x02, 0x2e, 0xb6, - 0x9b, 0xc1, 0x68, 0xf7, 0x3f, 0xa4, 0xc5, 0x7e, 0x64, 0xa3, 0x57, 0x60, 0xc8, 0xc7, 0x2d, 0x96, - 0x91, 0x41, 0x3d, 0xf9, 0x89, 0xae, 0xc3, 0xb9, 0x16, 0x73, 0x52, 0x1e, 0x9a, 0x90, 0x66, 0x86, - 0x75, 0xf1, 0xa4, 0xcc, 0xc1, 0x0c, 0x6b, 0xba, 0x77, 0xd8, 0xb0, 0xd8, 0x72, 0x71, 0xb8, 0x91, - 0x8c, 0x8a, 0x35, 0x76, 0xf9, 0xe3, 0xb0, 0xb3, 0xae, 0xca, 0xf7, 0x12, 0xcc, 0x9e, 0x00, 0x2c, - 0xb2, 0x14, 0x40, 0xb9, 0x68, 0x02, 0x89, 0x3e, 0xd0, 0x72, 0xd2, 0xd6, 0xcb, 0xb5, 0x48, 0xcf, - 0x35, 0x9c, 0x87, 0x51, 0x66, 0x61, 0x9a, 0x91, 0x5b, 0x4d, 0x9a, 0x46, 0x37, 0x29, 0x2e, 0x0e, - 0xe4, 0x3b, 0x49, 0x44, 0xdd, 0x13, 0x2b, 0xe2, 0xd8, 0x81, 0x1b, 0x05, 0xd3, 0x59, 0x84, 0xa1, - 0xe6, 0x84, 0xd1, 0xc3, 0xb1, 0x88, 0x82, 0x37, 0xf7, 0x31, 0x88, 0xf2, 0x14, 0x6e, 0x32, 0x62, - 0x4f, 0xa8, 0x49, 0x71, 0x3d, 0xf6, 0x1e, 0x27, 0x73, 0x34, 0xbd, 0x57, 0xcb, 0x50, 0x62, 0x73, - 0x35, 0xad, 0xf9, 0xc8, 0xa2, 0x9c, 0x73, 0x34, 0x33, 0x79, 0x64, 0xa7, 0xbd, 0x44, 0xf8, 0xa3, - 0xf2, 0x93, 0x04, 0x72, 0x9e, 0x6b, 0x11, 0xe5, 0x53, 0x78, 0x99, 0xfb, 0x6e, 0x7a, 0xa6, 0x85, - 0x7d, 0x1c, 0x50, 0x71, 0xc4, 0x6c, 0xce, 0x11, 0x1b, 0x24, 0x70, 0xb6, 0x70, 0xe8, 0x33, 0x17, - 0xd5, 0xd4, 0x40, 0x9c, 0x78, 0x99, 0x74, 0x49, 0xd1, 0x38, 0x8c, 0xd4, 0x5d, 0xcf, 0x33, 0x4c, - 0x9f, 0xc4, 0x01, 0x65, 0x3d, 0x39, 0xac, 0x43, 0x22, 0x5a, 0x61, 0x12, 0x34, 0x06, 0x17, 0x68, - 0xe8, 0x3a, 0x0e, 0x0e, 0xb1, 0xcd, 0xba, 0xb3, 0xa4, 0x1f, 0x09, 0x94, 0x69, 0x98, 0x64, 0xb4, - 0x37, 0x3a, 0x76, 0x53, 0x6e, 0x51, 0xbf, 0x94, 0x60, 0xaa, 0x1f, 0x52, 0x04, 0xfb, 0x0c, 0xae, - 0xe4, 0xac, 0x3a, 0x11, 0xf0, 0x64, 0x5e, 0xc0, 0x19, 0x97, 0x22, 0x58, 0xe4, 0x65, 0x34, 0xca, - 0x0a, 0xdc, 0x7a, 0x42, 0x43, 0x6c, 0xf2, 0xf4, 0xd4, 0x08, 0xd9, 0xf9, 0x90, 0xef, 0xb3, 0xb4, - 0x8e, 0xd9, 0xfb, 0x3b, 0xd4, 0x7d, 0x7f, 0x95, 0x3f, 0x24, 0xa8, 0x14, 0xf9, 0x68, 0x17, 0xec, - 0xbc, 0x58, 0x93, 0x62, 0x08, 0x3d, 0xe8, 0xe6, 0x2d, 0xf6, 0xac, 0x9a, 0xdd, 0xaa, 0x8f, 0xeb, - 0xf5, 0xb5, 0x44, 0xc0, 0x3d, 0x6e, 0x2f, 0xa4, 0x9d, 0x22, 0xf4, 0x48, 0x86, 0x52, 0x14, 0x98, - 0xcd, 0xa8, 0x41, 0x78, 0xb5, 0x4a, 0x7a, 0xfb, 0x19, 0xdd, 0x86, 0x8b, 0xfc, 0x32, 0x34, 0xb0, - 0xeb, 0x34, 0x28, 0x2b, 0xd7, 0x25, 0x7d, 0x84, 0xc9, 0xde, 0x63, 0x22, 0x34, 0x0a, 0x17, 0xf0, - 0x1e, 0xb6, 0x0c, 0x9f, 0xd8, 0xb8, 0x3c, 0xcc, 0xf4, 0xa5, 0x44, 0xb0, 0x49, 0x6c, 0xbc, 0xf8, - 0x03, 0xc0, 0x4b, 0xac, 0x48, 0xe8, 0x2b, 0x09, 0x4a, 0xe9, 0xba, 0x42, 0x73, 0x39, 0x39, 0x2f, - 0xd8, 0xf9, 0xf2, 0x4c, 0x11, 0xf6, 0xf8, 0xd2, 0x57, 0x66, 0xbf, 0xf8, 0xfd, 0xdf, 0x6f, 0x07, - 0x5f, 0x43, 0xb7, 0xb5, 0x1e, 0xef, 0x4f, 0xda, 0xa7, 0xae, 0xfd, 0x19, 0xfa, 0x5a, 0x82, 0x91, - 0x8e, 0xbd, 0x5b, 0x4c, 0x28, 0xfb, 0x02, 0x20, 0xdf, 0xe9, 0x47, 0xa8, 0x63, 0x91, 0x2b, 0xaf, - 0x33, 0x4e, 0x15, 0x34, 0xd6, 0x8b, 0x13, 0xfa, 0x45, 0x82, 0x72, 0xd1, 0x02, 0x41, 0x8b, 0xa7, - 0xda, 0x36, 0x9c, 0xe3, 0xbd, 0x33, 0x6c, 0x28, 0x65, 0x89, 0x71, 0xbd, 0xbf, 0x24, 0xcd, 0x29, - 0x9a, 0x96, 0xfb, 0x02, 0x68, 0x04, 0xc4, 0xc6, 0x06, 0x25, 0xfc, 0xdb, 0xea, 0x20, 0xf9, 0x9b, - 0x04, 0x63, 0xbd, 0x66, 0x39, 0x5a, 0x2e, 0xca, 0xda, 0x09, 0x36, 0x91, 0xfc, 0xf6, 0xd9, 0x8c, - 0x45, 0x5c, 0x53, 0x2c, 0xae, 0x09, 0x54, 0xd1, 0x7a, 0xbe, 0x34, 0xa3, 0x9f, 0x25, 0x18, 0xed, - 0x31, 0xc8, 0xd1, 0x52, 0x11, 0x8b, 0xfe, 0x2b, 0x48, 0x5e, 0x3e, 0x93, 0xad, 0x08, 0x60, 0x92, - 0x05, 0x30, 0x8e, 0x6e, 0xf5, 0xfc, 0x27, 0x81, 0x7e, 0x95, 0xe0, 0x66, 0xe1, 0x30, 0x44, 0x0f, - 0x8a, 0x18, 0xf4, 0x9b, 0xb4, 0xf2, 0x5b, 0x67, 0xb0, 0x14, 0xcc, 0x55, 0xc6, 0x7c, 0x06, 0x4d, - 0x69, 0x27, 0xfa, 0xf7, 0x81, 0x02, 0xb8, 0xd4, 0xb5, 0xaf, 0xd0, 0x1b, 0x45, 0x67, 0xe7, 0x6d, - 0x4c, 0xf9, 0xee, 0x09, 0xd1, 0x82, 0xdd, 0x00, 0xfa, 0x1c, 0xae, 0xe7, 0xcf, 0x5d, 0x34, 0x9f, - 0xe3, 0xaa, 0xe7, 0x98, 0x97, 0x17, 0x4e, 0x61, 0xc1, 0x09, 0xcc, 0x4b, 0xab, 0xd5, 0xe7, 0x07, - 0x15, 0xe9, 0xc5, 0x41, 0x45, 0xfa, 0xe7, 0xa0, 0x22, 0x7d, 0x73, 0x58, 0x19, 0x78, 0x71, 0x58, - 0x19, 0xf8, 0xf3, 0xb0, 0x32, 0xf0, 0xf1, 0x9b, 0x8e, 0x4b, 0x1b, 0x71, 0x4d, 0xb5, 0x88, 0xdf, - 0x9d, 0xbc, 0xd6, 0xfd, 0xbb, 0x6c, 0xc0, 0x6b, 0x6d, 0xc9, 0x1e, 0x4f, 0x28, 0xdd, 0x6f, 0xe2, - 0xa8, 0x76, 0x8e, 0x89, 0xef, 0xfd, 0x1f, 0x00, 0x00, 0xff, 0xff, 0xcd, 0x79, 0x36, 0x31, 0x08, - 0x0f, 0x00, 0x00, + 0x14, 0xdf, 0x49, 0x42, 0xbb, 0x79, 0x49, 0xda, 0x32, 0x69, 0xda, 0xad, 0x93, 0x6e, 0x52, 0x43, + 0x93, 0x4d, 0x4a, 0xd7, 0x4d, 0x5a, 0x55, 0xa5, 0x41, 0x45, 0x49, 0x44, 0x3f, 0xa4, 0x86, 0x06, + 0xf7, 0x83, 0x0a, 0x2a, 0x59, 0x5e, 0x7b, 0xd6, 0xb1, 0x6a, 0x7b, 0x36, 0xf6, 0xec, 0x2a, 0x11, + 0x42, 0x48, 0x1c, 0xb8, 0x00, 0x12, 0x12, 0x07, 0x0e, 0x48, 0x48, 0x88, 0x3f, 0x01, 0x71, 0x44, + 0x94, 0x5b, 0x8f, 0x95, 0xb8, 0x70, 0x40, 0x08, 0xb5, 0x9c, 0xf9, 0x1b, 0x90, 0x67, 0xc6, 0x9b, + 0xdd, 0xd8, 0xde, 0x4d, 0x72, 0xd9, 0xb5, 0xdf, 0xbc, 0x8f, 0xdf, 0x7b, 0xf3, 0x9b, 0x79, 0xcf, + 0x70, 0xd6, 0xde, 0xb1, 0xb7, 0x1b, 0x21, 0x65, 0xd4, 0xa2, 0x9e, 0x66, 0x79, 0xb4, 0xa6, 0x6d, + 0x35, 0x49, 0xb8, 0x53, 0xe5, 0x32, 0xfc, 0x7a, 0xe7, 0x72, 0x35, 0x5e, 0x56, 0x4e, 0x3a, 0xd4, + 0xa1, 0x5c, 0xa4, 0xc5, 0x4f, 0x42, 0x51, 0x99, 0x72, 0x28, 0x75, 0x3c, 0xa2, 0x99, 0x0d, 0x57, + 0x33, 0x83, 0x80, 0x32, 0x93, 0xb9, 0x34, 0x88, 0xe4, 0xea, 0x82, 0x45, 0x23, 0x9f, 0x46, 0x5a, + 0xcd, 0x8c, 0x88, 0xf0, 0xaf, 0xb5, 0x16, 0x6b, 0x84, 0x99, 0x8b, 0x5a, 0xc3, 0x74, 0xdc, 0x80, + 0x2b, 0x4b, 0x5d, 0x2d, 0x8d, 0xa8, 0xe6, 0x51, 0xeb, 0xa9, 0x11, 0x9a, 0x8c, 0x18, 0x9e, 0xeb, + 0xbb, 0xcc, 0xb0, 0x68, 0x50, 0x77, 0x1d, 0x69, 0x70, 0x2e, 0x6d, 0x10, 0xff, 0x18, 0x0d, 0xd3, + 0x0d, 0xa5, 0xca, 0xa5, 0xb4, 0x0a, 0xd9, 0x6a, 0xba, 0x6c, 0xc7, 0x60, 0x2e, 0x09, 0xb3, 0x9c, + 0x66, 0xd4, 0x85, 0x86, 0x36, 0x49, 0x1c, 0x4e, 0xa7, 0x97, 0x7d, 0x93, 0x59, 0x9b, 0x24, 0xc9, + 0xf8, 0x42, 0x5a, 0xc1, 0x73, 0xb7, 0x9a, 0xae, 0x2d, 0xea, 0xd2, 0x1d, 0x6c, 0x32, 0xc3, 0x1b, + 0x69, 0xc9, 0xc5, 0x1b, 0x5d, 0x8b, 0x6e, 0x60, 0x93, 0x6d, 0x12, 0x6a, 0xb4, 0x5e, 0x37, 0xac, + 0x4d, 0xd3, 0x0d, 0x8c, 0x66, 0xc3, 0x36, 0x19, 0x89, 0xd2, 0x12, 0x61, 0xaf, 0xce, 0xc3, 0xe9, + 0x0f, 0xe2, 0x8a, 0xdf, 0x22, 0x6c, 0xcd, 0xa3, 0xb5, 0x0d, 0xd3, 0x0d, 0x75, 0xb2, 0xd5, 0x24, + 0x11, 0xc3, 0xc7, 0x60, 0xc0, 0xb5, 0x4b, 0x68, 0x06, 0x55, 0xc6, 0xf4, 0x01, 0xd7, 0x56, 0x3f, + 0x84, 0x09, 0xae, 0xba, 0xab, 0x17, 0x35, 0x68, 0x10, 0x11, 0x7c, 0x03, 0x86, 0xdb, 0x25, 0xe5, + 0xfa, 0x23, 0x4b, 0x93, 0xd5, 0x14, 0x35, 0xaa, 0x89, 0xdd, 0xea, 0xd0, 0xf3, 0xbf, 0xa7, 0x0b, + 0x7a, 0xd1, 0x92, 0xef, 0xaa, 0x29, 0x31, 0xac, 0x78, 0xde, 0x5e, 0x0c, 0x37, 0x01, 0x76, 0x29, + 0x20, 0x7d, 0xcf, 0x56, 0x05, 0x5f, 0xaa, 0x31, 0x5f, 0xaa, 0x82, 0x8f, 0x92, 0x2f, 0xd5, 0x0d, + 0xd3, 0x21, 0xd2, 0x56, 0xef, 0xb0, 0x54, 0x7f, 0x42, 0x50, 0xea, 0x02, 0xbf, 0xe2, 0x79, 0x79, + 0xf8, 0x07, 0x0f, 0x88, 0x1f, 0xdf, 0xea, 0x02, 0x39, 0xc0, 0x41, 0xce, 0xf5, 0x05, 0x29, 0x82, + 0x77, 0xa1, 0xfc, 0x0b, 0xc1, 0xf4, 0x3a, 0x69, 0xbd, 0x4f, 0x6d, 0xf2, 0x80, 0xc6, 0xbf, 0x6b, + 0xa6, 0x67, 0x35, 0x3d, 0xbe, 0x98, 0x54, 0xe4, 0x09, 0x9c, 0x12, 0x84, 0x6f, 0x84, 0xb4, 0x41, + 0x23, 0x12, 0x1a, 0x92, 0x5a, 0xed, 0xea, 0xa4, 0x91, 0x3f, 0x32, 0xbd, 0x98, 0x5a, 0x34, 0x5c, + 0x27, 0xad, 0x75, 0xa1, 0xad, 0x9f, 0xe4, 0x5e, 0x36, 0xa4, 0x13, 0x29, 0xc5, 0x1f, 0xc3, 0x44, + 0x2b, 0x51, 0x36, 0x7c, 0xd2, 0x32, 0x7c, 0xc2, 0x42, 0xd7, 0x8a, 0xda, 0x59, 0xa5, 0x9d, 0x77, + 0x01, 0x5e, 0x17, 0xea, 0xfa, 0x78, 0xab, 0x33, 0xa4, 0x10, 0xaa, 0xff, 0x21, 0x98, 0xc9, 0x4f, + 0x4f, 0x6e, 0x86, 0x03, 0x47, 0x43, 0x12, 0x35, 0x3d, 0x16, 0xc9, 0xad, 0xb8, 0xd5, 0x2f, 0x66, + 0x86, 0x97, 0x58, 0x61, 0x25, 0xb0, 0x1f, 0x51, 0xaf, 0xe9, 0x93, 0x0d, 0x12, 0xc6, 0x5b, 0x27, + 0xb7, 0x2d, 0xf1, 0xae, 0x98, 0x30, 0x9e, 0xa1, 0x85, 0x67, 0x60, 0xb4, 0x4d, 0x06, 0xa3, 0xcd, + 0x7f, 0x48, 0x36, 0xfb, 0x8e, 0x8d, 0x4f, 0xc0, 0xa0, 0x4f, 0x5a, 0xbc, 0x22, 0x03, 0x7a, 0xfc, + 0x88, 0x4f, 0xc1, 0x91, 0x16, 0x77, 0x52, 0x1a, 0x9c, 0x41, 0x95, 0x21, 0x5d, 0xbe, 0xa9, 0x0b, + 0x50, 0xe1, 0xa4, 0x7b, 0x8f, 0xdf, 0x26, 0x0f, 0x5c, 0x12, 0xde, 0x8d, 0xef, 0x92, 0x35, 0x7e, + 0xba, 0x9b, 0x61, 0xe7, 0xbe, 0xaa, 0xdf, 0x23, 0x98, 0xdf, 0x87, 0xb2, 0xac, 0x52, 0x00, 0xa5, + 0xbc, 0x2b, 0x4a, 0xf2, 0x40, 0xcb, 0x28, 0x5b, 0x2f, 0xd7, 0xb2, 0x3c, 0x13, 0x24, 0x4b, 0x47, + 0x9d, 0x87, 0x39, 0x0e, 0x6e, 0x35, 0x26, 0x8d, 0x6e, 0x32, 0x92, 0x9f, 0xc8, 0x77, 0x48, 0x66, + 0xdd, 0x53, 0x57, 0xe6, 0xf1, 0x14, 0x4e, 0xe7, 0x5c, 0xdf, 0x32, 0x8d, 0x6a, 0x46, 0x1a, 0x3d, + 0x1c, 0xcb, 0x2c, 0x04, 0xb9, 0xf7, 0xa8, 0xa8, 0x8f, 0xe1, 0x0c, 0x07, 0x76, 0x9f, 0x99, 0x8c, + 0xd4, 0x9b, 0xde, 0xbd, 0xf8, 0xca, 0x4e, 0xce, 0xd5, 0x32, 0x14, 0xf9, 0x15, 0x9e, 0xec, 0xf9, + 0xc8, 0x92, 0x92, 0x11, 0x9a, 0x9b, 0xdc, 0xb1, 0x13, 0x2e, 0x51, 0xf1, 0xaa, 0xfe, 0x82, 0x40, + 0xc9, 0x72, 0x2d, 0xb3, 0x7c, 0x0c, 0xc7, 0x85, 0xef, 0x86, 0x67, 0x5a, 0xc4, 0x27, 0x01, 0x93, + 0x21, 0xe6, 0x33, 0x42, 0xdc, 0xa5, 0x81, 0xf3, 0x80, 0x84, 0x3e, 0x77, 0xb1, 0x91, 0x18, 0xc8, + 0x88, 0xc7, 0x68, 0x97, 0x14, 0x4f, 0xc3, 0x48, 0xdd, 0xf5, 0x3c, 0xc3, 0xf4, 0x69, 0x33, 0x60, + 0x9c, 0x93, 0x43, 0x3a, 0xc4, 0xa2, 0x15, 0x2e, 0xc1, 0x53, 0x30, 0xcc, 0x42, 0xd7, 0x71, 0x48, + 0x48, 0x6c, 0xce, 0xce, 0xa2, 0xbe, 0x2b, 0x50, 0xe7, 0xe0, 0x3c, 0x87, 0x7d, 0xb7, 0xa3, 0xf9, + 0x64, 0x6e, 0xea, 0x17, 0x08, 0x66, 0xfb, 0x69, 0xca, 0x64, 0x9f, 0xc0, 0x78, 0x46, 0x2f, 0x93, + 0x09, 0x9f, 0xcf, 0x4a, 0x38, 0xe5, 0x52, 0x26, 0x8b, 0xbd, 0xd4, 0x8a, 0xba, 0x02, 0x67, 0xef, + 0xb3, 0x90, 0x98, 0xa2, 0x3c, 0x35, 0x4a, 0x9f, 0x3e, 0x14, 0xfd, 0x2c, 0xd9, 0xc7, 0xf4, 0xf9, + 0x1d, 0xec, 0x3e, 0xbf, 0xea, 0x8f, 0x08, 0xca, 0x79, 0x3e, 0x64, 0x0e, 0xef, 0xc2, 0x51, 0xd9, + 0x26, 0xe5, 0x25, 0x34, 0x9d, 0x81, 0x5b, 0xf8, 0x10, 0xa6, 0x09, 0x21, 0xa4, 0x15, 0x3e, 0x07, + 0xa3, 0x82, 0xd7, 0x9b, 0xc4, 0x75, 0x36, 0xc5, 0xc6, 0x8c, 0xe9, 0x23, 0x5c, 0x76, 0x9b, 0x8b, + 0xf0, 0x24, 0x0c, 0x93, 0x6d, 0x62, 0x19, 0x3e, 0xb5, 0xc5, 0xbd, 0x31, 0xa6, 0x17, 0x63, 0xc1, + 0x3a, 0xb5, 0x89, 0xfa, 0x0c, 0xc1, 0x68, 0xa7, 0x7f, 0xfc, 0x10, 0x4e, 0xd0, 0x04, 0xad, 0x6c, + 0xe1, 0xb2, 0xa4, 0x95, 0x5c, 0x68, 0x7b, 0xd2, 0xbb, 0x5d, 0xd0, 0x8f, 0xd3, 0x6e, 0x51, 0xdc, + 0xba, 0x04, 0x33, 0x63, 0xca, 0xc8, 0x4b, 0x7e, 0xb6, 0xbf, 0xc3, 0x9b, 0xae, 0xe7, 0xdd, 0x2e, + 0xe8, 0xc3, 0xdc, 0x36, 0x7e, 0x59, 0x3d, 0x01, 0xc7, 0x04, 0x2a, 0xc3, 0x27, 0x51, 0x64, 0x3a, + 0x44, 0xfd, 0x1a, 0xc1, 0x44, 0x26, 0x0e, 0xfc, 0x78, 0x6f, 0x75, 0xaf, 0x75, 0x47, 0x94, 0x53, + 0x4c, 0x35, 0x3d, 0xb3, 0xdc, 0xab, 0xd7, 0xd7, 0x62, 0x81, 0x70, 0xf4, 0x68, 0x71, 0x6f, 0xd9, + 0x15, 0x28, 0x46, 0x81, 0xd9, 0x88, 0x36, 0xa9, 0x28, 0x79, 0x51, 0x6f, 0xbf, 0xab, 0x3f, 0x23, + 0x18, 0xcf, 0x48, 0x03, 0x2f, 0x03, 0x27, 0x87, 0x68, 0xa3, 0xb2, 0xa6, 0x53, 0x39, 0xed, 0x9f, + 0xb7, 0x49, 0x9d, 0x4f, 0x0b, 0xfc, 0x11, 0x5f, 0x85, 0x23, 0xbc, 0x06, 0x71, 0x83, 0x8c, 0x33, + 0x29, 0xe5, 0xdd, 0x19, 0x12, 0xa9, 0xd4, 0xc6, 0x73, 0x30, 0xda, 0x71, 0x6e, 0xa3, 0xd2, 0xe0, + 0xcc, 0x60, 0x65, 0x48, 0xea, 0x8c, 0xec, 0x1e, 0xdf, 0x68, 0xe9, 0x07, 0x80, 0xd7, 0xf8, 0xc1, + 0xc3, 0x5f, 0x22, 0x28, 0x26, 0x23, 0x08, 0x5e, 0xc8, 0x88, 0x93, 0x33, 0xc7, 0x29, 0x95, 0x3c, + 0xdd, 0xbd, 0x83, 0x9c, 0x3a, 0xff, 0xf9, 0x1f, 0xff, 0x7e, 0x3b, 0xf0, 0x06, 0x3e, 0xa7, 0xf5, + 0x18, 0x9a, 0xb5, 0x4f, 0x5c, 0xfb, 0x53, 0xfc, 0x15, 0x82, 0x91, 0x8e, 0x59, 0x2a, 0x1f, 0x50, + 0x7a, 0xa8, 0x53, 0x2e, 0xf4, 0x03, 0xd4, 0x31, 0x9c, 0xa9, 0x6f, 0x72, 0x4c, 0x65, 0x3c, 0xd5, + 0x0b, 0x13, 0xfe, 0x0d, 0x41, 0x29, 0x6f, 0x28, 0xc0, 0x4b, 0x07, 0x9a, 0x20, 0x04, 0xc6, 0xcb, + 0x87, 0x98, 0x3a, 0xd4, 0xeb, 0x1c, 0xeb, 0x95, 0xeb, 0x68, 0x41, 0xd5, 0xb4, 0xcc, 0xa9, 0xdd, + 0x08, 0xa8, 0x4d, 0x0c, 0x46, 0xc5, 0xbf, 0xd5, 0x01, 0xf2, 0x77, 0x04, 0x53, 0xbd, 0xfa, 0x33, + 0x5e, 0xce, 0xab, 0xda, 0x3e, 0xa6, 0x0b, 0xe5, 0x9d, 0xc3, 0x19, 0xcb, 0xbc, 0x66, 0x79, 0x5e, + 0x33, 0xb8, 0xac, 0xf5, 0xfc, 0x52, 0xc2, 0xbf, 0x22, 0x98, 0xec, 0xd1, 0x9c, 0xf1, 0xf5, 0x3c, + 0x14, 0xfd, 0xc7, 0x0a, 0x65, 0xf9, 0x50, 0xb6, 0x32, 0x81, 0xf3, 0x3c, 0x81, 0x69, 0x7c, 0xb6, + 0xe7, 0xe7, 0x23, 0x7e, 0x86, 0xe0, 0x4c, 0x6e, 0x83, 0xc3, 0xd7, 0xf2, 0x10, 0xf4, 0xeb, 0x9e, + 0xca, 0xdb, 0x87, 0xb0, 0x94, 0xc8, 0xab, 0x1c, 0x79, 0x05, 0xcf, 0x6a, 0xfb, 0xfa, 0x64, 0xc4, + 0x01, 0x8c, 0x75, 0xcd, 0x20, 0xf8, 0xad, 0xbc, 0xd8, 0x59, 0x53, 0x90, 0x72, 0x71, 0x9f, 0xda, + 0x12, 0x5d, 0x01, 0x7f, 0x06, 0xa7, 0xb2, 0x7b, 0x29, 0xbe, 0xb4, 0xdf, 0xbe, 0x94, 0xb4, 0x6e, + 0x65, 0xf1, 0x00, 0x16, 0x02, 0xc0, 0x25, 0xb4, 0xba, 0xf1, 0xfc, 0x65, 0x19, 0xbd, 0x78, 0x59, + 0x46, 0xff, 0xbc, 0x2c, 0xa3, 0x6f, 0x5e, 0x95, 0x0b, 0x2f, 0x5e, 0x95, 0x0b, 0x7f, 0xbe, 0x2a, + 0x17, 0x3e, 0xba, 0xea, 0xb8, 0x6c, 0xb3, 0x59, 0xab, 0x5a, 0xd4, 0xef, 0x2e, 0x5e, 0xeb, 0xca, + 0x45, 0xde, 0x56, 0xb4, 0xb6, 0x64, 0x5b, 0x14, 0x94, 0xed, 0x34, 0x48, 0x54, 0x3b, 0xc2, 0xc5, + 0x97, 0xff, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x63, 0x36, 0x67, 0x96, 0xfd, 0x10, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1021,7 +1239,8 @@ type QueryClient interface { LiquidationsConfiguration(ctx context.Context, in *QueryLiquidationsConfigurationRequest, opts ...grpc.CallOption) (*QueryLiquidationsConfigurationResponse, error) // Queries the stateful order for a given order id. StatefulOrder(ctx context.Context, in *QueryStatefulOrderRequest, opts ...grpc.CallOption) (*QueryStatefulOrderResponse, error) - // Streams orderbook updates. + // Streams orderbook updates. Updates contain orderbook data + // such as order placements, updates, and fills. StreamOrderbookUpdates(ctx context.Context, in *StreamOrderbookUpdatesRequest, opts ...grpc.CallOption) (Query_StreamOrderbookUpdatesClient, error) } @@ -1144,7 +1363,8 @@ type QueryServer interface { LiquidationsConfiguration(context.Context, *QueryLiquidationsConfigurationRequest) (*QueryLiquidationsConfigurationResponse, error) // Queries the stateful order for a given order id. StatefulOrder(context.Context, *QueryStatefulOrderRequest) (*QueryStatefulOrderResponse, error) - // Streams orderbook updates. + // Streams orderbook updates. Updates contain orderbook data + // such as order placements, updates, and fills. StreamOrderbookUpdates(*StreamOrderbookUpdatesRequest, Query_StreamOrderbookUpdatesServer) error } @@ -1952,21 +2172,11 @@ func (m *StreamOrderbookUpdatesResponse) MarshalToSizedBuffer(dAtA []byte) (int, if m.ExecMode != 0 { i = encodeVarintQuery(dAtA, i, uint64(m.ExecMode)) i-- - dAtA[i] = 0x20 + dAtA[i] = 0x18 } if m.BlockHeight != 0 { i = encodeVarintQuery(dAtA, i, uint64(m.BlockHeight)) i-- - dAtA[i] = 0x18 - } - if m.Snapshot { - i-- - if m.Snapshot { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i-- dAtA[i] = 0x10 } if len(m.Updates) > 0 { @@ -1986,82 +2196,270 @@ func (m *StreamOrderbookUpdatesResponse) MarshalToSizedBuffer(dAtA []byte) (int, return len(dAtA) - i, nil } -func encodeVarintQuery(dAtA []byte, offset int, v uint64) int { - offset -= sovQuery(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ +func (m *StreamUpdate) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err } - dAtA[offset] = uint8(v) - return base + return dAtA[:n], nil } -func (m *QueryGetClobPairRequest) Size() (n int) { - if m == nil { - return 0 - } + +func (m *StreamUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i var l int _ = l - if m.Id != 0 { - n += 1 + sovQuery(uint64(m.Id)) + if m.UpdateMessage != nil { + { + size := m.UpdateMessage.Size() + i -= size + if _, err := m.UpdateMessage.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } } - return n + return len(dAtA) - i, nil } -func (m *QueryClobPairResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = m.ClobPair.Size() - n += 1 + l + sovQuery(uint64(l)) - return n +func (m *StreamUpdate_OrderbookUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryAllClobPairRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Pagination != nil { - l = m.Pagination.Size() - n += 1 + l + sovQuery(uint64(l)) +func (m *StreamUpdate_OrderbookUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderbookUpdate != nil { + { + size, err := m.OrderbookUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa } - return n + return len(dAtA) - i, nil +} +func (m *StreamUpdate_OrderFill) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryClobPairAllResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.ClobPair) > 0 { - for _, e := range m.ClobPair { - l = e.Size() - n += 1 + l + sovQuery(uint64(l)) +func (m *StreamUpdate_OrderFill) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderFill != nil { + { + size, err := m.OrderFill.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0x12 } - if m.Pagination != nil { - l = m.Pagination.Size() - n += 1 + l + sovQuery(uint64(l)) + return len(dAtA) - i, nil +} +func (m *StreamOrderbookUpdate) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err } - return n + return dAtA[:n], nil } -func (m *MevNodeToNodeCalculationRequest) Size() (n int) { - if m == nil { - return 0 - } +func (m *StreamOrderbookUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamOrderbookUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i var l int _ = l - if m.BlockProposerMatches != nil { - l = m.BlockProposerMatches.Size() - n += 1 + l + sovQuery(uint64(l)) - } + if m.Snapshot { + i-- + if m.Snapshot { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if len(m.Updates) > 0 { + for iNdEx := len(m.Updates) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Updates[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *StreamOrderbookFill) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamOrderbookFill) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamOrderbookFill) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.FillAmounts) > 0 { + dAtA16 := make([]byte, len(m.FillAmounts)*10) + var j15 int + for _, num := range m.FillAmounts { + for num >= 1<<7 { + dAtA16[j15] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j15++ + } + dAtA16[j15] = uint8(num) + j15++ + } + i -= j15 + copy(dAtA[i:], dAtA16[:j15]) + i = encodeVarintQuery(dAtA, i, uint64(j15)) + i-- + dAtA[i] = 0x1a + } + if len(m.Orders) > 0 { + for iNdEx := len(m.Orders) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Orders[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if m.ClobMatch != nil { + { + size, err := m.ClobMatch.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintQuery(dAtA []byte, offset int, v uint64) int { + offset -= sovQuery(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *QueryGetClobPairRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Id != 0 { + n += 1 + sovQuery(uint64(m.Id)) + } + return n +} + +func (m *QueryClobPairResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.ClobPair.Size() + n += 1 + l + sovQuery(uint64(l)) + return n +} + +func (m *QueryAllClobPairRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Pagination != nil { + l = m.Pagination.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} + +func (m *QueryClobPairAllResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ClobPair) > 0 { + for _, e := range m.ClobPair { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + if m.Pagination != nil { + l = m.Pagination.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} + +func (m *MevNodeToNodeCalculationRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.BlockProposerMatches != nil { + l = m.BlockProposerMatches.Size() + n += 1 + l + sovQuery(uint64(l)) + } if m.ValidatorMevMetrics != nil { l = m.ValidatorMevMetrics.Size() n += 1 + l + sovQuery(uint64(l)) @@ -2218,9 +2616,6 @@ func (m *StreamOrderbookUpdatesResponse) Size() (n int) { n += 1 + l + sovQuery(uint64(l)) } } - if m.Snapshot { - n += 2 - } if m.BlockHeight != 0 { n += 1 + sovQuery(uint64(m.BlockHeight)) } @@ -2230,6 +2625,86 @@ func (m *StreamOrderbookUpdatesResponse) Size() (n int) { return n } +func (m *StreamUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.UpdateMessage != nil { + n += m.UpdateMessage.Size() + } + return n +} + +func (m *StreamUpdate_OrderbookUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderbookUpdate != nil { + l = m.OrderbookUpdate.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} +func (m *StreamUpdate_OrderFill) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderFill != nil { + l = m.OrderFill.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} +func (m *StreamOrderbookUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Updates) > 0 { + for _, e := range m.Updates { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + if m.Snapshot { + n += 2 + } + return n +} + +func (m *StreamOrderbookFill) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ClobMatch != nil { + l = m.ClobMatch.Size() + n += 1 + l + sovQuery(uint64(l)) + } + if len(m.Orders) > 0 { + for _, e := range m.Orders { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + if len(m.FillAmounts) > 0 { + l = 0 + for _, e := range m.FillAmounts { + l += sovQuery(uint64(e)) + } + n += 1 + sovQuery(uint64(l)) + l + } + return n +} + func sovQuery(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -3687,32 +4162,12 @@ func (m *StreamOrderbookUpdatesResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Updates = append(m.Updates, types.OffChainUpdateV1{}) + m.Updates = append(m.Updates, StreamUpdate{}) if err := m.Updates[len(m.Updates)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Snapshot", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowQuery - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.Snapshot = bool(v != 0) - case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field BlockHeight", wireType) } @@ -3731,7 +4186,7 @@ func (m *StreamOrderbookUpdatesResponse) Unmarshal(dAtA []byte) error { break } } - case 4: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ExecMode", wireType) } @@ -3771,6 +4226,426 @@ func (m *StreamOrderbookUpdatesResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *StreamUpdate) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamUpdate: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamUpdate: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderbookUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.UpdateMessage = &StreamUpdate_OrderbookUpdate{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderFill", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookFill{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.UpdateMessage = &StreamUpdate_OrderFill{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StreamOrderbookUpdate) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamOrderbookUpdate: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamOrderbookUpdate: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Updates", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Updates = append(m.Updates, types.OffChainUpdateV1{}) + if err := m.Updates[len(m.Updates)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Snapshot", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Snapshot = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StreamOrderbookFill) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamOrderbookFill: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamOrderbookFill: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClobMatch", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ClobMatch == nil { + m.ClobMatch = &ClobMatch{} + } + if err := m.ClobMatch.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Orders", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Orders = append(m.Orders, Order{}) + if err := m.Orders[len(m.Orders)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.FillAmounts = append(m.FillAmounts, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.FillAmounts) == 0 { + m.FillAmounts = make([]uint64, 0, elementCount) + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.FillAmounts = append(m.FillAmounts, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field FillAmounts", wireType) + } + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipQuery(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0