Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(operator): Operator panic's when reconnect fails after max retries #1692

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,26 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
defer func() {
err := recover() //stops panics
if err != nil {
s.logger.Error("SubscribeToNewTasksV2 error channel recovered from panic", "err", err)
}
}()

for {
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.SubscribeToNewTasksParams())
avilagaston9 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.SubscribeToNewTasksParams())
if err != nil {
errorChannel <- err
}
Expand Down Expand Up @@ -180,19 +187,26 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
defer func() {
err := recover() //stops panics
if err != nil {
s.logger.Error("SubscribeToNewTasksV3 error channel recovered from panic", "err", err)
}
}()

for {
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.SubscribeToNewTasksParams())
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.SubscribeToNewTasksParams())
if err != nil {
errorChannel <- err
}
Expand Down
4 changes: 2 additions & 2 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<-
SubscribeToNewTasksV2Retryable
Subscribe to NewBatchV2 logs from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
- Retry times (Infinite Retries): 1 sec, 2 sec, 4 sec, 8 sec, ..., 60 sec, ...,
*/
func SubscribeToNewTasksV2Retryable(
opts *bind.WatchOpts,
Expand All @@ -215,7 +215,7 @@ func SubscribeToNewTasksV2Retryable(
SubscribeToNewTasksV3Retryable
Subscribe to NewBatchV3 logs from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
- Retry times (Infinite Retries): 1 sec, 2 sec, 4 sec, 8 sec, ..., 60 sec, ...,
avilagaston9 marked this conversation as resolved.
Show resolved Hide resolved
*/
func SubscribeToNewTasksV3Retryable(
opts *bind.WatchOpts,
Expand Down
24 changes: 19 additions & 5 deletions core/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ const (
NetworkMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by.
NetworkNumRetries uint64 = 3 // Total number of retries attempted.

// Retry Params for Sending Tx to Chain
// Retry Parameters for Sending a Tx to Chain
ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block.
ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry.

// Retry Params for WaitForTransactionReceipt in the Fee Bump
// Retry Parameters for WaitForTransactionReceipt in the Fee Bump
WaitForTxMaxInterval = 2 * time.Second // Maximum interval for an individual retry.
WaitForTxNumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached.

// Retry Parameters for RespondToTaskV2 in the Fee Bump
RespondToTaskV2MaxInterval = time.Millisecond * 500 // Maximum interval for an individual retry.
RespondToTaskV2MaxElapsedTime = 0 // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries.
RespondToTaskV2NumRetries uint64 = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached.
RespondToTaskV2MaxInterval = time.Millisecond * 500 // Maximum interval for an individual retry.
RespondToTaskV2MaxElapsedTime = 0 // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries.
RespondToTaskV2NumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached.

// Retry Parameters for SubscribeToNewTasks
SubscribeToNewTasksNumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime (60 sec) is reached.
)

type RetryParams struct {
Expand Down Expand Up @@ -102,6 +105,17 @@ func WaitForTxRetryParams(maxElapsedTime time.Duration) *RetryParams {
}
}

func SubscribeToNewTasksParams() *RetryParams {
return &RetryParams{
InitialInterval: NetworkInitialInterval,
MaxInterval: NetworkMaxInterval,
MaxElapsedTime: NetworkMaxElapsedTime,
RandomizationFactor: NetworkRandomizationFactor,
Multiplier: NetworkMultiplier,
NumRetries: SubscribeToNewTasksNumRetries,
}
}

/*
Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system.
They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work.
Expand Down
Loading