diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 9c1b1bc647..636912433e 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -392,7 +392,7 @@ func handleQueryRequestsImpl( zap.Stringer("lastUpdateTime", pcq.lastUpdateTime), zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()), ) - pcq.ccqForwardToWatcher(qLogger, pq.receiveTime) + pcq.ccqForwardToWatcher(qLogger, now) } } } diff --git a/node/pkg/query/request.go b/node/pkg/query/request.go index a05d67631f..7c4c2c4aa7 100644 --- a/node/pkg/query/request.go +++ b/node/pkg/query/request.go @@ -125,6 +125,8 @@ const SolanaAccountQueryRequestType ChainSpecificQueryType = 4 type SolanaAccountQueryRequest struct { // Commitment identifies the commitment level to be used in the queried. Currently it may only "finalized". // Before we can support "confirmed", we need a way to read the account data and the block information atomically. + // We would also need to deal with the fact that queries are only handled in the finalized watcher and it does not + // have access to the latest confirmed slot needed for MinContextSlot retries. Commitment string // The minimum slot that the request can be evaluated at. Zero means unused. diff --git a/node/pkg/watchers/solana/ccq.go b/node/pkg/watchers/solana/ccq.go index 6387d314c0..2332cf3711 100644 --- a/node/pkg/watchers/solana/ccq.go +++ b/node/pkg/watchers/solana/ccq.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "encoding/json" "errors" - "fmt" "strconv" "time" @@ -227,24 +226,27 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext( return false } - isMinContext, currentSlot, err := ccqIsMinContextSlotError(err) - if err != nil { - w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err)) + isMinContext, currentSlotFromError := ccqIsMinContextSlotError(err) + if !isMinContext { return false } - if !isMinContext { - return false + var currentSlot uint64 + if currentSlotFromError != 0 { + currentSlot = currentSlotFromError + } else { + currentSlot = w.GetLatestFinalizedBlockNumber() } // Estimate how far in the future the requested slot is, using our estimated slot time. futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME - // If the requested slot is more than ten seconds in the future, use the regular retry mechanism. - if futureSlotEstimate > query.RetryInterval { + // If the requested slot is definitively more than the retry interval, use the regular retry mechanism. + if futureSlotEstimate > query.RetryInterval*2 { w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry", zap.String("requestId", requestId), zap.Uint64("currentSlot", currentSlot), + zap.Uint64("currentSlotFromError", currentSlotFromError), zap.Uint64("minContextSlot", req.MinContextSlot), zap.Stringer("futureSlotEstimate", futureSlotEstimate), ) @@ -252,16 +254,26 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext( } // Kick off the retry after a short delay. - go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log) + go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log) return true } // ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry. -func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) { +func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery( + ctx context.Context, + queryRequest *query.PerChainQueryInternal, + req *query.SolanaAccountQueryRequest, + requestId string, + currentSlot uint64, + currentSlotFromError uint64, + giveUpTime time.Time, + log bool, +) { if log { w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly", zap.String("requestId", requestId), zap.Uint64("currentSlot", currentSlot), + zap.Uint64("currentSlotFromError", currentSlotFromError), zap.Uint64("minContextSlot", req.MinContextSlot), zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL), ) @@ -277,42 +289,46 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryR } // ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number -func ccqIsMinContextSlotError(err error) (bool, uint64, error) { +func ccqIsMinContextSlotError(err error) (bool, uint64) { /* - A MinContextSlot error looks like this (and contains the context slot): - "(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n" + A MinContextSlot error looks like this (and contains the context slot): + "(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n" + + Except some endpoints return something like this instead: + "(*jsonrpc.RPCError)(0xc03c0bcd20)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (interface {}) \n})\n" */ var rpcErr *jsonrpc.RPCError if !errors.As(err, &rpcErr) { - return false, 0, nil // Some other kind of error. That's okay. + return false, 0 // Some other kind of error. } if rpcErr.Code != -32016 { // Minimum context slot has not been reached - return false, 0, nil // Some other kind of RPC error. That's okay. + return false, 0 // Some other kind of RPC error. } - // From here on down, any error is bad because the MinContextSlot error is not in the expected format. + // We know it is a MinContextSlot error. If it contains the current slot number, extract and return that. + // Since some Solana endpoints do not return that, we can't treat it as an error if it is missing. m, ok := rpcErr.Data.(map[string]interface{}) if !ok { - return false, 0, fmt.Errorf("failed to extract data from min context slot error") + return true, 0 } contextSlot, ok := m["contextSlot"] if !ok { - return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`) + return true, 0 } currentSlotAsJson, ok := contextSlot.(json.Number) if !ok { - return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`) + return true, 0 } currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64) if typeErr != nil { - return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err) + return true, 0 } - return true, currentSlot, nil + return true, currentSlot } type M map[string]interface{} diff --git a/node/pkg/watchers/solana/ccq_test.go b/node/pkg/watchers/solana/ccq_test.go index 9f61854148..9c0931b3f6 100644 --- a/node/pkg/watchers/solana/ccq_test.go +++ b/node/pkg/watchers/solana/ccq_test.go @@ -3,7 +3,6 @@ package solana import ( "encoding/json" "fmt" - "strings" "testing" "github.com/certusone/wormhole/node/pkg/query" @@ -26,16 +25,14 @@ func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) { }, } - isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr)) - require.NoError(t, err) + isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr)) require.True(t, isMinContext) assert.Equal(t, uint64(13526), currentSlot) } func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) { myErr := fmt.Errorf("Some other error") - isMinContext, _, err := ccqIsMinContextSlotError(error(myErr)) - require.NoError(t, err) + isMinContext, _ := ccqIsMinContextSlotError(error(myErr)) require.False(t, isMinContext) } @@ -48,8 +45,7 @@ func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) { }, } - isMinContext, _, err := ccqIsMinContextSlotError(error(myErr)) - require.NoError(t, err) + isMinContext, _ := ccqIsMinContextSlotError(error(myErr)) require.False(t, isMinContext) } @@ -59,8 +55,9 @@ func TestCcqIsMinContextSlotErrorNoData(t *testing.T) { Message: "Minimum context slot has not been reached", } - _, _, err := ccqIsMinContextSlotError(error(myErr)) - assert.EqualError(t, err, `failed to extract data from min context slot error`) + isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr)) + require.True(t, isMinContext) + assert.Equal(t, uint64(0), currentSlot) } func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) { @@ -72,8 +69,9 @@ func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) { }, } - _, _, err := ccqIsMinContextSlotError(error(myErr)) - assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`) + isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr)) + require.True(t, isMinContext) + assert.Equal(t, uint64(0), currentSlot) } func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) { @@ -85,8 +83,9 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) { }, } - _, _, err := ccqIsMinContextSlotError(error(myErr)) - assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`) + isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr)) + require.True(t, isMinContext) + assert.Equal(t, uint64(0), currentSlot) } func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) { @@ -98,6 +97,7 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) { }, } - _, _, err := ccqIsMinContextSlotError(error(myErr)) - assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`)) + isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr)) + require.True(t, isMinContext) + assert.Equal(t, uint64(0), currentSlot) }