Skip to content

Commit

Permalink
Merge pull request #352 from camunda/fix-351
Browse files Browse the repository at this point in the history
Fix-351
  • Loading branch information
jwulf authored Jan 23, 2025
2 parents 39cc399 + 59ebf5d commit 4498144
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 7 deletions.
1 change: 0 additions & 1 deletion src/__tests__/modeler/modeler.unit.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ test('Constructor does not throws without base url', () => {
})
expect(m).toBeTruthy()
} catch (e) {
console.log(e)
expect((e as Error).message.includes('Missing')).toBe(true)
}
expect(thrown).toBe(false)
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/job-complete-error-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0wy2we3" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.30.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.6.0">
<bpmn:process id="job-complete-error-test" name="Job Complete Error Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Job Complete Error Test">
<bpmn:outgoing>Flow_1ag9e6h</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1ag9e6h" sourceRef="StartEvent_1" targetRef="Activity_11vo52k" />
<bpmn:endEvent id="Event_14s7zgm" name="Completed Job Complete Error Test">
<bpmn:incoming>Flow_116nk1q</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_116nk1q" sourceRef="Activity_11vo52k" targetRef="Event_14s7zgm" />
<bpmn:serviceTask id="Activity_11vo52k" name="Check Job Complete Error">
<bpmn:extensionElements>
<zeebe:taskDefinition type="job-complete-error" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1ag9e6h</bpmn:incoming>
<bpmn:outgoing>Flow_116nk1q</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="job-complete-error-test">
<bpmndi:BPMNShape id="StartEvent_1_di" bpmnElement="StartEvent_1">
<dc:Bounds x="182" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="163" y="145" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_14s7zgm_di" bpmnElement="Event_14s7zgm">
<dc:Bounds x="422" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="403" y="145" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0yo5w5b_di" bpmnElement="Activity_11vo52k">
<dc:Bounds x="270" y="80" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1ag9e6h_di" bpmnElement="Flow_1ag9e6h">
<di:waypoint x="218" y="120" />
<di:waypoint x="270" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_116nk1q_di" bpmnElement="Flow_116nk1q">
<di:waypoint x="370" y="120" />
<di:waypoint x="422" y="120" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
10 changes: 6 additions & 4 deletions src/__tests__/zeebe/integration/Worker-Failure.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ test('Does not fail a process when the handler throws, by default', async () =>
taskType: 'console-log-worker-failure-2',
taskHandler: async (job) => {
if (alreadyFailed) {
await zbc.cancelProcessInstance(wf!.processInstanceKey) // throws if not found. Should NOT throw in this test
job.complete()
return w.close().then(() => resolve(null))
// The job was failed and reactivated, so we can cancel the process instance
await zbc.cancelProcessInstance(wf!.processInstanceKey) // throws if process instance is not found. Should NOT throw in this test
resolve(null)
w.close()
return job.forward()
}
alreadyFailed = true
throw new Error(
'Unhandled exception in task handler for testing purposes'
) // Will be caught in the library
) // Will be caught in the library, and should fail the job, allowing it to be reactivated by this worker
},
pollInterval: 10000,
})
Expand Down
58 changes: 57 additions & 1 deletion src/__tests__/zeebe/integration/Worker-integration.spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import { JOB_ACTION_ACKNOWLEDGEMENT } from 'zeebe/types'

import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'
import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0'

jest.setTimeout(30000)
jest.setTimeout(120000)
suppressZeebeLogging()

const zbc = new ZeebeGrpcClient()
let wf: CreateProcessInstanceResponse | undefined
let processDefinitionKey1: string
let processDefinitionKey2: string
let processDefinitionKey3: string
let processDefinitionKey4: string
let bpmnProcessId1: string
let bpmnProcessId2: string
let bpmnProcessId3: string
let bpmnProcessId4: string

beforeAll(async () => {
const res1 = await zbc.deployResource({
Expand All @@ -40,6 +44,13 @@ beforeAll(async () => {
bpmnProcessId: bpmnProcessId3,
} = res3.deployments[0].process)
await cancelProcesses(processDefinitionKey3)
const res4 = await zbc.deployResource({
processFilename: './src/__tests__/testdata/job-complete-error-test.bpmn',
})
;({
processDefinitionKey: processDefinitionKey4,
bpmnProcessId: bpmnProcessId4,
} = res4.deployments[0].process)
})

afterEach(async () => {
Expand All @@ -53,6 +64,7 @@ afterAll(async () => {
await cancelProcesses(processDefinitionKey1)
await cancelProcesses(processDefinitionKey2)
await cancelProcesses(processDefinitionKey3)
await cancelProcesses(processDefinitionKey4)
restoreZeebeLogging()
})

Expand Down Expand Up @@ -98,6 +110,50 @@ test('Can service a task with complete.success', (done) => {
})
})

test('An already completed job will throw NOT_FOUND if another worker invocation tries to complete it', (done) => {
let alreadyActivated = false
let threw = false
const jobTimeout = 30000 // The job is made available for reactivation after this time
const jobDuration = 40000 // The job takes this long to complete
const secondWorkerDuration = jobDuration - jobTimeout + 5000 // The second worker will try to complete the job after this time
zbc
.createProcessInstance({
bpmnProcessId: bpmnProcessId4,
variables: {},
})
.then((res) => {
wf = res
zbc.createWorker({
taskType: 'job-complete-error',
taskHandler: async (job) => {
const delay = alreadyActivated ? secondWorkerDuration : jobDuration
const shouldThrow = alreadyActivated
alreadyActivated = true
expect(job.processInstanceKey).toBe(wf?.processInstanceKey)
let res: JOB_ACTION_ACKNOWLEDGEMENT = 'JOB_ACTION_ACKNOWLEDGEMENT'
try {
await new Promise((resolve) =>
setTimeout(() => resolve(null), delay)
)
res = await job.complete(job.variables)
if (shouldThrow) {
throw new Error('Should have thrown NOT_FOUND')
}
return res
} catch (e: unknown) {
expect((e as Error).message.includes('NOT_FOUND')).toBe(true)
threw = true
done(null)
}
expect(shouldThrow).toBe(threw)
return res
},
loglevel: 'NONE',
timeout: jobTimeout,
})
})
})

test('Can update process variables with complete.success()', async () => {
wf = await zbc.createProcessInstance({
bpmnProcessId: bpmnProcessId3,
Expand Down
2 changes: 1 addition & 1 deletion src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ You should call only one job action method in the worker handler. This is a bug
this.logger.logDebug(
`Completing job ${jobKey} for ${this.taskType} threw ${e.message}`
)
return e
throw e
})
.then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT)
.finally(() => {
Expand Down

0 comments on commit 4498144

Please sign in to comment.