From 4fc4af5ddac3cf5bf9c01bb5f2856a477bff8b25 Mon Sep 17 00:00:00 2001 From: Alberto Codutti Date: Mon, 20 Jan 2025 18:21:07 +0100 Subject: [PATCH] :recycle: [Test] Improved Message Broker container readiness check Signed-off-by: Alberto Codutti --- .../qa/integration/steps/DockerSteps.java | 107 +++++++++++++++--- 1 file changed, 91 insertions(+), 16 deletions(-) diff --git a/qa/integration-steps/src/main/java/org/eclipse/kapua/qa/integration/steps/DockerSteps.java b/qa/integration-steps/src/main/java/org/eclipse/kapua/qa/integration/steps/DockerSteps.java index 5af71714943..b2c76678ff1 100644 --- a/qa/integration-steps/src/main/java/org/eclipse/kapua/qa/integration/steps/DockerSteps.java +++ b/qa/integration-steps/src/main/java/org/eclipse/kapua/qa/integration/steps/DockerSteps.java @@ -41,7 +41,11 @@ import org.eclipse.kapua.qa.common.DBHelper; import org.eclipse.kapua.qa.common.StepData; import org.eclipse.kapua.qa.integration.steps.utils.TestReadinessConnection; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,16 +86,28 @@ public class DockerSteps { private static final long WAIT_FOR_ES = 5000; private static final long WAIT_FOR_EVENTS_BROKER = 10000; private static final long WAIT_FOR_REST_API = 30000; - private static final long WAIT_FOR_BROKER = 30000; private static final int HTTP_COMMUNICATION_TIMEOUT = 3000; private static final int JOB_ENGINE_PORT_INTERNAL = 8080; private static final int JOB_ENGINE_PORT_EXTERNAL = 8080; - // private static final String JOB_ENGINE_ADDRESS_INTERNAL = "http://job-engine:" + JOB_ENGINE_PORT_INTERNAL; Not used? private static final String JOB_ENGINE_ADDRESS_EXTERNAL = "http://localhost:" + JOB_ENGINE_PORT_EXTERNAL; private static final long JOB_ENGINE_READY_CHECK_INTERVAL = 1000; private static final long JOB_ENGINE_READY_MAX_WAIT = 60000; + private static final int MESSAGE_BROKER_PORT_MQTT_CONTAINER = 1883; + private static final int MESSAGE_BROKER_PORT_MQTT_HOST = 1883; + private static final int MESSAGE_BROKER_PORT_INTERNAL_CONTAINER = 1893; + private static final int MESSAGE_BROKER_PORT_INTERNAL_HOST = 1893; + private static final int MESSAGE_BROKER_PORT_MQTTS_CONTAINER = 8883; + private static final int MESSAGE_BROKER_PORT_MQTTS_HOST = 8883; + private static final int MESSAGE_BROKER_PORT_WS_CONTAINER = 8161; + private static final int MESSAGE_BROKER_PORT_WS_HOST = 8161; + private static final int MESSAGE_BROKER_PORT_DEBUG_CONTAINER = 5005; + private static final int MESSAGE_BROKER_PORT_DEBUG_HOST = 5005; + private static final String MESSAGE_BROKER_ADDRESS_EXTERNAL = "tcp://localhost:" + MESSAGE_BROKER_PORT_MQTT_HOST; + private static final long MESSAGE_BROKER_READY_CHECK_INTERVAL = 1000; + private static final long MESSAGE_BROKER_READY_MAX_WAIT = 60000; + private static final int LIFECYCLE_HEALTH_CHECK_PORT = 8090; private static final int TElEMETRY_HEALTH_CHECK_PORT = 8091; private static final int AUTH_SERVICE_HEALTH_CHECK_PORT = 8092; @@ -629,7 +645,7 @@ private void waitEsContainer(String name) throws Exception{ } } - @And("Start EventBroker container with name {string}") + @And("Start Event Broker container with name {string}") public void startEventBrokerContainer(String name) throws DockerException, InterruptedException { logger.info("Starting EventBroker container..."); ContainerConfig ebConfig = getEventBrokerContainerConfig(); @@ -655,7 +671,7 @@ private void waitEventBrokerContainer(String name) throws Exception{ } } - @And("Start JobEngine container with name {string}") + @And("Start Job Engine container with name {string}") public void startJobEngineContainer(String name) throws DockerException, InterruptedException { logger.info("Starting Job Engine container {}...", name); ContainerConfig mbConfig = getJobEngineContainerConfig(); @@ -676,7 +692,6 @@ public void startJobEngineContainer(String name) throws DockerException, Interru * @since 2.1.0 */ private void waitJobEngineContainer(String name) throws Exception{ - long now = System.currentTimeMillis(); while (now + JOB_ENGINE_READY_MAX_WAIT > System.currentTimeMillis()) { if (isJobEngineContainerReady(name)) { @@ -736,10 +751,22 @@ private void waitRestApiContainer(String name) throws Exception{ } } - @And("Start MessageBroker container with name {string}") + @And("Start Message Broker container with name {string}") public void startMessageBrokerContainer(String name) throws DockerException, InterruptedException { logger.info("Starting Message Broker container {}...", name); - ContainerConfig mbConfig = getBrokerContainerConfig("message-broker", 1883, 1883, 1893, 1893, 8883, 8883, 8161, 8161, 5005, 5005, "kapua/" + BROKER_IMAGE + ":" + KAPUA_VERSION); + ContainerConfig mbConfig = getBrokerContainerConfig( + name, + MESSAGE_BROKER_PORT_MQTT_CONTAINER, + MESSAGE_BROKER_PORT_MQTT_HOST, + MESSAGE_BROKER_PORT_INTERNAL_CONTAINER, + MESSAGE_BROKER_PORT_INTERNAL_HOST, + MESSAGE_BROKER_PORT_MQTTS_CONTAINER, + MESSAGE_BROKER_PORT_MQTTS_HOST, + MESSAGE_BROKER_PORT_WS_CONTAINER, + MESSAGE_BROKER_PORT_WS_HOST, + MESSAGE_BROKER_PORT_DEBUG_CONTAINER, + MESSAGE_BROKER_PORT_DEBUG_HOST, + "kapua/" + BROKER_IMAGE + ":" + KAPUA_VERSION); ContainerCreation mbContainerCreation = DockerUtil.getDockerClient().createContainer(mbConfig, name); String containerId = mbContainerCreation.id(); @@ -757,9 +784,54 @@ public void startMessageBrokerContainer(String name) throws DockerException, Int * @since 2.1.0 */ private void waitMessageBrokerContainer(String name) throws Exception{ - synchronized (this) { - this.wait(WAIT_FOR_BROKER); + long now = System.currentTimeMillis(); + while (now + MESSAGE_BROKER_READY_MAX_WAIT > System.currentTimeMillis()) { + if (isMessageBrokerContainerReady(name)) { + logger.info("Message Broker ready in: ~{}ms", System.currentTimeMillis() - now); + return; + } + + logger.info("Message Broker not ready yet... Retrying in {}ms", MESSAGE_BROKER_READY_CHECK_INTERVAL); + TimeUnit.MILLISECONDS.sleep(MESSAGE_BROKER_READY_CHECK_INTERVAL); } + + Assert.fail("Message Broker not ready within: " + MESSAGE_BROKER_READY_MAX_WAIT + "ms"); + } + + /** + * Checks if the Message Broker Docker container is ready + * + * @param name The Message Broker Docker container name + * @return {@code true} if is ready, {@code false} otherwise + * @since 2.1.0 + */ + private boolean isMessageBrokerContainerReady(String name) { + try (MqttClient testReadinessClient = new MqttClient(MESSAGE_BROKER_ADDRESS_EXTERNAL, "test-readiness", new MemoryPersistence())){ + + // These username and password do not match any entry. + // We need just to receive the "Not authorized to connect" from the broker on connection attempt + MqttConnectOptions clientOpts = new MqttConnectOptions(); + clientOpts.setUserName("test-readiness-user"); // This user do + clientOpts.setPassword("test-readiness-password".toCharArray()); + clientOpts.setConnectionTimeout(1); + + try { + testReadinessClient.connect(clientOpts); + } + catch (MqttSecurityException mse) { + // When the Message Broker is ready will accept connection attempts. + // Since we are not providing valid username and password we are interested on + // receiving a MqttSecurityException with the following message. + if ("Not authorized to connect".equals(mse.getMessage())) { + return true; + } + } + } + catch (Exception e) { + // Ignoring... + } + + return false; } @And("Start Auth service container with name {string}") @@ -951,9 +1023,7 @@ private void printContainerLog(String name) { /** * Creation of docker container configuration for broker. * - * @param brokerAddr * @param brokerIp - * @param clusterName * @param mqttPort mqtt port on docker * @param mqttHostPort mqtt port on docker host * @param mqttsPort mqtts port on docker @@ -967,11 +1037,16 @@ private void printContainerLog(String name) { * @return Container configuration for specific boroker instance */ private ContainerConfig getBrokerContainerConfig(String brokerIp, - int mqttPort, int mqttHostPort, - int mqttInternalPort, int mqttInternalHostPort, - int mqttsPort, int mqttsHostPort, - int webPort, int webHostPort, - int debugPort, int debugHostPort, + int mqttPort, + int mqttHostPort, + int mqttInternalPort, + int mqttInternalHostPort, + int mqttsPort, + int mqttsHostPort, + int webPort, + int webHostPort, + int debugPort, + int debugHostPort, String dockerImage) { final Map> portBindings = new HashMap<>();