Skip to content

Commit

Permalink
Submitter: fix transaction bumping (#2756)
Browse files Browse the repository at this point in the history
* Feat: use vanilla wg instead of errgroup

* Cleanup: make wg local

* [goreleaser]
  • Loading branch information
dwasse authored Jun 21, 2024
1 parent b189451 commit 37af1a8
Showing 1 changed file with 101 additions and 96 deletions.
197 changes: 101 additions & 96 deletions ethergo/submitter/chain_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

// chainQueue is a single use queue for a single chain.
type chainQueue struct {
*txSubmitterImpl
// client is the client for this chain
client client.EVM
// g is the errgroup for this chain
g *errgroup.Group
// client is the nonce used for this chain
nonce uint64
// txsHaveConfirmed is true if any of the txes have confirmed
Expand Down Expand Up @@ -71,11 +68,10 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
span.AddEvent("could not register nonce", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}

g, gCtx := errgroup.WithContext(ctx)
wg := &sync.WaitGroup{}

cq := chainQueue{
txSubmitterImpl: t,
g: g,
chainID: core.CopyBigInt(chainID),
nonce: currentNonce,
client: chainClient,
Expand Down Expand Up @@ -105,15 +101,30 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
span.AddEvent("tx out of gas", trace.WithAttributes(txToAttributes(tx.Transaction, tx.UUID)...))
break
}
cq.bumpTX(gCtx, tx)
}
cq.updateOldTxStatuses(gCtx)

err = cq.g.Wait()
if err != nil {
return fmt.Errorf("error in chainPendingQueue: %w", err)
// bump tx in new goroutine
wg.Add(1)
go func() {
defer wg.Done()
bumpErr := cq.bumpTX(ctx, tx)
if bumpErr != nil {
logger.Errorf("could not bump tx: %v", bumpErr)
}
}()
}

// update old tx statuses in parallel
wg.Add(1)
go func() {
defer wg.Done()
updateErr := cq.updateOldTxStatuses(ctx)
if updateErr != nil {
logger.Errorf("could not update old tx statuses: %v", updateErr)
}
}()

wg.Wait()

sort.Slice(cq.reprocessQueue, func(i, j int) bool {
return cq.reprocessQueue[i].Nonce() < cq.reprocessQueue[j].Nonce()
})
Expand Down Expand Up @@ -209,89 +220,87 @@ func (c *chainQueue) registerNumPendingTXes(ctx context.Context, num, chainID in
}

// nolint: cyclop
func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) {
c.g.Go(func() (err error) {
if !c.isBumpIntervalElapsed(ogTx) {
c.addToReprocessQueue(ogTx)
return nil
}
// copy the transaction, switching the type if we need to.
// this is required if the config changes to use legacy transactions on a tx that is already bumped.
tx, err := util.CopyTX(ogTx.Transaction, util.WithTxType(c.txTypeForChain(c.chainID)))
if err != nil {
return fmt.Errorf("could not copy tx: %w", err)
}
func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) (err error) {
if !c.isBumpIntervalElapsed(ogTx) {
c.addToReprocessQueue(ogTx)
return nil
}
// copy the transaction, switching the type if we need to.
// this is required if the config changes to use legacy transactions on a tx that is already bumped.
tx, err := util.CopyTX(ogTx.Transaction, util.WithTxType(c.txTypeForChain(c.chainID)))
if err != nil {
return fmt.Errorf("could not copy tx: %w", err)
}

ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.bumpTX", trace.WithAttributes(attribute.Stringer(metrics.TxHash, tx.Hash())))
defer func() {
metrics.EndSpanWithErr(span, err)
}()
ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.bumpTX", trace.WithAttributes(attribute.Stringer(metrics.TxHash, tx.Hash())))
defer func() {
metrics.EndSpanWithErr(span, err)
}()

newGasEstimate, err := c.getGasEstimate(ctx, c.client, c.chainIDInt(), tx)
if err != nil {
return fmt.Errorf("could not get gas estimate: %w", err)
}
newGasEstimate, err := c.getGasEstimate(ctx, c.client, c.chainIDInt(), tx)
if err != nil {
return fmt.Errorf("could not get gas estimate: %w", err)
}

transactor, err := c.signer.GetTransactor(ctx, c.chainID)
if err != nil {
return fmt.Errorf("could not get transactor: %w", err)
}
transactor, err := c.signer.GetTransactor(ctx, c.chainID)
if err != nil {
return fmt.Errorf("could not get transactor: %w", err)
}

transactor.NoSend = true
transactor.Nonce = new(big.Int).SetUint64(tx.Nonce())
transactor.GasLimit = newGasEstimate
transactor.NoSend = true
transactor.Nonce = new(big.Int).SetUint64(tx.Nonce())
transactor.GasLimit = newGasEstimate

err = c.setGasPrice(ctx, c.client, transactor, c.chainID, ogTx.Transaction)
if err != nil {
return fmt.Errorf("could not set gas price: %w", err)
}
err = c.setGasPrice(ctx, c.client, transactor, c.chainID, ogTx.Transaction)
if err != nil {
return fmt.Errorf("could not set gas price: %w", err)
}

switch tx.Type() {
case types.LegacyTxType:
tx = types.NewTx(&types.LegacyTx{
Nonce: tx.Nonce(),
GasPrice: transactor.GasPrice,
Gas: transactor.GasLimit,
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
})
case types.DynamicFeeTxType:
tx = types.NewTx(&types.DynamicFeeTx{
ChainID: tx.ChainId(),
Nonce: tx.Nonce(),
GasTipCap: core.CopyBigInt(transactor.GasTipCap),
GasFeeCap: core.CopyBigInt(transactor.GasFeeCap),
Gas: transactor.GasLimit,
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
})
default:
return fmt.Errorf("unknown tx type: %v", ogTx.Type())
}
switch tx.Type() {
case types.LegacyTxType:
tx = types.NewTx(&types.LegacyTx{
Nonce: tx.Nonce(),
GasPrice: transactor.GasPrice,
Gas: transactor.GasLimit,
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
})
case types.DynamicFeeTxType:
tx = types.NewTx(&types.DynamicFeeTx{
ChainID: tx.ChainId(),
Nonce: tx.Nonce(),
GasTipCap: core.CopyBigInt(transactor.GasTipCap),
GasFeeCap: core.CopyBigInt(transactor.GasFeeCap),
Gas: transactor.GasLimit,
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
})
default:
return fmt.Errorf("unknown tx type: %v", ogTx.Type())
}

tx, err = transactor.Signer(transactor.From, tx)
if err != nil {
return fmt.Errorf("could not sign tx: %w", err)
}
tx, err = transactor.Signer(transactor.From, tx)
if err != nil {
return fmt.Errorf("could not sign tx: %w", err)
}

span.AddEvent("add to reprocess queue")
span.SetAttributes(txToAttributes(tx, ogTx.UUID)...)
span.AddEvent("add to reprocess queue")
span.SetAttributes(txToAttributes(tx, ogTx.UUID)...)

c.addToReprocessQueue(db.TX{
UUID: ogTx.UUID,
Transaction: tx,
Status: db.Stored,
})
c.addToReprocessQueue(db.TX{
UUID: ogTx.UUID,
Transaction: tx,
Status: db.Stored,
})

registerErr := c.registerBumpTx(ctx, tx)
if registerErr != nil {
span.AddEvent("could not register bump tx", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}
registerErr := c.registerBumpTx(ctx, tx)
if registerErr != nil {
span.AddEvent("could not register bump tx", trace.WithAttributes(attribute.String("error", registerErr.Error())))
}

return nil
})
return nil
}

// addToReprocessQueue adds a tx to the reprocess queue.
Expand Down Expand Up @@ -327,24 +336,20 @@ func (c *chainQueue) registerBumpTx(ctx context.Context, tx *types.Transaction)

// updateOldTxStatuses updates the status of txes that are before the current nonce
// this will only run if we have txes that have confirmed.
func (c *chainQueue) updateOldTxStatuses(parentCtx context.Context) {
func (c *chainQueue) updateOldTxStatuses(parentCtx context.Context) (err error) {
// nothing to do
if !c.txsHaveConfirmed {
return
}

ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.updateOldTxStatuses")
defer func() {
metrics.EndSpanWithErr(span, err)
}()

// start a new goroutine to mark the txes as replaced or confirmed in parallel
c.g.Go(func() (err error) {
defer func() {
metrics.EndSpanWithErr(span, err)
}()

err = c.db.MarkAllBeforeNonceReplacedOrConfirmed(ctx, c.signer.Address(), c.chainID, c.nonce)
if err != nil {
return fmt.Errorf("could not mark txes: %w", err)
}
return nil
})
err = c.db.MarkAllBeforeNonceReplacedOrConfirmed(ctx, c.signer.Address(), c.chainID, c.nonce)
if err != nil {
return fmt.Errorf("could not mark txes: %w", err)
}
return nil
}

0 comments on commit 37af1a8

Please sign in to comment.