From 8b12fdbcac570aa8e2b2eac7120c1a946a8c4d5c Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Fri, 2 Feb 2024 14:51:14 -0500 Subject: [PATCH] fix(noncer): initialize nonce before main txr loop (#60) * initialize + timeout * use configs * comment * changes * noncer cleanup * fix --- core/transactor/config.go | 2 +- core/transactor/tracker/noncer.go | 57 ++++++++++++++++--------------- core/transactor/transactor.go | 27 ++++++--------- types/queue/sqs/sqs.go | 7 ++++ 4 files changed, 49 insertions(+), 44 deletions(-) diff --git a/core/transactor/config.go b/core/transactor/config.go index 290cb4f..f78b477 100644 --- a/core/transactor/config.go +++ b/core/transactor/config.go @@ -8,7 +8,7 @@ type Config struct { Multicall3Address string TxReceiptTimeout time.Duration // how long to wait for a tx to be mined PendingNonceTimeout time.Duration // how long to wait for the pending nonce - EmtpyQueueDelay time.Duration // how long to wait if the queue is empty. + EmtpyQueueDelay time.Duration // how long to wait if the queue is empty TxBatchSize int TxBatchTimeout time.Duration // how long to wait for a batch to be flushed CallTxTimeout time.Duration // how long to wait for a eth call result diff --git a/core/transactor/tracker/noncer.go b/core/transactor/tracker/noncer.go index 3464897..fcdf89a 100644 --- a/core/transactor/tracker/noncer.go +++ b/core/transactor/tracker/noncer.go @@ -19,39 +19,31 @@ type Noncer struct { inFlight *skiplist.SkipList // The list of nonces currently in flight. mu sync.Mutex // Mutex for thread-safe operations. + pendingNonceTimeout time.Duration latestConfirmedNonce uint64 } // NewNoncer creates a new Noncer instance. -func NewNoncer(sender common.Address) *Noncer { +func NewNoncer(sender common.Address, pendingNonceTimeout time.Duration) *Noncer { return &Noncer{ - sender: sender, - acquired: skiplist.New(skiplist.Uint64), - inFlight: skiplist.New(skiplist.Uint64), - mu: sync.Mutex{}, + sender: sender, + acquired: skiplist.New(skiplist.Uint64), + inFlight: skiplist.New(skiplist.Uint64), + mu: sync.Mutex{}, + pendingNonceTimeout: pendingNonceTimeout, } } func (n *Noncer) RefreshLoop(ctx context.Context) { - go func() { - timer := time.NewTimer(5 * time.Second) //nolint:gomnd // fix later. - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - n.refreshConfirmedNonce(ctx) - } + timer := time.NewTimer(5 * time.Second) //nolint:gomnd // should be once per block. + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + n.latestConfirmedNonce, _ = n.ethClient.NonceAt(ctx, n.sender, nil) } - }() -} - -func (n *Noncer) refreshConfirmedNonce(ctx context.Context) { - latestConfirmedNonce, err := n.ethClient.NonceAt(ctx, n.sender, nil) - if err != nil { - return } - n.latestConfirmedNonce = latestConfirmedNonce } // Start initiates the nonce synchronization. @@ -59,9 +51,18 @@ func (n *Noncer) SetClient(ethClient eth.Client) { n.ethClient = ethClient } -func (n *Noncer) InitializeExistingTxs(ctx context.Context) error { - _, err := n.ethClient.TxPoolContent(ctx) - return err +// MustInitializeExistingTxs ensures we can read into the mempool for checking nonces later on. +func (n *Noncer) MustInitializeExistingTxs(ctx context.Context) { + var err error + + // use pending nonce to initialize if some txs are already backed up in mempool + if n.latestConfirmedNonce, err = n.ethClient.PendingNonceAt(ctx, n.sender); err != nil { + panic(err) + } + + if _, err = n.ethClient.TxPoolContent(ctx); err != nil { + panic(err) + } } // Acquire gets the next available nonce. @@ -91,8 +92,10 @@ func (n *Noncer) Acquire(ctx context.Context) (uint64, error) { } } else { var err error - // TODO: doing a network call while holding the lock is a bit dangerous - nextNonce, err = n.ethClient.PendingNonceAt(ctx, n.sender) + // TODO: Network call holds the lock for at most the pending timeout, which is not ideal. + ctxWithTimeout, cancel := context.WithTimeout(ctx, n.pendingNonceTimeout) + nextNonce, err = n.ethClient.PendingNonceAt(ctxWithTimeout, n.sender) + cancel() if err != nil { return 0, err } diff --git a/core/transactor/transactor.go b/core/transactor/transactor.go index 52eec05..6e1d6e7 100644 --- a/core/transactor/transactor.go +++ b/core/transactor/transactor.go @@ -18,8 +18,8 @@ import ( "github.com/ethereum/go-ethereum/common" ) -// awsMaxBatchSize is the max batch size for AWS. -const awsMaxBatchSize = 10 +// TODO: find a more appropriate value. +const inflightChanSize = 1024 // TxrV2 is the main transactor object. type TxrV2 struct { @@ -39,7 +39,7 @@ type TxrV2 struct { func NewTransactor( cfg Config, queue queuetypes.Queue[*types.TxRequest], signer kmstypes.TxSigner, ) *TxrV2 { - noncer := tracker.NewNoncer(signer.Address()) + noncer := tracker.NewNoncer(signer.Address(), cfg.PendingNonceTimeout) factory := factory.New( noncer, signer, factory.NewMulticall3Batcher(common.HexToAddress(cfg.Multicall3Address)), @@ -59,7 +59,7 @@ func NewTransactor( } // Register the tracker as a subscriber to the tracker. - ch := make(chan *tracker.InFlightTx, 1024) //nolint:gomnd // its okay. + ch := make(chan *tracker.InFlightTx, inflightChanSize) go func() { // TODO: handle error _ = tracker.NewSubscription(txr, txr.logger).Start(context.Background(), ch) @@ -67,7 +67,7 @@ func NewTransactor( dispatcher.Subscribe(ch) // Register the sender as a subscriber to the tracker. - ch2 := make(chan *tracker.InFlightTx, 1024) //nolint:gomnd // its okay. + ch2 := make(chan *tracker.InFlightTx, inflightChanSize) go func() { // TODO: handle error _ = tracker.NewSubscription(txr.sender, txr.logger).Start(context.Background(), ch2) @@ -83,12 +83,11 @@ func (t *TxrV2) RegistryKey() string { } // SubscribeTxResults sends the tx results (inflight) to the given channel. -func (t *TxrV2) SubscribeTxResults( - ctx context.Context, subscriber tracker.Subscriber, ch chan *tracker.InFlightTx, -) { +func (t *TxrV2) SubscribeTxResults(subscriber tracker.Subscriber) { + ch := make(chan *tracker.InFlightTx, inflightChanSize) go func() { // TODO: handle error - _ = tracker.NewSubscription(subscriber, t.logger).Start(ctx, ch) + _ = tracker.NewSubscription(subscriber, t.logger).Start(context.Background(), ch) }() t.dispatcher.Subscribe(ch) } @@ -132,9 +131,7 @@ func (t *TxrV2) Start(ctx context.Context) { // mainLoop is the main transaction sending / batching loop. func (t *TxrV2) mainLoop(ctx context.Context) { - if err := t.noncer.InitializeExistingTxs(ctx); err != nil { - panic(err) - } + t.noncer.MustInitializeExistingTxs(ctx) for { select { @@ -171,11 +168,9 @@ func (t *TxrV2) retrieveBatch(_ context.Context) ([]string, []*types.TxRequest) var retMsgIDs []string startTime := time.Now() - // Retrieve the smaller of the aws max batch size or the delta between the max total batch size. + // Retrieve the delta between the max total batch size. for len(batch) < t.cfg.TxBatchSize && time.Since(startTime) < t.cfg.TxBatchTimeout { - msgIDs, txReq, err := t.requests.ReceiveMany( - int32(min(awsMaxBatchSize, t.cfg.TxBatchSize-len(batch))), - ) + msgIDs, txReq, err := t.requests.ReceiveMany(int32(t.cfg.TxBatchSize - len(batch))) if err != nil { t.logger.Error("failed to receive tx request", "err", err) continue diff --git a/types/queue/sqs/sqs.go b/types/queue/sqs/sqs.go index aac031a..04132e2 100644 --- a/types/queue/sqs/sqs.go +++ b/types/queue/sqs/sqs.go @@ -14,6 +14,9 @@ import ( "github.com/berachain/offchain-sdk/types/queue/types" ) +// awsMaxBatchSize is the max batch size for AWS. +const awsMaxBatchSize = 10 + // SQSClient is an interface that defines the necessary methods for interacting // with the SQS service. type Client interface { @@ -123,6 +126,10 @@ func (q *Queue[T]) Receive() (string, T, bool) { } func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, error) { + if num > awsMaxBatchSize { + num = awsMaxBatchSize + } + ts := make([]T, 0) msgIDs := make([]string, 0)