Skip to content

Commit

Permalink
Throw an error for concurrent waits (with nice docs link) (#1618)
Browse files Browse the repository at this point in the history
* WIP preventing concurrent waits, throw an error

* Added ConcurrentWaitError (not retryable)

* Move preventMultipleWaits out of the RuntimeAPI

* Added preventMultipleWaits to the devRuntimeManager

* Added throwable InternalError. Plus new TASK_DID_CONCURRENT_WAIT code

* Docs link for troubleshooting concurrent waits

* Docs for troubleshooting concurrent waits

* preventMultipleWaits function

* Added TASK_DID_CONCURRENT_WAIT code

* Deal with InternalErrors that skipRetrying

* Added preventMultipleWaits to prod
  • Loading branch information
matt-aitken authored Jan 16, 2025
1 parent 2ad664d commit dd321e3
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 79 deletions.
15 changes: 15 additions & 0 deletions docs/troubleshooting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ Your code is deployed separately from the rest of your app(s) so you need to mak

Prisma uses code generation to create the client from your schema file. This means you need to add a bit of config so we can generate this file before your tasks run: [Read the guide](/config/config-file#prisma).

### `Parallel waits are not supported`

In the current version, you can't perform more that one "wait" in parallel.

Waits include:
- `wait.for()`
- `wait.until()`
- `task.triggerAndWait()`
- `task.batchTriggerAndWait()`
- And any of our functions with `wait` in the name.

This restriction exists because we suspend the task server after a wait, and resume it when the wait is done. At the moment, if you do more than one wait, the run will never continue when deployed, so we throw this error instead.

The most common situation this happens is if you're using `Promise.all` around some of our wait functions. Instead of doing this use our built-in functions for [triggering tasks](/triggering#triggering-from-inside-another-task). We have functions that allow you to trigger different tasks in parallel.

### When triggering subtasks the parent task finishes too soon

Make sure that you always use `await` when you call `trigger`, `triggerAndWait`, `batchTrigger`, and `batchTriggerAndWait`. If you don't then it's likely the task(s) won't be triggered because the calling function process can be terminated before the networks calls are sent.
Expand Down
49 changes: 49 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,37 @@ import { links } from "./links.js";
import { ExceptionEventProperties } from "./schemas/openTelemetry.js";
import { assertExhaustive } from "../utils.js";

/**
* If you throw this, it will get converted into an INTERNAL_ERROR
*/
export class InternalError extends Error {
public readonly code: TaskRunErrorCodes;
public readonly skipRetrying: boolean;

constructor({
code,
message,
showStackTrace = true,
skipRetrying = false,
}: {
code: TaskRunErrorCodes;
message?: string;
showStackTrace?: boolean;
skipRetrying?: boolean;
}) {
super(`${code}: ${message ?? "No message"}`);
this.name = "InternalError";
this.code = code;
this.message = message ?? "InternalError";

if (!showStackTrace) {
this.stack = undefined;
}

this.skipRetrying = skipRetrying;
}
}

export class AbortTaskRunError extends Error {
constructor(message: string) {
super(message);
Expand All @@ -32,6 +63,15 @@ export class TaskPayloadParsedError extends Error {
}

export function parseError(error: unknown): TaskRunError {
if (error instanceof InternalError) {
return {
type: "INTERNAL_ERROR",
code: error.code,
message: error.message,
stackTrace: error.stack ?? "",
};
}

if (error instanceof Error) {
return {
type: "BUILT_IN_ERROR",
Expand Down Expand Up @@ -168,6 +208,7 @@ export function shouldRetryError(error: TaskRunError): boolean {
case "DISK_SPACE_EXCEEDED":
case "TASK_RUN_HEARTBEAT_TIMEOUT":
case "OUTDATED_SDK_VERSION":
case "TASK_DID_CONCURRENT_WAIT":
return false;

case "GRACEFUL_EXIT_TIMEOUT":
Expand Down Expand Up @@ -437,6 +478,14 @@ const prettyInternalErrors: Partial<
href: links.docs.upgrade.beta,
},
},
TASK_DID_CONCURRENT_WAIT: {
message:
"Parallel waits are not supported, e.g. using Promise.all() around our wait functions.",
link: {
name: "Read the docs for solutions",
href: links.docs.troubleshooting.concurrentWaits,
},
},
};

const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => {
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/v3/links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export const links = {
upgrade: {
beta: "https://trigger.dev/docs/upgrading-beta",
},
troubleshooting: {
concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported",
},
},
site: {
home: "https://trigger.dev",
Expand Down
73 changes: 40 additions & 33 deletions packages/core/src/v3/runtime/devRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from "../schemas/index.js";
import { unboundedTimeout } from "../utils/timers.js";
import { RuntimeManager } from "./manager.js";
import { preventMultipleWaits } from "./preventMultipleWaits.js";

export class DevRuntimeManager implements RuntimeManager {
_taskWaits: Map<string, { resolve: (value: TaskRunExecutionResult) => void }> = new Map();
Expand All @@ -17,71 +18,77 @@ export class DevRuntimeManager implements RuntimeManager {

_pendingCompletionNotifications: Map<string, TaskRunExecutionResult> = new Map();

_preventMultipleWaits = preventMultipleWaits();

disable(): void {
// do nothing
}

async waitForDuration(ms: number): Promise<void> {
await unboundedTimeout(ms);
await this._preventMultipleWaits(() => unboundedTimeout(ms));
}

async waitUntil(date: Date): Promise<void> {
return this.waitForDuration(date.getTime() - Date.now());
}

async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);
return this._preventMultipleWaits(async () => {
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);

if (pendingCompletion) {
this._pendingCompletionNotifications.delete(params.id);
if (pendingCompletion) {
this._pendingCompletionNotifications.delete(params.id);

return pendingCompletion;
}
return pendingCompletion;
}

const promise = new Promise<TaskRunExecutionResult>((resolve) => {
this._taskWaits.set(params.id, { resolve });
});
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
this._taskWaits.set(params.id, { resolve });
});

await this.#tryFlushMetadata();
await this.#tryFlushMetadata();

return await promise;
return await promise;
});
}

async waitForBatch(params: {
id: string;
runs: string[];
ctx: TaskRunContext;
}): Promise<BatchTaskRunExecutionResult> {
if (!params.runs.length) {
return Promise.resolve({ id: params.id, items: [] });
}
return this._preventMultipleWaits(async () => {
if (!params.runs.length) {
return Promise.resolve({ id: params.id, items: [] });
}

const promise = Promise.all(
params.runs.map((runId) => {
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
const pendingCompletion = this._pendingCompletionNotifications.get(runId);
const promise = Promise.all(
params.runs.map((runId) => {
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
const pendingCompletion = this._pendingCompletionNotifications.get(runId);

if (pendingCompletion) {
this._pendingCompletionNotifications.delete(runId);
if (pendingCompletion) {
this._pendingCompletionNotifications.delete(runId);

resolve(pendingCompletion);
resolve(pendingCompletion);

return;
}
return;
}

this._taskWaits.set(runId, { resolve });
});
})
);
this._taskWaits.set(runId, { resolve });
});
})
);

await this.#tryFlushMetadata();
await this.#tryFlushMetadata();

const results = await promise;
const results = await promise;

return {
id: params.id,
items: results,
};
return {
id: params.id,
items: results,
};
});
}

resumeTask(completion: TaskRunExecutionResult, runId: string): void {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/v3/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import { usage } from "../usage-api.js";

const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager();

/**
* All state must be inside the RuntimeManager, do NOT store it on this class.
* This is because of the "dual package hazard", this can be bundled multiple times.
*/
export class RuntimeAPI {
private static _instance?: RuntimeAPI;

Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/v3/runtime/preventMultipleWaits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { InternalError } from "../errors.js";
import { TaskRunErrorCodes } from "../schemas/common.js";

const concurrentWaitErrorMessage =
"Parallel waits are not supported, e.g. using Promise.all() around our wait functions.";

export function preventMultipleWaits() {
let isExecutingWait = false;

return async <T>(cb: () => Promise<T>): Promise<T> => {
if (isExecutingWait) {
console.error(concurrentWaitErrorMessage);
throw new InternalError({
code: TaskRunErrorCodes.TASK_DID_CONCURRENT_WAIT,
message: concurrentWaitErrorMessage,
skipRetrying: true,
showStackTrace: false,
});
}

isExecutingWait = true;

try {
return await cb();
} finally {
isExecutingWait = false;
}
};
}
Loading

0 comments on commit dd321e3

Please sign in to comment.