From ff6ca6eaee0a21cf2011c4099204084753105031 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 8 Sep 2022 00:04:59 +0800 Subject: [PATCH 1/5] add prometheusRawMetricsProvider support --- .../metrics/DataSketchesOpStatsLogger.java | 21 +- .../prometheus/metrics/LongAdderCounter.java | 21 +- .../metrics/PrometheusMetricsProvider.java | 108 ++++---- .../metrics/PrometheusStatsLogger.java | 36 ++- .../metrics/PrometheusTextFormat.java | 213 +++++++++++++++ .../metrics/PrometheusTextFormatUtil.java | 245 +++++++++++------- .../prometheus/metrics/ScopeContext.java | 56 ++++ .../stats/prometheus/metrics/SimpleGauge.java | 11 +- .../prometheus/metrics/ThreadRegistry.java | 90 +++++++ .../ThreadScopedDataSketchesStatsLogger.java | 114 ++++++++ .../metrics/ThreadScopedLongAdderCounter.java | 104 ++++++++ .../broker/stats/PrometheusMetricsTest.java | 31 +++ 12 files changed, 885 insertions(+), 165 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java index 70a2c5bc79429..378ff0413d622 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java @@ -54,9 +54,15 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { private final LongAdder successSumAdder = new LongAdder(); private final LongAdder failSumAdder = new LongAdder(); - public DataSketchesOpStatsLogger() { + private Map labels; + + // used for lazy registration for thread scoped metrics + private boolean threadInitialized; + + public DataSketchesOpStatsLogger(Map labels) { this.current = new ThreadLocalAccessor(); this.replacement = new ThreadLocalAccessor(); + this.labels = labels; } @Override @@ -172,6 +178,19 @@ public double getQuantileValue(boolean success, double quantile) { return s != null ? s.getQuantile(quantile) : Double.NaN; } + public Map getLabels() { + return labels; + } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } + private static class LocalData { private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java index 39be12b35509b..4c6d92fbb84c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import java.util.Map; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.stats.Counter; @@ -30,8 +31,13 @@ public class LongAdderCounter implements Counter { private final LongAdder counter = new LongAdder(); - public LongAdderCounter() { + private Map labels; + // used for lazy registration for thread scoped metric + private boolean threadInitialized; + + public LongAdderCounter(Map labels) { + this.labels = labels; } @Override @@ -58,4 +64,17 @@ public void add(long delta) { public Long get() { return counter.sum(); } + + public Map getLabels() { + return labels; + } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java index 0e59286861a40..811bb0d501d66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java @@ -18,27 +18,30 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; -import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; import java.io.IOException; import java.io.Writer; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.CachingStatsProvider; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; +import org.apache.pulsar.common.util.SimpleTextOutputStream; /** - * A Prometheus based {@link StatsProvider} implementation. + * A Prometheus based {@link PrometheusRawMetricsProvider} implementation. */ -public class PrometheusMetricsProvider implements StatsProvider { +public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider { private ScheduledExecutorService executor; public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds"; @@ -46,81 +49,78 @@ public class PrometheusMetricsProvider implements StatsProvider { public static final String CLUSTER_NAME = "cluster"; public static final String DEFAULT_CLUSTER_NAME = "pulsar"; - private String cluster; - private final CachingStatsProvider cachingStatsProvider; - + private final CollectorRegistry registry; + private Map labels; /** * These acts a registry of the metrics defined in this provider. */ - public final ConcurrentMap counters = new ConcurrentSkipListMap<>(); - public final ConcurrentMap> gauges = new ConcurrentSkipListMap<>(); - public final ConcurrentMap opStats = new ConcurrentSkipListMap<>(); + public final ConcurrentMap counters = new ConcurrentHashMap<>(); + public final ConcurrentMap> gauges = new ConcurrentHashMap<>(); + public final ConcurrentMap opStats = new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedOpStats = + new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedCounters = + new ConcurrentHashMap<>(); + public PrometheusMetricsProvider() { - this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() { - @Override - public void start(Configuration conf) { - // nop - } - - @Override - public void stop() { - // nop - } - - @Override - public StatsLogger getStatsLogger(String scope) { - return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope); - } - - @Override - public String getStatsName(String... statsComponents) { - String completeName; - if (statsComponents.length == 0) { - return ""; - } else if (statsComponents[0].isEmpty()) { - completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length); - } else { - completeName = StringUtils.join(statsComponents, '_'); - } - return Collector.sanitizeMetricName(completeName); - } - }); + this(CollectorRegistry.defaultRegistry); + } + + public PrometheusMetricsProvider(CollectorRegistry registry) { + this.registry = registry; } - @Override public void start(Configuration conf) { - executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics")); + executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics")); int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS); - cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); + + executor.scheduleAtFixedRate(() -> { + rotateLatencyCollection(); + }, 1, latencyRolloverSeconds, TimeUnit.SECONDS); - executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection), - 1, latencyRolloverSeconds, TimeUnit.SECONDS); } - @Override public void stop() { - executor.shutdownNow(); + executor.shutdown(); } - @Override public StatsLogger getStatsLogger(String scope) { - return this.cachingStatsProvider.getStatsLogger(scope); + return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels); } @Override public void writeAllMetrics(Writer writer) throws IOException { - gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge)); - counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter)); - opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, - opStatLogger)); + PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); + PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry); + + gauges.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); + counters.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); + opStats.forEach((sc, opStatLogger) -> + prometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); } @Override + public void generate(SimpleTextOutputStream writer) { + gauges.forEach((sc, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, sc.getScope(), gauge)); + counters.forEach((sc, counter) -> PrometheusTextFormatUtil.writeCounter(writer, sc.getScope(), counter)); + opStats.forEach((sc, opStatLogger) -> + PrometheusTextFormatUtil.writeOpStat(writer, sc.getScope(), opStatLogger)); + } + public String getStatsName(String... statsComponents) { - return cachingStatsProvider.getStatsName(statsComponents); + String completeName; + if (statsComponents.length == 0) { + return ""; + } else if (statsComponents[0].isEmpty()) { + completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length); + } else { + completeName = StringUtils.join(statsComponents, '_'); + } + return Collector.sanitizeMetricName(completeName); } @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java index 6ca3996424ced..e6ddc139d8b29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java @@ -20,6 +20,8 @@ import com.google.common.base.Joiner; import io.prometheus.client.Collector; +import java.util.Map; +import java.util.TreeMap; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -32,35 +34,37 @@ public class PrometheusStatsLogger implements StatsLogger { private final PrometheusMetricsProvider provider; private final String scope; + private final Map labels; - public PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) { + PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope, Map labels) { this.provider = provider; this.scope = scope; + this.labels = labels; } @Override public OpStatsLogger getOpStatsLogger(String name) { - return provider.opStats.computeIfAbsent(completeName(name), x -> new DataSketchesOpStatsLogger()); + return provider.opStats.computeIfAbsent(scopeContext(name), x -> new DataSketchesOpStatsLogger(labels)); } - @Override public OpStatsLogger getThreadScopedOpStatsLogger(String name) { - throw new RuntimeException("not implemented"); + return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedDataSketchesStatsLogger(provider, x, labels)); } @Override public Counter getCounter(String name) { - return provider.counters.computeIfAbsent(completeName(name), x -> new LongAdderCounter()); + return provider.counters.computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); } - @Override public Counter getThreadScopedCounter(String name) { - throw new RuntimeException("not implemented"); + return provider.threadScopedCounters.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedLongAdderCounter(provider, x, labels)); } @Override public void registerGauge(String name, Gauge gauge) { - provider.gauges.computeIfAbsent(completeName(name), x -> new SimpleGauge(gauge)); + provider.gauges.computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); } @Override @@ -75,11 +79,21 @@ public void removeScope(String name, StatsLogger statsLogger) { @Override public StatsLogger scope(String name) { - return new PrometheusStatsLogger(provider, completeName(name)); + return new PrometheusStatsLogger(provider, completeName(name), labels); + } + + @Override + public StatsLogger scopeLabel(String labelName, String labelValue) { + Map newLabels = new TreeMap<>(labels); + newLabels.put(labelName, labelValue); + return new PrometheusStatsLogger(provider, scope, newLabels); + } + + private ScopeContext scopeContext(String name) { + return new ScopeContext(completeName(name), labels); } private String completeName(String name) { - String completeName = scope.isEmpty() ? name : Joiner.on('_').join(scope, name); - return Collector.sanitizeMetricName(completeName); + return Collector.sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java new file mode 100644 index 0000000000000..1d53069a71bbe --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import io.prometheus.client.Collector; +import io.prometheus.client.Collector.MetricFamilySamples; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import io.prometheus.client.CollectorRegistry; +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Logic to write metrics in Prometheus text format. + */ +public class PrometheusTextFormat { + + Set metricNameSet = new HashSet<>(); + + void writeGauge(Writer w, String name, SimpleGauge gauge) { + // Example: + // # TYPE bookie_storage_entries_count gauge + // bookie_storage_entries_count 519 + try { + writeType(w, name, "gauge"); + w.append(name); + writeLabels(w, gauge.getLabels()); + w.append(' ').append(gauge.getSample().toString()).append('\n'); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + void writeCounter(Writer w, String name, LongAdderCounter counter) { + // Example: + // # TYPE jvm_threads_started_total counter + // jvm_threads_started_total 59 + try { + writeType(w, name, "counter"); + w.append(name); + writeLabels(w, counter.getLabels()); + w.append(' ').append(counter.get().toString()).append('\n'); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + void writeOpStat(Writer w, String name, DataSketchesOpStatsLogger opStat) { + // Example: + // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 + try { + writeType(w, name, "summary"); + writeQuantile(w, opStat, name, false, 0.5); + writeQuantile(w, opStat, name, false, 0.75); + writeQuantile(w, opStat, name, false, 0.95); + writeQuantile(w, opStat, name, false, 0.99); + writeQuantile(w, opStat, name, false, 0.999); + writeQuantile(w, opStat, name, false, 0.9999); + writeQuantile(w, opStat, name, false, 1.0); + writeCount(w, opStat, name, false); + writeSum(w, opStat, name, false); + + writeQuantile(w, opStat, name, true, 0.5); + writeQuantile(w, opStat, name, true, 0.75); + writeQuantile(w, opStat, name, true, 0.95); + writeQuantile(w, opStat, name, true, 0.99); + writeQuantile(w, opStat, name, true, 0.999); + writeQuantile(w, opStat, name, true, 0.9999); + writeQuantile(w, opStat, name, true, 1.0); + writeCount(w, opStat, name, true); + writeSum(w, opStat, name, true); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void writeLabels(Writer w, Map labels) throws IOException { + if (labels.isEmpty()) { + return; + } + + w.append('{'); + writeLabelsNoBraces(w, labels); + w.append('}'); + } + + private void writeLabelsNoBraces(Writer w, Map labels) throws IOException { + if (labels.isEmpty()) { + return; + } + + boolean isFirst = true; + for (Map.Entry e : labels.entrySet()) { + if (!isFirst) { + w.append(','); + } + isFirst = false; + w.append(e.getKey()) + .append("=\"") + .append(e.getValue()) + .append('"'); + } + } + + private void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success, + double quantile) throws IOException { + w.append(name) + .append("{success=\"").append(success.toString()) + .append("\",quantile=\"").append(Double.toString(quantile)) + .append("\""); + if (!opStat.getLabels().isEmpty()) { + w.append(", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } + w.append("} ") + .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n'); + } + + private void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success) + throws IOException { + w.append(name).append("_count{success=\"").append(success.toString()).append("\""); + if (!opStat.getLabels().isEmpty()) { + w.append(", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } + w.append("} ") + .append(Long.toString(opStat.getCount(success))).append('\n'); + } + + private void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success) + throws IOException { + w.append(name).append("_sum{success=\"").append(success.toString()).append("\""); + if (!opStat.getLabels().isEmpty()) { + w.append(", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } + w.append("} ") + .append(Double.toString(opStat.getSum(success))).append('\n'); + } + + static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) throws IOException { + Enumeration metricFamilySamples = registry.metricFamilySamples(); + while (metricFamilySamples.hasMoreElements()) { + MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); + + for (int i = 0; i < metricFamily.samples.size(); i++) { + Sample sample = metricFamily.samples.get(i); + w.write(sample.name); + w.write('{'); + for (int j = 0; j < sample.labelNames.size(); j++) { + if (j != 0) { + w.write(", "); + } + w.write(sample.labelNames.get(j)); + w.write("=\""); + w.write(sample.labelValues.get(j)); + w.write('"'); + } + + w.write("} "); + w.write(Collector.doubleToGoString(sample.value)); + w.write('\n'); + } + } + } + + void writeType(Writer w, String name, String type) throws IOException { + if (metricNameSet.contains(name)) { + return; + } + metricNameSet.add(name); + w.append("# TYPE ").append(name).append(" ").append(type).append("\n"); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java index d00093c5ad09e..e4194b75e773a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java @@ -18,121 +18,174 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; -import java.io.IOException; -import java.io.Writer; -import org.apache.bookkeeper.stats.Counter; +import io.prometheus.client.Collector; +import io.prometheus.client.Collector.MetricFamilySamples; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import io.prometheus.client.CollectorRegistry; +import java.util.Enumeration; +import java.util.Map; +import org.apache.pulsar.common.util.SimpleTextOutputStream; /** * Logic to write metrics in Prometheus text format. */ public class PrometheusTextFormatUtil { - static void writeGauge(Writer w, String name, String cluster, SimpleGauge gauge) { + public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { // Example: - // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge - // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057 - try { - w.append("# TYPE ").append(name).append(" gauge\n"); - w.append(name).append("{cluster=\"").append(cluster).append("\"}") - .append(' ').append(gauge.getSample().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } + // # TYPE bookie_storage_entries_count gauge + // bookie_storage_entries_count 519 + w.write("# TYPE ").write(name).write(" gauge\n"); + w.write(name); + writeLabels(w, gauge.getLabels()); + w.write(' ').write(gauge.getSample().toString()).write('\n'); + } - static void writeCounter(Writer w, String name, String cluster, Counter counter) { + public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { // Example: // # TYPE jvm_threads_started_total counter - // jvm_threads_started_total{cluster="test"} 59 - try { - w.append("# TYPE ").append(name).append(" counter\n"); - w.append(name).append("{cluster=\"").append(cluster).append("\"}") - .append(' ').append(counter.get().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } + // jvm_threads_started_total 59 + w.write("# TYPE ").write(name).write(" counter\n"); + w.write(name); + writeLabels(w, counter.getLabels()); + w.write(' ').write(counter.get().toString()).write('\n'); } - static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) { + public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { // Example: - // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued summary - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.5"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.75"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.95"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.99"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.999"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.9999"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="1.0"} -Infinity - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="false"} 0 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="false"} 0.0 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.5"} 0.031 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.75"} 0.043 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.95"} 0.061 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.99"} 0.064 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.999"} 0.073 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.9999"} 0.073 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="1.0"} 0.552 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="true"} 40911432 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="true"} 527.0 - try { - w.append("# TYPE ").append(name).append(" summary\n"); - writeQuantile(w, opStat, name, cluster, false, 0.5); - writeQuantile(w, opStat, name, cluster, false, 0.75); - writeQuantile(w, opStat, name, cluster, false, 0.95); - writeQuantile(w, opStat, name, cluster, false, 0.99); - writeQuantile(w, opStat, name, cluster, false, 0.999); - writeQuantile(w, opStat, name, cluster, false, 0.9999); - writeQuantile(w, opStat, name, cluster, false, 1.0); - writeCount(w, opStat, name, cluster, false); - writeSum(w, opStat, name, cluster, false); - - writeQuantile(w, opStat, name, cluster, true, 0.5); - writeQuantile(w, opStat, name, cluster, true, 0.75); - writeQuantile(w, opStat, name, cluster, true, 0.95); - writeQuantile(w, opStat, name, cluster, true, 0.99); - writeQuantile(w, opStat, name, cluster, true, 0.999); - writeQuantile(w, opStat, name, cluster, true, 0.9999); - writeQuantile(w, opStat, name, cluster, true, 1.0); - writeCount(w, opStat, name, cluster, true); - writeSum(w, opStat, name, cluster, true); - - } catch (IOException e) { - throw new RuntimeException(e); + // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 + w.write("# TYPE ").write(name).write(" summary\n"); + writeQuantile(w, opStat, name, false, 0.5); + writeQuantile(w, opStat, name, false, 0.75); + writeQuantile(w, opStat, name, false, 0.95); + writeQuantile(w, opStat, name, false, 0.99); + writeQuantile(w, opStat, name, false, 0.999); + writeQuantile(w, opStat, name, false, 0.9999); + writeQuantile(w, opStat, name, false, 1.0); + writeCount(w, opStat, name, false); + writeSum(w, opStat, name, false); + + writeQuantile(w, opStat, name, true, 0.5); + writeQuantile(w, opStat, name, true, 0.75); + writeQuantile(w, opStat, name, true, 0.95); + writeQuantile(w, opStat, name, true, 0.99); + writeQuantile(w, opStat, name, true, 0.999); + writeQuantile(w, opStat, name, true, 0.9999); + writeQuantile(w, opStat, name, true, 1.0); + writeCount(w, opStat, name, true); + writeSum(w, opStat, name, true); + } + + private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success, double quantile) { + w.write(name) + .write("{success=\"").write(success.toString()) + .write("\",quantile=\"").write(Double.toString(quantile)); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); + } + w.write("} ") + .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n'); + } + + private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_count{success=\"").write(success.toString()); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); + } + w.write("} ") + .write(Long.toString(opStat.getCount(success))).write('\n'); + } + + private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_sum{success=\"").write(success.toString()); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); } + w.write("} ") + .write(Double.toString(opStat.getSum(success))).write('\n'); } - private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success, double quantile) throws IOException { - w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\", quantile=\"") - .append(Double.toString(quantile)).append("\"} ") - .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n'); + public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { + Enumeration metricFamilySamples = registry.metricFamilySamples(); + while (metricFamilySamples.hasMoreElements()) { + MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); + + for (int i = 0; i < metricFamily.samples.size(); i++) { + Sample sample = metricFamily.samples.get(i); + w.write(sample.name); + w.write('{'); + for (int j = 0; j < sample.labelNames.size(); j++) { + if (j != 0) { + w.write(", "); + } + w.write(sample.labelNames.get(j)); + w.write("=\""); + w.write(sample.labelValues.get(j)); + w.write('"'); + } + + w.write("} "); + w.write(Collector.doubleToGoString(sample.value)); + w.write('\n'); + } + } } - private static void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success) throws IOException { - w.append(name).append("_count{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\"} ") - .append(Long.toString(opStat.getCount(success))).append('\n'); + private static void writeLabels(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { + return; + } + + w.write('{'); + writeLabelsNoBraces(w, labels); + w.write('}'); } - private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success) throws IOException { - w.append(name).append("_sum{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\"} ") - .append(Double.toString(opStat.getSum(success))).append('\n'); + private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { + return; + } + + boolean isFirst = true; + for (Map.Entry e : labels.entrySet()) { + if (!isFirst) { + w.write(','); + } + isFirst = false; + w.write(e.getKey()) + .write("=\"") + .write(e.getValue()) + .write('"'); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java new file mode 100644 index 0000000000000..25648fb7b92db --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.Map; +import java.util.Objects; + +/** + * Holder for a scope and a set of associated labels. + */ +public class ScopeContext { + private final String scope; + private final Map labels; + + public ScopeContext(String scope, Map labels) { + this.scope = scope; + this.labels = labels; + } + + public String getScope() { + return scope; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScopeContext that = (ScopeContext) o; + return Objects.equals(scope, that.scope) && Objects.equals(labels, that.labels); + } + + @Override + public int hashCode() { + return Objects.hash(scope, labels); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java index 612168a2d9143..90b65e47bf699 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import java.util.Map; import org.apache.bookkeeper.stats.Gauge; /** @@ -25,13 +26,19 @@ */ public class SimpleGauge { + private final Map labels; private final Gauge gauge; - public SimpleGauge(final Gauge gauge) { + public SimpleGauge(final Gauge gauge, Map labels) { this.gauge = gauge; + this.labels = labels; } - public Number getSample() { + Number getSample() { return gauge.getSample(); } + + public Map getLabels() { + return labels; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java new file mode 100644 index 0000000000000..6fb0dfa7862ef --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * For mapping thread ids to thread pools and threads within those pools + * or just for lone named threads. Thread scoped metrics add labels to + * metrics by retrieving the ThreadPoolThread object from this registry. + * For flexibility, this registry is not based on TLS. + */ +public class ThreadRegistry { + private static ConcurrentMap threadPoolMap = new ConcurrentHashMap<>(); + + /** + * Threads can register themselves as their first act before carrying out any work. + * @param threadPool + * @param threadPoolThread + */ + public static void register(String threadPool, int threadPoolThread) { + register(threadPool, threadPoolThread, Thread.currentThread().getId()); + } + + /** + * Thread factories can register a thread by its id. + * @param threadPool + * @param threadPoolThread + * @param threadId + */ + public static void register(String threadPool, int threadPoolThread, long threadId) { + ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId); + threadPoolMap.put(threadId, tpt); + } + + /** + * Clears all stored thread state. + */ + public static void clear() { + threadPoolMap.clear(); + } + + /** + * Retrieves the registered ThreadPoolThread (if registered) for the calling thread. + * @return + */ + public static ThreadPoolThread get() { + return threadPoolMap.get(Thread.currentThread().getId()); + } + + /** + * Stores the thread pool and ordinal. + */ + public static final class ThreadPoolThread { + final String threadPool; + final int ordinal; + final long threadId; + + public ThreadPoolThread(String threadPool, int ordinal, long threadId) { + this.threadPool = threadPool; + this.ordinal = ordinal; + this.threadId = threadId; + } + + public String getThreadPool() { + return threadPool; + } + + public int getOrdinal() { + return ordinal; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java new file mode 100644 index 0000000000000..fec1ae20aef6f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsData; +import org.apache.bookkeeper.stats.OpStatsLogger; + +/** + * OpStatsLogger implementation that lazily registers OpStatsLoggers per thread + * with added labels for the threadpool/thresd name and thread no. + */ +public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger { + + private ThreadLocal statsLoggers; + private DataSketchesOpStatsLogger defaultStatsLogger; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedDataSketchesStatsLogger(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = labels; + this.defaultStatsLogger = new DataSketchesOpStatsLogger(labels); + + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultStatsLogger.initializeThread(defaultLabels); + + this.statsLoggers = ThreadLocal.withInitial(() -> { + return new DataSketchesOpStatsLogger(labels); + }); + } + + @Override + public void registerFailedEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerFailedEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerSuccessfulEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulValue(long value) { + getStatsLogger().registerSuccessfulValue(value); + } + + @Override + public void registerFailedValue(long value) { + getStatsLogger().registerFailedValue(value); + } + + @Override + public OpStatsData toOpStatsData() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + private DataSketchesOpStatsLogger getStatsLogger() { + DataSketchesOpStatsLogger statsLogger = statsLoggers.get(); + + // Lazy registration + // Update the stats logger with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard OpsStatsLogger (defaultStatsLogger) + if (!statsLogger.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + if (tpt == null) { + statsLoggers.set(defaultStatsLogger); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + return defaultStatsLogger; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + statsLogger.initializeThread(threadScopedlabels); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + } + } + + return statsLogger; + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java new file mode 100644 index 0000000000000..f87ec6bee2025 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.bookkeeper.stats.Counter; + +/** + * {@link Counter} implementation that lazily registers LongAdderCounters per thread + * * with added labels for the threadpool/thread name and thread no. + */ +public class ThreadScopedLongAdderCounter implements Counter { + private ThreadLocal counters; + private LongAdderCounter defaultCounter; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedLongAdderCounter(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = new HashMap<>(labels); + this.defaultCounter = new LongAdderCounter(labels); + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultCounter.initializeThread(defaultLabels); + + this.counters = ThreadLocal.withInitial(() -> { + return new LongAdderCounter(labels); + }); + } + + @Override + public void clear() { + getCounter().clear(); + } + + @Override + public void inc() { + getCounter().inc(); + } + + @Override + public void dec() { + getCounter().dec(); + } + + @Override + public void add(long delta) { + getCounter().add(delta); + } + + @Override + public Long get() { + return getCounter().get(); + } + + private LongAdderCounter getCounter() { + LongAdderCounter counter = counters.get(); + + // Lazy registration + // Update the counter with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard counter (defaultCounter) + if (!counter.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + + if (tpt == null) { + counters.set(defaultCounter); + provider.counters.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); + return defaultCounter; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + counter.initializeThread(threadScopedlabels); + provider.counters.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); + } + } + + return counter; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index cfab570301873..cff69d0ec8de0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -28,8 +28,11 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import io.jsonwebtoken.SignatureAlgorithm; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.reflect.Field; import java.math.RoundingMode; import java.nio.charset.StandardCharsets; @@ -53,7 +56,12 @@ import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -66,6 +74,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -1632,6 +1641,28 @@ public static Multimap parseMetrics(String metrics) { return parsed; } + @Test + public void testRawMetricsProvider() throws IOException { + PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider(); + rawMetricsProvider.start(new PropertiesConfiguration()); + rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics") + .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); + + getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; + HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); + InputStream inputStream = response.getEntity().getContent(); + InputStreamReader isReader = new InputStreamReader(inputStream); + BufferedReader reader = new BufferedReader(isReader); + StringBuffer sb = new StringBuffer(); + String str; + while((str = reader.readLine()) != null){ + sb.append(str); + } + Assert.assertTrue(sb.toString().contains("test_metrics")); + } + public static class Metric { public Map tags = new TreeMap<>(); public double value; From a88329030557f89f2db0bd404313891f33d59319 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 8 Sep 2022 09:11:34 +0800 Subject: [PATCH 2/5] format code --- .../org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index cff69d0ec8de0..93df4284aa039 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1661,6 +1661,7 @@ public void testRawMetricsProvider() throws IOException { sb.append(str); } Assert.assertTrue(sb.toString().contains("test_metrics")); + rawMetricsProvider.stop(); } public static class Metric { From 2c102769cc16358c9293f47ba5c95d4557515153 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 22 Sep 2022 22:36:11 +0800 Subject: [PATCH 3/5] address comments --- .../PrometheusMetricsGenerator.java | 11 +- .../metrics/CachingStatsProvider.java | 68 +++++++ .../metrics/PrometheusMetricsProvider.java | 72 ++++--- .../metrics/PrometheusTextFormat.java | 188 ++++++++--------- .../metrics/PrometheusTextFormatUtil.java | 191 ------------------ .../broker/stats/PrometheusMetricsTest.java | 44 ++-- 6 files changed, 217 insertions(+), 357 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index b217e02373363..f26d19abc55d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -31,8 +31,6 @@ import io.prometheus.client.hotspot.DefaultExports; import java.io.IOException; import java.io.OutputStream; -import java.io.StringWriter; -import java.io.Writer; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -51,6 +49,7 @@ import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; +import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.DirectMemoryUtils; import org.apache.pulsar.common.util.SimpleTextOutputStream; @@ -315,12 +314,6 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa return; } - try { - Writer writer = new StringWriter(); - statsProvider.writeAllMetrics(writer); - stream.write(writer.toString()); - } catch (IOException e) { - // nop - } + ((PrometheusMetricsProvider) statsProvider).generate(stream); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java new file mode 100644 index 0000000000000..d320da8228c4a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.commons.configuration.Configuration; + +/** + * A {@code CachingStatsProvider} adds the caching functionality to an existing {@code StatsProvider}. + * + *

The stats provider will cache the stats objects created by the other {@code StatsProvider} to allow + * the reusability of stats objects and avoid creating a lot of stats objects. + */ +public class CachingStatsProvider implements StatsProvider { + + protected final StatsProvider underlying; + protected final ConcurrentMap statsLoggers; + + public CachingStatsProvider(StatsProvider provider) { + this.underlying = provider; + this.statsLoggers = new ConcurrentHashMap(); + } + + @Override + public void start(Configuration conf) { + this.underlying.start(conf); + } + + @Override + public void stop() { + this.underlying.stop(); + } + + @Override + public StatsLogger getStatsLogger(String scope) { + StatsLogger statsLogger = statsLoggers.get(scope); + if (null == statsLogger) { + StatsLogger newStatsLogger = underlying.getStatsLogger(scope); + StatsLogger oldStatsLogger = statsLoggers.putIfAbsent(scope, newStatsLogger); + statsLogger = (null == oldStatsLogger) ? newStatsLogger : oldStatsLogger; + } + return statsLogger; + } + + @Override + public String getStatsName(String... statsComponents) { + return underlying.getStatsName(statsComponents); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java index 811bb0d501d66..a68359354d710 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Collector; -import io.prometheus.client.CollectorRegistry; import java.io.IOException; import java.io.Writer; import java.util.Collections; @@ -49,8 +49,9 @@ public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMe public static final String CLUSTER_NAME = "cluster"; public static final String DEFAULT_CLUSTER_NAME = "pulsar"; - private final CollectorRegistry registry; private Map labels; + private final CachingStatsProvider cachingStatsProvider; + /** * These acts a registry of the metrics defined in this provider. */ @@ -62,25 +63,46 @@ public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMe final ConcurrentMap threadScopedCounters = new ConcurrentHashMap<>(); - public PrometheusMetricsProvider() { - this(CollectorRegistry.defaultRegistry); - } - - public PrometheusMetricsProvider(CollectorRegistry registry) { - this.registry = registry; + this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() { + @Override + public void start(Configuration conf) { + // nop + } + + @Override + public void stop() { + // nop + } + + @Override + public StatsLogger getStatsLogger(String scope) { + return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels); + } + + @Override + public String getStatsName(String... statsComponents) { + String completeName; + if (statsComponents.length == 0) { + return ""; + } else if (statsComponents[0].isEmpty()) { + completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length); + } else { + completeName = StringUtils.join(statsComponents, '_'); + } + return Collector.sanitizeMetricName(completeName); + } + }); } public void start(Configuration conf) { - executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics")); int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS); labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); - executor.scheduleAtFixedRate(() -> { - rotateLatencyCollection(); - }, 1, latencyRolloverSeconds, TimeUnit.SECONDS); + executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection), + 1, latencyRolloverSeconds, TimeUnit.SECONDS); } @@ -89,38 +111,24 @@ public void stop() { } public StatsLogger getStatsLogger(String scope) { - return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels); + return cachingStatsProvider.getStatsLogger(scope); } @Override public void writeAllMetrics(Writer writer) throws IOException { - PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); - PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry); - - gauges.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); - counters.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); - opStats.forEach((sc, opStatLogger) -> - prometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); + throw new UnsupportedOperationException("writeAllMetrics is not support yet"); } @Override public void generate(SimpleTextOutputStream writer) { - gauges.forEach((sc, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, sc.getScope(), gauge)); - counters.forEach((sc, counter) -> PrometheusTextFormatUtil.writeCounter(writer, sc.getScope(), counter)); + gauges.forEach((sc, gauge) -> PrometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); + counters.forEach((sc, counter) -> PrometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); opStats.forEach((sc, opStatLogger) -> - PrometheusTextFormatUtil.writeOpStat(writer, sc.getScope(), opStatLogger)); + PrometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); } public String getStatsName(String... statsComponents) { - String completeName; - if (statsComponents.length == 0) { - return ""; - } else if (statsComponents[0].isEmpty()) { - completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length); - } else { - completeName = StringUtils.join(statsComponents, '_'); - } - return Collector.sanitizeMetricName(completeName); + return cachingStatsProvider.getStatsName(statsComponents); } @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java index 1d53069a71bbe..a949398846ac9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java @@ -22,49 +22,36 @@ import io.prometheus.client.Collector.MetricFamilySamples; import io.prometheus.client.Collector.MetricFamilySamples.Sample; import io.prometheus.client.CollectorRegistry; -import java.io.IOException; -import java.io.Writer; import java.util.Enumeration; -import java.util.HashSet; import java.util.Map; -import java.util.Set; +import org.apache.pulsar.common.util.SimpleTextOutputStream; /** * Logic to write metrics in Prometheus text format. */ public class PrometheusTextFormat { - - Set metricNameSet = new HashSet<>(); - - void writeGauge(Writer w, String name, SimpleGauge gauge) { + public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { // Example: // # TYPE bookie_storage_entries_count gauge // bookie_storage_entries_count 519 - try { - writeType(w, name, "gauge"); - w.append(name); - writeLabels(w, gauge.getLabels()); - w.append(' ').append(gauge.getSample().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } + w.write("# TYPE ").write(name).write(" gauge\n"); + w.write(name); + writeLabels(w, gauge.getLabels()); + w.write(' ').write(gauge.getSample().toString()).write('\n'); + } - void writeCounter(Writer w, String name, LongAdderCounter counter) { + public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { // Example: // # TYPE jvm_threads_started_total counter // jvm_threads_started_total 59 - try { - writeType(w, name, "counter"); - w.append(name); - writeLabels(w, counter.getLabels()); - w.append(' ').append(counter.get().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } + w.write("# TYPE ").write(name).write(" counter\n"); + w.write(name); + writeLabels(w, counter.getLabels()); + w.write(' ').write(counter.get().toString()).write('\n'); } - void writeOpStat(Writer w, String name, DataSketchesOpStatsLogger opStat) { + public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { // Example: // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN @@ -85,98 +72,70 @@ void writeOpStat(Writer w, String name, DataSketchesOpStatsLogger opStat) { // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 - try { - writeType(w, name, "summary"); - writeQuantile(w, opStat, name, false, 0.5); - writeQuantile(w, opStat, name, false, 0.75); - writeQuantile(w, opStat, name, false, 0.95); - writeQuantile(w, opStat, name, false, 0.99); - writeQuantile(w, opStat, name, false, 0.999); - writeQuantile(w, opStat, name, false, 0.9999); - writeQuantile(w, opStat, name, false, 1.0); - writeCount(w, opStat, name, false); - writeSum(w, opStat, name, false); - - writeQuantile(w, opStat, name, true, 0.5); - writeQuantile(w, opStat, name, true, 0.75); - writeQuantile(w, opStat, name, true, 0.95); - writeQuantile(w, opStat, name, true, 0.99); - writeQuantile(w, opStat, name, true, 0.999); - writeQuantile(w, opStat, name, true, 0.9999); - writeQuantile(w, opStat, name, true, 1.0); - writeCount(w, opStat, name, true); - writeSum(w, opStat, name, true); - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void writeLabels(Writer w, Map labels) throws IOException { - if (labels.isEmpty()) { - return; - } - - w.append('{'); - writeLabelsNoBraces(w, labels); - w.append('}'); + w.write("# TYPE ").write(name).write(" summary\n"); + writeQuantile(w, opStat, name, false, 0.5); + writeQuantile(w, opStat, name, false, 0.75); + writeQuantile(w, opStat, name, false, 0.95); + writeQuantile(w, opStat, name, false, 0.99); + writeQuantile(w, opStat, name, false, 0.999); + writeQuantile(w, opStat, name, false, 0.9999); + writeQuantile(w, opStat, name, false, 1.0); + writeCount(w, opStat, name, false); + writeSum(w, opStat, name, false); + + writeQuantile(w, opStat, name, true, 0.5); + writeQuantile(w, opStat, name, true, 0.75); + writeQuantile(w, opStat, name, true, 0.95); + writeQuantile(w, opStat, name, true, 0.99); + writeQuantile(w, opStat, name, true, 0.999); + writeQuantile(w, opStat, name, true, 0.9999); + writeQuantile(w, opStat, name, true, 1.0); + writeCount(w, opStat, name, true); + writeSum(w, opStat, name, true); } - private void writeLabelsNoBraces(Writer w, Map labels) throws IOException { - if (labels.isEmpty()) { - return; - } - - boolean isFirst = true; - for (Map.Entry e : labels.entrySet()) { - if (!isFirst) { - w.append(','); - } - isFirst = false; - w.append(e.getKey()) - .append("=\"") - .append(e.getValue()) - .append('"'); - } - } - - private void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success, - double quantile) throws IOException { - w.append(name) - .append("{success=\"").append(success.toString()) - .append("\",quantile=\"").append(Double.toString(quantile)) - .append("\""); + private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success, double quantile) { + w.write(name) + .write("{success=\"").write(success.toString()) + .write("\",quantile=\"").write(Double.toString(quantile)); if (!opStat.getLabels().isEmpty()) { - w.append(", "); + w.write("\", "); writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); } - w.append("} ") - .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n'); + w.write("} ") + .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n'); } - private void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success) - throws IOException { - w.append(name).append("_count{success=\"").append(success.toString()).append("\""); + private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_count{success=\"").write(success.toString()); if (!opStat.getLabels().isEmpty()) { - w.append(", "); + w.write("\", "); writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); } - w.append("} ") - .append(Long.toString(opStat.getCount(success))).append('\n'); + w.write("} ") + .write(Long.toString(opStat.getCount(success))).write('\n'); } - private void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, Boolean success) - throws IOException { - w.append(name).append("_sum{success=\"").append(success.toString()).append("\""); + private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_sum{success=\"").write(success.toString()); if (!opStat.getLabels().isEmpty()) { - w.append(", "); + w.write("\", "); writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); } - w.append("} ") - .append(Double.toString(opStat.getSum(success))).append('\n'); + w.write("} ") + .write(Double.toString(opStat.getSum(success))).write('\n'); } - static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) throws IOException { + public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { Enumeration metricFamilySamples = registry.metricFamilySamples(); while (metricFamilySamples.hasMoreElements()) { MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); @@ -202,12 +161,31 @@ static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry } } - void writeType(Writer w, String name, String type) throws IOException { - if (metricNameSet.contains(name)) { + private static void writeLabels(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { return; } - metricNameSet.add(name); - w.append("# TYPE ").append(name).append(" ").append(type).append("\n"); + + w.write('{'); + writeLabelsNoBraces(w, labels); + w.write('}'); } + private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { + return; + } + + boolean isFirst = true; + for (Map.Entry e : labels.entrySet()) { + if (!isFirst) { + w.write(','); + } + isFirst = false; + w.write(e.getKey()) + .write("=\"") + .write(e.getValue()) + .write('"'); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java deleted file mode 100644 index e4194b75e773a..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.stats.prometheus.metrics; - -import io.prometheus.client.Collector; -import io.prometheus.client.Collector.MetricFamilySamples; -import io.prometheus.client.Collector.MetricFamilySamples.Sample; -import io.prometheus.client.CollectorRegistry; -import java.util.Enumeration; -import java.util.Map; -import org.apache.pulsar.common.util.SimpleTextOutputStream; - -/** - * Logic to write metrics in Prometheus text format. - */ -public class PrometheusTextFormatUtil { - public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { - // Example: - // # TYPE bookie_storage_entries_count gauge - // bookie_storage_entries_count 519 - w.write("# TYPE ").write(name).write(" gauge\n"); - w.write(name); - writeLabels(w, gauge.getLabels()); - w.write(' ').write(gauge.getSample().toString()).write('\n'); - - } - - public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { - // Example: - // # TYPE jvm_threads_started_total counter - // jvm_threads_started_total 59 - w.write("# TYPE ").write(name).write(" counter\n"); - w.write(name); - writeLabels(w, counter.getLabels()); - w.write(' ').write(counter.get().toString()).write('\n'); - } - - public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { - // Example: - // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN - // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0 - // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902 - // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 - // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 - // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 - w.write("# TYPE ").write(name).write(" summary\n"); - writeQuantile(w, opStat, name, false, 0.5); - writeQuantile(w, opStat, name, false, 0.75); - writeQuantile(w, opStat, name, false, 0.95); - writeQuantile(w, opStat, name, false, 0.99); - writeQuantile(w, opStat, name, false, 0.999); - writeQuantile(w, opStat, name, false, 0.9999); - writeQuantile(w, opStat, name, false, 1.0); - writeCount(w, opStat, name, false); - writeSum(w, opStat, name, false); - - writeQuantile(w, opStat, name, true, 0.5); - writeQuantile(w, opStat, name, true, 0.75); - writeQuantile(w, opStat, name, true, 0.95); - writeQuantile(w, opStat, name, true, 0.99); - writeQuantile(w, opStat, name, true, 0.999); - writeQuantile(w, opStat, name, true, 0.9999); - writeQuantile(w, opStat, name, true, 1.0); - writeCount(w, opStat, name, true); - writeSum(w, opStat, name, true); - } - - private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, - Boolean success, double quantile) { - w.write(name) - .write("{success=\"").write(success.toString()) - .write("\",quantile=\"").write(Double.toString(quantile)); - if (!opStat.getLabels().isEmpty()) { - w.write("\", "); - writeLabelsNoBraces(w, opStat.getLabels()); - } else { - w.write("\""); - } - w.write("} ") - .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n'); - } - - private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, - Boolean success) { - w.write(name).write("_count{success=\"").write(success.toString()); - if (!opStat.getLabels().isEmpty()) { - w.write("\", "); - writeLabelsNoBraces(w, opStat.getLabels()); - } else { - w.write("\""); - } - w.write("} ") - .write(Long.toString(opStat.getCount(success))).write('\n'); - } - - private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, - Boolean success) { - w.write(name).write("_sum{success=\"").write(success.toString()); - if (!opStat.getLabels().isEmpty()) { - w.write("\", "); - writeLabelsNoBraces(w, opStat.getLabels()); - } else { - w.write("\""); - } - w.write("} ") - .write(Double.toString(opStat.getSum(success))).write('\n'); - } - - public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { - Enumeration metricFamilySamples = registry.metricFamilySamples(); - while (metricFamilySamples.hasMoreElements()) { - MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); - - for (int i = 0; i < metricFamily.samples.size(); i++) { - Sample sample = metricFamily.samples.get(i); - w.write(sample.name); - w.write('{'); - for (int j = 0; j < sample.labelNames.size(); j++) { - if (j != 0) { - w.write(", "); - } - w.write(sample.labelNames.get(j)); - w.write("=\""); - w.write(sample.labelValues.get(j)); - w.write('"'); - } - - w.write("} "); - w.write(Collector.doubleToGoString(sample.value)); - w.write('\n'); - } - } - } - - private static void writeLabels(SimpleTextOutputStream w, Map labels) { - if (labels.isEmpty()) { - return; - } - - w.write('{'); - writeLabelsNoBraces(w, labels); - w.write('}'); - } - - private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { - if (labels.isEmpty()) { - return; - } - - boolean isFirst = true; - for (Map.Entry e : labels.entrySet()) { - if (!isFirst) { - w.write(','); - } - isFirst = false; - w.write(e.getKey()) - .write("=\"") - .write(e.getValue()) - .write('"'); - } - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 93df4284aa039..a1b0a4b974ac3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -28,11 +28,8 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import io.jsonwebtoken.SignatureAlgorithm; -import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.lang.reflect.Field; import java.math.RoundingMode; import java.nio.charset.StandardCharsets; @@ -62,6 +59,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -1644,24 +1642,30 @@ public static Multimap parseMetrics(String metrics) { @Test public void testRawMetricsProvider() throws IOException { PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider(); - rawMetricsProvider.start(new PropertiesConfiguration()); - rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics") - .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); - - getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); - HttpClient httpClient = HttpClientBuilder.create().build(); - final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; - HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); - InputStream inputStream = response.getEntity().getContent(); - InputStreamReader isReader = new InputStreamReader(inputStream); - BufferedReader reader = new BufferedReader(isReader); - StringBuffer sb = new StringBuffer(); - String str; - while((str = reader.readLine()) != null){ - sb.append(str); + try { + rawMetricsProvider.start(new PropertiesConfiguration()); + rawMetricsProvider.getStatsLogger("test_raw") + .scopeLabel("topic", "persistent://public/default/test-v1") + .getOpStatsLogger("writeLatency") + .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); + + getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; + HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); + Multimap metrics = parseMetrics(EntityUtils.toString(response.getEntity())); + if (((List) metrics.get("test_raw_writeLatency_count")) + .get(0).tags.get("success").equals("false")) { + assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(1).value, 1); + } else { + assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(0).value, 1); + } + assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(0).tags.get("topic"), + "persistent://public/default/test-v1"); + } finally { + rawMetricsProvider.stop(); } - Assert.assertTrue(sb.toString().contains("test_metrics")); - rawMetricsProvider.stop(); + } public static class Metric { From 6cf8f627e472d200a7dfe2c40031324317045ef3 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 26 Sep 2022 23:05:57 +0800 Subject: [PATCH 4/5] add CachingStatsLogger and test --- .../metrics/CachingStatsLogger.java | 138 ++++++++++++++++ .../metrics/CachingStatsProvider.java | 12 +- .../metrics/PrometheusMetricsProvider.java | 7 +- .../metrics/PrometheusStatsLogger.java | 3 + .../metrics/PrometheusTextFormat.java | 72 ++++----- .../broker/stats/CachingStatsLoggerTest.java | 148 ++++++++++++++++++ .../broker/stats/PrometheusMetricsTest.java | 39 +++-- 7 files changed, 357 insertions(+), 62 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CachingStatsLoggerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java new file mode 100644 index 0000000000000..7fd9e4dc50a8a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import com.google.common.base.Joiner; +import io.prometheus.client.Collector; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * A {@code StatsLogger} that caches the stats objects created by other {@code StatsLogger}. + */ +public class CachingStatsLogger implements StatsLogger { + + protected final StatsLogger underlying; + protected final ConcurrentMap counters; + protected final ConcurrentMap opStatsLoggers; + protected final ConcurrentMap scopeStatsLoggers; + protected final ConcurrentMap scopeLabelStatsLoggers; + + private final String scope; + private final Map labels; + + public CachingStatsLogger(String scope, StatsLogger statsLogger, Map labels) { + this.scope = scope; + this.labels = labels; + this.underlying = statsLogger; + this.counters = new ConcurrentHashMap<>(); + this.opStatsLoggers = new ConcurrentHashMap<>(); + this.scopeStatsLoggers = new ConcurrentHashMap<>(); + this.scopeLabelStatsLoggers = new ConcurrentHashMap<>(); + } + + @Override + public int hashCode() { + return underlying.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof CachingStatsLogger)) { + return false; + } + CachingStatsLogger another = (CachingStatsLogger) obj; + return underlying.equals(another.underlying); + } + + @Override + public String toString() { + return underlying.toString(); + } + + @Override + public OpStatsLogger getOpStatsLogger(String name) { + return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name)); + } + + @Override + public Counter getCounter(String name) { + return counters.computeIfAbsent(scopeContext(name), x -> underlying.getCounter(name)); + } + + @Override + public void registerGauge(String name, Gauge gauge) { + underlying.registerGauge(name, gauge); + } + + @Override + public void unregisterGauge(String name, Gauge gauge) { + underlying.unregisterGauge(name, gauge); + } + + @Override + public StatsLogger scope(String name) { + return scopeStatsLoggers.computeIfAbsent(scopeContext(name), + x -> new CachingStatsLogger(scope, underlying.scope(name), labels)); + } + + @Override + public StatsLogger scopeLabel(String labelName, String labelValue) { + Map newLabels = new TreeMap<>(labels); + newLabels.put(labelName, labelValue); + return scopeLabelStatsLoggers.computeIfAbsent(new ScopeContext(completeName(""), newLabels), + x -> new CachingStatsLogger(scope, underlying.scopeLabel(labelName, labelValue), newLabels)); + } + + + @Override + public void removeScope(String name, StatsLogger statsLogger) { + scopeStatsLoggers.remove(scopeContext(name), statsLogger); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } + + private ScopeContext scopeContext(String name) { + return new ScopeContext(completeName(name), labels); + } + + private String completeName(String name) { + return Collector.sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java index d320da8228c4a..df795fa7d34e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.bookkeeper.stats.StatsLogger; @@ -37,7 +38,7 @@ public class CachingStatsProvider implements StatsProvider { public CachingStatsProvider(StatsProvider provider) { this.underlying = provider; - this.statsLoggers = new ConcurrentHashMap(); + this.statsLoggers = new ConcurrentHashMap<>(); } @Override @@ -52,13 +53,8 @@ public void stop() { @Override public StatsLogger getStatsLogger(String scope) { - StatsLogger statsLogger = statsLoggers.get(scope); - if (null == statsLogger) { - StatsLogger newStatsLogger = underlying.getStatsLogger(scope); - StatsLogger oldStatsLogger = statsLoggers.putIfAbsent(scope, newStatsLogger); - statsLogger = (null == oldStatsLogger) ? newStatsLogger : oldStatsLogger; - } - return statsLogger; + return statsLoggers.computeIfAbsent(scope, + x -> new CachingStatsLogger(scope, underlying.getStatsLogger(scope), Collections.emptyMap())); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java index a68359354d710..38b7b3771231b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java @@ -121,10 +121,11 @@ public void writeAllMetrics(Writer writer) throws IOException { @Override public void generate(SimpleTextOutputStream writer) { - gauges.forEach((sc, gauge) -> PrometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); - counters.forEach((sc, counter) -> PrometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); + PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); + gauges.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); + counters.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); opStats.forEach((sc, opStatLogger) -> - PrometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); + prometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); } public String getStatsName(String... statsComponents) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java index e6ddc139d8b29..14ccffcbbdc81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java @@ -22,6 +22,7 @@ import io.prometheus.client.Collector; import java.util.Map; import java.util.TreeMap; +import lombok.Getter; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -33,7 +34,9 @@ public class PrometheusStatsLogger implements StatsLogger { private final PrometheusMetricsProvider provider; + @Getter private final String scope; + @Getter private final Map labels; PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope, Map labels) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java index a949398846ac9..57f61b5102b09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java @@ -18,40 +18,40 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; -import io.prometheus.client.Collector; -import io.prometheus.client.Collector.MetricFamilySamples; -import io.prometheus.client.Collector.MetricFamilySamples.Sample; -import io.prometheus.client.CollectorRegistry; -import java.util.Enumeration; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.pulsar.common.util.SimpleTextOutputStream; /** * Logic to write metrics in Prometheus text format. */ public class PrometheusTextFormat { - public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { + + Set metricNameSet = new HashSet<>(); + + public void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { // Example: // # TYPE bookie_storage_entries_count gauge // bookie_storage_entries_count 519 - w.write("# TYPE ").write(name).write(" gauge\n"); + writeType(w, name, "gauge"); w.write(name); writeLabels(w, gauge.getLabels()); w.write(' ').write(gauge.getSample().toString()).write('\n'); } - public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { + public void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { // Example: // # TYPE jvm_threads_started_total counter // jvm_threads_started_total 59 - w.write("# TYPE ").write(name).write(" counter\n"); + writeType(w, name, "counter"); w.write(name); writeLabels(w, counter.getLabels()); w.write(' ').write(counter.get().toString()).write('\n'); } - public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { + public void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { // Example: // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN @@ -72,7 +72,7 @@ public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketch // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 - w.write("# TYPE ").write(name).write(" summary\n"); + writeType(w, name, "summary"); writeQuantile(w, opStat, name, false, 0.5); writeQuantile(w, opStat, name, false, 0.75); writeQuantile(w, opStat, name, false, 0.95); @@ -94,7 +94,7 @@ public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketch writeSum(w, opStat, name, true); } - private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + private void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, Boolean success, double quantile) { w.write(name) .write("{success=\"").write(success.toString()) @@ -109,7 +109,7 @@ private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsL .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n'); } - private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + private void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, Boolean success) { w.write(name).write("_count{success=\"").write(success.toString()); if (!opStat.getLabels().isEmpty()) { @@ -122,7 +122,7 @@ private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogg .write(Long.toString(opStat.getCount(success))).write('\n'); } - private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + private void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, Boolean success) { w.write(name).write("_sum{success=\"").write(success.toString()); if (!opStat.getLabels().isEmpty()) { @@ -135,33 +135,7 @@ private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger .write(Double.toString(opStat.getSum(success))).write('\n'); } - public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { - Enumeration metricFamilySamples = registry.metricFamilySamples(); - while (metricFamilySamples.hasMoreElements()) { - MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); - - for (int i = 0; i < metricFamily.samples.size(); i++) { - Sample sample = metricFamily.samples.get(i); - w.write(sample.name); - w.write('{'); - for (int j = 0; j < sample.labelNames.size(); j++) { - if (j != 0) { - w.write(", "); - } - w.write(sample.labelNames.get(j)); - w.write("=\""); - w.write(sample.labelValues.get(j)); - w.write('"'); - } - - w.write("} "); - w.write(Collector.doubleToGoString(sample.value)); - w.write('\n'); - } - } - } - - private static void writeLabels(SimpleTextOutputStream w, Map labels) { + private void writeLabels(SimpleTextOutputStream w, Map labels) { if (labels.isEmpty()) { return; } @@ -171,7 +145,7 @@ private static void writeLabels(SimpleTextOutputStream w, Map la w.write('}'); } - private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { + private void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { if (labels.isEmpty()) { return; } @@ -188,4 +162,18 @@ private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map metrics = parseMetrics(stream.getBuffer().toString(StandardCharsets.UTF_8)); + + assertEquals(((List) metrics.get("test_raw_counter4")).get(0).tags.get("topic"), + "persistent://public/default/test-v2"); + Metric metric = ((List) metrics.get("test_raw_counter3")).get(0); + assertEquals(metric.tags.get("topic"), "persistent://public/default/test-v1"); + assertEquals(metric.tags.get("aa"), "aa-value"); + + List metricList = ((List) metrics.get("test_raw_test_v1_write_v5_count")); + if (metricList.get(0).tags.get("success").equals("false")) { + assertEquals(metricList.get(0).value, 0.0); + assertEquals(metricList.get(1).value, 1.0); + } else { + assertEquals(metricList.get(0).value, 1.0); + assertEquals(metricList.get(1).value, 0.0); + } + + metricList = ((List) metrics.get("test_raw_test_write_v5_count")); + if (metricList.get(0).tags.get("success").equals("false")) { + assertEquals(metricList.get(0).value, 0.0); + assertEquals(metricList.get(1).value, 2.0); + } else { + assertEquals(metricList.get(0).value, 2.0); + assertEquals(metricList.get(1).value, 0.0); + } + } finally { + metricsProvider.stop(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a1b0a4b974ac3..eae2904c9c943 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -44,12 +44,15 @@ import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; @@ -89,12 +92,14 @@ @Test(groups = "flaky") public class PrometheusMetricsTest extends BrokerTestBase { + private static final int MANAGED_LEDGER_SCHEDULER_THREADS_NUM = 4; @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { conf.setTopicLevelPoliciesEnabled(false); conf.setSystemTopicEnabled(false); + conf.setManagedLedgerNumSchedulerThreads(MANAGED_LEDGER_SCHEDULER_THREADS_NUM); super.baseSetup(); AuthenticationProviderToken.resetMetrics(); } @@ -1010,26 +1015,43 @@ public void testManagedLedgerBookieClientStats() throws Exception { List cm = (List) metrics.get(keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_completed_tasks")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + // check thread labels + Set threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_queue")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_total_tasks")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_threads")); assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + assertEquals((int) (cm.get(0).value), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); cm = (List) metrics.get( keyNameBySubstrings(metrics, @@ -1654,14 +1676,13 @@ public void testRawMetricsProvider() throws IOException { final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); Multimap metrics = parseMetrics(EntityUtils.toString(response.getEntity())); - if (((List) metrics.get("test_raw_writeLatency_count")) - .get(0).tags.get("success").equals("false")) { - assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(1).value, 1); + List subMetrics = (List) metrics.get("test_raw_writeLatency_count"); + if (subMetrics.get(0).tags.get("success").equals("false")) { + assertEquals(subMetrics.get(1).value, 1); } else { - assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(0).value, 1); + assertEquals(subMetrics.get(0).value, 1); } - assertEquals(((List) metrics.get("test_raw_writeLatency_count")).get(0).tags.get("topic"), - "persistent://public/default/test-v1"); + assertEquals(subMetrics.get(0).tags.get("topic"), "persistent://public/default/test-v1"); } finally { rawMetricsProvider.stop(); } From 8704521d343bfd8874cd7cf680bb29cc7cc3cec1 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 27 Sep 2022 11:49:23 +0800 Subject: [PATCH 5/5] ensure all the metrics with the same metric name are grouped togethoer --- .../metrics/PrometheusMetricsProvider.java | 17 +++++++++++++---- .../metrics/PrometheusStatsLogger.java | 7 +++++-- .../metrics/ThreadScopedLongAdderCounter.java | 7 +++++-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java index 38b7b3771231b..7184ff5e64687 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java @@ -55,9 +55,16 @@ public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMe /** * These acts a registry of the metrics defined in this provider. */ - public final ConcurrentMap counters = new ConcurrentHashMap<>(); - public final ConcurrentMap> gauges = new ConcurrentHashMap<>(); + // The outside map is used to group the same metrics name, + // but with different labels metrics to ensure all the metrics with same metric name are grouped. + // `https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/ + // exposition_formats.md#grouping-and-sorting` + public final ConcurrentMap> counters = new ConcurrentHashMap<>(); + public final ConcurrentMap>> gauges = new ConcurrentHashMap<>(); public final ConcurrentMap opStats = new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedOpStats = new ConcurrentHashMap<>(); final ConcurrentMap threadScopedCounters = @@ -122,8 +129,10 @@ public void writeAllMetrics(Writer writer) throws IOException { @Override public void generate(SimpleTextOutputStream writer) { PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); - gauges.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge)); - counters.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter)); + gauges.forEach((name, metric) -> + metric.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge))); + counters.forEach((name, metric) -> + metric.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter))); opStats.forEach((sc, opStatLogger) -> prometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java index 14ccffcbbdc81..8347bbce04fe0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java @@ -22,6 +22,7 @@ import io.prometheus.client.Collector; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; @@ -57,7 +58,8 @@ public OpStatsLogger getThreadScopedOpStatsLogger(String name) { @Override public Counter getCounter(String name) { - return provider.counters.computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); + return provider.counters.computeIfAbsent(name, x -> new ConcurrentHashMap<>()) + .computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); } public Counter getThreadScopedCounter(String name) { @@ -67,7 +69,8 @@ public Counter getThreadScopedCounter(String name) { @Override public void registerGauge(String name, Gauge gauge) { - provider.gauges.computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); + provider.gauges.computeIfAbsent(name, x -> new ConcurrentHashMap<>()) + .computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java index f87ec6bee2025..8148d680d9651 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.bookkeeper.stats.Counter; /** @@ -87,7 +88,8 @@ private LongAdderCounter getCounter() { if (tpt == null) { counters.set(defaultCounter); - provider.counters.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); + provider.counters.computeIfAbsent(scopeContext.getScope(), x -> new ConcurrentHashMap<>()) + .put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); return defaultCounter; } else { Map threadScopedlabels = new HashMap<>(originalLabels); @@ -95,7 +97,8 @@ private LongAdderCounter getCounter() { threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); counter.initializeThread(threadScopedlabels); - provider.counters.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); + provider.counters.computeIfAbsent(scopeContext.getScope(), x -> new ConcurrentHashMap<>()) + .put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); } }