Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix(operator): manage subscription to events without panic #1729

Open
wants to merge 12 commits into
base: testnet
Choose a base branch
from
6 changes: 3 additions & 3 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func aggregatorMain(ctx *cli.Context) error {

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
if listenErr != nil {
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr)
listenErrPair := aggregator.SubscribeToNewTasks()
if listenErrPair != nil {
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErrPair)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Aggregator struct {
avsReader *chainio.AvsReader
avsSubscriber *chainio.AvsSubscriber
avsWriter *chainio.AvsWriter
taskSubscriber chan error
taskSubscriber chan chainio.ErrorPair
blsAggregationService blsagg.BlsAggregationService

// BLS Signature Service returns an Index
Expand Down
28 changes: 14 additions & 14 deletions aggregator/pkg/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package pkg

func (agg *Aggregator) SubscribeToNewTasks() error {
err := agg.subscribeToNewTasks()
if err != nil {
return err
import "github.com/yetanotherco/aligned_layer/core/chainio"

func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair {
errorPairPtr := agg.subscribeToNewTasks()
uri-99 marked this conversation as resolved.
Show resolved Hide resolved
if errorPairPtr != nil {
return errorPairPtr
}

for {
select {
case err := <-agg.taskSubscriber:
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
err = agg.subscribeToNewTasks()
if err != nil {
return err
errorPairPtr = agg.subscribeToNewTasks()
if errorPairPtr != nil {
return errorPairPtr
}
case newBatch := <-agg.NewBatchChan:
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task")
Expand All @@ -21,14 +23,12 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
}
}

func (agg *Aggregator) subscribeToNewTasks() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)
func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair {
errorPairPtr := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
if errorPairPtr != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPairPtr)
}

return err
return errorPairPtr
}
10 changes: 9 additions & 1 deletion core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error
}

func (r *AvsReader) DisabledVerifiers() (*big.Int, error) {
return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
num, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
if err != nil {
// Retry with fallback client
num, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
if err != nil {
r.logger.Error("Failed to fetch DisabledVerifiers", "err", err)
}
}
return num, err
}

// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number
Expand Down
118 changes: 74 additions & 44 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"

"github.com/ethereum/go-ethereum/core/types"
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/config"

"fmt"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/crypto"
)
Expand Down Expand Up @@ -43,6 +45,11 @@ type AvsSubscriber struct {
logger sdklogging.Logger
}

type ErrorPair struct {
ErrorMainRPC error
ErrorFallbackRPC error
}

func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) {
avsContractBindings, err := NewAvsServiceBindings(
baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr,
Expand All @@ -61,26 +68,27 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
}, nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")

// create a new channel to foward errors
errorChannel := make(chan error)
if errMain != nil && errFallback != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}

s.logger.Info("Subscribed to new AlignedLayer V2 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

Expand Down Expand Up @@ -109,49 +117,61 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
for {
var errMain, errFallback error
var auxSub, auxSubFallback event.Subscription
for errMain == nil || errFallback == nil { //while one is active
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
uri-99 marked this conversation as resolved.
Show resolved Hide resolved
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
uri-99 marked this conversation as resolved.
Show resolved Hide resolved

auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain == nil {
//sub.Unsubscribe()
sub = auxSub // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
case err := <-subFallback.Err():
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
uri-99 marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err

auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback == nil {
//subFallback.Unsubscribe()
subFallback = auxSubFallback // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
}
}
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}()

return errorChannel, nil
return nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

s.logger.Info("Starting subscription to new AlignedLayer V3 tasks")
// Subscribe to new tasks
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain))
//return err
}

subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback))
}
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")

// create a new channel to foward errors
errorChannel := make(chan error)
if errMain != nil && errFallback != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}

s.logger.Info("Subscribed to new AlignedLayer V3 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

Expand Down Expand Up @@ -180,27 +200,37 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
for {
s.logger.Info("Starting error handling goroutine")
var errMain, errFallback error
var auxSub, auxSubFallback event.Subscription
for errMain == nil || errFallback == nil { //while one is active
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)

auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain == nil {
//sub.Unsubscribe()
sub = auxSub // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)

auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback == nil {
//subFallback.Unsubscribe()
subFallback = auxSubFallback // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
}
}
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}()

return errorChannel, nil
return nil
}

func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
Expand Down
3 changes: 3 additions & 0 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainio

import (
"context"
"github.com/rs/zerolog/log"
"math/big"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable(
config *retry.RetryParams,
) (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
log.Info().Msg("Subscribing to NewBatchV2")
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
Expand All @@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable(
config *retry.RetryParams,
) (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
log.Info().Msg("Subscribing to NewBatchV3")
return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
Expand Down
37 changes: 20 additions & 17 deletions operator/pkg/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
return operator, nil
}

func (o *Operator) SubscribeToNewTasksV2() (chan error, error) {
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2)
func (o *Operator) SubscribeToNewTasksV2(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorPairChan)
}

func (o *Operator) SubscribeToNewTasksV3() (chan error, error) {
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3)
func (o *Operator) SubscribeToNewTasksV3(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorPairChan)
}

type OperatorLastProcessedBatch struct {
Expand Down Expand Up @@ -205,13 +205,16 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error {
}

func (o *Operator) Start(ctx context.Context) error {
subV2, err := o.SubscribeToNewTasksV2()
if err != nil {
// create a new channel to foward errors
subV2ErrorChannel := make(chan chainio.ErrorPair)
errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel)
if errorPairPtr != nil {
log.Fatal("Could not subscribe to new tasks")
}

subV3, err := o.SubscribeToNewTasksV3()
if err != nil {
subV3ErrorChannel := make(chan chainio.ErrorPair)
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
if errorPairPtr != nil {
log.Fatal("Could not subscribe to new tasks")
}

Expand All @@ -231,24 +234,24 @@ func (o *Operator) Start(ctx context.Context) error {
return nil
case err := <-metricsErrChan:
o.Logger.Errorf("Metrics server failed", "err", err)
case err := <-subV2:
o.Logger.Infof("Error in websocket subscription", "err", err)
subV2, err = o.SubscribeToNewTasksV2()
if err != nil {
case errorPair := <-subV2ErrorChannel:
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
errorPairPtr = o.SubscribeToNewTasksV2(subV2ErrorChannel)
if errorPairPtr != nil {
o.Logger.Fatal("Could not subscribe to new tasks V2")
}
case err := <-subV3:
o.Logger.Infof("Error in websocket subscription", "err", err)
subV2, err = o.SubscribeToNewTasksV3()
if err != nil {
case errorPair := <-subV3ErrorChannel:
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
if errorPairPtr != nil {
o.Logger.Fatal("Could not subscribe to new tasks V3")
}
case newBatchLogV2 := <-o.NewTaskCreatedChanV2:
go o.handleNewBatchLogV2(newBatchLogV2)
case newBatchLogV3 := <-o.NewTaskCreatedChanV3:
go o.handleNewBatchLogV3(newBatchLogV3)
case blockNumber := <-o.lastProcessedBatch.batchProcessedChan:
err = o.UpdateLastProcessBatch(blockNumber)
err := o.UpdateLastProcessBatch(blockNumber)
if err != nil {
o.Logger.Errorf("Error while updating last process batch", "err", err)
}
Expand Down
Loading