Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

perf: Fanout for async updates in global #198

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type BehaviorConfig struct {
GlobalBatchLimit int
// ForceGlobal forces global mode on all rate limit checks.
ForceGlobal bool

// Number of concurrent requests that will be made to peers. Defaults to 100
GlobalPeerRequestsConcurrency int
}

// Config for a gubernator instance
Expand Down Expand Up @@ -126,6 +129,8 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize)
setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100)

setter.SetDefault(&c.Behaviors.GlobalPeerRequestsConcurrency, 100)

setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))

Expand Down
48 changes: 29 additions & 19 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,23 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
}
}

fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency)
// Send the rate limit requests to their respective owning peers.
for _, p := range peerRequests {
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
continue
}
fan.Run(func(in interface{}) error {
p := in.(*pair)
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
}
return nil
}, p)
}
fan.Wait()
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
Expand Down Expand Up @@ -232,24 +237,29 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
})
}

fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency)
for _, peer := range gm.instance.GetPeerList() {
// Exclude ourselves from the update
if peer.Info().IsOwner {
continue
}

ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
fan.Run(func(in interface{}) error {
peer := in.(*PeerClient)
ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
continue
}
return nil
}, peer)
}
fan.Wait()
}

func (gm *globalManager) Close() {
Expand Down
Loading