Skip to content

Commit

Permalink
fix: daily reprocess job interferes with NPM follower (#731)
Browse files Browse the repository at this point in the history
The daily reprocessing job enqueues 60,000 packages all at once
every day to be reprocessed. This completely overloads the ECS
cluster, with the following consequences:

* On-demand work (like the NPM follower) suffers from being drowned
  out by 60k work items, and our latency on processing package updates
  spikes to 30 minutes or sometimes even more than an hour, triggering
  alarms.
* Since there is no backpressure, we keep on hammering the ECS cluster
  trying to start tasks, retrying until it succeeds. Eventually this
  sometimes still fails and messages end up in the DLQ, triggering
  alarms and requiring human intervention.

This PR mellows the reprocessing driver out a bit, by not having it fill
the worker queue as fast as it can: instead, we will feed the system
work at about the pace we know it can process it, with a little extra
margin for on-demand work. We do that by sleeping in between the redrive
batches.

Based on estimes I've done, it takes on average about 4 minutes to
process a single package. We use 1000 ECS workers to process 60,000
items in 4 hours.

This PR raises the delay between batches of 1000 jobs to 5 minutes (20%
margin). This will make us process the 60k items in 5 hours instead of
4, but at least we won't be triggering as many alarms as we used to.

Backpressure and fair queueing would have been a better solution,
but that requires a more thorough rearchitecting of the system. In the
mean time, this solution will alleviate the worst pressure.

The parameters of the wait calculation are directly available in the
code, and will be easy to change once this is deployed and we can
see how it behaves.


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
rix0rrr authored Jan 27, 2022
1 parent 387cdaa commit 7e0633c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
20 changes: 10 additions & 10 deletions src/__tests__/__snapshots__/construct-hub.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 34 additions & 3 deletions src/backend/ingestion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { RetentionDays } from '@aws-cdk/aws-logs';
import { BlockPublicAccess, IBucket } from '@aws-cdk/aws-s3';
import { BucketDeployment, Source } from '@aws-cdk/aws-s3-deployment';
import { IQueue, Queue, QueueEncryption } from '@aws-cdk/aws-sqs';
import { StateMachine, JsonPath, Choice, Succeed, Condition, Map, TaskInput, IntegrationPattern } from '@aws-cdk/aws-stepfunctions';
import { StateMachine, JsonPath, Choice, Succeed, Condition, Map, TaskInput, IntegrationPattern, Wait, WaitTime } from '@aws-cdk/aws-stepfunctions';
import { CallAwsService, LambdaInvoke, StepFunctionsStartExecution } from '@aws-cdk/aws-stepfunctions-tasks';
import { Construct, Duration, Stack, ArnFormat } from '@aws-cdk/core';
import { Repository } from '../../codeartifact/repository';
Expand Down Expand Up @@ -386,7 +386,13 @@ class ReprocessIngestionWorkflow extends Construct {
new Choice(this, 'Is there more?')
.when(
Condition.isPresent('$.response.NextContinuationToken'),
new StepFunctionsStartExecution(this, 'Continue as new', {

new Wait(this, 'Give room for on-demand work', {
// Sleep a little before enqueuing the next batch, so that we leave room in the worker
// pool for handling on-demand work. If we don't do this, 60k items will be queued at
// once and live updates from NPM will struggle to get in in a reasonable time.
time: WaitTime.duration(waitTimeBetweenReprocessBatches()),
}).next(new StepFunctionsStartExecution(this, 'Continue as new', {
associateWithParent: true,
stateMachine: StateMachine.fromStateMachineArn(this, 'ThisStateMachine', Stack.of(this).formatArn({
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
Expand All @@ -397,7 +403,7 @@ class ReprocessIngestionWorkflow extends Construct {
input: TaskInput.fromObject({ ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken') }),
integrationPattern: IntegrationPattern.REQUEST_RESPONSE,
resultPath: JsonPath.DISCARD,
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] }),
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })),
).afterwards({ includeOtherwise: true })
.next(process),
);
Expand All @@ -422,3 +428,28 @@ function stateMachineNameFrom(nodePath: string): string {
// Poor man's replace all...
return nodePath.split(/[^a-z0-9+!@.()=_'-]+/i).join('.');
}

/**
* The time we wait between enqueueing different batches of the reprocessing machine
*/
function waitTimeBetweenReprocessBatches() {
// Average time per ECS task. We don've have statistics on this, but
// can be roughly derived from:

// Every day we process about 60k items with 1000 workers in 4 hours.
// 4 hours / (60_000 / 1000) ~= 4 minutes
const avgTimePerTask = Duration.minutes(4);

// How many objects are returned by 'listObjectsV2', per call
const batchSize = 1000;

// How many workers we have at our disposal
const workers = 1000;

// What fraction of capacity [0..1) we want to keep available for on-demand
// work, while reprocessing.
const marginFrac = 0.2;

const seconds = (avgTimePerTask.toSeconds() / (1 - marginFrac)) * (batchSize / workers);
return Duration.seconds(Math.floor(seconds));
}

0 comments on commit 7e0633c

Please sign in to comment.