Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Refactor global behavior and functional tests for stability.
Browse files Browse the repository at this point in the history
- Simplify passing of request time across layers.
- Better handling of metrics in tests.
- Better detection of global broadcasts, global updates, and idle.
- Drop redundant metric `guberator_global_broadcast_counter`.
- Fix metric `gubernator_global_queue_length` for global broadcast.
- Add metric `gubernator_global_send_queue_length` for global send.
  • Loading branch information
Baliedge committed Mar 11, 2024
1 parent cb3816a commit e2b8853
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 563 deletions.
54 changes: 27 additions & 27 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

Expand Down Expand Up @@ -100,7 +100,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
s.Remove(ctx, hashKey)
}

return tokenBucketNewItem(ctx, s, c, r, requestTime)
return tokenBucketNewItem(ctx, s, c, r)
}

// Update the limit if it changed.
Expand Down Expand Up @@ -133,12 +133,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
}

// If our new duration means we are currently expired.
now := EpochMillis(requestTime)
if expire <= now {
requestTime := *r.RequestTime
if expire <= requestTime {
// Renew item.
span.AddEvent("Limit has expired")
expire = now + r.Duration
t.CreatedAt = now
expire = requestTime + r.Duration
t.CreatedAt = requestTime
t.Remaining = t.Limit
}

Expand Down Expand Up @@ -196,19 +196,19 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
}

// Item is not found in cache or store, create new.
return tokenBucketNewItem(ctx, s, c, r, requestTime)
return tokenBucketNewItem(ctx, s, c, r)
}

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
now := EpochMillis(requestTime)
expire := now + r.Duration
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
requestTime := *r.RequestTime
expire := requestTime + r.Duration

t := &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
CreatedAt: requestTime,
}

// Add a new rate limit to the cache.
Expand Down Expand Up @@ -252,15 +252,15 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
}

now := EpochMillis(requestTime)
requestTime := *r.RequestTime

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -309,7 +309,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
s.Remove(ctx, hashKey)
}

return leakyBucketNewItem(ctx, s, c, r, requestTime)
return leakyBucketNewItem(ctx, s, c, r)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -349,16 +349,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
}

if r.Hits != 0 {
c.UpdateExpiration(r.HashKey(), now+duration)
c.UpdateExpiration(r.HashKey(), requestTime+duration)
}

// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := now - b.UpdatedAt
elapsed := requestTime - b.UpdatedAt
leak := float64(elapsed) / rate

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = now
b.UpdatedAt = requestTime
}

if int64(b.Remaining) > b.Burst {
Expand All @@ -369,7 +369,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + (b.Limit-int64(b.Remaining))*int64(rate),
ResetTime: requestTime + (b.Limit-int64(b.Remaining))*int64(rate),
}

// TODO: Feature missing: check for Duration change between item/request.
Expand All @@ -391,7 +391,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request
if int64(b.Remaining) == r.Hits {
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

Expand All @@ -417,16 +417,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, request

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

return leakyBucketNewItem(ctx, s, c, r, requestTime)
return leakyBucketNewItem(ctx, s, c, r)
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
now := EpochMillis(requestTime)
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
requestTime := *r.RequestTime
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand All @@ -445,28 +445,28 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
Remaining: float64(r.Burst - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: now,
UpdatedAt: requestTime,
Burst: r.Burst,
}

rl := RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
ResetTime: now + (b.Limit-(r.Burst-r.Hits))*int64(rate),
ResetTime: requestTime + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// Client could be requesting that we start with the bucket OVER_LIMIT
if r.Hits > r.Burst {
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

item := &CacheItem{
ExpireAt: now + duration,
ExpireAt: requestTime + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
Expand Down
Loading

0 comments on commit e2b8853

Please sign in to comment.