Skip to content

Commit

Permalink
[KOGITO-9785] Handle event state error (#3270)
Browse files Browse the repository at this point in the history
* [KOGITO-9785] Handle event state error

* [KOGITO-9785] Walters comments

* Revert "[KOGITO-9785] Walters comments"

This reverts commit 3d83f1c.
  • Loading branch information
fjtirado authored Nov 2, 2023
1 parent 0338971 commit f9149a5
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;

import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.actions.Action;
import io.serverlessworkflow.api.events.OnEvents;
import io.serverlessworkflow.api.states.EventState;

import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.eventBasedSplitNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.joinExclusiveNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.startMessageNode;
import static org.kie.kogito.serverless.workflow.utils.TimeoutsConfigResolver.resolveEventTimeout;

public class EventHandler extends CompositeContextNodeHandler<EventState> {

Expand All @@ -51,18 +55,45 @@ public void handleStart() {

@Override
public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
MakeNodeResult currentBranch = joinNodes(factory, state.getOnEvents(), this::processOnEvent);
// ignore timeout for start states
return isStartState ? currentBranch : makeTimeoutNode(factory, currentBranch);
return joinNodes(factory, state.getOnEvents(), this::processOnEvent);
}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), isStartState ? ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR : getVarName(),
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
if (isStartState) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
} else {
String varName = getVarName();
CompositeContextNodeFactory<?> embeddedSubProcess = makeCompositeNode(factory);
NodeFactory<?, ?> startNode = embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart");
JoinFactory<?> joinNode = null;
String eventTimeout = resolveEventTimeout(state, workflow);
if (eventTimeout != null) {
// creating a split-join branch for the timer
SplitFactory<?> splitNode = eventBasedSplitNode(embeddedSubProcess.splitNode(parserContext.newId()), Split.TYPE_XAND);
joinNode = joinExclusiveNode(embeddedSubProcess.joinNode(parserContext.newId()));
startNode = connect(startNode, splitNode);
createTimerNode(embeddedSubProcess, splitNode, joinNode, eventTimeout);
}
MakeNodeResult result = joinNodes(embeddedSubProcess,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), varName,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
connect(startNode, result.getIncomingNode());
NodeFactory<?, ?> currentNode = result.getOutgoingNode();
for (Action action : onEvent.getActions()) {
currentNode = connect(currentNode, getActionNode(embeddedSubProcess, action, varName, true));
}
if (joinNode != null) {
currentNode = connect(currentNode, joinNode);
}
connect(currentNode, embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
handleErrors(parserContext.factory(), embeddedSubProcess);
return new MakeNodeResult(embeddedSubProcess);
}
}

