Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fullnode) Add filterOrders option to streaming subscription #2676

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,13 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/** Filter order updates in addition to position updates */
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filterOrders: boolean;
filterOrdersBySubaccountId: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
Expand All @@ -296,9 +300,13 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/** Filter order updates in addition to position updates */
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filter_orders: boolean;
filter_orders_by_subaccount_id: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand Down Expand Up @@ -1305,7 +1313,7 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
clobPairId: [],
subaccountIds: [],
marketIds: [],
filterOrders: false
filterOrdersBySubaccountId: false
};
}

Expand All @@ -1331,8 +1339,8 @@ export const StreamOrderbookUpdatesRequest = {

writer.ldelim();

if (message.filterOrders === true) {
writer.uint32(32).bool(message.filterOrders);
if (message.filterOrdersBySubaccountId === true) {
writer.uint32(32).bool(message.filterOrdersBySubaccountId);
}

return writer;
Expand Down Expand Up @@ -1378,7 +1386,7 @@ export const StreamOrderbookUpdatesRequest = {
break;

case 4:
message.filterOrders = reader.bool();
message.filterOrdersBySubaccountId = reader.bool();
break;

default:
Expand All @@ -1395,7 +1403,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrders = object.filterOrders ?? false;
message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false;
return message;
}

Expand Down
6 changes: 4 additions & 2 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ message StreamOrderbookUpdatesRequest {
// Market ids for price updates.
repeated uint32 market_ids = 3;

// Filter order updates in addition to position updates
bool filter_orders = 4;
// Filter order updates by subaccount IDs.
// If true, the orderbook updates only include orders from provided subaccount
// IDs.
bool filter_orders_by_subaccount_id = 4;
}

// StreamOrderbookUpdatesResponse is a response message for the
Expand Down
113 changes: 49 additions & 64 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,68 +220,59 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

func doFilterSubaccountStreamUpdate(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) bool {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
return true
}
} else {
logger.Error(err.Error())
}
}
return false
}

// Filter StreamUpdates for subaccountIdNumbers
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(
output chan []clobtypes.StreamUpdate,
func FilterSubaccountStreamUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to FilterStreamUpdateBySubaccount to disambiguate from "filtering SubaccountStreamUpdate"

updates []clobtypes.StreamUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) {
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
for i, subaccountId := range sub.subaccountIds {
subaccountIdNumbers[i] = subaccountId.Number
}

) *[]clobtypes.StreamUpdate {
// If reflection becomes too expensive, split updatesChannel by message type
for updates := range sub.updatesChannel {
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
}
} else {
logger.Error(err.Error())
}
}
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped
if len(orderBookUpdates) > 0 {
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) {
update = clobtypes.StreamUpdate{
BlockHeight: update.BlockHeight,
ExecMode: update.ExecMode,
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Snapshot: updateMessage.OrderbookUpdate.Snapshot,
Updates: orderBookUpdates,
},
},
}
}
filteredUpdates = append(filteredUpdates, update)
}
default:
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
if doFilterSubaccountStreamUpdate(updateMessage, subaccountIdNumbers, logger) {
filteredUpdates = append(filteredUpdates, update)
}
}
if len(filteredUpdates) > 0 {
output <- filteredUpdates
default:
filteredUpdates = append(filteredUpdates, update)
}
}

if len(filteredUpdates) > 0 {
return &filteredUpdates
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need for else. Just do return nil outside of if statement above.

return nil
}
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrders bool,
filterOrdersBySubAccountId bool,
messageSender types.OutgoingMessageSender,
) (
err error,
Expand All @@ -290,11 +281,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) {
sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds")
return types.ErrInvalidStreamingRequest
}

sm.Lock()
sIds := make([]satypes.SubaccountId, len(subaccountIds))
subaccountIdNumbers := make([]uint32, len(subaccountIds))
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
subaccountIdNumbers[i] = subaccountId.Number
}

subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)
Expand Down Expand Up @@ -346,27 +343,15 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
sm.EmitMetrics()
sm.Unlock()

// If filterOrders, listen to filtered channel and start filter goroutine
// Error if filterOrders but no subaccounts are subscribed
filteredUpdateChannel := subscription.updatesChannel
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}

// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range filteredUpdateChannel {
for updates := range subscription.updatesChannel {
if filterOrdersBySubAccountId {
filteredUpdates := FilterSubaccountStreamUpdates(updates, subaccountIdNumbers, sm.logger)
if filteredUpdates != nil {
updates = *filteredUpdates
}
}
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
Expand Down
Loading
Loading