diff --git a/pom.xml b/pom.xml
index af177fc..5793b52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,11 +111,13 @@
3.1.0
1.7.0
- 3.7.0
+ 3.8.1
1.3.2
2.2.0
2.0.13
5.10.2
+ 0.108.0
+ 1.20.1
@@ -195,6 +197,18 @@
${slf4j.version}
test
+
+ io.strimzi
+ strimzi-test-container
+ ${strimzi-test-container.version}
+ test
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
@@ -225,6 +239,11 @@
org.apache.maven.plugins
maven-surefire-plugin
${maven.surefire.version}
+
+
+ **/*IT.java
+
+
org.apache.maven.plugins
@@ -290,6 +309,8 @@
org.slf4j:slf4j-simple
+ io.strimzi:strimzi-test-container
+ org.testcontainers:testcontainers
diff --git a/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java b/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java
index 66dab2b..5cb2949 100644
--- a/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java
+++ b/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java
@@ -42,8 +42,19 @@ public class MetricsUtils {
* @throws Exception If any error occurs
*/
public static List getMetrics(int port) throws Exception {
+ return getMetrics("localhost", port);
+ }
+
+ /**
+ * Query the HTTP endpoint and returns the output
+ * @param host The host to query
+ * @param port The port to query
+ * @return The lines from the output
+ * @throws Exception If any error occurs
+ */
+ public static List getMetrics(String host, int port) throws Exception {
List metrics = new ArrayList<>();
- URL url = new URL("http://localhost:" + port + "/metrics");
+ URL url = new URL("http://" + host + ":" + port + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
diff --git a/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java b/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java
new file mode 100644
index 0000000..2e3bcaa
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics.integration;
+
+import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
+import io.strimzi.kafka.metrics.MetricsUtils;
+import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig;
+import io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter;
+import io.strimzi.kafka.metrics.http.Listener;
+import io.strimzi.test.container.StrimziKafkaContainer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.utility.MountableFile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBrokerMetricsIT {
+
+ private static final String VERSION = "1.0.0-SNAPSHOT";
+ private static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/";
+ private static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
+ private static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
+
+ private Map configs;
+ private StrimziKafkaContainer broker;
+
+ @BeforeEach
+ public void setUp() {
+ configs = new HashMap<>();
+ configs.put("metric.reporters", KafkaPrometheusMetricsReporter.class.getName());
+ configs.put("kafka.metrics.reporters", YammerPrometheusMetricsReporter.class.getName());
+
+ broker = new StrimziKafkaContainer()
+ .withNodeId(0)
+ .withKraft()
+ .withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH)
+ .withExposedPorts(9092, PORT)
+ .withKafkaConfigurationMap(configs)
+ .withEnv(Collections.singletonMap("CLASSPATH", MOUNT_PATH + "*"));
+ }
+
+ @AfterEach
+ public void tearDown() {
+ broker.stop();
+ }
+
+ @Test
+ public void testMetricsReporter() throws Exception {
+ broker.start();
+ List metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
+ List prefixes = Arrays.asList(
+ "jvm_",
+ "kafka_controller_",
+ "kafka_coordinator_",
+ "kafka_log_",
+ "kafka_network_",
+ "kafka_server_");
+ for (String prefix : prefixes) {
+ assertFalse(filterMetrics(metrics, prefix).isEmpty());
+ }
+ }
+
+ @Test
+ public void testMetricsReporterWithAllowlist() throws Exception {
+ configs.put("prometheus.metrics.reporter.allowlist", "kafka_controller.*,kafka_server.*");
+ broker.withKafkaConfigurationMap(configs);
+ broker.start();
+ List metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
+ List allowedPrefixes = Arrays.asList(
+ "jvm_",
+ "kafka_controller_",
+ "kafka_server_");
+ for (String prefix : allowedPrefixes) {
+ assertFalse(filterMetrics(metrics, prefix).isEmpty());
+ }
+ List disallowPrefixes = Arrays.asList(
+ "kafka_coordinator_",
+ "kafka_log_",
+ "kafka_network_");
+ for (String prefix : disallowPrefixes) {
+ assertTrue(filterMetrics(metrics, prefix).isEmpty());
+ }
+ }
+
+ private List filterMetrics(List allMetrics, String prefix) {
+ List metrics = new ArrayList<>();
+ for (String metric : allMetrics) {
+ if (metric.startsWith(prefix)) {
+ metrics.add(metric);
+ }
+ }
+ return metrics;
+ }
+}