From 59ebf5d91626927619d20e0656b2ed2ec538c0e4 Mon Sep 17 00:00:00 2001 From: Josh Wulf Date: Thu, 23 Jan 2025 17:20:27 +1300 Subject: [PATCH] fix(zeebe): propagate 'NOT_FOUND' error on job.complete(). fixes #351 --- src/__tests__/modeler/modeler.unit.spec.ts | 1 - .../testdata/job-complete-error-test.bpmn | 48 +++++++++++++++ .../zeebe/integration/Worker-Failure.spec.ts | 10 ++-- .../integration/Worker-integration.spec.ts | 58 ++++++++++++++++++- src/zeebe/lib/ZBWorkerBase.ts | 2 +- 5 files changed, 112 insertions(+), 7 deletions(-) create mode 100644 src/__tests__/testdata/job-complete-error-test.bpmn diff --git a/src/__tests__/modeler/modeler.unit.spec.ts b/src/__tests__/modeler/modeler.unit.spec.ts index 580f8884..85797b79 100644 --- a/src/__tests__/modeler/modeler.unit.spec.ts +++ b/src/__tests__/modeler/modeler.unit.spec.ts @@ -17,7 +17,6 @@ test('Constructor does not throws without base url', () => { }) expect(m).toBeTruthy() } catch (e) { - console.log(e) expect((e as Error).message.includes('Missing')).toBe(true) } expect(thrown).toBe(false) diff --git a/src/__tests__/testdata/job-complete-error-test.bpmn b/src/__tests__/testdata/job-complete-error-test.bpmn new file mode 100644 index 00000000..5045ff2e --- /dev/null +++ b/src/__tests__/testdata/job-complete-error-test.bpmn @@ -0,0 +1,48 @@ + + + + + Flow_1ag9e6h + + + + Flow_116nk1q + + + + + + + Flow_1ag9e6h + Flow_116nk1q + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts index c8d17757..348982b8 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts @@ -118,14 +118,16 @@ test('Does not fail a process when the handler throws, by default', async () => taskType: 'console-log-worker-failure-2', taskHandler: async (job) => { if (alreadyFailed) { - await zbc.cancelProcessInstance(wf!.processInstanceKey) // throws if not found. Should NOT throw in this test - job.complete() - return w.close().then(() => resolve(null)) + // The job was failed and reactivated, so we can cancel the process instance + await zbc.cancelProcessInstance(wf!.processInstanceKey) // throws if process instance is not found. Should NOT throw in this test + resolve(null) + w.close() + return job.forward() } alreadyFailed = true throw new Error( 'Unhandled exception in task handler for testing purposes' - ) // Will be caught in the library + ) // Will be caught in the library, and should fail the job, allowing it to be reactivated by this worker }, pollInterval: 10000, }) diff --git a/src/__tests__/zeebe/integration/Worker-integration.spec.ts b/src/__tests__/zeebe/integration/Worker-integration.spec.ts index bc0b1626..80723e35 100644 --- a/src/__tests__/zeebe/integration/Worker-integration.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-integration.spec.ts @@ -1,9 +1,11 @@ +import { JOB_ACTION_ACKNOWLEDGEMENT } from 'zeebe/types' + import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' import { ZeebeGrpcClient } from '../../../zeebe' import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0' -jest.setTimeout(30000) +jest.setTimeout(120000) suppressZeebeLogging() const zbc = new ZeebeGrpcClient() @@ -11,9 +13,11 @@ let wf: CreateProcessInstanceResponse | undefined let processDefinitionKey1: string let processDefinitionKey2: string let processDefinitionKey3: string +let processDefinitionKey4: string let bpmnProcessId1: string let bpmnProcessId2: string let bpmnProcessId3: string +let bpmnProcessId4: string beforeAll(async () => { const res1 = await zbc.deployResource({ @@ -40,6 +44,13 @@ beforeAll(async () => { bpmnProcessId: bpmnProcessId3, } = res3.deployments[0].process) await cancelProcesses(processDefinitionKey3) + const res4 = await zbc.deployResource({ + processFilename: './src/__tests__/testdata/job-complete-error-test.bpmn', + }) + ;({ + processDefinitionKey: processDefinitionKey4, + bpmnProcessId: bpmnProcessId4, + } = res4.deployments[0].process) }) afterEach(async () => { @@ -53,6 +64,7 @@ afterAll(async () => { await cancelProcesses(processDefinitionKey1) await cancelProcesses(processDefinitionKey2) await cancelProcesses(processDefinitionKey3) + await cancelProcesses(processDefinitionKey4) restoreZeebeLogging() }) @@ -98,6 +110,50 @@ test('Can service a task with complete.success', (done) => { }) }) +test('An already completed job will throw NOT_FOUND if another worker invocation tries to complete it', (done) => { + let alreadyActivated = false + let threw = false + const jobTimeout = 30000 // The job is made available for reactivation after this time + const jobDuration = 40000 // The job takes this long to complete + const secondWorkerDuration = jobDuration - jobTimeout + 5000 // The second worker will try to complete the job after this time + zbc + .createProcessInstance({ + bpmnProcessId: bpmnProcessId4, + variables: {}, + }) + .then((res) => { + wf = res + zbc.createWorker({ + taskType: 'job-complete-error', + taskHandler: async (job) => { + const delay = alreadyActivated ? secondWorkerDuration : jobDuration + const shouldThrow = alreadyActivated + alreadyActivated = true + expect(job.processInstanceKey).toBe(wf?.processInstanceKey) + let res: JOB_ACTION_ACKNOWLEDGEMENT = 'JOB_ACTION_ACKNOWLEDGEMENT' + try { + await new Promise((resolve) => + setTimeout(() => resolve(null), delay) + ) + res = await job.complete(job.variables) + if (shouldThrow) { + throw new Error('Should have thrown NOT_FOUND') + } + return res + } catch (e: unknown) { + expect((e as Error).message.includes('NOT_FOUND')).toBe(true) + threw = true + done(null) + } + expect(shouldThrow).toBe(threw) + return res + }, + loglevel: 'NONE', + timeout: jobTimeout, + }) + }) +}) + test('Can update process variables with complete.success()', async () => { wf = await zbc.createProcessInstance({ bpmnProcessId: bpmnProcessId3, diff --git a/src/zeebe/lib/ZBWorkerBase.ts b/src/zeebe/lib/ZBWorkerBase.ts index 23c5ff67..20598230 100644 --- a/src/zeebe/lib/ZBWorkerBase.ts +++ b/src/zeebe/lib/ZBWorkerBase.ts @@ -416,7 +416,7 @@ You should call only one job action method in the worker handler. This is a bug this.logger.logDebug( `Completing job ${jobKey} for ${this.taskType} threw ${e.message}` ) - return e + throw e }) .then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT) .finally(() => {