diff --git a/relayer/relays/beefy/main.go b/relayer/relays/beefy/main.go index 2a34b3b6e7..e10f739af0 100644 --- a/relayer/relays/beefy/main.go +++ b/relayer/relays/beefy/main.go @@ -3,6 +3,7 @@ package beefy import ( "context" "fmt" + "sync/atomic" "time" "golang.org/x/sync/errgroup" @@ -198,13 +199,27 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { return fmt.Errorf("create gateway client: %w", err) } + var tokens atomic.Uint64 + tokens.Store(1) + + go refiller(ctx, &tokens, 1, 1, time.Minute*30) + for { + log.Info("Starting check") + paraNonce, ethNonce, err := relay.queryNonces(ctx, parachainConn, gatewayContract) if err != nil { - return fmt.Errorf("require sync: %w", err) + return fmt.Errorf("query nonces: %w", err) } - if paraNonce > ethNonce { + log.WithFields(log.Fields{ + "paraNonce": paraNonce, + "ethNonce": ethNonce, + }).Info("Nonces checked") + + if paraNonce > ethNonce && tryConsume(&tokens) { + log.Info("Performing sync") + beefyBlockHash, err := relay.relaychainConn.API().RPC.Beefy.GetFinalizedHead() if err != nil { return fmt.Errorf("fetch latest beefy block: %w", err) @@ -216,6 +231,12 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { } relay.doSync(ctx, uint64(header.Number)) + + if tokens.Load() > 0 { + tokens.Add(^uint64(0)) + } + + log.Info("Sync completed") } // Sleep for 5 minute @@ -227,6 +248,35 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { } } +func refiller(ctx context.Context, tokens *atomic.Uint64, refillAmount uint64, maxTokens uint64, refillPeriod time.Duration) { + ticker := time.NewTicker(refillPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + currentTokens := tokens.Load() + newTokens := currentTokens + refillAmount + tokens.Store(newTokens) + } + } +} + +func tryConsume(tokens *atomic.Uint64) bool { + for { + currentTokens := tokens.Load() + if currentTokens < 1 { + return false + } + + if tokens.CompareAndSwap(currentTokens, currentTokens-1) { + return true + } + } +} + func (relay *Relay) queryNonces(ctx context.Context, parachainConn *parachain.Connection, gatewayContract *contracts.Gateway) (uint64, uint64, error) { data, err := types.HexDecodeString("0xc173fac324158e77fb5840738a1a541f633cbec8884c6a601c567d2b376a0539") if err != nil {