Skip to content

Commit

Permalink
♻️ [Test] Improved Message Broker container readiness check
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Codutti <[email protected]>
  • Loading branch information
Coduz committed Jan 21, 2025
1 parent 1ed1fe6 commit 4fc4af5
Showing 1 changed file with 91 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
import org.eclipse.kapua.qa.common.DBHelper;
import org.eclipse.kapua.qa.common.StepData;
import org.eclipse.kapua.qa.integration.steps.utils.TestReadinessConnection;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,16 +86,28 @@ public class DockerSteps {
private static final long WAIT_FOR_ES = 5000;
private static final long WAIT_FOR_EVENTS_BROKER = 10000;
private static final long WAIT_FOR_REST_API = 30000;
private static final long WAIT_FOR_BROKER = 30000;
private static final int HTTP_COMMUNICATION_TIMEOUT = 3000;

private static final int JOB_ENGINE_PORT_INTERNAL = 8080;
private static final int JOB_ENGINE_PORT_EXTERNAL = 8080;
// private static final String JOB_ENGINE_ADDRESS_INTERNAL = "http://job-engine:" + JOB_ENGINE_PORT_INTERNAL; Not used?
private static final String JOB_ENGINE_ADDRESS_EXTERNAL = "http://localhost:" + JOB_ENGINE_PORT_EXTERNAL;
private static final long JOB_ENGINE_READY_CHECK_INTERVAL = 1000;
private static final long JOB_ENGINE_READY_MAX_WAIT = 60000;

private static final int MESSAGE_BROKER_PORT_MQTT_CONTAINER = 1883;
private static final int MESSAGE_BROKER_PORT_MQTT_HOST = 1883;
private static final int MESSAGE_BROKER_PORT_INTERNAL_CONTAINER = 1893;
private static final int MESSAGE_BROKER_PORT_INTERNAL_HOST = 1893;
private static final int MESSAGE_BROKER_PORT_MQTTS_CONTAINER = 8883;
private static final int MESSAGE_BROKER_PORT_MQTTS_HOST = 8883;
private static final int MESSAGE_BROKER_PORT_WS_CONTAINER = 8161;
private static final int MESSAGE_BROKER_PORT_WS_HOST = 8161;
private static final int MESSAGE_BROKER_PORT_DEBUG_CONTAINER = 5005;
private static final int MESSAGE_BROKER_PORT_DEBUG_HOST = 5005;
private static final String MESSAGE_BROKER_ADDRESS_EXTERNAL = "tcp://localhost:" + MESSAGE_BROKER_PORT_MQTT_HOST;
private static final long MESSAGE_BROKER_READY_CHECK_INTERVAL = 1000;
private static final long MESSAGE_BROKER_READY_MAX_WAIT = 60000;

private static final int LIFECYCLE_HEALTH_CHECK_PORT = 8090;
private static final int TElEMETRY_HEALTH_CHECK_PORT = 8091;
private static final int AUTH_SERVICE_HEALTH_CHECK_PORT = 8092;
Expand Down Expand Up @@ -629,7 +645,7 @@ private void waitEsContainer(String name) throws Exception{
}
}

@And("Start EventBroker container with name {string}")
@And("Start Event Broker container with name {string}")
public void startEventBrokerContainer(String name) throws DockerException, InterruptedException {
logger.info("Starting EventBroker container...");
ContainerConfig ebConfig = getEventBrokerContainerConfig();
Expand All @@ -655,7 +671,7 @@ private void waitEventBrokerContainer(String name) throws Exception{
}
}

