From 3748cba389697ca2e769b5fcc7b59f4117fcf2cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gonzalo=20Mu=C3=B1oz?= Date: Wed, 29 Mar 2023 16:26:00 +0200 Subject: [PATCH] [JBPM-10166] Allow tests with productized Kakfa version (#273) --- .../samples/KafkaProxyAsyncSampleTest.java | 23 ++++++++++++++----- .../springboot/samples/KafkaProxyBase.java | 5 +++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyAsyncSampleTest.java b/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyAsyncSampleTest.java index 50e2662c8..144cf6fa0 100644 --- a/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyAsyncSampleTest.java +++ b/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyAsyncSampleTest.java @@ -17,11 +17,13 @@ package org.jbpm.workitem.springboot.samples; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.jbpm.services.api.RuntimeDataService; +import org.jbpm.services.api.model.ProcessInstanceDesc; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -123,13 +125,13 @@ public void testAsyncKafkaWIHNoConnection() throws Exception { CountDownLatch latch = new CountDownLatch(1); kafkaProxy.setConnectionCut(true); - //Kafka WIH will try during publish config max.block.ms -60 seconds by default- to get connected to Kafka - //TimeoutException: Topic PAM_Events not present in metadata after 60000 ms. - Long processInstanceId= processService.startProcess(deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP); - - assertTrue(processInstanceId > 0); try { + //Kafka WIH will try during publish config max.block.ms -60 seconds by default- to get connected to Kafka + //TimeoutException: Topic PAM_Events not present in metadata after 60000 ms. + Long processInstanceId= processService.startProcess(deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP); + assertTrue(processInstanceId > 0); + startOtherProcess(); // After more than 60 seconds, connection has not been reestablished and process keeps on active @@ -138,7 +140,7 @@ public void testAsyncKafkaWIHNoConnection() throws Exception { startOtherProcess(); } finally { - processService.abortProcessInstance(processInstanceId); + abortAllProcesses(); } } @@ -153,4 +155,13 @@ private void startOtherProcess() { Map outcome = processService.computeProcessOutcome(deploymentId, HELLO_PROCESS, singletonMap("name", "Grogu")); assertEquals("Hello Grogu", outcome.get("outcome")); } + + protected void abortAllProcesses() { + Collection activeInstances = runtimeDataService.getProcessInstances(singletonList(STATE_ACTIVE), null, null); + if (activeInstances != null) { + for (ProcessInstanceDesc instance : activeInstances) { + processService.abortProcessInstance(instance.getDeploymentId(), instance.getId()); + } + } + } } diff --git a/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyBase.java b/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyBase.java index 8ff35189d..9c3b3d25a 100644 --- a/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyBase.java +++ b/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaProxyBase.java @@ -17,6 +17,7 @@ package org.jbpm.workitem.springboot.samples; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,6 +54,8 @@ public abstract class KafkaProxyBase extends KafkaBaseTest { protected static final int TOXY_PROXY_PORT = Integer.parseInt(System.getProperty("toxiproxy.port")); + private static final String VERSION = String.join(".", Arrays.copyOfRange(System.getProperty("kafka.container.version", "3.1.0").split("\\."), 0, 3)); + @ClassRule public static final SpringClassRule scr = new SpringClassRule(); @@ -64,7 +67,7 @@ public abstract class KafkaProxyBase extends KafkaBaseTest { @Rule public StrimziKafkaContainer kafka = new StrimziKafkaContainer() - .withKafkaVersion(System.getProperty("kafka.container.version")) + .withKafkaVersion(VERSION) .withNetwork(network); @Rule