Skip to content

Commit

Permalink
Simplify streaming.FilterSubaccountStreamUpdates and forward entire S…
Browse files Browse the repository at this point in the history
…treamUpdate if any OffchainUpdateV1 message is in scope
  • Loading branch information
UnbornAztecKing committed Jan 15, 2025
1 parent e632ed4 commit 3c71f85
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 239 deletions.
24 changes: 15 additions & 9 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,12 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/** Filter order updates in addition to position updates */
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount IDs.
*/

filterOrders: boolean;
filterOrdersBySubaccountId: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
Expand All @@ -296,9 +299,12 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/** Filter order updates in addition to position updates */
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount IDs.
*/

filter_orders: boolean;
filter_orders_by_subaccount_id: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand Down Expand Up @@ -1305,7 +1311,7 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
clobPairId: [],
subaccountIds: [],
marketIds: [],
filterOrders: false
filterOrdersBySubaccountId: false
};
}

Expand All @@ -1331,8 +1337,8 @@ export const StreamOrderbookUpdatesRequest = {

writer.ldelim();

if (message.filterOrders === true) {
writer.uint32(32).bool(message.filterOrders);
if (message.filterOrdersBySubaccountId === true) {
writer.uint32(32).bool(message.filterOrdersBySubaccountId);
}

return writer;
Expand Down Expand Up @@ -1378,7 +1384,7 @@ export const StreamOrderbookUpdatesRequest = {
break;

case 4:
message.filterOrders = reader.bool();
message.filterOrdersBySubaccountId = reader.bool();
break;

default:
Expand All @@ -1395,7 +1401,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrders = object.filterOrders ?? false;
message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false;
return message;
}

Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ message StreamOrderbookUpdatesRequest {

// Filter order updates by subaccount IDs.
// If true, the orderbook updates only include orders from provided subaccount IDs.
bool filter_orders = 4;
bool filter_orders_by_subaccount_id = 4;
}

// StreamOrderbookUpdatesResponse is a response message for the
Expand Down
113 changes: 49 additions & 64 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,68 +220,59 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

func doFilterSubaccountStreamUpdate(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) bool {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
return true
}
} else {
logger.Error(err.Error())
}
}
return false
}

// Filter StreamUpdates for subaccountIdNumbers
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(
output chan []clobtypes.StreamUpdate,
func FilterSubaccountStreamUpdates(
updates []clobtypes.StreamUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) {
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
for i, subaccountId := range sub.subaccountIds {
subaccountIdNumbers[i] = subaccountId.Number
}

) *[]clobtypes.StreamUpdate {
// If reflection becomes too expensive, split updatesChannel by message type
for updates := range sub.updatesChannel {
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
}
} else {
logger.Error(err.Error())
}
}
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped
if len(orderBookUpdates) > 0 {
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) {
update = clobtypes.StreamUpdate{
BlockHeight: update.BlockHeight,
ExecMode: update.ExecMode,
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Snapshot: updateMessage.OrderbookUpdate.Snapshot,
Updates: orderBookUpdates,
},
},
}
}
filteredUpdates = append(filteredUpdates, update)
}
default:
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
if doFilterSubaccountStreamUpdate(updateMessage, subaccountIdNumbers, logger) {
filteredUpdates = append(filteredUpdates, update)
}
}
if len(filteredUpdates) > 0 {
output <- filteredUpdates
default:
filteredUpdates = append(filteredUpdates, update)
}
}

if len(filteredUpdates) > 0 {
return &filteredUpdates
} else {
return nil
}
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrders bool,
filterOrdersBySubAccountId bool,
messageSender types.OutgoingMessageSender,
) (
err error,
Expand All @@ -290,11 +281,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) {
sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds")
return types.ErrInvalidStreamingRequest
}

sm.Lock()
sIds := make([]satypes.SubaccountId, len(subaccountIds))
subaccountIdNumbers := make([]uint32, len(subaccountIds))
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
subaccountIdNumbers[i] = subaccountId.Number
}

subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)
Expand Down Expand Up @@ -346,27 +343,15 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
sm.EmitMetrics()
sm.Unlock()

// If filterOrders, listen to filtered channel and start filter goroutine
// Error if filterOrders but no subaccounts are subscribed
filteredUpdateChannel := subscription.updatesChannel
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}

// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range filteredUpdateChannel {
for updates := range subscription.updatesChannel {
if filterOrdersBySubAccountId {
filteredUpdates := FilterSubaccountStreamUpdates(updates, subaccountIdNumbers, sm.logger)
if filteredUpdates != nil {
updates = *filteredUpdates
}
}
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
Expand Down
Loading

0 comments on commit 3c71f85

Please sign in to comment.