Skip to content

Commit

Permalink
fix(noncer): initialize nonce before main txr loop (#60)
Browse files Browse the repository at this point in the history
* initialize + timeout

* use configs

* comment

* changes

* noncer cleanup

* fix
  • Loading branch information
calbera authored Feb 2, 2024
1 parent ab5568c commit 8b12fdb
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 44 deletions.
2 changes: 1 addition & 1 deletion core/transactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Config struct {
Multicall3Address string
TxReceiptTimeout time.Duration // how long to wait for a tx to be mined
PendingNonceTimeout time.Duration // how long to wait for the pending nonce
EmtpyQueueDelay time.Duration // how long to wait if the queue is empty.
EmtpyQueueDelay time.Duration // how long to wait if the queue is empty
TxBatchSize int
TxBatchTimeout time.Duration // how long to wait for a batch to be flushed
CallTxTimeout time.Duration // how long to wait for a eth call result
Expand Down
57 changes: 30 additions & 27 deletions core/transactor/tracker/noncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,50 @@ type Noncer struct {
inFlight *skiplist.SkipList // The list of nonces currently in flight.
mu sync.Mutex // Mutex for thread-safe operations.

pendingNonceTimeout time.Duration
latestConfirmedNonce uint64
}

// NewNoncer creates a new Noncer instance.
func NewNoncer(sender common.Address) *Noncer {
func NewNoncer(sender common.Address, pendingNonceTimeout time.Duration) *Noncer {
return &Noncer{
sender: sender,
acquired: skiplist.New(skiplist.Uint64),
inFlight: skiplist.New(skiplist.Uint64),
mu: sync.Mutex{},
sender: sender,
acquired: skiplist.New(skiplist.Uint64),
inFlight: skiplist.New(skiplist.Uint64),
mu: sync.Mutex{},
pendingNonceTimeout: pendingNonceTimeout,
}
}

func (n *Noncer) RefreshLoop(ctx context.Context) {
go func() {
timer := time.NewTimer(5 * time.Second) //nolint:gomnd // fix later.
for {
select {
case <-ctx.Done():
return
case <-timer.C:
n.refreshConfirmedNonce(ctx)
}
timer := time.NewTimer(5 * time.Second) //nolint:gomnd // should be once per block.
for {
select {
case <-ctx.Done():
return
case <-timer.C:
n.latestConfirmedNonce, _ = n.ethClient.NonceAt(ctx, n.sender, nil)
}
}()
}

func (n *Noncer) refreshConfirmedNonce(ctx context.Context) {
latestConfirmedNonce, err := n.ethClient.NonceAt(ctx, n.sender, nil)
if err != nil {
return
}
n.latestConfirmedNonce = latestConfirmedNonce
}

// Start initiates the nonce synchronization.
func (n *Noncer) SetClient(ethClient eth.Client) {
n.ethClient = ethClient
}

func (n *Noncer) InitializeExistingTxs(ctx context.Context) error {
_, err := n.ethClient.TxPoolContent(ctx)
return err
// MustInitializeExistingTxs ensures we can read into the mempool for checking nonces later on.
func (n *Noncer) MustInitializeExistingTxs(ctx context.Context) {
var err error

// use pending nonce to initialize if some txs are already backed up in mempool
if n.latestConfirmedNonce, err = n.ethClient.PendingNonceAt(ctx, n.sender); err != nil {
panic(err)
}

if _, err = n.ethClient.TxPoolContent(ctx); err != nil {
panic(err)
}
}

// Acquire gets the next available nonce.
Expand Down Expand Up @@ -91,8 +92,10 @@ func (n *Noncer) Acquire(ctx context.Context) (uint64, error) {
}
} else {
var err error
// TODO: doing a network call while holding the lock is a bit dangerous
nextNonce, err = n.ethClient.PendingNonceAt(ctx, n.sender)
// TODO: Network call holds the lock for at most the pending timeout, which is not ideal.
ctxWithTimeout, cancel := context.WithTimeout(ctx, n.pendingNonceTimeout)
nextNonce, err = n.ethClient.PendingNonceAt(ctxWithTimeout, n.sender)
cancel()
if err != nil {
return 0, err
}
Expand Down
27 changes: 11 additions & 16 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/ethereum/go-ethereum/common"
)

// awsMaxBatchSize is the max batch size for AWS.
const awsMaxBatchSize = 10
// TODO: find a more appropriate value.
const inflightChanSize = 1024

// TxrV2 is the main transactor object.
type TxrV2 struct {
Expand All @@ -39,7 +39,7 @@ type TxrV2 struct {
func NewTransactor(
cfg Config, queue queuetypes.Queue[*types.TxRequest], signer kmstypes.TxSigner,
) *TxrV2 {
noncer := tracker.NewNoncer(signer.Address())
noncer := tracker.NewNoncer(signer.Address(), cfg.PendingNonceTimeout)
factory := factory.New(
noncer, signer,
factory.NewMulticall3Batcher(common.HexToAddress(cfg.Multicall3Address)),
Expand All @@ -59,15 +59,15 @@ func NewTransactor(
}

// Register the tracker as a subscriber to the tracker.
ch := make(chan *tracker.InFlightTx, 1024) //nolint:gomnd // its okay.
ch := make(chan *tracker.InFlightTx, inflightChanSize)
go func() {
// TODO: handle error
_ = tracker.NewSubscription(txr, txr.logger).Start(context.Background(), ch)
}()
dispatcher.Subscribe(ch)

// Register the sender as a subscriber to the tracker.
ch2 := make(chan *tracker.InFlightTx, 1024) //nolint:gomnd // its okay.
ch2 := make(chan *tracker.InFlightTx, inflightChanSize)
go func() {
// TODO: handle error
_ = tracker.NewSubscription(txr.sender, txr.logger).Start(context.Background(), ch2)
Expand All @@ -83,12 +83,11 @@ func (t *TxrV2) RegistryKey() string {
}

// SubscribeTxResults sends the tx results (inflight) to the given channel.
func (t *TxrV2) SubscribeTxResults(
ctx context.Context, subscriber tracker.Subscriber, ch chan *tracker.InFlightTx,
) {
func (t *TxrV2) SubscribeTxResults(subscriber tracker.Subscriber) {
ch := make(chan *tracker.InFlightTx, inflightChanSize)
go func() {
// TODO: handle error
_ = tracker.NewSubscription(subscriber, t.logger).Start(ctx, ch)
_ = tracker.NewSubscription(subscriber, t.logger).Start(context.Background(), ch)
}()
t.dispatcher.Subscribe(ch)
}
Expand Down Expand Up @@ -132,9 +131,7 @@ func (t *TxrV2) Start(ctx context.Context) {

// mainLoop is the main transaction sending / batching loop.
func (t *TxrV2) mainLoop(ctx context.Context) {
if err := t.noncer.InitializeExistingTxs(ctx); err != nil {
panic(err)
}
t.noncer.MustInitializeExistingTxs(ctx)

for {
select {
Expand Down Expand Up @@ -171,11 +168,9 @@ func (t *TxrV2) retrieveBatch(_ context.Context) ([]string, []*types.TxRequest)
var retMsgIDs []string
startTime := time.Now()

// Retrieve the smaller of the aws max batch size or the delta between the max total batch size.
// Retrieve the delta between the max total batch size.
for len(batch) < t.cfg.TxBatchSize && time.Since(startTime) < t.cfg.TxBatchTimeout {
msgIDs, txReq, err := t.requests.ReceiveMany(
int32(min(awsMaxBatchSize, t.cfg.TxBatchSize-len(batch))),
)
msgIDs, txReq, err := t.requests.ReceiveMany(int32(t.cfg.TxBatchSize - len(batch)))
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
Expand Down
7 changes: 7 additions & 0 deletions types/queue/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/berachain/offchain-sdk/types/queue/types"
)

// awsMaxBatchSize is the max batch size for AWS.
const awsMaxBatchSize = 10

// SQSClient is an interface that defines the necessary methods for interacting
// with the SQS service.
type Client interface {
Expand Down Expand Up @@ -123,6 +126,10 @@ func (q *Queue[T]) Receive() (string, T, bool) {
}

func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, error) {
if num > awsMaxBatchSize {
num = awsMaxBatchSize
}

ts := make([]T, 0)
msgIDs := make([]string, 0)

Expand Down

0 comments on commit 8b12fdb

Please sign in to comment.