diff --git a/protocol/streaming/grpc/client/client.go b/protocol/streaming/grpc/client/client.go index 113be59442..8e860cc973 100644 --- a/protocol/streaming/grpc/client/client.go +++ b/protocol/streaming/grpc/client/client.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "sync" "cosmossdk.io/log" @@ -87,6 +88,11 @@ func (c *GrpcClient) GetOrderbookSnapshot(pairId uint32) *LocalOrderbook { // Write method for stream orderbook updates. func (c *GrpcClient) Update(updates *clobtypes.StreamOrderbookUpdatesResponse) { for _, update := range updates.GetUpdates() { + c.Logger.Info( + fmt.Sprintf("Received stream update for callback %+v", update.ExecMode), + "blockHeight", + update.BlockHeight, + ) if orderUpdate := update.GetOrderbookUpdate(); orderUpdate != nil { c.ProcessOrderbookUpdate(orderUpdate) } @@ -106,18 +112,30 @@ func (c *GrpcClient) ProcessOrderbookUpdate(orderUpdate *clobtypes.StreamOrderbo for _, update := range orderUpdate.Updates { if orderPlace := update.GetOrderPlace(); orderPlace != nil { order := orderPlace.GetOrder() + c.Logger.Info( + "place order recieved", + "orderId", IndexerOrderIdToOrderId(order.OrderId).String(), + ) orderbook := c.GetOrderbook(order.OrderId.ClobPairId) orderbook.AddOrder(*order) } if orderRemove := update.GetOrderRemove(); orderRemove != nil { orderId := orderRemove.RemovedOrderId + c.Logger.Info( + "remove order recieved", + "orderId", IndexerOrderIdToOrderId(*orderId).String(), + ) orderbook := c.GetOrderbook(orderId.ClobPairId) orderbook.RemoveOrder(*orderId) } if orderUpdate := update.GetOrderUpdate(); orderUpdate != nil { orderId := orderUpdate.OrderId + c.Logger.Info( + "update order recieved", + "orderId", IndexerOrderIdToOrderId(*orderId).String(), + ) orderbook := c.GetOrderbook(orderId.ClobPairId) orderbook.SetOrderFillAmount(orderId, orderUpdate.TotalFilledQuantums) } @@ -145,6 +163,9 @@ func (c *GrpcClient) ProcessMatchPerpetualLiquidation( orderMap map[clobtypes.OrderId]clobtypes.Order, fillAmountMap map[clobtypes.OrderId]uint64, ) { + c.Logger.Info( + "liquidation order recieved", + ) localOrderbook := c.Orderbook[perpLiquidation.ClobPairId] for _, fill := range perpLiquidation.GetFills() { makerOrder := orderMap[fill.MakerOrderId] @@ -158,6 +179,9 @@ func (c *GrpcClient) ProcessMatchOrders( orderMap map[clobtypes.OrderId]clobtypes.Order, fillAmountMap map[clobtypes.OrderId]uint64, ) { + c.Logger.Info( + "match order recieved", + ) takerOrderId := matchOrders.TakerOrderId clobPairId := takerOrderId.GetClobPairId() localOrderbook := c.Orderbook[clobPairId] @@ -287,6 +311,11 @@ func (l *LocalOrderbook) SetOrderFillAmount( l.Lock() defer l.Unlock() + l.Logger.Info( + fmt.Sprintf("local fill set to %+v", fillAmount), + "orderId", IndexerOrderIdToOrderId(*orderId).String(), + ) + if fillAmount == 0 { delete(l.FillAmounts, *orderId) } else { diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 8efbfc71e1..0b821f26fc 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -162,8 +162,16 @@ func PrepareCheckState( log.LocalValidatorOperationsQueue, types.GetInternalOperationsQueueTextString(localValidatorOperationsQueue), ) + log.InfoLog(ctx, + "removing all operations from local opqueue", + ) + keeper.MemClob.RemoveAndClearOperationsQueue(ctx, localValidatorOperationsQueue) + log.InfoLog(ctx, + "purging state from local opqueue", + ) + // 2. Purge invalid state from the memclob. offchainUpdates := types.NewOffchainUpdates() offchainUpdates = keeper.MemClob.PurgeInvalidMemclobState( @@ -175,6 +183,10 @@ func PrepareCheckState( offchainUpdates, ) + log.InfoLog(ctx, + "place stateful order placements", + ) + // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. @@ -204,6 +216,10 @@ func PrepareCheckState( offchainUpdates, ) + log.InfoLog(ctx, + "replay local validator operations", + ) + // 5. Replay the local validator’s operations onto the book. replayUpdates := keeper.MemClob.ReplayOperations( ctx, diff --git a/protocol/x/clob/keeper/grpc_stream_orderbook.go b/protocol/x/clob/keeper/grpc_stream_orderbook.go index 22b087feb5..090593393b 100644 --- a/protocol/x/clob/keeper/grpc_stream_orderbook.go +++ b/protocol/x/clob/keeper/grpc_stream_orderbook.go @@ -220,9 +220,12 @@ func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( localOrderbookFillAmount := localOrderbook.FillAmounts[indexerOrderId] if orderFillAmount != localOrderbookFillAmount { + order := localOrderbook.OrderIdToOrder[v1.OrderIdToIndexerOrderId(orderId)] logger.Error( "Fill Amount Mismatch", "orderId", orderId.String(), + "time_in_force", order.TimeInForce, + "order_flags", order.OrderId.OrderFlags, "state_fill_amt", orderFillAmount, "local_fill_amt", localOrderbookFillAmount, ) diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 24ff70a22f..1f36de2f28 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -3,6 +3,7 @@ package keeper import ( "bytes" "encoding/binary" + "fmt" "cosmossdk.io/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" @@ -93,6 +94,12 @@ func (k Keeper) SetOrderFillAmount( orderId.ToStateKey(), orderFillStateBytes, ) + + log.InfoLog( + ctx, + fmt.Sprintf("real fill set to %+v", fillAmount), + "orderId", orderId.String(), + ) } // GetOrderFillAmount returns the total `fillAmount` and `prunableBlockHeight` from the memStore. @@ -277,6 +284,12 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) { ) memStore.Delete(orderId.ToStateKey()) + log.InfoLog( + ctx, + "real fill set to 0 (remove)", + "orderId", orderId.String(), + ) + // If grpc stream is on, zero out the fill amount. if k.GetGrpcStreamingManager().Enabled() { allUpdates := types.NewOffchainUpdates()