From 7e0633c1b04f09d763d87782a9498847dabedf80 Mon Sep 17 00:00:00 2001 From: Rico Huijbers Date: Thu, 27 Jan 2022 18:26:52 +0100 Subject: [PATCH] fix: daily reprocess job interferes with NPM follower (#731) The daily reprocessing job enqueues 60,000 packages all at once every day to be reprocessed. This completely overloads the ECS cluster, with the following consequences: * On-demand work (like the NPM follower) suffers from being drowned out by 60k work items, and our latency on processing package updates spikes to 30 minutes or sometimes even more than an hour, triggering alarms. * Since there is no backpressure, we keep on hammering the ECS cluster trying to start tasks, retrying until it succeeds. Eventually this sometimes still fails and messages end up in the DLQ, triggering alarms and requiring human intervention. This PR mellows the reprocessing driver out a bit, by not having it fill the worker queue as fast as it can: instead, we will feed the system work at about the pace we know it can process it, with a little extra margin for on-demand work. We do that by sleeping in between the redrive batches. Based on estimes I've done, it takes on average about 4 minutes to process a single package. We use 1000 ECS workers to process 60,000 items in 4 hours. This PR raises the delay between batches of 1000 jobs to 5 minutes (20% margin). This will make us process the 60k items in 5 hours instead of 4, but at least we won't be triggering as many alarms as we used to. Backpressure and fair queueing would have been a better solution, but that requires a more thorough rearchitecting of the system. In the mean time, this solution will alleviate the worst pressure. The parameters of the wait calculation are directly available in the code, and will be easy to change once this is deployed and we can see how it behaves. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- .../__snapshots__/construct-hub.test.ts.snap | 20 +++++----- .../__snapshots__/snapshot.test.ts.snap | 9 +++-- src/backend/ingestion/index.ts | 37 +++++++++++++++++-- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/__tests__/__snapshots__/construct-hub.test.ts.snap b/src/__tests__/__snapshots__/construct-hub.test.ts.snap index c2531067d..5b1e2fcd2 100644 --- a/src/__tests__/__snapshots__/construct-hub.test.ts.snap +++ b/src/__tests__/__snapshots__/construct-hub.test.ts.snap @@ -2885,7 +2885,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "ConstructHubPackageDataDC5EF35E", }, - "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Continue as new\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", + "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Give room for on-demand work\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", Object { "Ref": "AWS::Partition", }, @@ -2920,7 +2920,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "AWS::AccountId", }, - ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}}},\\"TimeoutSeconds\\":3600}", + ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}},\\"Give room for on-demand work\\":{\\"Type\\":\\"Wait\\",\\"Seconds\\":300,\\"Next\\":\\"Continue as new\\"}},\\"TimeoutSeconds\\":3600}", ], ], }, @@ -14390,7 +14390,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "ConstructHubPackageDataDC5EF35E", }, - "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Continue as new\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", + "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Give room for on-demand work\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", Object { "Ref": "AWS::Partition", }, @@ -14425,7 +14425,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "AWS::AccountId", }, - ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}}},\\"TimeoutSeconds\\":3600}", + ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}},\\"Give room for on-demand work\\":{\\"Type\\":\\"Wait\\",\\"Seconds\\":300,\\"Next\\":\\"Continue as new\\"}},\\"TimeoutSeconds\\":3600}", ], ], }, @@ -25729,7 +25729,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "ConstructHubPackageDataDC5EF35E", }, - "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Continue as new\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", + "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Give room for on-demand work\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", Object { "Ref": "AWS::Partition", }, @@ -25764,7 +25764,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "AWS::AccountId", }, - ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}}},\\"TimeoutSeconds\\":3600}", + ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}},\\"Give room for on-demand work\\":{\\"Type\\":\\"Wait\\",\\"Seconds\\":300,\\"Next\\":\\"Continue as new\\"}},\\"TimeoutSeconds\\":3600}", ], ], }, @@ -37098,7 +37098,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "ConstructHubPackageDataDC5EF35E", }, - "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Continue as new\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", + "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Give room for on-demand work\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", Object { "Ref": "AWS::Partition", }, @@ -37133,7 +37133,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "AWS::AccountId", }, - ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}}},\\"TimeoutSeconds\\":3600}", + ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}},\\"Give room for on-demand work\\":{\\"Type\\":\\"Wait\\",\\"Seconds\\":300,\\"Next\\":\\"Continue as new\\"}},\\"TimeoutSeconds\\":3600}", ], ], }, @@ -48693,7 +48693,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "ConstructHubPackageDataDC5EF35E", }, - "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Continue as new\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", + "\\",\\"Prefix\\":\\"data/\\"}},\\"Is there more?\\":{\\"Type\\":\\"Choice\\",\\"Choices\\":[{\\"Variable\\":\\"$.response.NextContinuationToken\\",\\"IsPresent\\":true,\\"Next\\":\\"Give room for on-demand work\\"}],\\"Default\\":\\"Process Result\\"},\\"S3.ListObjectsV2(NextPage)\\":{\\"Next\\":\\"Is there more?\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"S3.SdkClientException\\"]}],\\"Type\\":\\"Task\\",\\"ResultPath\\":\\"$.response\\",\\"Resource\\":\\"arn:", Object { "Ref": "AWS::Partition", }, @@ -48728,7 +48728,7 @@ Direct link to the function: /lambda/home#/functions/", Object { "Ref": "AWS::AccountId", }, - ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}}},\\"TimeoutSeconds\\":3600}", + ":stateMachine:Test.ConstructHub.Ingestion.ReprocessWorkflow\\"}},\\"Give room for on-demand work\\":{\\"Type\\":\\"Wait\\",\\"Seconds\\":300,\\"Next\\":\\"Continue as new\\"}},\\"TimeoutSeconds\\":3600}", ], ], }, diff --git a/src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap b/src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap index 9457f87b5..edb805bf8 100644 --- a/src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap +++ b/src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap @@ -4843,8 +4843,8 @@ Resources: - :states:::aws-sdk:s3:listObjectsV2","Parameters":{"Bucket":" - Ref: ConstructHubPackageDataDC5EF35E - '","Prefix":"data/"}},"Is there - more?":{"Type":"Choice","Choices":[{"Variable":"$.response.NextContinuationToken","IsPresent":true,"Next":"Continue - as new"}],"Default":"Process + more?":{"Type":"Choice","Choices":[{"Variable":"$.response.NextContinuationToken","IsPresent":true,"Next":"Give + room for on-demand work"}],"Default":"Process Result"},"S3.ListObjectsV2(NextPage)":{"Next":"Is there more?","Retry":[{"ErrorEquals":["S3.SdkClientException"]}],"Type":"Task","ResultPath":"$.response","Resource":"arn:' - Ref: AWS::Partition @@ -4872,7 +4872,10 @@ Resources: - Ref: AWS::Region - ":" - Ref: AWS::AccountId - - :stateMachine:dev.ConstructHub.Ingestion.ReprocessWorkflow"}}},"TimeoutSeconds":3600} + - :stateMachine:dev.ConstructHub.Ingestion.ReprocessWorkflow"}},"Give + room for on-demand + work":{"Type":"Wait","Seconds":300,"Next":"Continue as + new"}},"TimeoutSeconds":3600} StateMachineName: dev.ConstructHub.Ingestion.ReprocessWorkflow DependsOn: - ConstructHubIngestionReprocessWorkflowStateMachineRoleDefaultPolicy3A21E747 diff --git a/src/backend/ingestion/index.ts b/src/backend/ingestion/index.ts index 7917309fc..9b0bcb823 100644 --- a/src/backend/ingestion/index.ts +++ b/src/backend/ingestion/index.ts @@ -8,7 +8,7 @@ import { RetentionDays } from '@aws-cdk/aws-logs'; import { BlockPublicAccess, IBucket } from '@aws-cdk/aws-s3'; import { BucketDeployment, Source } from '@aws-cdk/aws-s3-deployment'; import { IQueue, Queue, QueueEncryption } from '@aws-cdk/aws-sqs'; -import { StateMachine, JsonPath, Choice, Succeed, Condition, Map, TaskInput, IntegrationPattern } from '@aws-cdk/aws-stepfunctions'; +import { StateMachine, JsonPath, Choice, Succeed, Condition, Map, TaskInput, IntegrationPattern, Wait, WaitTime } from '@aws-cdk/aws-stepfunctions'; import { CallAwsService, LambdaInvoke, StepFunctionsStartExecution } from '@aws-cdk/aws-stepfunctions-tasks'; import { Construct, Duration, Stack, ArnFormat } from '@aws-cdk/core'; import { Repository } from '../../codeartifact/repository'; @@ -386,7 +386,13 @@ class ReprocessIngestionWorkflow extends Construct { new Choice(this, 'Is there more?') .when( Condition.isPresent('$.response.NextContinuationToken'), - new StepFunctionsStartExecution(this, 'Continue as new', { + + new Wait(this, 'Give room for on-demand work', { + // Sleep a little before enqueuing the next batch, so that we leave room in the worker + // pool for handling on-demand work. If we don't do this, 60k items will be queued at + // once and live updates from NPM will struggle to get in in a reasonable time. + time: WaitTime.duration(waitTimeBetweenReprocessBatches()), + }).next(new StepFunctionsStartExecution(this, 'Continue as new', { associateWithParent: true, stateMachine: StateMachine.fromStateMachineArn(this, 'ThisStateMachine', Stack.of(this).formatArn({ arnFormat: ArnFormat.COLON_RESOURCE_NAME, @@ -397,7 +403,7 @@ class ReprocessIngestionWorkflow extends Construct { input: TaskInput.fromObject({ ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken') }), integrationPattern: IntegrationPattern.REQUEST_RESPONSE, resultPath: JsonPath.DISCARD, - }).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] }), + }).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })), ).afterwards({ includeOtherwise: true }) .next(process), ); @@ -422,3 +428,28 @@ function stateMachineNameFrom(nodePath: string): string { // Poor man's replace all... return nodePath.split(/[^a-z0-9+!@.()=_'-]+/i).join('.'); } + +/** + * The time we wait between enqueueing different batches of the reprocessing machine + */ +function waitTimeBetweenReprocessBatches() { + // Average time per ECS task. We don've have statistics on this, but + // can be roughly derived from: + + // Every day we process about 60k items with 1000 workers in 4 hours. + // 4 hours / (60_000 / 1000) ~= 4 minutes + const avgTimePerTask = Duration.minutes(4); + + // How many objects are returned by 'listObjectsV2', per call + const batchSize = 1000; + + // How many workers we have at our disposal + const workers = 1000; + + // What fraction of capacity [0..1) we want to keep available for on-demand + // work, while reprocessing. + const marginFrac = 0.2; + + const seconds = (avgTimePerTask.toSeconds() / (1 - marginFrac)) * (batchSize / workers); + return Duration.seconds(Math.floor(seconds)); +} \ No newline at end of file