-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(fullnode) Add filterOrders option to streaming subscription #2676
feat(fullnode) Add filterOrders option to streaming subscription #2676
Conversation
Warning Rate limit exceeded@UnbornAztecKing has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 12 minutes and 33 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThis pull request introduces a new boolean property Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (7)
protocol/streaming/util/util.go (2)
25-26
: Improve function documentation.The current comment is focused on error cases. Consider adding a description of the function's purpose and return values.
-// Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events +// GetOffChainUpdateV1SubaccountIdNumber extracts the SubaccountId.Number from an OffChainUpdateV1 message. +// Returns the subaccount number and an error if the update message type is not a supported order event +// (OrderPlace, OrderRemove, OrderUpdate, OrderReplace).
28-42
: Consider using constants for error messages.The error message is hardcoded. Consider defining it as a constant to maintain consistency and make updates easier.
+const ( + ErrUnsupportedUpdateMessage = "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v" +) switch updateMessage := update.UpdateMessage.(type) { // ... case statements ... default: - return 0, fmt.Errorf( - "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v", - updateMessage, - ) + return 0, fmt.Errorf(ErrUnsupportedUpdateMessage, updateMessage)protocol/streaming/util/util_test.go (2)
113-113
: Remove debug print statement.The
fmt.Println
statement appears to be left from debugging.- fmt.Println("expected", id)
21-47
: Refactor test data setup.Consider moving the test data setup into helper functions to improve readability and reusability.
func createTestOrder(subaccountIdNumber uint32) (pv1types.IndexerOrderId, pv1types.IndexerOrder) { orderId := pv1types.IndexerOrderId{ SubaccountId: pv1types.IndexerSubaccountId{ Owner: "foo", Number: subaccountIdNumber, }, ClientId: 0, OrderFlags: 0, ClobPairId: 0, } order := pv1types.IndexerOrder{ OrderId: orderId, // ... other fields ... } return orderId, order }protocol/streaming/full_node_streaming_manager.go (1)
223-277
: Ensure comprehensive error handling and future-proofing for additional update typesIn
FilterSubaccountStreamUpdates
, whenGetOffChainUpdateV1SubaccountIdNumber
returns an error, the error is logged but not addressed further. Consider whether logging alone is sufficient or if additional error handling is necessary to prevent potential data loss of critical updates.Moreover, currently, only
StreamUpdate_OrderbookUpdate
messages are explicitly filtered. If new update message types containing subaccount IDs are introduced in the future, they might bypass filtering. Consider extending the filtering logic to handle other relevant message types to ensure consistent behavior.protocol/streaming/types/interface.go (1)
19-19
: Add documentation for the newfilterOrders
parameterIncluding comments to describe the purpose and usage of the
filterOrders
parameter in theSubscribe
method will improve code readability and assist developers in understanding the subscription behavior.Apply this diff to add documentation:
Subscribe( clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, + // filterOrders determines whether to filter order updates based on subaccountIds filterOrders bool, srv OutgoingMessageSender, ) ( err error, )
proto/dydxprotocol/clob/query.proto (1)
189-191
: Add detailed documentation for the filter_orders field.The new field would benefit from more comprehensive documentation explaining:
- Its purpose and behavior
- The default value implications
- The relationship with subaccount filtering
Add documentation comments above the field:
// Market ids for price updates. repeated uint32 market_ids = 3; + // Indicates whether to filter order updates based on the specified subaccount_ids. + // When true, only order updates related to the specified subaccount_ids will be streamed. + // When false or unspecified, all order updates will be streamed regardless of subaccount_ids. + // This filter works in conjunction with position update filtering specified by subaccount_ids. bool filter_orders = 4;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/query.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (10)
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
(6 hunks)proto/dydxprotocol/clob/query.proto
(1 hunks)protocol/streaming/full_node_streaming_manager.go
(6 hunks)protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)protocol/streaming/noop_streaming_manager.go
(1 hunks)protocol/streaming/types/interface.go
(1 hunks)protocol/streaming/util/util.go
(1 hunks)protocol/streaming/util/util_test.go
(1 hunks)protocol/streaming/ws/websocket_server.go
(3 hunks)protocol/x/clob/keeper/grpc_stream_orderbook.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-import-export
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test / run_command
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: test-race
- GitHub Check: test-coverage-upload
- GitHub Check: container-tests
- GitHub Check: Summary
🔇 Additional comments (14)
protocol/streaming/full_node_streaming_manager.go (3)
97-114
: Encapsulation ofOrderbookSubscription
initialization improves code reusabilityThe new function
NewOrderbookSubscription
cleanly encapsulates the initialization logic forOrderbookSubscription
, enhancing code reuse and maintainability.
116-131
: RefactoredNewOrderbookSubscription
method promotes code simplicityBy delegating to the general
NewOrderbookSubscription
function, the method withinFullNodeStreamingManagerImpl
simplifies subscription creation and reduces code duplication.
1182-1187
: Correctly caching updates to synchronize local operations queueThe addition of caching
syncLocalUpdates
ensures that local operations are properly synchronized duringStreamBatchUpdatesAfterFinalizeBlock
, enhancing the reliability and consistency of the streaming service.protocol/x/clob/keeper/grpc_stream_orderbook.go (1)
15-15
: Inclusion offilterOrders
parameter enables order filtering in subscriptionsAdding
req.GetFilterOrders()
to theSubscribe
call correctly integrates thefilterOrders
option, allowing clients to optionally filter order updates based on their specified criteria.protocol/streaming/noop_streaming_manager.go (1)
27-27
: LGTM!The no-op implementation correctly implements the updated interface by adding the new
filterOrders
parameter.protocol/streaming/ws/websocket_server.go (3)
100-108
: LGTM! Error handling is consistent with other parameters.The implementation follows the established pattern for parameter parsing and error handling.
183-194
: LGTM! Helper function follows best practices.The
parseFilterOrders
function:
- Correctly handles empty parameter case
- Uses appropriate error message format
- Returns a default value of
false
when parameter is not provided
123-123
: LGTM! Parameter correctly passed to Subscribe method.The
filterOrders
parameter is properly passed to the streaming manager's Subscribe method.protocol/streaming/full_node_streaming_manager_test.go (3)
257-261
: LGTM! Well-structured test case definition.The
TestCase
struct provides a clear and maintainable way to define test scenarios.
318-454
: LGTM! Comprehensive test coverage.The test suite thoroughly covers:
- Base scenarios for filtering
- Edge cases (empty updates, no IDs)
- Different update types (orderbook fills, taker orders, subaccount updates, price updates)
- Multiple subaccount combinations
456-477
: LGTM! Robust test execution.The test implementation:
- Properly handles channel cleanup with defer
- Uses appropriate timeouts for no-message scenarios
- Includes clear assertions
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)
281-283
: LGTM! Clear field documentation.The new field is well-documented with a clear description of its purpose.
1307-1308
: LGTM! Proper protobuf encoding implementation.The encoding logic correctly:
- Initializes the field with a default value of
false
- Only encodes the field when it's
true
Also applies to: 1333-1336
1380-1382
: LGTM! Proper protobuf decoding implementation.The decoding logic correctly handles the new boolean field.
tests := map[string]struct { | ||
update ocutypes.OffChainUpdateV1 | ||
id uint32 | ||
err error | ||
}{ | ||
"OrderPlace": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ | ||
OrderPlace: &ocutypes.OrderPlaceV1{ | ||
Order: &order, | ||
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED, | ||
TimeStamp: _ToPtr(orderPlaceTime), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderRemove": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{ | ||
OrderRemove: &ocutypes.OrderRemoveV1{ | ||
RemovedOrderId: &orderId, | ||
Reason: stypes.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED, | ||
RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED, | ||
TimeStamp: _ToPtr(orderPlaceTime.Add(1 * time.Second)), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderUpdate": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ | ||
OrderUpdate: &ocutypes.OrderUpdateV1{ | ||
OrderId: &orderId, | ||
TotalFilledQuantums: fillQuantums, | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderReplace": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{ | ||
OrderReplace: &ocutypes.OrderReplaceV1{ | ||
OldOrderId: &orderId, | ||
Order: &newOrder, | ||
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, | ||
TimeStamp: _ToPtr(orderPlaceTime.Add(3 * time.Second)), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add negative test cases.
The test suite only covers happy paths. Consider adding test cases for:
- Nil UpdateMessage
- Unsupported message types
- Nil inner objects (Order, OrderId, etc.)
Example additional test cases:
"Nil UpdateMessage": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: nil,
},
id: 0,
err: fmt.Errorf(ErrUnsupportedUpdateMessage, nil),
},
"Unsupported Message Type": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_TradeUpdate{},
},
id: 0,
err: fmt.Errorf(ErrUnsupportedUpdateMessage, &ocutypes.OffChainUpdateV1_TradeUpdate{}),
},
// If filterOrders, listen to filtered channel and start filter goroutine | ||
// Error if filterOrders but no subaccounts are subscribed | ||
filteredUpdateChannel := subscription.updatesChannel | ||
if filterOrders { | ||
if len(subaccountIds) == 0 { | ||
sm.logger.Error( | ||
fmt.Sprintf( | ||
"filterOrders requires subaccountIds for subscription id: %+v", | ||
subscription.subscriptionId, | ||
), | ||
) | ||
} else { | ||
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize) | ||
defer close(filteredUpdateChannel) | ||
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent execution when filterOrders
is true without subaccountIds
When filterOrders
is true but no subaccountIds
are provided, the code logs an error but continues execution. This can lead to subscribers receiving unfiltered order updates, contrary to their request. It's advisable to return an error or terminate the subscription to prevent unintended behavior.
Apply this diff to return an error when filterOrders
is requested without subaccountIds
:
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
+ return types.ErrInvalidStreamingRequest
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}
Committable suggestion skipped: line range outside the PR's diff.
proto/dydxprotocol/clob/query.proto
Outdated
|
||
// Filter order updates in addition to position updates | ||
bool filter_orders = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Implementation of filter_orders
parameter appears incomplete
The filter_orders
parameter has been added to the protocol definition and is properly passed through the interface chain, but there's no evidence of actual filtering logic implementation in the codebase. This could lead to the parameter being ineffective.
protocol/streaming/full_node_streaming_manager.go
: The streaming manager accepts the parameter but doesn't appear to use it for filtering order updates- Consider implementing the filtering logic in
AddOrderUpdatesToCache
or related methods where updates are processed
🔗 Analysis chain
Verify integration with existing streaming functionality.
The field addition looks correct, but let's verify its integration with the existing streaming system.
Run the following to check for consistent handling across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration of filter_orders field with streaming functionality
# Check for corresponding handler implementations
rg -g '*.{go,proto}' -A 5 'StreamOrderbookUpdates.*Request'
# Check for any existing filtering logic that might need updates
rg -g '*.go' -A 10 'func.*Subscribe'
# Look for tests covering the new functionality
rg -g '*_test.{go,proto}' -A 5 'filter.*orders'
Length of output: 14812
Script:
#!/bin/bash
# Check for filtering logic implementation in streaming manager
rg -g '*.go' -A 15 'filterOrders.*bool' protocol/streaming/
# Look for any tests verifying the filtering functionality
rg -g '*_test.go' -B 5 -A 10 'filterOrders.*true' protocol/streaming/
Length of output: 2484
Script:
#!/bin/bash
# Look for order update streaming implementation
rg -g '*.go' -B 5 -A 10 'SendStreamUpdates|SendOrderbookUpdates' protocol/streaming/
# Check for any filtering logic in the update sending code
rg -g '*.go' -B 5 -A 10 'func.*Update.*Order' protocol/streaming/
# Look for any order-related streaming logic
ast-grep --pattern 'if $filterOrders {
$$$
}'
Length of output: 7169
Script:
#!/bin/bash
# Look for message sending implementation
rg -g '*.go' -B 5 -A 15 'func.*Send.*Message' protocol/streaming/
# Check subscriber management and cache handling
rg -g '*.go' -B 5 -A 15 'AddOrderUpdatesToCache|HandleNewSubscriber' protocol/streaming/
# Look for any references to filter_orders in the implementation
rg -g '*.go' -l 'filter.*orders' protocol/streaming/
Length of output: 6862
proto/dydxprotocol/clob/query.proto
Outdated
@@ -186,6 +186,9 @@ message StreamOrderbookUpdatesRequest { | |||
|
|||
// Market ids for price updates. | |||
repeated uint32 market_ids = 3; | |||
|
|||
// Filter order updates in addition to position updates | |||
bool filter_orders = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer making it more explicit:
bool filter_orders = 4; | |
bool filter_orders_by_subaccount_id = 4; |
"filterOrders requires subaccountIds for subscription id: %+v", | ||
subscription.subscriptionId, | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just return error to user and fail the Subscribe
request? Then can also remove else
below: https://go.dev/wiki/CodeReviewComments#indent-error-flow
for updates := range sub.updatesChannel { | ||
filteredUpdates := []clobtypes.StreamUpdate{} | ||
for _, update := range updates { | ||
switch updateMessage := update.UpdateMessage.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For modularity, move switch statements into a new function:
func FilterUpdates(input []clobtypes.StreamUpdate) (filteredUpdates []clobtypes.StreamUpdate)
Also makes it easier to test
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates { | ||
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) | ||
if err == nil { | ||
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to use slice over a map (as a set)? Is it because the former is more efficient at smaller size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's based on the assumption that the subaccount cardinality is low.
If I had a test that imposed significant load, I could profile both the map and slice.
} else { | ||
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize) | ||
defer close(filteredUpdateChannel) | ||
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per offline discussion, let's start with the simpler implementation of doing filtering below in the loop on subscription.updatesChannel
, with the assumption that it's acceptable overhead to add an extra boolean comparison on each iteration of the channel.
Co-authored-by: Teddy Ding <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
proto/dydxprotocol/clob/query.proto (1)
189-192
: 🛠️ Refactor suggestionEnhance field name clarity and fix comment formatting.
- The field name should be more explicit about filtering by subaccount ID.
- The comment formatting needs to be adjusted to match clang-format style.
Apply this diff to improve clarity and fix formatting:
- // Filter order updates by subaccount IDs. - // If true, the orderbook updates only include orders from provided subaccount IDs. - bool filter_orders = 4; + // If true, filters order updates to only include orders from the subaccount IDs + // specified in the subaccount_ids field. + bool filter_orders_by_subaccount_id = 4;
🧹 Nitpick comments (1)
proto/dydxprotocol/clob/query.proto (1)
189-192
: Enhance documentation for filtering behavior.The documentation should be expanded to clarify:
- The relationship between
subaccount_ids
andfilter_orders_by_subaccount_id
- The default behavior when the field is not set
- Usage examples
Apply this diff to improve the documentation:
- // Filter order updates by subaccount IDs. - // If true, the orderbook updates only include orders from provided subaccount IDs. - bool filter_orders = 4; + // Controls filtering of order updates based on subaccount_ids field. + // If true: Only order updates from the specified subaccount_ids are included + // If false (default): All order updates are included regardless of subaccount_ids + // Example: Set to true with subaccount_ids=[A,B] to receive updates only for + // orders belonging to subaccounts A and B. + bool filter_orders_by_subaccount_id = 4;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
proto/dydxprotocol/clob/query.proto
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Protobuf
proto/dydxprotocol/clob/query.proto
[error] 187-189: Code formatting does not match clang-format style. Line breaks and spacing in comments need to be adjusted.
⏰ Context from checks skipped due to timeout of 90000ms (22)
- GitHub Check: call-build-ecs-service-vulcan / (vulcan) Check docker image build
- GitHub Check: call-build-ecs-service-socks / (socks) Check docker image build
- GitHub Check: call-build-ecs-service-roundtable / (roundtable) Check docker image build
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: call-build-ecs-service-ender / (ender) Check docker image build
- GitHub Check: test-race
- GitHub Check: call-build-ecs-service-comlink / (comlink) Check docker image build
- GitHub Check: test / run_command
- GitHub Check: check-build-auxo
- GitHub Check: test-coverage-upload
- GitHub Check: golangci-lint
- GitHub Check: check-build-bazooka
- GitHub Check: liveness-test
- GitHub Check: lint
- GitHub Check: container-tests
- GitHub Check: benchmark
- GitHub Check: build
- GitHub Check: check-sample-pregenesis-up-to-date
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
- GitHub Check: Summary
🔇 Additional comments (1)
proto/dydxprotocol/clob/query.proto (1)
189-192
: Verify filtering logic implementation.The field addition looks correct, but let's verify its integration with the existing streaming system.
Run the following to check for consistent handling across the codebase:
✅ Verification successful
Filtering logic is properly implemented
The field addition is well integrated with the existing streaming system. The implementation in
protocol/streaming/full_node_streaming_manager.go
handles the filtering of orderbook updates based on subaccount IDs, ensuring consistent behavior with the newfilter_orders
field.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the integration of filter_orders field with streaming functionality # Check for corresponding handler implementations rg -g '*.go' -A 10 'StreamOrderbookUpdates.*Request.*filter.*orders' # Check for filtering logic in the streaming manager rg -g '*.go' -A 15 'func.*Subscribe.*bool.*filter' protocol/streaming/ # Look for tests verifying the filtering functionality rg -g '*_test.go' -A 10 'filter.*orders.*true' protocol/streaming/Length of output: 212
Script:
#!/bin/bash # First, find all orderbook and streaming related files fd -e go -e proto | rg -i "orderbook|streaming" # Then look for any filter-related code in these contexts rg -g '*.go' -A 5 'type.*Orderbook.*|func.*Orderbook' # Search for any streaming subscriber implementations rg -g '*.go' -A 5 'Subscribe.*Orderbook|Stream.*Orderbook' # Look for any filter-related logic in the codebase rg -g '*.go' -A 5 'filter.*[Oo]rder.*|[Oo]rder.*filter'Length of output: 75412
…treamUpdate if any OffchainUpdateV1 message is in scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
protocol/streaming/full_node_streaming_manager_test.go (1)
318-464
: Consider adding test cases for edge cases.The test cases are comprehensive but could benefit from additional edge cases:
- Test with malformed subaccount IDs
- Test with extremely large updates
- Test with concurrent subscriptions
Would you like me to provide example test cases for these scenarios?
protocol/streaming/full_node_streaming_manager.go (2)
223-239
: Improve error handling in filter function.The error handling could be enhanced to provide more context about which update caused the error.
Apply this diff to improve error logging:
- logger.Error(err.Error()) + logger.Error( + "Failed to get subaccount ID from update", + "error", err, + "update_type", fmt.Sprintf("%T", orderBookUpdate.UpdateMessage), + )
1167-1172
: Consider pre-allocating slices for better performance.When dealing with batch updates, pre-allocating slices can improve performance by reducing memory allocations.
Apply this diff to pre-allocate slices:
+ // Pre-allocate slices with estimated capacity + syncLocalUpdates := make([]clobtypes.StreamUpdate, 0, len(orderBookUpdatesToSyncLocalOpsQueue.Updates)) + syncLocalClobPairIds := make([]uint32, 0, len(orderBookUpdatesToSyncLocalOpsQueue.Updates)) + syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/query.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (5)
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
(6 hunks)proto/dydxprotocol/clob/query.proto
(1 hunks)protocol/streaming/full_node_streaming_manager.go
(6 hunks)protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)protocol/x/clob/keeper/grpc_stream_orderbook.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- proto/dydxprotocol/clob/query.proto
- protocol/x/clob/keeper/grpc_stream_orderbook.go
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-import-export
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test / run_command
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: test-race
- GitHub Check: test-coverage-upload
- GitHub Check: container-tests
- GitHub Check: Summary
🔇 Additional comments (2)
protocol/streaming/full_node_streaming_manager.go (1)
284-287
: LGTM! Error handling improved as suggested.The implementation now correctly returns an error when filtering is requested without subaccount IDs, as suggested in the previous review.
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (1)
281-286
: LGTM! Well-documented protobuf changes.The new
filterOrdersBySubaccountId
property is properly documented and follows protobuf naming conventions.Also applies to: 302-307
Quantums: uint64(10 ^ 6), | ||
Subticks: 1, | ||
GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ | ||
GoodTilBlock: 10 ^ 9, | ||
}, | ||
TimeInForce: 10 ^ 9, | ||
ReduceOnly: false, | ||
ClientMetadata: 0, | ||
ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, | ||
ConditionalOrderTriggerSubticks: 0, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix power operator usage in order creation functions.
The ^
operator is used incorrectly for exponentiation in multiple places. In Go, use math.Pow()
for floating-point exponentiation or bit shift (<<
) for powers of 2.
Apply this diff to fix the power operator usage:
- GoodTilBlock: 10 ^ 9,
- TimeInForce: 10 ^ 9,
+ GoodTilBlock: 1_000_000_000, // 10^9
+ TimeInForce: 1_000_000_000, // 10^9
- Quantums: uint64(10 ^ 6),
+ Quantums: uint64(1_000_000), // 10^6
- Exponent: 1 ^ -6,
+ Exponent: -1_000_000, // -10^6
Also applies to: 244-254
) | ||
|
||
const ( | ||
maxSubscriptionChannelSize = 2 ^ 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the bit shift operation.
The expression 2 ^ 10
uses the XOR operator (^
) instead of bit shift (<<
) to calculate 2¹⁰.
Apply this diff to fix the constant definition:
-maxSubscriptionChannelSize = 2 ^ 10
+maxSubscriptionChannelSize = 1 << 10
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
maxSubscriptionChannelSize = 2 ^ 10 | |
maxSubscriptionChannelSize = 1 << 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
protocol/streaming/full_node_streaming_manager_test.go (4)
19-23
: Document the rationale for channel size.Consider adding a comment explaining why
maxSubscriptionChannelSize
is set to 1024. This helps future maintainers understand if this is an arbitrary value or based on specific performance considerations.
121-132
: Add documentation for test utility functions.Consider adding documentation comments for utility functions like
NewStreamOrderbookFill
to explain their purpose and usage in tests. This helps other developers understand how to use these functions in their tests.
263-476
: Enhance test coverage with edge cases.While the test coverage is comprehensive, consider adding the following edge cases:
- Test with malformed or invalid subaccount IDs
- Test with extremely large updates that might cause channel buffer overflow
- Test concurrent access to the filter function
466-476
: Consider using test table driven setup and teardown.The test could benefit from a more structured approach using test table driven setup and teardown functions. This would help manage resources and ensure consistent test environment across all test cases.
Example structure:
func TestFilterStreamUpdates(t *testing.T) { type testSetup struct { logger *mocks.Logger // other setup fields } setup := func(t *testing.T) *testSetup { return &testSetup{ logger: &mocks.Logger{}, // other setup } } teardown := func(t *testing.T, s *testSetup) { // cleanup resources } for name, testCase := range tests { t.Run(name, func(t *testing.T) { s := setup(t) defer teardown(t, s) filteredUpdates := streaming.FilterSubaccountStreamUpdates( testCase.updates, testCase.subaccountIds, s.logger, ) // assertions }) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test-sim-nondeterminism
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-import-export
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: test / run_command
- GitHub Check: test-race
- GitHub Check: test-coverage-upload
- GitHub Check: liveness-test
- GitHub Check: container-tests
- GitHub Check: benchmark
- GitHub Check: Summary
🔇 Additional comments (1)
protocol/streaming/full_node_streaming_manager_test.go (1)
99-101
: Add error handling test cases for MockMessageSender.The
Send
method always returns nil. Consider adding test cases to verify behavior when the sender encounters network issues or other potential failures.
// Filter StreamUpdates for subaccountIdNumbers | ||
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message | ||
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new | ||
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts | ||
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates( | ||
output chan []clobtypes.StreamUpdate, | ||
func FilterSubaccountStreamUpdates( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to FilterStreamUpdateBySubaccount
to disambiguate from "filtering SubaccountStreamUpdate"
|
||
if len(filteredUpdates) > 0 { | ||
return &filteredUpdates | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for else
. Just do return nil
outside of if
statement above.
MarketId: 1, | ||
Price: pricestypes.MarketPrice{ | ||
Id: 1, | ||
Exponent: 1_000_000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This proto represents a price of Price * 10 ^ Exponent
so a more realistic value would be Exponent = 6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
protocol/streaming/full_node_streaming_manager_test.go (1)
223-238
: Consider using more realistic test values.The test data uses hardcoded values that could be made more realistic to better represent production scenarios.
Apply this diff to use more realistic values:
- Quantums: uint64(1_000_000), + Quantums: uint64(1_000_000_000), // 1 BTC = 10^9 satoshis - Subticks: 1, + Subticks: 25_000_000, // $25,000 price pointprotocol/streaming/full_node_streaming_manager.go (4)
223-239
: Consider optimizing subaccount lookup.The current implementation uses
slices.Contains
which has O(n) complexity. For better performance with large subaccount lists, consider using a map for O(1) lookup.func doFilterStreamUpdateBySubaccount( orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate, - subaccountIdNumbers []uint32, + subaccountIdNumbers map[uint32]struct{}, logger log.Logger, ) bool { for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates { orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) if err == nil { - if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { + if _, ok := subaccountIdNumbers[orderBookUpdateSubaccountIdNumber]; ok { return true } } else { logger.Error(err.Error()) } } return false }
241-267
: Improve error handling in the filter function.The function silently continues on errors from
GetOffChainUpdateV1SubaccountIdNumber
. Consider adding structured error logging with context.if err == nil { if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { return true } } else { - logger.Error(err.Error()) + logger.Error("Failed to get subaccount ID from update", + "error", err, + "update_type", fmt.Sprintf("%T", orderBookUpdate.UpdateMessage), + ) }
283-286
: Enhance error message for invalid filter request.The current error message could be more descriptive to help users understand why their request was invalid.
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) { - sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds") + sm.logger.Error( + "Invalid streaming request: filterOrdersBySubaccountId requires at least one subaccountId", + "filter_enabled", filterOrdersBySubAccountId, + "subaccount_count", len(subaccountIds), + ) return types.ErrInvalidStreamingRequest }
1166-1171
: Improve variable naming for clarity.The current variable names could be more descriptive to better convey their purpose.
- syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( + localOpsQueueUpdates, localOpsQueueClobPairIds := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) - sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds) + sm.cacheStreamUpdatesByClobPairWithLock(localOpsQueueUpdates, localOpsQueueClobPairIds)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
protocol/streaming/full_node_streaming_manager.go
(6 hunks)protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (11)
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-import-export
- GitHub Check: test / run_command
- GitHub Check: test-coverage-upload
- GitHub Check: liveness-test
- GitHub Check: container-tests
- GitHub Check: test-race
- GitHub Check: benchmark
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: Summary
🔇 Additional comments (3)
protocol/streaming/full_node_streaming_manager_test.go (3)
1-95
: Well-structured test helpers!The helper functions for creating test data are comprehensive and cover all order operations (open, cancel, replace, update). The constants are appropriately defined.
97-119
: Clean mock implementation and subscription setup!The mock message sender and subscription creation helper are minimal yet sufficient for testing purposes.
257-476
: Excellent test coverage!The test suite is comprehensive and well-structured:
- Covers all filtering scenarios including edge cases
- Tests different update types (orders, fills, prices)
- Clear test case names and organization
- Proper validation of filtered results
Changelist
Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.
The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.
The user would like to apply the sub account ID filter to the order messages, in addition to position messages.
The change will add boolean flags to the Websocket and GRPC streaming API's:
filterOrdersBySubaccountId
boolean field for WS request (if not provided, default to False)filter_orders_by_subaccount_id
boolean field forStreamOrderbookUpdatesRequest
protobuf (if not provided, default to False)For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.
If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, each slice of updates received from the subscription
updatesChannel
will be filtered like:StreamUpdate_OrderbookUpdate
, forward itStreamUpdate_OrderbookUpdate
, forward it only if one of theOffChainUpdateV1
messages inside are for a target subaccountTest Plan
Unit test
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
Summary by CodeRabbit
New Features
Improvements
Technical Updates
filterOrders
parameter.