From 51d5b9d2ea52d1f1e9e8572d94edb551575c8d80 Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 22 Aug 2024 17:43:19 +0100 Subject: [PATCH 1/5] tapfreighter: re-attempt proof transfer when backoff attempts exhausted This commit ensures that the proof transfer ChainPorter state is re-executed once proof transfer backoff attempts have been exhausted. In the absence of this commit, the next opportunity for re-attempting proof transfer would be when tapd restarts (pending parcels are processed on startup). --- tapfreighter/chain_porter.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 3fbb6790d..0fe3d7b0e 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -849,6 +849,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { // those that do not. reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs) + // incompleteDelivery is set to true if any proof delivery attempts fail + // and exceed the maximum backoff limit. + incompleteDelivery := false + deliver := func(ctx context.Context, out TransferOutput) error { key := out.ScriptKey.PubKey @@ -910,6 +914,13 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { // later. var backoffExecErr *proof.BackoffExecError if errors.As(err, &backoffExecErr) { + log.Debugf("Exceeded backoff limit for proof delivery "+ + "(script_key=%x, proof_courier_addr=%s)", + key.SerializeCompressed(), out.ProofCourierAddr) + + // Set the incomplete delivery flag to true so that we + // can retry the proof transfer state later. + incompleteDelivery = true return nil } if err != nil { @@ -927,6 +938,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { "confirmation: %w", err) } + log.Infof("Transfer output proof delivery complete "+ + "(anchor_txid=%v, output_position=%d)", + pkg.OutboundPkg.AnchorTx.TxHash(), out.Position) + return nil } @@ -969,6 +984,17 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { return firstErr } + // If the delivery is incomplete, we'll return early so that we can + // retry proof transfer later. + if incompleteDelivery { + log.Debugf("Proof delivery incomplete, will retry executing "+ + "the proof transfer state (transfer_anchor_tx_hash=%v)", + pkg.OutboundPkg.AnchorTx.TxHash()) + + // Return here before setting the transfer to complete. + return nil + } + // At this point, the transfer is fully finalised and successful: // - The anchoring transaction has been confirmed on-chain. // - The proof(s) have been delivered to the receiver(s). From fa4a16596ab4c62fdc634f57eca76f282773c4d3 Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 22 Aug 2024 17:45:03 +0100 Subject: [PATCH 2/5] proof+itest: improve log and error messages --- itest/test_harness.go | 2 ++ proof/courier.go | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/itest/test_harness.go b/itest/test_harness.go index 45a20146a..971bcec44 100644 --- a/itest/test_harness.go +++ b/itest/test_harness.go @@ -318,6 +318,8 @@ func setupHarnesses(t *testing.T, ht *harnessTest, // If nothing is specified, we use the universe RPC proof courier by // default. default: + t.Logf("Address of universe server as proof courier: %v", + universeServer.service.rpcHost()) proofCourier = universeServer } diff --git a/proof/courier.go b/proof/courier.go index d117cd39f..75cd71e42 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -654,7 +654,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator, if err != nil { return err } - log.Infof("Starting proof transfer backoff procedure for proof "+ + log.Infof("Starting proof transfer backoff procedure "+ "(transfer_type=%s, locator_hash=%x)", transferType, locatorHash[:]) @@ -710,7 +710,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator, ) subscriberEvent(waitEvent) - log.Debugf("Proof delivery failed with error. Backing off. "+ + log.Debugf("Proof transfer failed with error. Backing off. "+ "(transfer_type=%s, locator_hash=%x, backoff=%s, "+ "attempt=%d): %v", transferType, locatorHash[:], backoff, i, errExec) @@ -742,7 +742,7 @@ func (b *BackoffHandler) wait(ctx context.Context, wait time.Duration) error { case <-time.After(wait): return nil case <-ctx.Done(): - return fmt.Errorf("context canceled") + return fmt.Errorf("back off handler context done") } } @@ -1418,8 +1418,8 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, err := c.ensureConnect(ctx) if err != nil { return fmt.Errorf("unable to connect to "+ - "courier service during delivery "+ - "attempt: %w", err) + "universe RPC courier service during "+ + "recieve attempt: %w", err) } // Retrieve proof from courier. From b296bdf0ee786215b8b5f03239b022610810508c Mon Sep 17 00:00:00 2001 From: ffranr Date: Tue, 19 Nov 2024 15:49:05 +0000 Subject: [PATCH 3/5] proof: make courier service connection attempt blocking The change ensures that the courier service connection attempt is blocking rather than synchronous. This prevents proof transfers from failing due to attempts to use connections before they are fully established, simplifying debugging. Both the connection and transfer steps are part of the backoff procedure, so failures in either step will trigger re-attempts. --- proof/courier.go | 39 ++++++++++++++++++++++++++++++++++----- sample-tapd.conf | 5 +++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/proof/courier.go b/proof/courier.go index 75cd71e42..413b7ba1b 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -293,6 +293,7 @@ func serverDialOpts() ([]grpc.DialOption, error) { tlsConfig := tls.Config{InsecureSkipVerify: true} transportCredentials := credentials.NewTLS(&tlsConfig) opts = append(opts, grpc.WithTransportCredentials(transportCredentials)) + opts = append(opts, grpc.WithBlock()) return opts, nil } @@ -1154,10 +1155,17 @@ func (h *HashMailCourier) SetSubscribers( var _ Courier = (*HashMailCourier)(nil) // UniverseRpcCourierCfg is the config for the universe RPC proof courier. +// +// nolint:lll type UniverseRpcCourierCfg struct { // BackoffCfg configures the behaviour of the proof delivery // functionality. BackoffCfg *BackoffCfg + + // ServiceRequestTimeout defines the maximum duration we'll wait for + // a courier service to handle our outgoing request during a connection + // attempt, or when delivering or retrieving a proof. + ServiceRequestTimeout time.Duration `long:"servicerequestimeout" description:"The maximum duration we'll wait for a courier service to handle our outgoing request during a connection attempt, or when delivering or retrieving a proof."` } // UniverseRpcCourier is a universe RPC proof courier service handle. It @@ -1351,7 +1359,12 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, deliverFunc := func() error { // Connect to the courier service if a connection hasn't // been established yet. - err := c.ensureConnect(ctx) + subCtx, subCtxCancel := context.WithTimeout( + ctx, c.cfg.ServiceRequestTimeout, + ) + defer subCtxCancel() + + err := c.ensureConnect(subCtx) if err != nil { return fmt.Errorf("unable to connect to "+ "courier service during delivery "+ @@ -1359,10 +1372,16 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, } // Submit proof to courier. - _, err = c.client.InsertProof(ctx, &unirpc.AssetProof{ + subCtx, subCtxCancel = context.WithTimeout( + ctx, c.cfg.ServiceRequestTimeout, + ) + defer subCtxCancel() + + assetProof := unirpc.AssetProof{ Key: &universeKey, AssetLeaf: &assetLeaf, - }) + } + _, err = c.client.InsertProof(subCtx, &assetProof) if err != nil { return fmt.Errorf("error inserting proof "+ "into universe courier service: %w", @@ -1415,7 +1434,12 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, receiveFunc := func() error { // Connect to the courier service if a connection hasn't // been established yet. - err := c.ensureConnect(ctx) + subCtx, subCtxCancel := context.WithTimeout( + ctx, c.cfg.ServiceRequestTimeout, + ) + defer subCtxCancel() + + err := c.ensureConnect(subCtx) if err != nil { return fmt.Errorf("unable to connect to "+ "universe RPC courier service during "+ @@ -1423,7 +1447,12 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, } // Retrieve proof from courier. - resp, err := c.client.QueryProof(ctx, &universeKey) + subCtx, subCtxCancel = context.WithTimeout( + ctx, c.cfg.ServiceRequestTimeout, + ) + defer subCtxCancel() + + resp, err := c.client.QueryProof(subCtx, &universeKey) if err != nil { return fmt.Errorf("error retrieving proof "+ "from universe courier service: %w", diff --git a/sample-tapd.conf b/sample-tapd.conf index d15527e75..e3b6314c0 100644 --- a/sample-tapd.conf +++ b/sample-tapd.conf @@ -63,6 +63,11 @@ ; {s, m, h}. ; custodianproofretrievaldelay=5s +; The maximum duration we'll wait for a proof courier service to handle our +; outgoing request during a connection attempt, or when delivering or retrieving +; a proof. +; universerpccourier.servicerequestimeout=5s + ; Network to run on (mainnet, regtest, testnet, simnet, signet) ; network=testnet From 78f729197f70bb944a051382add46120e9bb2f49 Mon Sep 17 00:00:00 2001 From: ffranr Date: Tue, 19 Nov 2024 15:43:51 +0000 Subject: [PATCH 4/5] tapcfg: add default value for new proof courier service response timeout This commit adds a new default value for the proof courier service response timeout which was added in the previous commit. --- tapcfg/config.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tapcfg/config.go b/tapcfg/config.go index d2dcc4062..9e5e79d21 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -100,6 +100,10 @@ const ( // use for waiting for a receiver to acknowledge a proof transfer. defaultProofTransferReceiverAckTimeout = time.Hour * 6 + // defaultProofCourierServiceResponseTimeout is the default timeout + // we'll use for waiting for a response from the proof courier service. + defaultProofCourierServiceResponseTimeout = time.Second * 5 + // defaultUniverseSyncInterval is the default interval that we'll use // to sync Universe state with the federation. defaultUniverseSyncInterval = time.Minute * 10 @@ -423,6 +427,7 @@ func DefaultConfig() Config { InitialBackoff: defaultProofTransferInitialBackoff, MaxBackoff: defaultProofTransferMaxBackoff, }, + ServiceRequestTimeout: defaultProofCourierServiceResponseTimeout, }, CustodianProofRetrievalDelay: defaultProofRetrievalDelay, Universe: &UniverseConfig{ From 771864d5a0dc35c67a909562ccd8c57e6564215f Mon Sep 17 00:00:00 2001 From: ffranr Date: Tue, 19 Nov 2024 15:56:05 +0000 Subject: [PATCH 5/5] itest: configure timeout for tapd harness universe courier service Set the request timeout for the tapd harness universe courier service to an appropriate value to ensure tests pass consistently. --- itest/tapd_harness.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index bcdd0ad96..997e21357 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -257,7 +257,8 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, BackoffCfg: &hashmailBackoffCfg, } finalCfg.UniverseRpcCourier = &proof.UniverseRpcCourierCfg{ - BackoffCfg: &universeRpcBackoffCfg, + BackoffCfg: &universeRpcBackoffCfg, + ServiceRequestTimeout: 50 * time.Millisecond, } switch typedProofCourier := (opts.proofCourier).(type) {