From d6b8f461ccda736ac105e9b4982d93ee05493e46 Mon Sep 17 00:00:00 2001 From: Vincent Geddes <117534+vgeddes@users.noreply.github.com> Date: Wed, 4 Dec 2024 23:29:33 +0200 Subject: [PATCH] rate limiting using token bucket --- relayer/relays/beefy/main.go | 54 ++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/relayer/relays/beefy/main.go b/relayer/relays/beefy/main.go index 2a34b3b6e7..e10f739af0 100644 --- a/relayer/relays/beefy/main.go +++ b/relayer/relays/beefy/main.go @@ -3,6 +3,7 @@ package beefy import ( "context" "fmt" + "sync/atomic" "time" "golang.org/x/sync/errgroup" @@ -198,13 +199,27 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { return fmt.Errorf("create gateway client: %w", err) } + var tokens atomic.Uint64 + tokens.Store(1) + + go refiller(ctx, &tokens, 1, 1, time.Minute*30) + for { + log.Info("Starting check") + paraNonce, ethNonce, err := relay.queryNonces(ctx, parachainConn, gatewayContract) if err != nil { - return fmt.Errorf("require sync: %w", err) + return fmt.Errorf("query nonces: %w", err) } - if paraNonce > ethNonce { + log.WithFields(log.Fields{ + "paraNonce": paraNonce, + "ethNonce": ethNonce, + }).Info("Nonces checked") + + if paraNonce > ethNonce && tryConsume(&tokens) { + log.Info("Performing sync") + beefyBlockHash, err := relay.relaychainConn.API().RPC.Beefy.GetFinalizedHead() if err != nil { return fmt.Errorf("fetch latest beefy block: %w", err) @@ -216,6 +231,12 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { } relay.doSync(ctx, uint64(header.Number)) + + if tokens.Load() > 0 { + tokens.Add(^uint64(0)) + } + + log.Info("Sync completed") } // Sleep for 5 minute @@ -227,6 +248,35 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error { } } +func refiller(ctx context.Context, tokens *atomic.Uint64, refillAmount uint64, maxTokens uint64, refillPeriod time.Duration) { + ticker := time.NewTicker(refillPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + currentTokens := tokens.Load() + newTokens := currentTokens + refillAmount + tokens.Store(newTokens) + } + } +} + +func tryConsume(tokens *atomic.Uint64) bool { + for { + currentTokens := tokens.Load() + if currentTokens < 1 { + return false + } + + if tokens.CompareAndSwap(currentTokens, currentTokens-1) { + return true + } + } +} + func (relay *Relay) queryNonces(ctx context.Context, parachainConn *parachain.Connection, gatewayContract *contracts.Gateway) (uint64, uint64, error) { data, err := types.HexDecodeString("0xc173fac324158e77fb5840738a1a541f633cbec8884c6a601c567d2b376a0539") if err != nil {