Skip to content

Commit

Permalink
Fix token rate update (#3076)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanveergill authored Dec 26, 2023
1 parent f9c5869 commit 4f64b1d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
22 changes: 13 additions & 9 deletions pkg/dmap-funcs/concurrency-limiter/global-token-counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,9 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([
}
}

now := time.Now()
takeNResp := tokencounterv1.TakeNResponse{
Ok: true,
CheckBackAt: timestamppb.New(now),
CheckBackAt: timestamppb.New(time.Time{}),
}

// Audit and find requestID in the queued requests
Expand All @@ -373,6 +372,7 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([
var requestQueued *tokencounterv1.Request
var reqIndex int
var tokensAhead float64
now := time.Now()

for i, r := range state.RequestsQueued {
if isExpired(r, now) {
Expand Down Expand Up @@ -409,15 +409,18 @@ func (gtc *GlobalTokenCounter) takeN(key string, stateBytes, argBytes []byte) ([
} else {
waitTime = MinimumWaitTime
}

if waitTime < MinimumWaitTime {
waitTime = MinimumWaitTime
}
if requestQueued != nil {
if requestQueued.NumRetries >= 5 {
// override wait time if the request has been retried more than 5 times
waitTime = requestQueued.WaitFor.AsDuration()
}
requestQueued.NumRetries += 1
if requestQueued.NumRetries%5 == 0 {
// Increase wait time exponentially after every 5 re-tries
newWaitTime := requestQueued.WaitFor.AsDuration() * 2
if newWaitTime > waitTime {
waitTime = newWaitTime
}
waitTime = requestQueued.WaitFor.AsDuration() * 2
}
}

Expand Down Expand Up @@ -549,8 +552,9 @@ func (gtc *GlobalTokenCounter) returnTokens(key string, stateBytes, argBytes []b
tokenWindow.Count += 1

if tokenWindow.Count >= TokenRateWindowSize {
if state.TokenRate == 0 {
state.TokenRate = tokenWindow.Sum / tokenWindow.End.AsTime().Sub(tokenWindow.Start.AsTime()).Seconds()
timeElapsed := tokenWindow.End.AsTime().Sub(tokenWindow.Start.AsTime()).Seconds()
if timeElapsed != 0 {
state.TokenRate = tokenWindow.Sum / timeElapsed
}
tokenWindow.Start = tokenWindow.End
tokenWindow.End = nil
Expand Down
22 changes: 15 additions & 7 deletions sdks/aperture-js/example/concurrency_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async function sendRequestForTier(
tier: string,
priority: number,
) {
console.log(`[${tier} Tier] Sending request with priority ${priority}...`);
const flow = await apertureClient.startFlow(
"concurrency-scheduling-feature",
{
Expand All @@ -43,25 +44,32 @@ async function sendRequestForTier(

if (flow.shouldRun()) {
console.log(`[${tier} Tier] Request accepted with priority ${priority}.`);
// sleep for 5 seconds to simulate a long-running request
await new Promise((resolve) => setTimeout(resolve, 5000));
} else {
console.log(`[${tier} Tier] Request rejected. Priority was ${priority}.`);
}

const flowEndResponse = await flow.end();
await flow.end();
}

// Launch each batch in parallel
async function scheduleRequests(apertureClient: ApertureClient) {
const requestsPerBatch = 10;
const batchInterval = 1000; // ms

setInterval(() => {
while (true) {
console.log("Sending new batch of requests...");
Object.entries(userTiers).forEach(([tier, priority]) => {
for (let i = 0; i < requestsPerBatch; i++) {
sendRequestForTier(apertureClient, tier, priority);
}
// Send requests for each tier
const promises = Object.entries(userTiers).flatMap(([tier, priority]) => {
return Array(requestsPerBatch)
.fill(null)
.map(() => sendRequestForTier(apertureClient, tier, priority));
});
}, batchInterval);

await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, batchInterval));
}
}

async function main() {
Expand Down

0 comments on commit 4f64b1d

Please sign in to comment.