From 4f64b1df86f2a93d63b4620559002138abe1b708 Mon Sep 17 00:00:00 2001 From: Tanveer Gill Date: Mon, 25 Dec 2023 21:54:27 -0800 Subject: [PATCH] Fix token rate update (#3076) --- .../global-token-counter.go | 22 +++++++++++-------- .../example/concurrency_scheduler.ts | 22 +++++++++++++------ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pkg/dmap-funcs/concurrency-limiter/global-token-counter.go b/pkg/dmap-funcs/concurrency-limiter/global-token-counter.go index 2c409b23d5..325691c39d 100644 --- a/pkg/dmap-funcs/concurrency-limiter/global-token-counter.go +++ b/pkg/dmap-funcs/concurrency-limiter/global-token-counter.go @@ -361,10 +361,9 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([ } } - now := time.Now() takeNResp := tokencounterv1.TakeNResponse{ Ok: true, - CheckBackAt: timestamppb.New(now), + CheckBackAt: timestamppb.New(time.Time{}), } // Audit and find requestID in the queued requests @@ -373,6 +372,7 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([ var requestQueued *tokencounterv1.Request var reqIndex int var tokensAhead float64 + now := time.Now() for i, r := range state.RequestsQueued { if isExpired(r, now) { @@ -409,15 +409,18 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([ } else { waitTime = MinimumWaitTime } - + if waitTime < MinimumWaitTime { + waitTime = MinimumWaitTime + } if requestQueued != nil { + if requestQueued.NumRetries >= 5 { + // override wait time if the request has been retried more than 5 times + waitTime = requestQueued.WaitFor.AsDuration() + } requestQueued.NumRetries += 1 if requestQueued.NumRetries%5 == 0 { // Increase wait time exponentially after every 5 re-tries - newWaitTime := requestQueued.WaitFor.AsDuration() * 2 - if newWaitTime > waitTime { - waitTime = newWaitTime - } + waitTime = requestQueued.WaitFor.AsDuration() * 2 } } @@ -549,8 +552,9 @@ func (gtc *GlobalTokenCounter) returnTokens(key string, stateBytes, argBytes []b tokenWindow.Count += 1 if tokenWindow.Count >= TokenRateWindowSize { - if state.TokenRate == 0 { - state.TokenRate = tokenWindow.Sum / tokenWindow.End.AsTime().Sub(tokenWindow.Start.AsTime()).Seconds() + timeElapsed := tokenWindow.End.AsTime().Sub(tokenWindow.Start.AsTime()).Seconds() + if timeElapsed != 0 { + state.TokenRate = tokenWindow.Sum / timeElapsed } tokenWindow.Start = tokenWindow.End tokenWindow.End = nil diff --git a/sdks/aperture-js/example/concurrency_scheduler.ts b/sdks/aperture-js/example/concurrency_scheduler.ts index 8e60d745dc..c52e612dd5 100644 --- a/sdks/aperture-js/example/concurrency_scheduler.ts +++ b/sdks/aperture-js/example/concurrency_scheduler.ts @@ -27,6 +27,7 @@ async function sendRequestForTier( tier: string, priority: number, ) { + console.log(`[${tier} Tier] Sending request with priority ${priority}...`); const flow = await apertureClient.startFlow( "concurrency-scheduling-feature", { @@ -43,25 +44,32 @@ async function sendRequestForTier( if (flow.shouldRun()) { console.log(`[${tier} Tier] Request accepted with priority ${priority}.`); + // sleep for 5 seconds to simulate a long-running request + await new Promise((resolve) => setTimeout(resolve, 5000)); } else { console.log(`[${tier} Tier] Request rejected. Priority was ${priority}.`); } - const flowEndResponse = await flow.end(); + await flow.end(); } +// Launch each batch in parallel async function scheduleRequests(apertureClient: ApertureClient) { const requestsPerBatch = 10; const batchInterval = 1000; // ms - setInterval(() => { + while (true) { console.log("Sending new batch of requests..."); - Object.entries(userTiers).forEach(([tier, priority]) => { - for (let i = 0; i < requestsPerBatch; i++) { - sendRequestForTier(apertureClient, tier, priority); - } + // Send requests for each tier + const promises = Object.entries(userTiers).flatMap(([tier, priority]) => { + return Array(requestsPerBatch) + .fill(null) + .map(() => sendRequestForTier(apertureClient, tier, priority)); }); - }, batchInterval); + + await Promise.all(promises); + await new Promise((resolve) => setTimeout(resolve, batchInterval)); + } } async function main() {