Skip to content

Commit

Permalink
sync client
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Jul 26, 2024
1 parent 83dec4e commit 8950ba9
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 69 deletions.
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,7 @@ func New(

if app.GrpcStreamingManager.Enabled() {
app.GrpcStreamingTestClient = streamingclient.NewGrpcClient(appFlags, app.Logger())
app.GrpcStreamingManager.SubscribeTestClient(app.GrpcStreamingTestClient)
}

// Report out app version and git commit. This will be run when validators restart.
Expand Down
10 changes: 6 additions & 4 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 38 additions & 40 deletions protocol/streaming/grpc/client/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package client

import (
"context"
"sync"

"cosmossdk.io/log"

appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand Down Expand Up @@ -40,44 +38,44 @@ func NewGrpcClient(appflags appflags.Flags, logger log.Logger) *GrpcClient {
}

// Subscribe to grpc orderbook updates.
go func() {
grpcClient := daemontypes.GrpcClientImpl{}

// Make a connection to the Cosmos gRPC query services.
queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress)
if err != nil {
logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err)
return
}
defer func() {
if err := grpcClient.CloseConnection(queryConn); err != nil {
logger.Error("Failed to close gRPC connection", "error", err)
}
}()

clobQueryClient := clobtypes.NewQueryClient(queryConn)
updateClient, err := clobQueryClient.StreamOrderbookUpdates(
context.Background(),
&clobtypes.StreamOrderbookUpdatesRequest{
ClobPairId: []uint32{0, 1},
},
)
if err != nil {
logger.Error("Failed to stream orderbook updates", "error", err)
return
}

for {
update, err := updateClient.Recv()
if err != nil {
logger.Error("Failed to receive orderbook update", "error", err)
return
}

logger.Info("Received orderbook update", "update", update)
client.Update(update)
}
}()
// go func() {
// grpcClient := daemontypes.GrpcClientImpl{}

// // Make a connection to the Cosmos gRPC query services.
// queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress)
// if err != nil {
// logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err)
// return
// }
// defer func() {
// if err := grpcClient.CloseConnection(queryConn); err != nil {
// logger.Error("Failed to close gRPC connection", "error", err)
// }
// }()

// clobQueryClient := clobtypes.NewQueryClient(queryConn)
// updateClient, err := clobQueryClient.StreamOrderbookUpdates(
// context.Background(),
// &clobtypes.StreamOrderbookUpdatesRequest{
// ClobPairId: []uint32{0, 1},
// },
// )
// if err != nil {
// logger.Error("Failed to stream orderbook updates", "error", err)
// return
// }

// for {
// update, err := updateClient.Recv()
// if err != nil {
// logger.Error("Failed to receive orderbook update", "error", err)
// return
// }

// logger.Info("Received orderbook update", "update", update)
// client.Update(update)
// }
// }()
return client
}

Expand Down
75 changes: 61 additions & 14 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand Down Expand Up @@ -52,6 +53,9 @@ type OrderbookSubscription struct {

// Channel to buffer writes before the stream
updatesChannel chan []clobtypes.StreamUpdate

// Testing
client *client.GrpcClient
}

func NewGrpcStreamingManager(
Expand Down Expand Up @@ -185,7 +189,20 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
return err
}

// removeSubscription removes a subscription from the grpc streaming manager.
func (sm *GrpcStreamingManagerImpl) SubscribeTestClient(client *client.GrpcClient) {
subscription := &OrderbookSubscription{
clobPairIds: []uint32{0, 1},
client: client,
}

sm.Lock()
defer sm.Unlock()

sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
sm.nextSubscriptionId++
}

// removeSubscription removes a subscription from the streaming manager.
// The streaming manager's lock should already be acquired before calling this.
func (sm *GrpcStreamingManagerImpl) removeSubscription(
subscriptionIdToRemove uint32,
Expand Down Expand Up @@ -253,17 +270,11 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdates:
default:
sm.logger.Error(
fmt.Sprintf(
"GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
subscriptionId,
),
)
removeSubscription = true
}
subscription.client.Update(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdates,
},
)
}

// Clean up subscriptions that have been closed.
Expand Down Expand Up @@ -317,7 +328,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
sm.Lock()
defer sm.Unlock()

for _, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := updatesByClobPairId[clobPairId]; ok {
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
subscription.client.Update(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
},
)
}
}
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand Down Expand Up @@ -354,7 +383,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
sm.Lock()
defer sm.Unlock()

for _, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := updatesByClobPairId[clobPairId]; ok {
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
subscription.client.Update(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
},
)
}
}
}

func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
Expand Down
4 changes: 4 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand Down Expand Up @@ -42,6 +43,9 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SubscribeTestClient(client *client.GrpcClient) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
Expand Down
4 changes: 4 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

Expand Down Expand Up @@ -31,6 +32,9 @@ type GrpcStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
SubscribeTestClient(
client *client.GrpcClient,
)
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
Expand Down
9 changes: 6 additions & 3 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func PrepareCheckState(
log.BlockHeight, ctx.BlockHeight()+1,
)

// Prune any rate limiting information that is no longer relevant.
keeper.PruneRateLimits(ctx)

// Initialize new streams with orderbook snapshots, if any.
keeper.InitializeNewGrpcStreams(ctx)

// Get the events generated from processing the matches in the latest block.
processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx)
if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) {
Expand Down Expand Up @@ -254,9 +260,6 @@ func PrepareCheckState(
types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue),
)

// Initialize new GRPC streams with orderbook snapshots, if any.
keeper.InitializeNewGrpcStreams(ctx)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
}
Loading

0 comments on commit 8950ba9

Please sign in to comment.