Skip to content

Commit

Permalink
Node/CCQ: Solana min context slot fix (#3751)
Browse files Browse the repository at this point in the history
* Node/CCQ: Solana min context slot fix

* Node/CCQ: retry not updating timeout
  • Loading branch information
bruce-riley authored Jan 29, 2024
1 parent 5fa8379 commit 3f074f3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 37 deletions.
2 changes: 1 addition & 1 deletion node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func handleQueryRequestsImpl(
zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
)
pcq.ccqForwardToWatcher(qLogger, pq.receiveTime)
pcq.ccqForwardToWatcher(qLogger, now)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/query/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ const SolanaAccountQueryRequestType ChainSpecificQueryType = 4
type SolanaAccountQueryRequest struct {
// Commitment identifies the commitment level to be used in the queried. Currently it may only "finalized".
// Before we can support "confirmed", we need a way to read the account data and the block information atomically.
// We would also need to deal with the fact that queries are only handled in the finalized watcher and it does not
// have access to the latest confirmed slot needed for MinContextSlot retries.
Commitment string

// The minimum slot that the request can be evaluated at. Zero means unused.
Expand Down
58 changes: 37 additions & 21 deletions node/pkg/watchers/solana/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -227,41 +226,54 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext(
return false
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(err)
if err != nil {
w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err))
isMinContext, currentSlotFromError := ccqIsMinContextSlotError(err)
if !isMinContext {
return false
}

if !isMinContext {
return false
var currentSlot uint64
if currentSlotFromError != 0 {
currentSlot = currentSlotFromError
} else {
currentSlot = w.GetLatestFinalizedBlockNumber()
}

// Estimate how far in the future the requested slot is, using our estimated slot time.
futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME

// If the requested slot is more than ten seconds in the future, use the regular retry mechanism.
if futureSlotEstimate > query.RetryInterval {
// If the requested slot is definitively more than the retry interval, use the regular retry mechanism.
if futureSlotEstimate > query.RetryInterval*2 {
w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("currentSlotFromError", currentSlotFromError),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Stringer("futureSlotEstimate", futureSlotEstimate),
)
return false
}

// Kick off the retry after a short delay.
go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log)
go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log)
return true
}

// ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry.
func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) {
func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(
ctx context.Context,
queryRequest *query.PerChainQueryInternal,
req *query.SolanaAccountQueryRequest,
requestId string,
currentSlot uint64,
currentSlotFromError uint64,
giveUpTime time.Time,
log bool,
) {
if log {
w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("currentSlotFromError", currentSlotFromError),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL),
)
Expand All @@ -277,42 +289,46 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryR
}

// ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
func ccqIsMinContextSlotError(err error) (bool, uint64, error) {
func ccqIsMinContextSlotError(err error) (bool, uint64) {
/*
A MinContextSlot error looks like this (and contains the context slot):
"(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
A MinContextSlot error looks like this (and contains the context slot):
"(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
Except some endpoints return something like this instead:
"(*jsonrpc.RPCError)(0xc03c0bcd20)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (interface {}) <nil>\n})\n"
*/
var rpcErr *jsonrpc.RPCError
if !errors.As(err, &rpcErr) {
return false, 0, nil // Some other kind of error. That's okay.
return false, 0 // Some other kind of error.
}

if rpcErr.Code != -32016 { // Minimum context slot has not been reached
return false, 0, nil // Some other kind of RPC error. That's okay.
return false, 0 // Some other kind of RPC error.
}

// From here on down, any error is bad because the MinContextSlot error is not in the expected format.
// We know it is a MinContextSlot error. If it contains the current slot number, extract and return that.
// Since some Solana endpoints do not return that, we can't treat it as an error if it is missing.
m, ok := rpcErr.Data.(map[string]interface{})
if !ok {
return false, 0, fmt.Errorf("failed to extract data from min context slot error")
return true, 0
}

contextSlot, ok := m["contextSlot"]
if !ok {
return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`)
return true, 0
}

currentSlotAsJson, ok := contextSlot.(json.Number)
if !ok {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`)
return true, 0
}

currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64)
if typeErr != nil {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err)
return true, 0
}

return true, currentSlot, nil
return true, currentSlot
}

type M map[string]interface{}
Expand Down
30 changes: 15 additions & 15 deletions node/pkg/watchers/solana/ccq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package solana
import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/certusone/wormhole/node/pkg/query"
Expand All @@ -26,16 +25,14 @@ func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) {
},
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
require.True(t, isMinContext)
assert.Equal(t, uint64(13526), currentSlot)
}

func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) {
myErr := fmt.Errorf("Some other error")
isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
isMinContext, _ := ccqIsMinContextSlotError(error(myErr))
require.False(t, isMinContext)
}

Expand All @@ -48,8 +45,7 @@ func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) {
},
}

isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
isMinContext, _ := ccqIsMinContextSlotError(error(myErr))
require.False(t, isMinContext)
}

Expand All @@ -59,8 +55,9 @@ func TestCcqIsMinContextSlotErrorNoData(t *testing.T) {
Message: "Minimum context slot has not been reached",
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `failed to extract data from min context slot error`)
isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
require.True(t, isMinContext)
assert.Equal(t, uint64(0), currentSlot)
}

func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
Expand All @@ -72,8 +69,9 @@ func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`)
isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
require.True(t, isMinContext)
assert.Equal(t, uint64(0), currentSlot)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
Expand All @@ -85,8 +83,9 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`)
isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
require.True(t, isMinContext)
assert.Equal(t, uint64(0), currentSlot)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
Expand All @@ -98,6 +97,7 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`))
isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
require.True(t, isMinContext)
assert.Equal(t, uint64(0), currentSlot)
}

0 comments on commit 3f074f3

Please sign in to comment.