diff --git a/aggregator/cmd/main.go b/aggregator/cmd/main.go index 37ab559e0..d82daa8f8 100644 --- a/aggregator/cmd/main.go +++ b/aggregator/cmd/main.go @@ -60,9 +60,9 @@ func aggregatorMain(ctx *cli.Context) error { // Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions: go func() { - listenErr := aggregator.SubscribeToNewTasks() - if listenErr != nil { - aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr) + listenErrPair := aggregator.SubscribeToNewTasks() + if listenErrPair != nil { + aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErrPair) } }() diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 00c7efd9c..a7ed7e8a2 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -46,7 +46,7 @@ type Aggregator struct { avsReader *chainio.AvsReader avsSubscriber *chainio.AvsSubscriber avsWriter *chainio.AvsWriter - taskSubscriber chan error + taskSubscriber chan chainio.ErrorPair blsAggregationService blsagg.BlsAggregationService // BLS Signature Service returns an Index diff --git a/aggregator/pkg/subscriber.go b/aggregator/pkg/subscriber.go index 4aef715da..7c7c092c2 100644 --- a/aggregator/pkg/subscriber.go +++ b/aggregator/pkg/subscriber.go @@ -1,18 +1,20 @@ package pkg -func (agg *Aggregator) SubscribeToNewTasks() error { - err := agg.subscribeToNewTasks() - if err != nil { - return err +import "github.com/yetanotherco/aligned_layer/core/chainio" + +func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair { + errorPair := agg.subscribeToNewTasks() + if errorPair != nil { + return errorPair } for { select { case err := <-agg.taskSubscriber: agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err) - err = agg.subscribeToNewTasks() - if err != nil { - return err + errorPair = agg.subscribeToNewTasks() + if errorPair != nil { + return errorPair } case newBatch := <-agg.NewBatchChan: agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task") @@ -21,14 +23,12 @@ func (agg *Aggregator) SubscribeToNewTasks() error { } } -func (agg *Aggregator) subscribeToNewTasks() error { - var err error - - agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan) +func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair { + errorPair := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) - if err != nil { - agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err) + if errorPair != nil { + agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPair) } - return err + return errorPair } diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index ae2ea0a9d..95dccd63e 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -72,7 +72,15 @@ func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error } func (r *AvsReader) DisabledVerifiers() (*big.Int, error) { - return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + num, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + if err != nil { + // Retry with fallback client + num, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + if err != nil { + r.logger.Error("Failed to fetch DisabledVerifiers", "err", err) + } + } + return num, err } // Returns all the "NewBatchV3" logs that have not been responded starting from the given block number diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index c40802058..b52efcff6 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -8,12 +8,14 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/core/types" servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager" retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/core/config" + "fmt" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/crypto" ) @@ -43,6 +45,11 @@ type AvsSubscriber struct { logger sdklogging.Logger } +type ErrorPair struct { + ErrorMainRPC error + ErrorFallbackRPC error +} + func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) { avsContractBindings, err := NewAvsServiceBindings( baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr, @@ -61,26 +68,27 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, }, nil } -func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) { +func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) // Subscribe to new tasks - sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - return nil, err + sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain)) } - subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - return nil, err + subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback)) } - s.logger.Info("Subscribed to new AlignedLayer V2 tasks") - // create a new channel to foward errors - errorChannel := make(chan error) + if errMain != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback) + return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} + } + + s.logger.Info("Subscribed to new AlignedLayer V2 tasks") pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) @@ -109,49 +117,57 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { - for { + var errMain, errFallback error + var auxSub, auxSubFallback event.Subscription + for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): - s.logger.Warn("Error in new task subscription", "err", err) - sub.Unsubscribe() - sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + s.logger.Warn("Error in new task subscription of main connection", "err", err) + + auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain == nil { + sub = auxSub // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): - s.logger.Warn("Error in fallback new task subscription", "err", err) - subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + s.logger.Warn("Error in new task subscription of fallback connection", "err", err) + + auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback == nil { + subFallback = auxSubFallback // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } } } + errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} }() - return errorChannel, nil + return nil } -func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) { +func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) + s.logger.Info("Starting subscription to new AlignedLayer V3 tasks") // Subscribe to new tasks - sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - return nil, err + sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain)) + //return err } - subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - return nil, err + subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback)) } - s.logger.Info("Subscribed to new AlignedLayer V3 tasks") - // create a new channel to foward errors - errorChannel := make(chan error) + if errMain != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback) + return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} + } + + s.logger.Info("Subscribed to new AlignedLayer V3 tasks") pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) @@ -180,27 +196,33 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { - for { + s.logger.Info("Starting error handling goroutine") + var errMain, errFallback error + var auxSub, auxSubFallback event.Subscription + for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): - s.logger.Warn("Error in new task subscription", "err", err) - sub.Unsubscribe() - sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + s.logger.Warn("Error in new task subscription of main connection", "err", err) + + auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain == nil { + sub = auxSub // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): - s.logger.Warn("Error in fallback new task subscription", "err", err) - subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + s.logger.Warn("Error in new task subscription of fallback connection", "err", err) + + auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback == nil { + subFallback = auxSubFallback // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } } } + errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} }() - return errorChannel, nil + return nil } func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) { diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index bf4724e2f..4c4d8e9f6 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -2,6 +2,7 @@ package chainio import ( "context" + "github.com/rs/zerolog/log" "math/big" "github.com/ethereum/go-ethereum" @@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable( config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { + log.Info().Msg("Subscribing to NewBatchV2") return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) } return retry.RetryWithData(subscribe_func, config) @@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable( config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { + log.Info().Msg("Subscribing to NewBatchV3") return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } return retry.RetryWithData(subscribe_func, config) diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index af4c5d42a..607249d8f 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro return operator, nil } -func (o *Operator) SubscribeToNewTasksV2() (chan error, error) { - return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2) +func (o *Operator) SubscribeToNewTasksV2(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair { + return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorPairChan) } -func (o *Operator) SubscribeToNewTasksV3() (chan error, error) { - return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3) +func (o *Operator) SubscribeToNewTasksV3(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair { + return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorPairChan) } type OperatorLastProcessedBatch struct { @@ -205,13 +205,16 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error { } func (o *Operator) Start(ctx context.Context) error { - subV2, err := o.SubscribeToNewTasksV2() - if err != nil { + // create a new channel to foward errors + subV2ErrorChannel := make(chan chainio.ErrorPair) + errorPair := o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPair != nil { log.Fatal("Could not subscribe to new tasks") } - subV3, err := o.SubscribeToNewTasksV3() - if err != nil { + subV3ErrorChannel := make(chan chainio.ErrorPair) + errorPair = o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPair != nil { log.Fatal("Could not subscribe to new tasks") } @@ -231,16 +234,16 @@ func (o *Operator) Start(ctx context.Context) error { return nil case err := <-metricsErrChan: o.Logger.Errorf("Metrics server failed", "err", err) - case err := <-subV2: - o.Logger.Infof("Error in websocket subscription", "err", err) - subV2, err = o.SubscribeToNewTasksV2() - if err != nil { + case errorPair := <-subV2ErrorChannel: + o.Logger.Infof("Error in websocket subscription", "err", errorPair) + errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V2") } - case err := <-subV3: - o.Logger.Infof("Error in websocket subscription", "err", err) - subV3, err = o.SubscribeToNewTasksV3() - if err != nil { + case errorPair := <-subV3ErrorChannel: + o.Logger.Infof("Error in websocket subscription", "err", errorPair) + errorPairPtr := o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V3") } case newBatchLogV2 := <-o.NewTaskCreatedChanV2: @@ -248,7 +251,7 @@ func (o *Operator) Start(ctx context.Context) error { case newBatchLogV3 := <-o.NewTaskCreatedChanV3: go o.handleNewBatchLogV3(newBatchLogV3) case blockNumber := <-o.lastProcessedBatch.batchProcessedChan: - err = o.UpdateLastProcessBatch(blockNumber) + err := o.UpdateLastProcessBatch(blockNumber) if err != nil { o.Logger.Errorf("Error while updating last process batch", "err", err) }