Skip to content

Commit

Permalink
rate limiting using token bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
vgeddes committed Dec 4, 2024
1 parent c743ec7 commit d6b8f46
Showing 1 changed file with 52 additions and 2 deletions.
54 changes: 52 additions & 2 deletions relayer/relays/beefy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beefy
import (
"context"
"fmt"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -198,13 +199,27 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
return fmt.Errorf("create gateway client: %w", err)
}

var tokens atomic.Uint64
tokens.Store(1)

go refiller(ctx, &tokens, 1, 1, time.Minute*30)

for {
log.Info("Starting check")

paraNonce, ethNonce, err := relay.queryNonces(ctx, parachainConn, gatewayContract)
if err != nil {
return fmt.Errorf("require sync: %w", err)
return fmt.Errorf("query nonces: %w", err)
}

if paraNonce > ethNonce {
log.WithFields(log.Fields{
"paraNonce": paraNonce,
"ethNonce": ethNonce,
}).Info("Nonces checked")

if paraNonce > ethNonce && tryConsume(&tokens) {
log.Info("Performing sync")

beefyBlockHash, err := relay.relaychainConn.API().RPC.Beefy.GetFinalizedHead()
if err != nil {
return fmt.Errorf("fetch latest beefy block: %w", err)
Expand All @@ -216,6 +231,12 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
}

relay.doSync(ctx, uint64(header.Number))

if tokens.Load() > 0 {
tokens.Add(^uint64(0))
}

log.Info("Sync completed")
}

// Sleep for 5 minute
Expand All @@ -227,6 +248,35 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
}
}

func refiller(ctx context.Context, tokens *atomic.Uint64, refillAmount uint64, maxTokens uint64, refillPeriod time.Duration) {
ticker := time.NewTicker(refillPeriod)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
currentTokens := tokens.Load()
newTokens := currentTokens + refillAmount
tokens.Store(newTokens)
}
}
}

func tryConsume(tokens *atomic.Uint64) bool {
for {
currentTokens := tokens.Load()
if currentTokens < 1 {
return false
}

if tokens.CompareAndSwap(currentTokens, currentTokens-1) {
return true
}
}
}

func (relay *Relay) queryNonces(ctx context.Context, parachainConn *parachain.Connection, gatewayContract *contracts.Gateway) (uint64, uint64, error) {
data, err := types.HexDecodeString("0xc173fac324158e77fb5840738a1a541f633cbec8884c6a601c567d2b376a0539")
if err != nil {
Expand Down

0 comments on commit d6b8f46

Please sign in to comment.