@And("Start JobEngine container with name {string}")
@And("Start Job Engine container with name {string}")
public void startJobEngineContainer(String name) throws DockerException, InterruptedException {
logger.info("Starting Job Engine container {}...", name);
ContainerConfig mbConfig = getJobEngineContainerConfig();
Expand All @@ -676,7 +692,6 @@ public void startJobEngineContainer(String name) throws DockerException, Interru
* @since 2.1.0
*/
private void waitJobEngineContainer(String name) throws Exception{

long now = System.currentTimeMillis();
while (now + JOB_ENGINE_READY_MAX_WAIT > System.currentTimeMillis()) {
if (isJobEngineContainerReady(name)) {
Expand Down Expand Up @@ -736,10 +751,22 @@ private void waitRestApiContainer(String name) throws Exception{
}
}

@And("Start MessageBroker container with name {string}")
@And("Start Message Broker container with name {string}")
public void startMessageBrokerContainer(String name) throws DockerException, InterruptedException {
logger.info("Starting Message Broker container {}...", name);
ContainerConfig mbConfig = getBrokerContainerConfig("message-broker", 1883, 1883, 1893, 1893, 8883, 8883, 8161, 8161, 5005, 5005, "kapua/" + BROKER_IMAGE + ":" + KAPUA_VERSION);
ContainerConfig mbConfig = getBrokerContainerConfig(
name,
MESSAGE_BROKER_PORT_MQTT_CONTAINER,
MESSAGE_BROKER_PORT_MQTT_HOST,
MESSAGE_BROKER_PORT_INTERNAL_CONTAINER,
MESSAGE_BROKER_PORT_INTERNAL_HOST,
MESSAGE_BROKER_PORT_MQTTS_CONTAINER,
MESSAGE_BROKER_PORT_MQTTS_HOST,
MESSAGE_BROKER_PORT_WS_CONTAINER,
MESSAGE_BROKER_PORT_WS_HOST,
MESSAGE_BROKER_PORT_DEBUG_CONTAINER,
MESSAGE_BROKER_PORT_DEBUG_HOST,
"kapua/" + BROKER_IMAGE + ":" + KAPUA_VERSION);
ContainerCreation mbContainerCreation = DockerUtil.getDockerClient().createContainer(mbConfig, name);
String containerId = mbContainerCreation.id();

Expand All @@ -757,9 +784,54 @@ public void startMessageBrokerContainer(String name) throws DockerException, Int
* @since 2.1.0
*/
private void waitMessageBrokerContainer(String name) throws Exception{
synchronized (this) {
this.wait(WAIT_FOR_BROKER);
long now = System.currentTimeMillis();
while (now + MESSAGE_BROKER_READY_MAX_WAIT > System.currentTimeMillis()) {
if (isMessageBrokerContainerReady(name)) {
logger.info("Message Broker ready in: ~{}ms", System.currentTimeMillis() - now);
return;
}

logger.info("Message Broker not ready yet... Retrying in {}ms", MESSAGE_BROKER_READY_CHECK_INTERVAL);
TimeUnit.MILLISECONDS.sleep(MESSAGE_BROKER_READY_CHECK_INTERVAL);
}

Assert.fail("Message Broker not ready within: " + MESSAGE_BROKER_READY_MAX_WAIT + "ms");
}

/**
* Checks if the Message Broker Docker container is ready
*
* @param name The Message Broker Docker container name
* @return {@code true} if is ready, {@code false} otherwise
* @since 2.1.0
*/
private boolean isMessageBrokerContainerReady(String name) {
try (MqttClient testReadinessClient = new MqttClient(MESSAGE_BROKER_ADDRESS_EXTERNAL, "test-readiness", new MemoryPersistence())){

// These username and password do not match any entry.
// We need just to receive the "Not authorized to connect" from the broker on connection attempt
MqttConnectOptions clientOpts = new MqttConnectOptions();
clientOpts.setUserName("test-readiness-user"); // This user do
clientOpts.setPassword("test-readiness-password".toCharArray());
clientOpts.setConnectionTimeout(1);

try {
testReadinessClient.connect(clientOpts);
}
catch (MqttSecurityException mse) {
// When the Message Broker is ready will accept connection attempts.
// Since we are not providing valid username and password we are interested on
// receiving a MqttSecurityException with the following message.
if ("Not authorized to connect".equals(mse.getMessage())) {
return true;
}
}
}
catch (Exception e) {
// Ignoring...
}

return false;
}

@And("Start Auth service container with name {string}")
Expand Down Expand Up @@ -951,9 +1023,7 @@ private void printContainerLog(String name) {
/**
* Creation of docker container configuration for broker.
*
* @param brokerAddr
* @param brokerIp
* @param clusterName
* @param mqttPort mqtt port on docker
* @param mqttHostPort mqtt port on docker host
* @param mqttsPort mqtts port on docker
Expand All @@ -967,11 +1037,16 @@ private void printContainerLog(String name) {
* @return Container configuration for specific boroker instance
*/
private ContainerConfig getBrokerContainerConfig(String brokerIp,
int mqttPort, int mqttHostPort,
int mqttInternalPort, int mqttInternalHostPort,
int mqttsPort, int mqttsHostPort,
int webPort, int webHostPort,
int debugPort, int debugHostPort,
int mqttPort,
int mqttHostPort,
int mqttInternalPort,
int mqttInternalHostPort,
int mqttsPort,
int mqttsHostPort,
int webPort,
int webHostPort,
int debugPort,
int debugHostPort,
String dockerImage) {

final Map<String, List<PortBinding>> portBindings = new HashMap<>();
Expand Down

0 comments on commit 4fc4af5

Please sign in to comment.