From 37af1a8b1a06d58038d5405b1246f2a475f48180 Mon Sep 17 00:00:00 2001 From: dwasse Date: Fri, 21 Jun 2024 10:04:52 -0500 Subject: [PATCH] Submitter: fix transaction bumping (#2756) * Feat: use vanilla wg instead of errgroup * Cleanup: make wg local * [goreleaser] --- ethergo/submitter/chain_queue.go | 197 ++++++++++++++++--------------- 1 file changed, 101 insertions(+), 96 deletions(-) diff --git a/ethergo/submitter/chain_queue.go b/ethergo/submitter/chain_queue.go index dc05d9defb..3fb012d795 100644 --- a/ethergo/submitter/chain_queue.go +++ b/ethergo/submitter/chain_queue.go @@ -21,7 +21,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" ) // chainQueue is a single use queue for a single chain. @@ -29,8 +28,6 @@ type chainQueue struct { *txSubmitterImpl // client is the client for this chain client client.EVM - // g is the errgroup for this chain - g *errgroup.Group // client is the nonce used for this chain nonce uint64 // txsHaveConfirmed is true if any of the txes have confirmed @@ -71,11 +68,10 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * span.AddEvent("could not register nonce", trace.WithAttributes(attribute.String("error", registerErr.Error()))) } - g, gCtx := errgroup.WithContext(ctx) + wg := &sync.WaitGroup{} cq := chainQueue{ txSubmitterImpl: t, - g: g, chainID: core.CopyBigInt(chainID), nonce: currentNonce, client: chainClient, @@ -105,15 +101,30 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * span.AddEvent("tx out of gas", trace.WithAttributes(txToAttributes(tx.Transaction, tx.UUID)...)) break } - cq.bumpTX(gCtx, tx) - } - cq.updateOldTxStatuses(gCtx) - err = cq.g.Wait() - if err != nil { - return fmt.Errorf("error in chainPendingQueue: %w", err) + // bump tx in new goroutine + wg.Add(1) + go func() { + defer wg.Done() + bumpErr := cq.bumpTX(ctx, tx) + if bumpErr != nil { + logger.Errorf("could not bump tx: %v", bumpErr) + } + }() } + // update old tx statuses in parallel + wg.Add(1) + go func() { + defer wg.Done() + updateErr := cq.updateOldTxStatuses(ctx) + if updateErr != nil { + logger.Errorf("could not update old tx statuses: %v", updateErr) + } + }() + + wg.Wait() + sort.Slice(cq.reprocessQueue, func(i, j int) bool { return cq.reprocessQueue[i].Nonce() < cq.reprocessQueue[j].Nonce() }) @@ -209,89 +220,87 @@ func (c *chainQueue) registerNumPendingTXes(ctx context.Context, num, chainID in } // nolint: cyclop -func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) { - c.g.Go(func() (err error) { - if !c.isBumpIntervalElapsed(ogTx) { - c.addToReprocessQueue(ogTx) - return nil - } - // copy the transaction, switching the type if we need to. - // this is required if the config changes to use legacy transactions on a tx that is already bumped. - tx, err := util.CopyTX(ogTx.Transaction, util.WithTxType(c.txTypeForChain(c.chainID))) - if err != nil { - return fmt.Errorf("could not copy tx: %w", err) - } +func (c *chainQueue) bumpTX(parentCtx context.Context, ogTx db.TX) (err error) { + if !c.isBumpIntervalElapsed(ogTx) { + c.addToReprocessQueue(ogTx) + return nil + } + // copy the transaction, switching the type if we need to. + // this is required if the config changes to use legacy transactions on a tx that is already bumped. + tx, err := util.CopyTX(ogTx.Transaction, util.WithTxType(c.txTypeForChain(c.chainID))) + if err != nil { + return fmt.Errorf("could not copy tx: %w", err) + } - ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.bumpTX", trace.WithAttributes(attribute.Stringer(metrics.TxHash, tx.Hash()))) - defer func() { - metrics.EndSpanWithErr(span, err) - }() + ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.bumpTX", trace.WithAttributes(attribute.Stringer(metrics.TxHash, tx.Hash()))) + defer func() { + metrics.EndSpanWithErr(span, err) + }() - newGasEstimate, err := c.getGasEstimate(ctx, c.client, c.chainIDInt(), tx) - if err != nil { - return fmt.Errorf("could not get gas estimate: %w", err) - } + newGasEstimate, err := c.getGasEstimate(ctx, c.client, c.chainIDInt(), tx) + if err != nil { + return fmt.Errorf("could not get gas estimate: %w", err) + } - transactor, err := c.signer.GetTransactor(ctx, c.chainID) - if err != nil { - return fmt.Errorf("could not get transactor: %w", err) - } + transactor, err := c.signer.GetTransactor(ctx, c.chainID) + if err != nil { + return fmt.Errorf("could not get transactor: %w", err) + } - transactor.NoSend = true - transactor.Nonce = new(big.Int).SetUint64(tx.Nonce()) - transactor.GasLimit = newGasEstimate + transactor.NoSend = true + transactor.Nonce = new(big.Int).SetUint64(tx.Nonce()) + transactor.GasLimit = newGasEstimate - err = c.setGasPrice(ctx, c.client, transactor, c.chainID, ogTx.Transaction) - if err != nil { - return fmt.Errorf("could not set gas price: %w", err) - } + err = c.setGasPrice(ctx, c.client, transactor, c.chainID, ogTx.Transaction) + if err != nil { + return fmt.Errorf("could not set gas price: %w", err) + } - switch tx.Type() { - case types.LegacyTxType: - tx = types.NewTx(&types.LegacyTx{ - Nonce: tx.Nonce(), - GasPrice: transactor.GasPrice, - Gas: transactor.GasLimit, - To: tx.To(), - Value: tx.Value(), - Data: tx.Data(), - }) - case types.DynamicFeeTxType: - tx = types.NewTx(&types.DynamicFeeTx{ - ChainID: tx.ChainId(), - Nonce: tx.Nonce(), - GasTipCap: core.CopyBigInt(transactor.GasTipCap), - GasFeeCap: core.CopyBigInt(transactor.GasFeeCap), - Gas: transactor.GasLimit, - To: tx.To(), - Value: tx.Value(), - Data: tx.Data(), - }) - default: - return fmt.Errorf("unknown tx type: %v", ogTx.Type()) - } + switch tx.Type() { + case types.LegacyTxType: + tx = types.NewTx(&types.LegacyTx{ + Nonce: tx.Nonce(), + GasPrice: transactor.GasPrice, + Gas: transactor.GasLimit, + To: tx.To(), + Value: tx.Value(), + Data: tx.Data(), + }) + case types.DynamicFeeTxType: + tx = types.NewTx(&types.DynamicFeeTx{ + ChainID: tx.ChainId(), + Nonce: tx.Nonce(), + GasTipCap: core.CopyBigInt(transactor.GasTipCap), + GasFeeCap: core.CopyBigInt(transactor.GasFeeCap), + Gas: transactor.GasLimit, + To: tx.To(), + Value: tx.Value(), + Data: tx.Data(), + }) + default: + return fmt.Errorf("unknown tx type: %v", ogTx.Type()) + } - tx, err = transactor.Signer(transactor.From, tx) - if err != nil { - return fmt.Errorf("could not sign tx: %w", err) - } + tx, err = transactor.Signer(transactor.From, tx) + if err != nil { + return fmt.Errorf("could not sign tx: %w", err) + } - span.AddEvent("add to reprocess queue") - span.SetAttributes(txToAttributes(tx, ogTx.UUID)...) + span.AddEvent("add to reprocess queue") + span.SetAttributes(txToAttributes(tx, ogTx.UUID)...) - c.addToReprocessQueue(db.TX{ - UUID: ogTx.UUID, - Transaction: tx, - Status: db.Stored, - }) + c.addToReprocessQueue(db.TX{ + UUID: ogTx.UUID, + Transaction: tx, + Status: db.Stored, + }) - registerErr := c.registerBumpTx(ctx, tx) - if registerErr != nil { - span.AddEvent("could not register bump tx", trace.WithAttributes(attribute.String("error", registerErr.Error()))) - } + registerErr := c.registerBumpTx(ctx, tx) + if registerErr != nil { + span.AddEvent("could not register bump tx", trace.WithAttributes(attribute.String("error", registerErr.Error()))) + } - return nil - }) + return nil } // addToReprocessQueue adds a tx to the reprocess queue. @@ -327,24 +336,20 @@ func (c *chainQueue) registerBumpTx(ctx context.Context, tx *types.Transaction) // updateOldTxStatuses updates the status of txes that are before the current nonce // this will only run if we have txes that have confirmed. -func (c *chainQueue) updateOldTxStatuses(parentCtx context.Context) { +func (c *chainQueue) updateOldTxStatuses(parentCtx context.Context) (err error) { // nothing to do if !c.txsHaveConfirmed { return } ctx, span := c.metrics.Tracer().Start(parentCtx, "chainPendingQueue.updateOldTxStatuses") + defer func() { + metrics.EndSpanWithErr(span, err) + }() - // start a new goroutine to mark the txes as replaced or confirmed in parallel - c.g.Go(func() (err error) { - defer func() { - metrics.EndSpanWithErr(span, err) - }() - - err = c.db.MarkAllBeforeNonceReplacedOrConfirmed(ctx, c.signer.Address(), c.chainID, c.nonce) - if err != nil { - return fmt.Errorf("could not mark txes: %w", err) - } - return nil - }) + err = c.db.MarkAllBeforeNonceReplacedOrConfirmed(ctx, c.signer.Address(), c.chainID, c.nonce) + if err != nil { + return fmt.Errorf("could not mark txes: %w", err) + } + return nil }