Skip to content

Commit

Permalink
[JBPM-10166] Allow tests with productized Kakfa version (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmunozfe authored Mar 29, 2023
1 parent 9f2aa77 commit 3748cba
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.jbpm.workitem.springboot.samples;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jbpm.services.api.RuntimeDataService;
import org.jbpm.services.api.model.ProcessInstanceDesc;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -123,13 +125,13 @@ public void testAsyncKafkaWIHNoConnection() throws Exception {
CountDownLatch latch = new CountDownLatch(1);

kafkaProxy.setConnectionCut(true);
//Kafka WIH will try during publish config max.block.ms -60 seconds by default- to get connected to Kafka
//TimeoutException: Topic PAM_Events not present in metadata after 60000 ms.
Long processInstanceId= processService.startProcess(deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP);

assertTrue(processInstanceId > 0);

try {
//Kafka WIH will try during publish config max.block.ms -60 seconds by default- to get connected to Kafka
//TimeoutException: Topic PAM_Events not present in metadata after 60000 ms.
Long processInstanceId= processService.startProcess(deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP);
assertTrue(processInstanceId > 0);

startOtherProcess();

// After more than 60 seconds, connection has not been reestablished and process keeps on active
Expand All @@ -138,7 +140,7 @@ public void testAsyncKafkaWIHNoConnection() throws Exception {

startOtherProcess();
} finally {
processService.abortProcessInstance(processInstanceId);
abortAllProcesses();
}
}

Expand All @@ -153,4 +155,13 @@ private void startOtherProcess() {
Map<String, Object> outcome = processService.computeProcessOutcome(deploymentId, HELLO_PROCESS, singletonMap("name", "Grogu"));
assertEquals("Hello Grogu", outcome.get("outcome"));
}

protected void abortAllProcesses() {
Collection<ProcessInstanceDesc> activeInstances = runtimeDataService.getProcessInstances(singletonList(STATE_ACTIVE), null, null);
if (activeInstances != null) {
for (ProcessInstanceDesc instance : activeInstances) {
processService.abortProcessInstance(instance.getDeploymentId(), instance.getId());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.jbpm.workitem.springboot.samples;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -53,6 +54,8 @@ public abstract class KafkaProxyBase extends KafkaBaseTest {

protected static final int TOXY_PROXY_PORT = Integer.parseInt(System.getProperty("toxiproxy.port"));

private static final String VERSION = String.join(".", Arrays.copyOfRange(System.getProperty("kafka.container.version", "3.1.0").split("\\."), 0, 3));

@ClassRule
public static final SpringClassRule scr = new SpringClassRule();

Expand All @@ -64,7 +67,7 @@ public abstract class KafkaProxyBase extends KafkaBaseTest {

@Rule
public StrimziKafkaContainer kafka = new StrimziKafkaContainer()
.withKafkaVersion(System.getProperty("kafka.container.version"))
.withKafkaVersion(VERSION)
.withNetwork(network);

@Rule
Expand Down

0 comments on commit 3748cba

Please sign in to comment.