Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
vgeddes committed Dec 5, 2024
1 parent b250f88 commit b5976f2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 51 deletions.
13 changes: 10 additions & 3 deletions relayer/relays/beefy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
OnDemandSync OnDemandSyncConfig `mapstructure:"on-demand-sync"`
}

type SourceConfig struct {
Expand All @@ -27,6 +28,12 @@ type ContractsConfig struct {
Gateway string `mapstructure:"Gateway"`
}

type OnDemandSyncConfig struct {
MaxTokens uint64 `mapstructure:"max-tokens"`
RefillAmount uint64 `mapstructure:"refill-amount"`
RefillPeriod uint64 `mapstructure:"refill-period"`
}

func (c Config) Validate() error {
err := c.Source.Polkadot.Validate()
if err != nil {
Expand All @@ -43,7 +50,7 @@ func (c Config) Validate() error {
return fmt.Errorf("sink contracts setting [BeefyClient] is not set")
}
if c.Sink.Contracts.Gateway == "" {
return fmt.Errorf("sink contracts setting [BeefyClient] is not set")
return fmt.Errorf("sink contracts setting [Gateway] is not set")
}
return nil
}
62 changes: 14 additions & 48 deletions relayer/relays/beefy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package beefy
import (
"context"
"fmt"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -119,10 +118,11 @@ func (relay *Relay) OnDemandSync(ctx context.Context) error {
return fmt.Errorf("create gateway client: %w", err)
}

var tokens atomic.Uint64
tokens.Store(1)

go refiller(ctx, &tokens, 1, 1, time.Minute*60)
tb := NewTokenBucket(ctx,
relay.config.OnDemandSync.MaxTokens,
relay.config.OnDemandSync.RefillAmount,
time.Duration(relay.config.OnDemandSync.RefillPeriod)*time.Second,
)

for {
log.Info("Starting check")
Expand All @@ -137,7 +137,7 @@ func (relay *Relay) OnDemandSync(ctx context.Context) error {
"ethNonce": ethNonce,
}).Info("Nonces checked")

if paraNonce > ethNonce && tryConsume(&tokens) {
if paraNonce > ethNonce && tb.TryConsume(1) {
log.Info("Performing sync")

beefyBlockHash, err := relay.relaychainConn.API().RPC.Beefy.GetFinalizedHead()
Expand All @@ -152,56 +152,22 @@ func (relay *Relay) OnDemandSync(ctx context.Context) error {

relay.doSync(ctx, uint64(header.Number))

if tokens.Load() > 0 {
tokens.Add(^uint64(0))
}

log.Info("Sync completed")

// Sleep for 5 minute to allow message relayer to sync nonces
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second * 300):
}
// Sleep for 10 minute to allow message relayer to sync nonces
sleep(ctx, time.Minute*10)
} else {
// Sleep for 1 minute
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second * 60):
}
sleep(ctx, time.Minute*1)
}

}
}

func refiller(ctx context.Context, tokens *atomic.Uint64, refillAmount uint64, maxTokens uint64, refillPeriod time.Duration) {
ticker := time.NewTicker(refillPeriod)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
currentTokens := tokens.Load()
newTokens := currentTokens + refillAmount
tokens.Store(newTokens)
}
}
}

func tryConsume(tokens *atomic.Uint64) bool {
for {
currentTokens := tokens.Load()
if currentTokens < 1 {
return false
}

if tokens.CompareAndSwap(currentTokens, currentTokens-1) {
return true
}
func sleep(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
return
case <-time.After(d):
}
}

Expand Down
57 changes: 57 additions & 0 deletions relayer/relays/beefy/token-bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package beefy

import (
"context"
"sync/atomic"
"time"
)

type TokenBucket struct {
tokens atomic.Uint64
maxTokens uint64
refillAmount uint64
refillPeriod time.Duration
}

func NewTokenBucket(ctx context.Context, maxTokens, refillAmount uint64, refillPeriod time.Duration) *TokenBucket {
tb := &TokenBucket{
maxTokens: maxTokens,
refillAmount: refillAmount,
refillPeriod: refillPeriod,
}
tb.tokens.Store(maxTokens)
go tb.refiller(ctx)
return tb
}

func (tb *TokenBucket) refiller(ctx context.Context) {
ticker := time.NewTicker(tb.refillPeriod)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
currentTokens := tb.tokens.Load()
newTokens := currentTokens + tb.refillAmount
if newTokens > tb.maxTokens {
newTokens = tb.maxTokens
}
tb.tokens.Store(newTokens)
}
}
}

func (tb *TokenBucket) TryConsume(tokens uint64) bool {
for {
currentTokens := tb.tokens.Load()
if currentTokens < tokens {
return false
}

if tb.tokens.CompareAndSwap(currentTokens, currentTokens-tokens) {
return true
}
}
}

0 comments on commit b5976f2

Please sign in to comment.