diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index c40802058..47868d951 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -109,19 +109,26 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { + defer func() { + err := recover() //stops panics + if err != nil { + s.logger.Error("SubscribeToNewTasksV2 error channel recovered from panic", "err", err) + } + }() + for { select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) sub.Unsubscribe() - sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.SubscribeToNewTasksParams()) if err != nil { errorChannel <- err } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.SubscribeToNewTasksParams()) if err != nil { errorChannel <- err } @@ -180,19 +187,26 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { + defer func() { + err := recover() //stops panics + if err != nil { + s.logger.Error("SubscribeToNewTasksV3 error channel recovered from panic", "err", err) + } + }() + for { select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) sub.Unsubscribe() - sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.SubscribeToNewTasksParams()) if err != nil { errorChannel <- err } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.SubscribeToNewTasksParams()) if err != nil { errorChannel <- err } diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index bf4724e2f..8e3310f99 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -196,7 +196,7 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- SubscribeToNewTasksV2Retryable Subscribe to NewBatchV2 logs from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (Infinite Retries): 1 sec, 2 sec, 4 sec, 8 sec, ..., 60 sec, ..., */ func SubscribeToNewTasksV2Retryable( opts *bind.WatchOpts, @@ -215,7 +215,7 @@ func SubscribeToNewTasksV2Retryable( SubscribeToNewTasksV3Retryable Subscribe to NewBatchV3 logs from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (Infinite Retries): 1 sec, 2 sec, 4 sec, 8 sec, ..., 60 sec, ..., */ func SubscribeToNewTasksV3Retryable( opts *bind.WatchOpts, diff --git a/core/retry.go b/core/retry.go index fb39e8d8d..3d3892f17 100644 --- a/core/retry.go +++ b/core/retry.go @@ -32,18 +32,21 @@ const ( NetworkMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. NetworkNumRetries uint64 = 3 // Total number of retries attempted. - // Retry Params for Sending Tx to Chain + // Retry Parameters for Sending a Tx to Chain ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. - // Retry Params for WaitForTransactionReceipt in the Fee Bump + // Retry Parameters for WaitForTransactionReceipt in the Fee Bump WaitForTxMaxInterval = 2 * time.Second // Maximum interval for an individual retry. WaitForTxNumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached. // Retry Parameters for RespondToTaskV2 in the Fee Bump - RespondToTaskV2MaxInterval = time.Millisecond * 500 // Maximum interval for an individual retry. - RespondToTaskV2MaxElapsedTime = 0 // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RespondToTaskV2NumRetries uint64 = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached. + RespondToTaskV2MaxInterval = time.Millisecond * 500 // Maximum interval for an individual retry. + RespondToTaskV2MaxElapsedTime = 0 // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RespondToTaskV2NumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached. + + // Retry Parameters for SubscribeToNewTasks + SubscribeToNewTasksNumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime (60 sec) is reached. ) type RetryParams struct { @@ -102,6 +105,17 @@ func WaitForTxRetryParams(maxElapsedTime time.Duration) *RetryParams { } } +func SubscribeToNewTasksParams() *RetryParams { + return &RetryParams{ + InitialInterval: NetworkInitialInterval, + MaxInterval: NetworkMaxInterval, + MaxElapsedTime: NetworkMaxElapsedTime, + RandomizationFactor: NetworkRandomizationFactor, + Multiplier: NetworkMultiplier, + NumRetries: SubscribeToNewTasksNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work.