diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java index 6eb747d..724fe7b 100644 --- a/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java @@ -17,8 +17,8 @@ import java.util.Map; import java.util.Optional; -import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics; -import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; +import static io.strimzi.kafka.metrics.TestUtils.getMetrics; +import static io.strimzi.kafka.metrics.TestUtils.newKafkaMetric; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java index 2b225bc..a049949 100644 --- a/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java @@ -19,10 +19,10 @@ import java.util.ArrayList; import java.util.List; -import static io.strimzi.kafka.metrics.MetricsUtils.assertCounterSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.assertGaugeSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.assertInfoSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.assertSummarySnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertCounterSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertGaugeSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertInfoSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertSummarySnapshot; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java b/src/test/java/io/strimzi/kafka/metrics/TestUtils.java similarity index 71% rename from src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java rename to src/test/java/io/strimzi/kafka/metrics/TestUtils.java index 5cb2949..e5459ef 100644 --- a/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java +++ b/src/test/java/io/strimzi/kafka/metrics/TestUtils.java @@ -11,16 +11,23 @@ import io.prometheus.metrics.model.snapshots.MetricSnapshot; import io.prometheus.metrics.model.snapshots.Quantiles; import io.prometheus.metrics.model.snapshots.SummarySnapshot; +import io.strimzi.kafka.metrics.http.Listener; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.function.ThrowingConsumer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.MountableFile; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -28,12 +35,21 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; /** * Utility class to create and retrieve metrics */ @SuppressWarnings("ClassFanOutComplexity") -public class MetricsUtils { +public class TestUtils { + + private static final String VERSION = "1.0.0-SNAPSHOT"; + private static final String CLIENTS_IMAGE = "quay.io/strimzi-test-clients/test-clients:latest-kafka-3.9.0"; + private static final Duration TIMEOUT = Duration.ofSeconds(5L); + + public static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/"; + public static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/"; + public static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port; /** * Query the HTTP endpoint and returns the output @@ -46,7 +62,7 @@ public static List getMetrics(int port) throws Exception { } /** - * Query the HTTP endpoint and returns the output + * Query the HTTP endpoint and returns the output the metrics starting with the prefix * @param host The host to query * @param port The port to query * @return The lines from the output @@ -164,4 +180,43 @@ public static void assertSummarySnapshot(MetricSnapshot snapshot, int expecte assertEquals(expectedLabels, datapoint.getLabels()); } + /** + * Filter metrics that start with a specified prefix + * @param allMetrics all the metric names + * @param prefix the prefix + * @return the list of metric names that start with the prefix + */ + public static List filterMetrics(List allMetrics, String prefix) { + List metrics = new ArrayList<>(); + for (String metric : allMetrics) { + if (metric.startsWith(prefix)) { + metrics.add(metric); + } + } + return metrics; + } + + public static void verify(GenericContainer container, String prefix, ThrowingConsumer> condition) { + assertTimeoutPreemptively(TIMEOUT, () -> { + while (true) { + try { + List filteredMetrics = filterMetrics(getMetrics(container.getHost(), container.getMappedPort(PORT)), prefix); + condition.accept(filteredMetrics); + return; + } catch (Throwable t) { + assertInstanceOf(AssertionError.class, t); + } + } + }); + } + + public static GenericContainer clientContainer(Map env) { + return new GenericContainer<>(CLIENTS_IMAGE) + .withNetwork(Network.SHARED) + .withExposedPorts(PORT) + .withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH) + .withEnv(env) + .waitingFor(Wait.forHttp("/metrics").forStatusCode(200)); + } + } diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java index 94d762a..afa0d46 100644 --- a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java @@ -18,7 +18,7 @@ import java.util.Map; import java.util.Properties; -import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics; +import static io.strimzi.kafka.metrics.TestUtils.getMetrics; import static org.junit.jupiter.api.Assertions.assertEquals; public class YammerPrometheusMetricsReporterTest { diff --git a/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java b/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java index 2e3bcaa..365acd3 100644 --- a/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java +++ b/src/test/java/io/strimzi/kafka/metrics/integration/TestBrokerMetricsIT.java @@ -5,17 +5,14 @@ package io.strimzi.kafka.metrics.integration; import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter; -import io.strimzi.kafka.metrics.MetricsUtils; -import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig; +import io.strimzi.kafka.metrics.TestUtils; import io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter; -import io.strimzi.kafka.metrics.http.Listener; import io.strimzi.test.container.StrimziKafkaContainer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.utility.MountableFile; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -27,11 +24,6 @@ public class TestBrokerMetricsIT { - private static final String VERSION = "1.0.0-SNAPSHOT"; - private static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/"; - private static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/"; - private static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port; - private Map configs; private StrimziKafkaContainer broker; @@ -44,10 +36,10 @@ public void setUp() { broker = new StrimziKafkaContainer() .withNodeId(0) .withKraft() - .withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH) - .withExposedPorts(9092, PORT) + .withCopyFileToContainer(MountableFile.forHostPath(TestUtils.REPORTER_JARS), TestUtils.MOUNT_PATH) + .withExposedPorts(9092, TestUtils.PORT) .withKafkaConfigurationMap(configs) - .withEnv(Collections.singletonMap("CLASSPATH", MOUNT_PATH + "*")); + .withEnv(Collections.singletonMap("CLASSPATH", TestUtils.MOUNT_PATH + "*")); } @AfterEach @@ -56,50 +48,44 @@ public void tearDown() { } @Test - public void testMetricsReporter() throws Exception { + public void testBrokerMetrics() throws Exception { broker.start(); - List metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT)); + List prefixes = Arrays.asList( - "jvm_", - "kafka_controller_", - "kafka_coordinator_", - "kafka_log_", - "kafka_network_", - "kafka_server_"); + "jvm_", + "process_", + "kafka_controller_", + "kafka_coordinator_", + "kafka_log_", + "kafka_network_", + "kafka_server_"); + List metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT)); for (String prefix : prefixes) { - assertFalse(filterMetrics(metrics, prefix).isEmpty()); + assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty()); } } @Test - public void testMetricsReporterWithAllowlist() throws Exception { + public void testBrokerMetricsWithAllowlist() throws Exception { configs.put("prometheus.metrics.reporter.allowlist", "kafka_controller.*,kafka_server.*"); broker.withKafkaConfigurationMap(configs); broker.start(); - List metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT)); + + List metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT)); List allowedPrefixes = Arrays.asList( - "jvm_", - "kafka_controller_", - "kafka_server_"); + "jvm_", + "process_", + "kafka_controller_", + "kafka_server_"); for (String prefix : allowedPrefixes) { - assertFalse(filterMetrics(metrics, prefix).isEmpty()); + assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty()); } List disallowPrefixes = Arrays.asList( - "kafka_coordinator_", - "kafka_log_", - "kafka_network_"); + "kafka_coordinator_", + "kafka_log_", + "kafka_network_"); for (String prefix : disallowPrefixes) { - assertTrue(filterMetrics(metrics, prefix).isEmpty()); - } - } - - private List filterMetrics(List allMetrics, String prefix) { - List metrics = new ArrayList<>(); - for (String metric : allMetrics) { - if (metric.startsWith(prefix)) { - metrics.add(metric); - } + assertTrue(TestUtils.filterMetrics(metrics, prefix).isEmpty()); } - return metrics; } } diff --git a/src/test/java/io/strimzi/kafka/metrics/integration/TestConsumerMetricsIT.java b/src/test/java/io/strimzi/kafka/metrics/integration/TestConsumerMetricsIT.java new file mode 100644 index 0000000..8470a6e --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/integration/TestConsumerMetricsIT.java @@ -0,0 +1,96 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics.integration; + +import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter; +import io.strimzi.kafka.metrics.TestUtils; +import io.strimzi.test.container.StrimziKafkaContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestConsumerMetricsIT { + + private StrimziKafkaContainer broker; + private Map env; + + @BeforeEach + public void setUp() { + broker = new StrimziKafkaContainer() + .withKraft() + .withNetworkAliases("kafka"); + broker.start(); + + env = new HashMap<>(); + env.put("CLIENT_TYPE", "KafkaConsumer"); + env.put("BOOTSTRAP_SERVERS", "kafka:9093"); + env.put("TOPIC", "my-topic"); + env.put("GROUP_ID", "my-group"); + env.put("ADDITIONAL_CONFIG", "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName()); + env.put("CLASSPATH", TestUtils.MOUNT_PATH + "*"); + env.put("MESSAGE_COUNT", "1000"); + } + + @AfterEach + public void tearDown() { + broker.stop(); + } + + @Test + public void testConsumerMetrics() { + try (GenericContainer consumer = TestUtils.clientContainer(env)) { + consumer.start(); + + List prefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_consumer_app_info_", + "kafka_consumer_kafka_metrics_", + "kafka_consumer_consumer_metrics_", + "kafka_consumer_consumer_node_metrics_", + "kafka_consumer_consumer_coordinator_metrics_", + "kafka_consumer_consumer_fetch_manager_metrics_"); + for (String prefix : prefixes) { + TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + } + } + + @Test + public void testConsumerMetricsWithAllowlist() { + env.put("ADDITIONAL_CONFIG", + "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName() + "\n" + + "prometheus.metrics.reporter.allowlist=kafka_consumer_kafka_metrics_.*,kafka_consumer_consumer_coordinator_metrics_.*"); + try (GenericContainer consumer = TestUtils.clientContainer(env)) { + consumer.start(); + + List allowedPrefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_consumer_kafka_metrics_", + "kafka_consumer_consumer_coordinator_metrics_"); + for (String prefix : allowedPrefixes) { + TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + List disallowedPrefixes = Arrays.asList( + "kafka_consumer_app_info_", + "kafka_consumer_consumer_metrics_", + "kafka_consumer_consumer_node_metrics_", + "kafka_consumer_consumer_fetch_manager_metrics_"); + for (String prefix : disallowedPrefixes) { + TestUtils.verify(consumer, prefix, metrics -> assertTrue(metrics.isEmpty())); + } + } + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/integration/TestProducerMetricsIT.java b/src/test/java/io/strimzi/kafka/metrics/integration/TestProducerMetricsIT.java new file mode 100644 index 0000000..5137891 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/integration/TestProducerMetricsIT.java @@ -0,0 +1,94 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics.integration; + +import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter; +import io.strimzi.kafka.metrics.TestUtils; +import io.strimzi.test.container.StrimziKafkaContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestProducerMetricsIT { + + private StrimziKafkaContainer broker; + private Map env; + + @BeforeEach + public void setUp() { + broker = new StrimziKafkaContainer() + .withKraft() + .withNetworkAliases("kafka"); + broker.start(); + + env = new HashMap<>(); + env.put("CLIENT_TYPE", "KafkaProducer"); + env.put("BOOTSTRAP_SERVERS", "kafka:9093"); + env.put("TOPIC", "my-topic"); + env.put("ADDITIONAL_CONFIG", "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName()); + env.put("CLASSPATH", TestUtils.MOUNT_PATH + "*"); + env.put("MESSAGE_COUNT", "1000"); + env.put("DELAY_MS", "100"); + } + + @AfterEach + public void tearDown() { + broker.stop(); + } + + @Test + public void testProducerMetrics() { + try (GenericContainer producer = TestUtils.clientContainer(env)) { + producer.start(); + + List prefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_producer_app_info_", + "kafka_producer_kafka_metrics_", + "kafka_producer_producer_metrics_", + "kafka_producer_producer_node_metrics_", + "kafka_producer_producer_topic_metrics_"); + for (String prefix : prefixes) { + TestUtils.verify(producer, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + } + } + + @Test + public void testProducerMetricsWithAllowlist() { + env.put("ADDITIONAL_CONFIG", + "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName() + "\n" + + "prometheus.metrics.reporter.allowlist=kafka_producer_kafka_metrics_.*,kafka_producer_producer_topic_metrics_.*"); + try (GenericContainer producer = TestUtils.clientContainer(env)) { + producer.start(); + + List allowedPrefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_producer_kafka_metrics_", + "kafka_producer_producer_topic_metrics_"); + for (String prefix : allowedPrefixes) { + TestUtils.verify(producer, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + List disallowedPrefixes = Arrays.asList( + "kafka_producer_app_info_", + "kafka_producer_producer_metrics_", + "kafka_producer_producer_node_metrics_"); + for (String prefix : disallowedPrefixes) { + TestUtils.verify(producer, prefix, metrics -> assertTrue(metrics.isEmpty())); + } + } + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/integration/TestStreamsMetricsIT.java b/src/test/java/io/strimzi/kafka/metrics/integration/TestStreamsMetricsIT.java new file mode 100644 index 0000000..0a5a5a0 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/integration/TestStreamsMetricsIT.java @@ -0,0 +1,152 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics.integration; + +import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter; +import io.strimzi.kafka.metrics.TestUtils; +import io.strimzi.test.container.StrimziKafkaContainer; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestStreamsMetricsIT { + + private StrimziKafkaContainer broker; + private Map env; + + @BeforeEach + public void setUp() throws Exception { + broker = new StrimziKafkaContainer() + .withKraft() + .withNetworkAliases("kafka"); + broker.start(); + + try (Admin admin = Admin.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()))) { + admin.createTopics(Arrays.asList( + new NewTopic("source-topic", 1, (short) -1), + new NewTopic("target-topic", 1, (short) -1)) + ).all().get(); + } + + Map producerConfigs = new HashMap<>(); + producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()); + producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + try (KafkaProducer producer = new KafkaProducer<>(producerConfigs)) { + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>("source-topic", "record" + i)); + } + producer.flush(); + } + + env = new HashMap<>(); + env.put("CLIENT_TYPE", "KafkaStreams"); + env.put("BOOTSTRAP_SERVERS", "kafka:9093"); + env.put("APPLICATION_ID", "my-app-id"); + env.put("SOURCE_TOPIC", "source-topic"); + env.put("TARGET_TOPIC", "target-topic"); + env.put("ADDITIONAL_CONFIG", "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName()); + env.put("CLASSPATH", TestUtils.MOUNT_PATH + "*"); + } + + @AfterEach + public void tearDown() { + broker.stop(); + } + + @Test + public void testStreamsMetrics() { + try (GenericContainer streams = TestUtils.clientContainer(env)) { + streams.start(); + + List prefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_admin_client_app_info_", + "kafka_admin_client_kafka_metrics_", + "kafka_admin_client_admin_client_metrics_", + "kafka_admin_client_admin_client_node_metrics_", + "kafka_consumer_app_info_", + "kafka_consumer_kafka_metrics_", + "kafka_consumer_consumer_metrics_", + "kafka_consumer_consumer_node_metrics_", + "kafka_consumer_consumer_coordinator_metrics_", + "kafka_consumer_consumer_fetch_manager_metrics_", + "kafka_producer_app_info_", + "kafka_producer_kafka_metrics_", + "kafka_producer_producer_metrics_", + "kafka_producer_producer_node_metrics_", + "kafka_producer_producer_topic_metrics_", + "kafka_streams_stream_metrics_", + "kafka_streams_stream_processor_node_metrics_", + "kafka_streams_stream_state_updater_metrics_", + "kafka_streams_stream_task_metrics_", + "kafka_streams_stream_thread_metrics_", + "kafka_streams_stream_topic_metrics_"); + for (String prefix : prefixes) { + TestUtils.verify(streams, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + } + } + + @Test + public void testStreamsMetricsWithAllowlist() { + env.put("ADDITIONAL_CONFIG", + "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName() + "\n" + + "prometheus.metrics.reporter.allowlist=kafka_consumer_.*,kafka_streams_stream_metrics_.*"); + try (GenericContainer streams = TestUtils.clientContainer(env)) { + streams.start(); + + List allowedPrefixes = Arrays.asList( + "jvm_", + "process_", + "kafka_consumer_app_info_", + "kafka_consumer_kafka_metrics_", + "kafka_consumer_consumer_metrics_", + "kafka_consumer_consumer_node_metrics_", + "kafka_consumer_consumer_coordinator_metrics_", + "kafka_consumer_consumer_fetch_manager_metrics_", + "kafka_streams_stream_metrics_"); + for (String prefix : allowedPrefixes) { + TestUtils.verify(streams, prefix, metrics -> assertFalse(metrics.isEmpty())); + } + List disallowedPrefixes = Arrays.asList( + "kafka_admin_client_app_info_", + "kafka_admin_client_kafka_metrics_", + "kafka_admin_client_admin_client_metrics_", + "kafka_admin_client_admin_client_node_metrics_", + "kafka_producer_app_info_", + "kafka_producer_kafka_metrics_", + "kafka_producer_producer_metrics_", + "kafka_producer_producer_node_metrics_", + "kafka_producer_producer_topic_metrics_", + "kafka_streams_stream_processor_node_metrics_", + "kafka_streams_stream_state_updater_metrics_", + "kafka_streams_stream_task_metrics_", + "kafka_streams_stream_thread_metrics_", + "kafka_streams_stream_topic_metrics_"); + for (String prefix : disallowedPrefixes) { + TestUtils.verify(streams, prefix, metrics -> assertTrue(metrics.isEmpty())); + } + } + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaCollectorTest.java index 889d30b..822dc6f 100644 --- a/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaCollectorTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaCollectorTest.java @@ -18,9 +18,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static io.strimzi.kafka.metrics.MetricsUtils.assertGaugeSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.assertInfoSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; +import static io.strimzi.kafka.metrics.TestUtils.assertGaugeSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertInfoSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.newKafkaMetric; import static org.junit.jupiter.api.Assertions.assertEquals; public class KafkaCollectorTest { diff --git a/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaMetricWrapperTest.java b/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaMetricWrapperTest.java index 785af4c..3e36882 100644 --- a/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaMetricWrapperTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/kafka/KafkaMetricWrapperTest.java @@ -15,7 +15,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; +import static io.strimzi.kafka.metrics.TestUtils.newKafkaMetric; import static org.junit.jupiter.api.Assertions.assertEquals; public class KafkaMetricWrapperTest { diff --git a/src/test/java/io/strimzi/kafka/metrics/yammer/YammerCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/yammer/YammerCollectorTest.java index 7e05745..fd4112b 100644 --- a/src/test/java/io/strimzi/kafka/metrics/yammer/YammerCollectorTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/yammer/YammerCollectorTest.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import static io.strimzi.kafka.metrics.MetricsUtils.assertGaugeSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.assertInfoSnapshot; -import static io.strimzi.kafka.metrics.MetricsUtils.newYammerMetric; +import static io.strimzi.kafka.metrics.TestUtils.assertGaugeSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.assertInfoSnapshot; +import static io.strimzi.kafka.metrics.TestUtils.newYammerMetric; import static org.junit.jupiter.api.Assertions.assertEquals; public class YammerCollectorTest { diff --git a/src/test/java/io/strimzi/kafka/metrics/yammer/YammerMetricWrapperTest.java b/src/test/java/io/strimzi/kafka/metrics/yammer/YammerMetricWrapperTest.java index edad975..4acc400 100644 --- a/src/test/java/io/strimzi/kafka/metrics/yammer/YammerMetricWrapperTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/yammer/YammerMetricWrapperTest.java @@ -8,7 +8,7 @@ import com.yammer.metrics.core.MetricName; import io.prometheus.metrics.model.snapshots.Labels; import io.prometheus.metrics.model.snapshots.PrometheusNaming; -import io.strimzi.kafka.metrics.MetricsUtils; +import io.strimzi.kafka.metrics.TestUtils; import org.junit.jupiter.api.Test; import java.util.concurrent.atomic.AtomicInteger; @@ -42,7 +42,7 @@ public void testYammerMetricName() { public void testYammerMetric() { AtomicInteger value = new AtomicInteger(0); MetricName name = new MetricName("group", "type", "name"); - Gauge metric = MetricsUtils.newYammerMetric(value::get); + Gauge metric = TestUtils.newYammerMetric(value::get); YammerMetricWrapper wrapper = new YammerMetricWrapper(YammerMetricWrapper.prometheusName(name), "", metric, "name"); assertEquals(value.get(), ((Gauge) wrapper.metric()).value()); value.incrementAndGet();