diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..49b5722
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+*.class
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# Maven stuff
+**/target/*
+
+# Eclipse stuff
+**/.project
+**/.settings/*
+**/.prefs
+**/.classpath
+/target/
+
+# IntelliJ IDEA specific
+.idea/
+*.iml
+
+# VS Code
+.factorypath
+.vscode
+
+.DS_Store
+
diff --git a/README.md b/README.md
index ec45640..87f52b8 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,33 @@
This repository contains the _Prometheus Metrics Reporter for Apache Kafka server and client components_ as proposed in [Strimzi Proposal #64](https://github.com/strimzi/proposals/blob/main/064-prometheus-metrics-reporter.md).
The implementation is currently still in progress.
+## Build
+
+```shell
+mvn package assembly:single
+```
+
+## Run
+
+### Kafka Brokers
+
+Add the following to your broker configuration:
+```properties
+metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+kafka.metrics.reporters=io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter
+```
+
+### Kafka Clients
+
+Add the following to your client configuration:
+```properties
+metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+```
+
+## Access Metrics
+
+Metrics are exposed on `http://localhost:8080/metrics`.
+
## Getting help
If you encounter any issues while using Strimzi, you can get help using:
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..51eba8c
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,143 @@
+
+
+
+ 4.0.0
+
+ io.strimzi
+ metrics-reporter
+ 1.0-SNAPSHOT
+
+ metrics-reporter
+ Prometheus Metrics Reporter for Apache Kafka server and client components.
+ https://strimzi.io/
+
+
+ scm:git:git://github.com/strimzi/metrics-reporter.git
+ scm:git:ssh://github.com:strimzi/metrics-reporter.git
+ https://github.com/strimzi/metrics-reporter
+
+
+
+ GitHub
+ https://github.com/strimzi/metrics-reporter/issues
+
+
+
+
+ The Apache License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+ UTF-8
+ 11
+ 11
+ 3.6.1
+ 0.16.0
+ 2.2.0
+ 2.0.6
+ 5.10.1
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ provided
+
+
+ org.apache.kafka
+ kafka-server-common
+ ${kafka.version}
+ provided
+
+
+ org.apache.kafka
+ kafka_2.13
+ ${kafka.version}
+ provided
+
+
+ com.yammer.metrics
+ metrics-core
+ ${yammer.version}
+ provided
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+
+ io.prometheus
+ simpleclient
+ ${prometheus.version}
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${prometheus.version}
+
+
+ io.prometheus
+ simpleclient_httpserver
+ ${prometheus.version}
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.version}
+ test
+
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 3.1.0
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+
+ 11
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.1.1
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java
new file mode 100644
index 0000000..eabfd30
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.Collector;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Prometheus Collector to store and export metrics retrieved by the reporters.
+ */
+public class KafkaMetricsCollector extends Collector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollector.class.getName());
+
+ private final Map metrics;
+ private final PrometheusMetricsReporterConfig config;
+ private String prefix;
+
+ public KafkaMetricsCollector(PrometheusMetricsReporterConfig config) {
+ this.config = config;
+ this.metrics = new ConcurrentHashMap<>();
+ }
+
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public List collect() {
+ List samples = new ArrayList<>();
+
+ for (Map.Entry entry : metrics.entrySet()) {
+ MetricName metricName = entry.getKey();
+ KafkaMetric kafkaMetric = entry.getValue();
+ LOG.trace("Collecting Kafka metric {}", metricName);
+
+ String name = metricName(metricName);
+ // TODO Filtering should take labels into account
+ if (!config.isAllowed(name)) {
+ LOG.info("Kafka metric {} is not allowed", name);
+ continue;
+ }
+ LOG.info("Kafka metric {} is allowed", name);
+ LOG.info("labels " + metricName.tags());
+ MetricFamilySamples sample = convert(name, metricName.description(), kafkaMetric, metricName.tags());
+ if (sample != null) {
+ samples.add(sample);
+ }
+ }
+ return samples;
+ }
+
+ public void addMetric(KafkaMetric metric) {
+ metrics.put(metric.metricName(), metric);
+ }
+
+ public void removeMetric(KafkaMetric metric) {
+ metrics.remove(metric.metricName());
+ }
+
+ String metricName(MetricName metricName) {
+ String prefix = this.prefix
+ .replace('.', '_')
+ .replace('-', '_')
+ .toLowerCase();
+ String group = metricName.group()
+ .replace('.', '_')
+ .replace('-', '_')
+ .toLowerCase();
+ String name = metricName.name()
+ .replace('.', '_')
+ .replace('-', '_')
+ .toLowerCase();
+ return prefix + '_' + group + '_' + name;
+ }
+
+ static MetricFamilySamples convert(String name, String help, KafkaMetric metric, Map labels) {
+ Object value = metric.metricValue();
+ if (!(value instanceof Number)) {
+ // Prometheus only accepts numeric metrics.
+ // Kafka gauges can have arbitrary types, so skip them for now
+ // TODO move non-numeric values to labels
+ return null;
+ }
+ Map sanitizedLabels = labels.entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> Collector.sanitizeMetricName(e.getKey()),
+ Map.Entry::getValue,
+ (v1, v2) -> { throw new IllegalStateException("Unexpected duplicate key " + v1); },
+ LinkedHashMap::new));
+ return new MetricFamilySamplesBuilder(Type.GAUGE, help)
+ .addSample(name, ((Number) value).doubleValue(), sanitizedLabels)
+ .build();
+ }
+}
diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java
new file mode 100644
index 0000000..5b4a9cc
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
+ *
+ * This can be used by Kafka brokers and clients.
+ */
+public class KafkaPrometheusMetricsReporter implements MetricsReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class.getName());
+
+ private KafkaMetricsCollector kafkaMetricsCollector;
+ private Optional httpServer;
+
+ @Override
+ public void configure(Map map) {
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map);
+ kafkaMetricsCollector = new KafkaMetricsCollector(config);
+ // Add JVM metrics
+ DefaultExports.initialize();
+ httpServer = config.startHttpServer();
+ }
+
+ @Override
+ public void init(List metrics) {
+ CollectorRegistry.defaultRegistry.register(kafkaMetricsCollector);
+ for (KafkaMetric metric : metrics) {
+ metricChange(metric);
+ }
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ LOG.info("Kafka metricChange " + metric.metricName());
+ kafkaMetricsCollector.addMetric(metric);
+ }
+
+ @Override
+ public void metricRemoval(KafkaMetric metric) {
+ LOG.info("Kafka metricRemoval " + metric.metricName());
+ kafkaMetricsCollector.removeMetric(metric);
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing the HTTP server");
+ }
+
+ @Override
+ public void reconfigure(Map configs) {
+ }
+
+ @Override
+ public void validateReconfiguration(Map configs) throws ConfigException {
+ }
+
+ @Override
+ public Set reconfigurableConfigs() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void contextChange(MetricsContext metricsContext) {
+ LOG.info("Kafka contextChange with " + metricsContext.contextLabels());
+ String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+ kafkaMetricsCollector.setPrefix(prefix);
+ }
+
+ public int getPort() {
+ return httpServer.get().getPort();
+ }
+}
diff --git a/src/main/java/io/strimzi/kafka/metrics/MetricFamilySamplesBuilder.java b/src/main/java/io/strimzi/kafka/metrics/MetricFamilySamplesBuilder.java
new file mode 100644
index 0000000..abf0754
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/MetricFamilySamplesBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import com.yammer.metrics.stats.Snapshot;
+import io.prometheus.client.Collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to convert Kafka metrics into the Prometheus format.
+ */
+public class MetricFamilySamplesBuilder {
+
+ private final Collector.Type type;
+ private final String help;
+ private final List samples;
+
+ public MetricFamilySamplesBuilder(Collector.Type type, String help) {
+ this.type = type;
+ this.help = help;
+ this.samples = new ArrayList<>();
+ }
+
+ MetricFamilySamplesBuilder addSample(String name, double value, Map labels) {
+ samples.add(new Collector.MetricFamilySamples.Sample(
+ Collector.sanitizeMetricName(name),
+ new ArrayList<>(labels.keySet()),
+ new ArrayList<>(labels.values()),
+ value));
+ return this;
+ }
+
+ MetricFamilySamplesBuilder addQuantileSamples(String name, Snapshot snapshot, Map labels) {
+ for (String quantile : Arrays.asList("0.50", "0.75", "0.95", "0.98", "0.99", "0.999")) {
+ Map newLabels = new HashMap<>(labels);
+ newLabels.put("quantile", quantile);
+ addSample(name, snapshot.getValue(Double.parseDouble(quantile)), newLabels);
+ }
+ return this;
+ }
+
+ Collector.MetricFamilySamples build() {
+ if (samples.isEmpty()) {
+ throw new IllegalStateException("There are no samples");
+ }
+ return new Collector.MetricFamilySamples(samples.get(0).name, type, help, samples);
+ }
+}
diff --git a/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java b/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java
new file mode 100644
index 0000000..e590e15
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.exporter.HTTPServer;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * Configuration for the reporter implementations.
+ */
+public class PrometheusMetricsReporterConfig extends AbstractConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsReporterConfig.class.getName());
+
+ public static final String CONFIG_PREFIX = "prometheus.metrics.reporter.";
+
+ public static final String PORT_CONFIG = CONFIG_PREFIX + "port";
+ public static final int PORT_CONFIG_DEFAULT = 8080;
+ public static final String PORT_CONFIG_DOC = "The HTTP port to expose the metrics.";
+
+ public static final String ALLOWLIST_CONFIG = CONFIG_PREFIX + "allowlist";
+ public static final String ALLOWLIST_CONFIG_DEFAULT = ".*";
+ public static final String ALLOWLIST_CONFIG_DOC = "A comma separated list of regex Patterns to specify the metrics to collect.";
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(PORT_CONFIG, ConfigDef.Type.INT, PORT_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, PORT_CONFIG_DOC)
+ .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC);
+
+ private final int port;
+ private final Pattern allowlist;
+
+ public PrometheusMetricsReporterConfig(Map, ?> props) {
+ super(CONFIG_DEF, props);
+ this.port = getInt(PORT_CONFIG);
+ this.allowlist = compileAllowlist(getList(ALLOWLIST_CONFIG));
+ }
+
+ public int port() {
+ return port;
+ }
+
+ public boolean isAllowed(String name) {
+ return allowlist.matcher(name).matches();
+ }
+
+ private Pattern compileAllowlist(List allowlist) {
+ String joined = String.join("|", allowlist);
+ return Pattern.compile(joined);
+ }
+
+ @Override
+ public String toString() {
+ return "PrometheusMetricsReporterConfig{" +
+ "allowlist=" + allowlist +
+ ", port=" + port +
+ '}';
+ }
+
+ public synchronized Optional startHttpServer() {
+ try {
+ HTTPServer httpServer = new HTTPServer(port, true);
+ LOG.info("HTTP server started on port " + port);
+ return Optional.of(httpServer);
+ } catch (BindException be) {
+ LOG.info("HTTP server already started");
+ return Optional.empty();
+ } catch (IOException ioe) {
+ LOG.error("Failed starting HTTP server", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+}
diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java
new file mode 100644
index 0000000..464f5cb
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import io.prometheus.client.Collector;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
+ *
+ * This can be used by Kafka brokers and clients.
+ */
+public class YammerMetricsCollector extends Collector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsCollector.class.getName());
+
+ private final List registries;
+ private final PrometheusMetricsReporterConfig config;
+
+ public YammerMetricsCollector(PrometheusMetricsReporterConfig config) {
+ this.config = config;
+ this.registries = Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry());
+ }
+
+ @Override
+ public List collect() {
+ List samples = new ArrayList<>();
+
+ for (MetricsRegistry registry : registries) {
+ for (Map.Entry entry : registry.allMetrics().entrySet()) {
+ MetricName metricName = entry.getKey();
+ Metric metric = entry.getValue();
+ LOG.trace("Collecting Yammer metric {}", metricName);
+
+ String name = metricName(metricName);
+ // TODO Filtering should take labels into account
+ if (!config.isAllowed(name)) {
+ LOG.info("Yammer metric {} is not allowed", name);
+ continue;
+ }
+ LOG.info("Yammer metric {} is allowed", name);
+ Map labels = labelsFromScope(metricName.getScope());
+ LOG.info("labels " + labels);
+
+ MetricFamilySamples sample = null;
+ if (metric instanceof Counter) {
+ sample = convert(name, (Counter) metric, labels);
+ } else if (metric instanceof Gauge) {
+ sample = convert(name, (Gauge>) metric, labels);
+ } else if (metric instanceof Histogram) {
+ sample = convert(name, (Histogram) metric, labels);
+ } else if (metric instanceof Meter) {
+ sample = convert(name, (Meter) metric, labels);
+ } else if (metric instanceof Timer) {
+ sample = convert(name, (Timer) metric, labels);
+ } else {
+ LOG.error("The metric " + metric.getClass().getName() + " has an unexpected type.");
+ }
+ if (sample != null) {
+ samples.add(sample);
+ }
+ }
+ }
+ return samples;
+ }
+
+ static String metricName(MetricName metricName) {
+ String metricNameStr = Collector.sanitizeMetricName(
+ "kafka_server_" +
+ metricName.getGroup() + '_' +
+ metricName.getType() + '_' +
+ metricName.getName()).toLowerCase();
+ LOG.info("metricName group {}, type {}, name {} converted into {}", metricName.getGroup(), metricName.getType(), metricName.getName(), metricNameStr);
+ return metricNameStr;
+ }
+
+ static Map labelsFromScope(String scope) {
+ if (scope != null) {
+ String[] parts = scope.split("\\.");
+ if (parts.length % 2 == 0) {
+ Map labels = new LinkedHashMap<>();
+ for (int i = 0; i < parts.length; i += 2) {
+ labels.put(Collector.sanitizeMetricName(parts[i]), parts[i + 1]);
+ }
+ return labels;
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ static MetricFamilySamples convert(String name, Counter counter, Map labels) {
+ return new MetricFamilySamplesBuilder(Type.GAUGE, "")
+ .addSample(name + "_count", counter.count(), labels)
+ .build();
+ }
+
+ static MetricFamilySamples convert(String name, Gauge> gauge, Map labels) {
+ Object value = gauge.value();
+ if (!(value instanceof Number)) {
+ // Prometheus only accepts numeric metrics.
+ // Some Kafka gauges have string values (for example kafka.server:type=KafkaServer,name=ClusterId), so skip them
+ return null;
+ }
+ return new MetricFamilySamplesBuilder(Type.GAUGE, "")
+ .addSample(name, ((Number) value).doubleValue(), labels)
+ .build();
+ }
+
+ static MetricFamilySamples convert(String name, Meter meter, Map labels) {
+ return new MetricFamilySamplesBuilder(Type.COUNTER, "")
+ .addSample(name + "_count", meter.count(), labels)
+ .build();
+ }
+
+ static MetricFamilySamples convert(String name, Histogram histogram, Map labels) {
+ return new MetricFamilySamplesBuilder(Type.SUMMARY, "")
+ .addSample(name + "_count", histogram.count(), labels)
+ .addQuantileSamples(name, histogram.getSnapshot(), labels)
+ .build();
+ }
+
+ static MetricFamilySamples convert(String name, Timer metric, Map labels) {
+ return new MetricFamilySamplesBuilder(Type.SUMMARY, "")
+ .addSample(name + "_count", metric.count(), labels)
+ .addQuantileSamples(name, metric.getSnapshot(), labels)
+ .build();
+ }
+}
diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java
new file mode 100644
index 0000000..6141814
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.utils.VerifiableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * KafkaMetricsReporter to export Kafka broker metrics in the Prometheus format.
+ */
+public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YammerPrometheusMetricsReporter.class.getName());
+
+ @Override
+ public void init(VerifiableProperties props) {
+ LOG.info(">>> in init() yammer");
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props.props());
+ LOG.info("yammer defaultRegistry" + CollectorRegistry.defaultRegistry);
+ CollectorRegistry.defaultRegistry.register(new YammerMetricsCollector(config));
+ }
+
+}
diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java
new file mode 100644
index 0000000..24f21f5
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.Collector;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class KafkaMetricsCollectorTest {
+
+ private final MetricConfig metricConfig = new MetricConfig();
+ private final Time time = Time.SYSTEM;
+ private Map labels;
+
+ @BeforeEach
+ public void setup() {
+ labels = new HashMap<>();
+ labels.put("key", "value");
+ }
+
+ @Test
+ public void testCollect() {
+ Map props = new HashMap<>();
+ props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
+ KafkaMetricsCollector collector = new KafkaMetricsCollector(config);
+ collector.setPrefix("kafka.server");
+
+ List metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a metric not matching the allowlist does nothing
+ collector.addMetric(buildMetric("name", "other", 2.0));
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a non-numeric metric does nothing
+ collector.addMetric(buildNonNumericMetric("name2", "group"));
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a metric that matches the allowlist
+ collector.addMetric(buildMetric("name", "group", 1.0));
+ metrics = collector.collect();
+ assertEquals(1, metrics.size());
+ assertEquals("kafka_server_group_name", metrics.get(0).name);
+ assertEquals(1, metrics.get(0).samples.size());
+ assertEquals(1.0, metrics.get(0).samples.get(0).value, 0.1);
+ assertEquals(new ArrayList<>(labels.keySet()), metrics.get(0).samples.get(0).labelNames);
+ assertEquals(new ArrayList<>(labels.values()), metrics.get(0).samples.get(0).labelValues);
+
+ // Adding the same metric updates its value
+ collector.addMetric(buildMetric("name", "group", 3.0));
+ metrics = collector.collect();
+ assertEquals(1, metrics.size());
+ assertEquals("kafka_server_group_name", metrics.get(0).name);
+ assertEquals(1, metrics.get(0).samples.size());
+ assertEquals(3.0, metrics.get(0).samples.get(0).value, 0.1);
+
+ // Removing the metric
+ collector.removeMetric(buildMetric("name", "group", 4.0));
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+ }
+
+ private KafkaMetric buildMetric(String name, String group, double value) {
+ Measurable measurable = (config, now) -> value;
+ return new KafkaMetric(
+ new Object(),
+ new MetricName(name, group, "", labels),
+ measurable,
+ metricConfig,
+ time);
+ }
+
+ private KafkaMetric buildNonNumericMetric(String name, String group) {
+ Gauge measurable = (config, now) -> "hello";
+ return new KafkaMetric(
+ new Object(),
+ new MetricName(name, group, "", labels),
+ measurable,
+ metricConfig,
+ time);
+ }
+
+}
diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java
new file mode 100644
index 0000000..0f1a266
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class KafkaPrometheusMetricsReporterTest {
+
+ private final MetricConfig metricConfig = new MetricConfig();
+ private final Time time = Time.SYSTEM;
+ private final Map labels = Collections.singletonMap("key", "value");
+
+ @BeforeEach
+ public void setup() {
+ CollectorRegistry.defaultRegistry.clear();
+ }
+
+ @Test
+ public void testLifeCycle() throws Exception {
+ KafkaPrometheusMetricsReporter reporter = new KafkaPrometheusMetricsReporter();
+ Map configs = new HashMap<>();
+ configs.put(PrometheusMetricsReporterConfig.PORT_CONFIG, "0");
+ reporter.configure(configs);
+ reporter.contextChange(new KafkaMetricsContext("kafka.server"));
+ int port = reporter.getPort();
+ // The first test that runs will
+ int initialMetrics = getMetrics(port).size();
+
+ KafkaMetric metric1 = buildMetric("name1", "group", 0);
+ reporter.init(Collections.singletonList(metric1));
+
+ List metrics = getMetrics(port);
+ assertEquals(initialMetrics + 1, metrics.size());
+
+ KafkaMetric metric2 = buildMetric("name2", "group", 0);
+ reporter.metricChange(metric2);
+ metrics = getMetrics(port);
+ assertEquals(initialMetrics + 2, metrics.size());
+
+ KafkaMetric metric3 = buildNonNumericMetric("name3", "group");
+ reporter.metricChange(metric3);
+ metrics = getMetrics(port);
+ assertEquals(initialMetrics + 2, metrics.size());
+
+ reporter.metricRemoval(metric1);
+ metrics = getMetrics(port);
+ assertEquals(initialMetrics + 1, metrics.size());
+
+ reporter.close();
+ }
+
+ @Test
+ public void testMultipleReporters() throws Exception {
+ Map configs = new HashMap<>();
+ configs.put(PrometheusMetricsReporterConfig.PORT_CONFIG, "0");
+
+ KafkaPrometheusMetricsReporter reporter1 = new KafkaPrometheusMetricsReporter();
+ reporter1.configure(configs);
+ reporter1.contextChange(new KafkaMetricsContext("kafka.server"));
+ int port = reporter1.getPort();
+ int initialMetrics = getMetrics(port).size();
+
+ KafkaPrometheusMetricsReporter reporter2 = new KafkaPrometheusMetricsReporter();
+ configs.put(PrometheusMetricsReporterConfig.PORT_CONFIG, String.valueOf(port));
+ reporter2.configure(configs);
+ reporter2.contextChange(new KafkaMetricsContext("kafka.server"));
+
+ KafkaMetric metric1 = buildMetric("name1", "group", 0);
+ reporter1.init(Collections.singletonList(metric1));
+
+ KafkaMetric metric2 = buildMetric("name2", "group", 0);
+ reporter2.init(Collections.singletonList(metric2));
+
+ int endMetrics = getMetrics(port).size();
+ assertTrue(initialMetrics < endMetrics);
+
+ reporter1.close();
+ reporter2.close();
+ }
+
+ private KafkaMetric buildMetric(String name, String group, double value) {
+ Measurable measurable = (config, now) -> value;
+ return new KafkaMetric(
+ new Object(),
+ new MetricName(name, group, "", labels),
+ measurable,
+ metricConfig,
+ time);
+ }
+
+ private KafkaMetric buildNonNumericMetric(String name, String group) {
+ Gauge measurable = (config, now) -> "hello";
+ return new KafkaMetric(
+ new Object(),
+ new MetricName(name, group, "", labels),
+ measurable,
+ metricConfig,
+ time);
+ }
+
+ private List getMetrics(int port) throws Exception {
+ List metrics = new ArrayList<>();
+ URL url = new URL("http://localhost:" + port + "/metrics");
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+ con.setRequestMethod("GET");
+
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ if (!inputLine.startsWith("#")) {
+ metrics.add(inputLine);
+ }
+ }
+ }
+ return metrics;
+ }
+
+}
diff --git a/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java b/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java
new file mode 100644
index 0000000..0eba5e9
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+public class PrometheusMetricsReporterConfigTest {
+
+ @Test
+ public void testDefaults() {
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(Collections.emptyMap());
+
+ assertEquals(PrometheusMetricsReporterConfig.PORT_CONFIG_DEFAULT, config.port());
+ assertTrue(config.isAllowed("random_name"));
+ }
+
+ @Test
+ public void testOverrides() {
+ Map props = new HashMap<>();
+ props.put(PrometheusMetricsReporterConfig.PORT_CONFIG, "1234");
+ props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server.*");
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
+
+ assertEquals(1234, config.port());
+ assertFalse(config.isAllowed("random_name"));
+ assertTrue(config.isAllowed("kafka_server_metric"));
+ }
+
+ @Test
+ public void testAllowlist() {
+ Map props = new HashMap<>();
+ props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server.*,kafka_network.*");
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
+
+ assertFalse(config.isAllowed("random_name"));
+ assertTrue(config.isAllowed("kafka_server_metric"));
+ assertTrue(config.isAllowed("kafka_network_metric"));
+ }
+}
diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java
new file mode 100644
index 0000000..6653a76
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import io.prometheus.client.Collector;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class YammerMetricsCollectorTest {
+
+ private LinkedHashMap tags;
+
+ @BeforeEach
+ public void setup() {
+ tags = new LinkedHashMap<>();
+ tags.put("k1", "v1");
+ tags.put("k2", "v2");
+ }
+
+ @Test
+ public void testCollect() {
+ Map props = new HashMap<>();
+ props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
+ PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
+ YammerMetricsCollector collector = new YammerMetricsCollector(config);
+
+ List metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a metric not matching the allowlist does nothing
+ newCounter("other", "name", "type");
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a non-numeric metric does nothing
+ newNonNumericGauge("group", "name2", "type");
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+
+ // Adding a metric that matches the allowlist
+ Counter counter = newCounter("group", "name", "type");
+ metrics = collector.collect();
+ assertEquals(1, metrics.size());
+ assertEquals("kafka_server_group_name_type_count", metrics.get(0).name);
+ assertEquals(1, metrics.get(0).samples.size());
+ assertEquals(0.0, metrics.get(0).samples.get(0).value, 0.1);
+ assertEquals(new ArrayList<>(tags.keySet()), metrics.get(0).samples.get(0).labelNames);
+ assertEquals(new ArrayList<>(tags.values()), metrics.get(0).samples.get(0).labelValues);
+
+ // Updating the value of the metric
+ counter.inc(10);
+ metrics = collector.collect();
+ assertEquals(1, metrics.size());
+ assertEquals("kafka_server_group_name_type_count", metrics.get(0).name);
+ assertEquals(1, metrics.get(0).samples.size());
+ assertEquals(10.0, metrics.get(0).samples.get(0).value, 0.1);
+
+ // Removing the metric
+ removeMetric("group", "name", "type");
+ metrics = collector.collect();
+ assertTrue(metrics.isEmpty());
+ }
+
+ @Test
+ public void testLabelsFromScope() {
+ assertEquals(tags, YammerMetricsCollector.labelsFromScope("k1.v1.k2.v2"));
+ assertEquals(Collections.emptyMap(), YammerMetricsCollector.labelsFromScope(null));
+ assertEquals(Collections.emptyMap(), YammerMetricsCollector.labelsFromScope("k1"));
+ assertEquals(Collections.emptyMap(), YammerMetricsCollector.labelsFromScope("k1."));
+ assertEquals(Collections.emptyMap(), YammerMetricsCollector.labelsFromScope("k1.v1.k"));
+ }
+
+ public Counter newCounter(String group, String name, String type) {
+ MetricName metricName = KafkaYammerMetrics.getMetricName(group, name, type, tags);
+ return KafkaYammerMetrics.defaultRegistry().newCounter(metricName);
+ }
+
+ public void newNonNumericGauge(String group, String name, String type) {
+ MetricName metricName = KafkaYammerMetrics.getMetricName(group, name, type, tags);
+ KafkaYammerMetrics.defaultRegistry().newGauge(metricName, new Gauge() {
+ @Override
+ public String value() {
+ return "value";
+ }
+ });
+ }
+
+ public void removeMetric(String group, String name, String type) {
+ MetricName metricName = KafkaYammerMetrics.getMetricName(group, name, type, tags);
+ KafkaYammerMetrics.defaultRegistry().removeMetric(metricName);
+ }
+
+}
diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java
new file mode 100644
index 0000000..886b51e
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+import kafka.utils.VerifiableProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+public class YammerPrometheusMetricsReporterTest {
+
+ @BeforeEach
+ public void setup() {
+ CollectorRegistry.defaultRegistry.clear();
+ }
+
+ @Test
+ public void testLifeCycle() {
+ YammerPrometheusMetricsReporter reporter = new YammerPrometheusMetricsReporter();
+ Properties configs = new Properties();
+ configs.put("broker.id", "0");
+ configs.put(PrometheusMetricsReporterConfig.PORT_CONFIG, "0");
+ reporter.init(new VerifiableProperties(configs));
+ }
+}