Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration tests for Consumer, Producer and Streams #67

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import java.util.Map;
import java.util.Optional;

import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics;
import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric;
import static io.strimzi.kafka.metrics.TestUtils.getMetrics;
import static io.strimzi.kafka.metrics.TestUtils.newKafkaMetric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import java.util.ArrayList;
import java.util.List;

import static io.strimzi.kafka.metrics.MetricsUtils.assertCounterSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertGaugeSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertInfoSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertSummarySnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertCounterSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertGaugeSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertInfoSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertSummarySnapshot;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,48 @@
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.Quantiles;
import io.prometheus.metrics.model.snapshots.SummarySnapshot;
import io.strimzi.kafka.metrics.http.Listener;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

/**
* Utility class to create and retrieve metrics
*/
@SuppressWarnings("ClassFanOutComplexity")
public class MetricsUtils {
public class TestUtils {

private static final String VERSION = "1.0.0-SNAPSHOT";
private static final String KAFKA_VERSION = "3.9.0";
private static final String CLIENTS_IMAGE = "quay.io/strimzi-test-clients/test-clients:latest-kafka-" + KAFKA_VERSION;
private static final Duration TIMEOUT = Duration.ofSeconds(10L);

public static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/";
public static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
public static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
public static final String KAFKA_NETWORK_ALIAS = "kafka";

/**
* Query the HTTP endpoint and returns the output
Expand Down Expand Up @@ -164,4 +183,44 @@ public static void assertSummarySnapshot(MetricSnapshot<?> snapshot, int expecte
assertEquals(expectedLabels, datapoint.getLabels());
}

/**
* Filter metrics that start with a specified prefix
* @param allMetrics all the metric names
* @param prefix the prefix
* @return the list of metric names that start with the prefix
*/
public static List<String> filterMetrics(List<String> allMetrics, String prefix) {
List<String> metrics = new ArrayList<>();
for (String metric : allMetrics) {
if (metric.startsWith(prefix)) {
metrics.add(metric);
}
}
return metrics;
}

public static void verify(GenericContainer<?> container, String prefix, ThrowingConsumer<List<String>> condition) {
assertTimeoutPreemptively(TIMEOUT, () -> {
while (true) {
try {
List<String> filteredMetrics = filterMetrics(getMetrics(container.getHost(), container.getMappedPort(PORT)), prefix);
condition.accept(filteredMetrics);
return;
} catch (Throwable t) {
assertInstanceOf(AssertionError.class, t);
TimeUnit.MILLISECONDS.sleep(100L);
}
}
});
}

public static GenericContainer<?> clientContainer(Map<String, String> env) {
return new GenericContainer<>(CLIENTS_IMAGE)
.withNetwork(Network.SHARED)
.withExposedPorts(PORT)
.withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH)
.withEnv(env)
.waitingFor(Wait.forHttp("/metrics").forStatusCode(200));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Map;
import java.util.Properties;

import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics;
import static io.strimzi.kafka.metrics.TestUtils.getMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class YammerPrometheusMetricsReporterTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@
package io.strimzi.kafka.metrics.integration;

import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.MetricsUtils;
import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig;
import io.strimzi.kafka.metrics.TestUtils;
import io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.http.Listener;
import io.strimzi.test.container.StrimziKafkaContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.MountableFile;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,11 +23,6 @@

public class TestBrokerMetricsIT {

private static final String VERSION = "1.0.0-SNAPSHOT";
private static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/";
private static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
private static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;

private Map<String, String> configs;
private StrimziKafkaContainer broker;

Expand All @@ -44,10 +35,10 @@ public void setUp() {
broker = new StrimziKafkaContainer()
.withNodeId(0)
.withKraft()
.withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH)
.withExposedPorts(9092, PORT)
.withCopyFileToContainer(MountableFile.forHostPath(TestUtils.REPORTER_JARS), TestUtils.MOUNT_PATH)
.withExposedPorts(9092, TestUtils.PORT)
.withKafkaConfigurationMap(configs)
.withEnv(Collections.singletonMap("CLASSPATH", MOUNT_PATH + "*"));
.withEnv(Collections.singletonMap("CLASSPATH", TestUtils.MOUNT_PATH + "*"));
}

@AfterEach
Expand All @@ -56,50 +47,44 @@ public void tearDown() {
}

@Test
public void testMetricsReporter() throws Exception {
public void testBrokerMetrics() throws Exception {
broker.start();
List<String> metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
List<String> prefixes = Arrays.asList(
"jvm_",
"kafka_controller_",
"kafka_coordinator_",
"kafka_log_",
"kafka_network_",
"kafka_server_");

List<String> prefixes = List.of(
"jvm_",
"process_",
"kafka_controller_",
"kafka_coordinator_",
"kafka_log_",
"kafka_network_",
"kafka_server_");
List<String> metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT));
for (String prefix : prefixes) {
assertFalse(filterMetrics(metrics, prefix).isEmpty());
assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
}

@Test
public void testMetricsReporterWithAllowlist() throws Exception {
public void testBrokerMetricsWithAllowlist() throws Exception {
configs.put("prometheus.metrics.reporter.allowlist", "kafka_controller.*,kafka_server.*");
broker.withKafkaConfigurationMap(configs);
broker.start();
List<String> metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
List<String> allowedPrefixes = Arrays.asList(
"jvm_",
"kafka_controller_",
"kafka_server_");

List<String> metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT));
List<String> allowedPrefixes = List.of(
"jvm_",
"process_",
"kafka_controller_",
"kafka_server_");
for (String prefix : allowedPrefixes) {
assertFalse(filterMetrics(metrics, prefix).isEmpty());
assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
List<String> disallowPrefixes = Arrays.asList(
"kafka_coordinator_",
"kafka_log_",
"kafka_network_");
List<String> disallowPrefixes = List.of(
"kafka_coordinator_",
"kafka_log_",
"kafka_network_");
for (String prefix : disallowPrefixes) {
assertTrue(filterMetrics(metrics, prefix).isEmpty());
}
}

private List<String> filterMetrics(List<String> allMetrics, String prefix) {
List<String> metrics = new ArrayList<>();
for (String metric : allMetrics) {
if (metric.startsWith(prefix)) {
metrics.add(metric);
}
assertTrue(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
return metrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics.integration;

import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.TestUtils;
import io.strimzi.test.container.StrimziKafkaContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestConsumerMetricsIT {

private StrimziKafkaContainer broker;
private Map<String, String> env;

@BeforeEach
public void setUp() {
broker = new StrimziKafkaContainer()
.withKraft()
.withNetworkAliases(TestUtils.KAFKA_NETWORK_ALIAS);
broker.start();

env = new HashMap<>();
env.put("CLIENT_TYPE", "KafkaConsumer");
env.put("BOOTSTRAP_SERVERS", TestUtils.KAFKA_NETWORK_ALIAS + ":9091");
env.put("TOPIC", "my-topic");
env.put("GROUP_ID", "my-group");
env.put("ADDITIONAL_CONFIG", "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName());
env.put("CLASSPATH", TestUtils.MOUNT_PATH + "*");
env.put("MESSAGE_COUNT", "1000");
}

@AfterEach
public void tearDown() {
broker.stop();
}

@Test
public void testConsumerMetrics() {
try (GenericContainer<?> consumer = TestUtils.clientContainer(env)) {
consumer.start();

List<String> prefixes = List.of(
"jvm_",
"process_",
"kafka_consumer_app_info_",
"kafka_consumer_kafka_metrics_",
"kafka_consumer_consumer_metrics_",
"kafka_consumer_consumer_node_metrics_",
"kafka_consumer_consumer_coordinator_metrics_",
"kafka_consumer_consumer_fetch_manager_metrics_");
for (String prefix : prefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty()));
}
}
}

@Test
public void testConsumerMetricsWithAllowlist() {
env.put("ADDITIONAL_CONFIG",
"metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName() + "\n" +
"prometheus.metrics.reporter.allowlist=kafka_consumer_kafka_metrics_.*,kafka_consumer_consumer_coordinator_metrics_.*");
try (GenericContainer<?> consumer = TestUtils.clientContainer(env)) {
consumer.start();

List<String> allowedPrefixes = List.of(
"jvm_",
"process_",
"kafka_consumer_kafka_metrics_",
"kafka_consumer_consumer_coordinator_metrics_");
for (String prefix : allowedPrefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty()));
}
List<String> disallowedPrefixes = List.of(
"kafka_consumer_app_info_",
"kafka_consumer_consumer_metrics_",
"kafka_consumer_consumer_node_metrics_",
"kafka_consumer_consumer_fetch_manager_metrics_");
for (String prefix : disallowedPrefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertTrue(metrics.isEmpty()));
}
}
}
}
Loading
Loading