private <T> MakeNodeResult joinNodes(RuleFlowNodeContainerFactory<?, ?> factory, List<T> events, BiFunction<RuleFlowNodeContainerFactory<?, ?>, T, MakeNodeResult> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.jbpm.ruleflow.core.Metadata;
import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory;
import org.jbpm.ruleflow.core.RuleFlowProcessFactory;
import org.jbpm.ruleflow.core.factory.AbstractCompositeNodeFactory;
import org.jbpm.ruleflow.core.factory.ActionNodeFactory;
import org.jbpm.ruleflow.core.factory.BoundaryEventNodeFactory;
import org.jbpm.ruleflow.core.factory.CompositeContextNodeFactory;
Expand Down Expand Up @@ -477,25 +476,19 @@ protected final EventDefinition eventDefinition(String eventName) {
protected final MakeNodeResult makeTimeoutNode(RuleFlowNodeContainerFactory<?, ?> factory, MakeNodeResult notTimerBranch) {
String eventTimeout = resolveEventTimeout(state, workflow);
if (eventTimeout != null) {
if (notTimerBranch.getIncomingNode() == notTimerBranch.getOutgoingNode() && notTimerBranch.getIncomingNode() instanceof AbstractCompositeNodeFactory) {
// reusing composite
((AbstractCompositeNodeFactory<?, ?>) notTimerBranch.getIncomingNode()).timeout(eventTimeout);
return notTimerBranch;
} else {
// creating a split-join branch for the timer
SplitFactory<?> splitNode = eventBasedSplitNode(factory.splitNode(parserContext.newId()), Split.TYPE_XAND);
JoinFactory<?> joinNode = joinExclusiveNode(factory.joinNode(parserContext.newId()));
connect(connect(splitNode, notTimerBranch), joinNode);
createTimerNode(factory, splitNode, joinNode, eventTimeout);
return new MakeNodeResult(splitNode, joinNode);
}
// creating a split-join branch for the timer
SplitFactory<?> splitNode = eventBasedSplitNode(factory.splitNode(parserContext.newId()), Split.TYPE_XAND);
JoinFactory<?> joinNode = joinExclusiveNode(factory.joinNode(parserContext.newId()));
connect(connect(splitNode, notTimerBranch), joinNode);
createTimerNode(factory, splitNode, joinNode, eventTimeout);
return new MakeNodeResult(splitNode, joinNode);
} else {
// No timeouts, returning the existing branch.
return notTimerBranch;
}
}

private void createTimerNode(RuleFlowNodeContainerFactory<?, ?> factory, SplitFactory<?> splitNode, JoinFactory<?> joinNode, String eventTimeout) {
protected final void createTimerNode(RuleFlowNodeContainerFactory<?, ?> factory, SplitFactory<?> splitNode, JoinFactory<?> joinNode, String eventTimeout) {
TimerNodeFactory<?> eventTimeoutTimerNode = timerNode(factory.timerNode(parserContext.newId()), eventTimeout);
connect(splitNode, eventTimeoutTimerNode);
connect(eventTimeoutTimerNode, joinNode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
"id": "eventTimedout",
"version": "1.0",
"expressionLang": "jsonpath",
"name": "Workflow event test",
"description": "An test of a non starting event with timeout error",
"start": "printWaitMessage",
"events": [
{
"name": "moveEvent",
"source": "",
"type": "move"
}
],
"errors": [
{
"name": "timeoutError",
"code": "TimedOut"
}
],
"functions": [
{
"name": "printMessage",
"type": "custom",
"operation": "sysout"
},
{
"name": "publishTimeoutExpired",
"type": "asyncapi",
"operation": "specs/callbackResults.yaml#sendTimeoutExpired"
}
]
,
"states": [
{
"name": "printWaitMessage",
"type": "operation",
"actions": [
{
"name": "printBeforeEvent",
"functionRef": {
"refName": "printMessage",
"arguments": {
"message": "$[*]"
}
}
}
],
"transition": "waitForEvent"
},
{
"name": "waitForEvent",
"type": "event",
"onEvents": [
{
"eventRefs": [
"moveEvent"
],
"actions": [
{
"name": "printAfterEvent",
"functionRef": {
"refName": "printMessage",
"arguments": {
"message": "$[*]"
}
}
}
]
}
],
"onErrors": [
{
"errorRef": "timeoutError",
"transition": "PublishTimeout"
}
],
"timeouts": {
"eventTimeout": "PT5S"
},
"end":true
},
{
"name": "PublishTimeout",
"type": "operation",
"actions": [
{
"name": "publishTimeoutExpired",
"functionRef": "publishTimeoutExpired"
}
],
"end": "true"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.quarkus.workflows;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.Converter;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.event.impl.ByteArrayCloudEventUnmarshallerFactory;
import org.kie.kogito.test.quarkus.QuarkusTestProperty;
import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;

import static org.assertj.core.api.Assertions.assertThat;
import static org.kie.kogito.quarkus.workflows.AssuredTestUtils.startProcess;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
public class EventTimedoutIT {

private final static Logger logger = LoggerFactory.getLogger(EventTimedoutIT.class);

@QuarkusTestProperty(name = KafkaQuarkusTestResource.KOGITO_KAFKA_PROPERTY)
String kafkaBootstrapServers;
private ObjectMapper objectMapper;
private KafkaTypedTestClient<byte[], ByteArraySerializer, ByteArrayDeserializer> kafkaClient;

@BeforeEach
void setup() {
kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class);
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(JsonFormat.getCloudEventJacksonModule())
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}

@AfterEach
void cleanUp() {
if (kafkaClient != null) {
kafkaClient.shutdown();
}
}

@Test
void testTimedout() throws InterruptedException {
String id = startProcess("eventTimedout");
Converter<byte[], CloudEvent> converter = new ByteArrayCloudEventUnmarshallerFactory(objectMapper).unmarshaller(Map.class).cloudEvent();
final CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaClient.consume("timeout", v -> {
try {
CloudEvent event = converter.convert(v);
if (id.equals(event.getExtension(CloudEventExtensionConstants.PROCESS_INSTANCE_ID))) {
countDownLatch.countDown();
}
} catch (IOException e) {
logger.info("Unmarshall exception", e);
}
});
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(countDownLatch.getCount()).isZero();
}
}

0 comments on commit f9149a5

Please sign in to comment.