diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index b217e02373363..f26d19abc55d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -31,8 +31,6 @@ import io.prometheus.client.hotspot.DefaultExports; import java.io.IOException; import java.io.OutputStream; -import java.io.StringWriter; -import java.io.Writer; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -51,6 +49,7 @@ import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; +import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.DirectMemoryUtils; import org.apache.pulsar.common.util.SimpleTextOutputStream; @@ -315,12 +314,6 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa return; } - try { - Writer writer = new StringWriter(); - statsProvider.writeAllMetrics(writer); - stream.write(writer.toString()); - } catch (IOException e) { - // nop - } + ((PrometheusMetricsProvider) statsProvider).generate(stream); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java new file mode 100644 index 0000000000000..7fd9e4dc50a8a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import com.google.common.base.Joiner; +import io.prometheus.client.Collector; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * A {@code StatsLogger} that caches the stats objects created by other {@code StatsLogger}. + */ +public class CachingStatsLogger implements StatsLogger { + + protected final StatsLogger underlying; + protected final ConcurrentMap counters; + protected final ConcurrentMap opStatsLoggers; + protected final ConcurrentMap scopeStatsLoggers; + protected final ConcurrentMap scopeLabelStatsLoggers; + + private final String scope; + private final Map labels; + + public CachingStatsLogger(String scope, StatsLogger statsLogger, Map labels) { + this.scope = scope; + this.labels = labels; + this.underlying = statsLogger; + this.counters = new ConcurrentHashMap<>(); + this.opStatsLoggers = new ConcurrentHashMap<>(); + this.scopeStatsLoggers = new ConcurrentHashMap<>(); + this.scopeLabelStatsLoggers = new ConcurrentHashMap<>(); + } + + @Override + public int hashCode() { + return underlying.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof CachingStatsLogger)) { + return false; + } + CachingStatsLogger another = (CachingStatsLogger) obj; + return underlying.equals(another.underlying); + } + + @Override + public String toString() { + return underlying.toString(); + } + + @Override + public OpStatsLogger getOpStatsLogger(String name) { + return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name)); + } + + @Override + public Counter getCounter(String name) { + return counters.computeIfAbsent(scopeContext(name), x -> underlying.getCounter(name)); + } + + @Override + public void registerGauge(String name, Gauge gauge) { + underlying.registerGauge(name, gauge); + } + + @Override + public void unregisterGauge(String name, Gauge gauge) { + underlying.unregisterGauge(name, gauge); + } + + @Override + public StatsLogger scope(String name) { + return scopeStatsLoggers.computeIfAbsent(scopeContext(name), + x -> new CachingStatsLogger(scope, underlying.scope(name), labels)); + } + + @Override + public StatsLogger scopeLabel(String labelName, String labelValue) { + Map newLabels = new TreeMap<>(labels); + newLabels.put(labelName, labelValue); + return scopeLabelStatsLoggers.computeIfAbsent(new ScopeContext(completeName(""), newLabels), + x -> new CachingStatsLogger(scope, underlying.scopeLabel(labelName, labelValue), newLabels)); + } + + + @Override + public void removeScope(String name, StatsLogger statsLogger) { + scopeStatsLoggers.remove(scopeContext(name), statsLogger); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } + + private ScopeContext scopeContext(String name) { + return new ScopeContext(completeName(name), labels); + } + + private String completeName(String name) { + return Collector.sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java new file mode 100644 index 0000000000000..df795fa7d34e7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.commons.configuration.Configuration; + +/** + * A {@code CachingStatsProvider} adds the caching functionality to an existing {@code StatsProvider}. + * + *

The stats provider will cache the stats objects created by the other {@code StatsProvider} to allow + * the reusability of stats objects and avoid creating a lot of stats objects. + */ +public class CachingStatsProvider implements StatsProvider { + + protected final StatsProvider underlying; + protected final ConcurrentMap statsLoggers; + + public CachingStatsProvider(StatsProvider provider) { + this.underlying = provider; + this.statsLoggers = new ConcurrentHashMap<>(); + } + + @Override + public void start(Configuration conf) { + this.underlying.start(conf); + } + + @Override + public void stop() { + this.underlying.stop(); + } + + @Override + public StatsLogger getStatsLogger(String scope) { + return statsLoggers.computeIfAbsent(scope, + x -> new CachingStatsLogger(scope, underlying.getStatsLogger(scope), Collections.emptyMap())); + } + + @Override + public String getStatsName(String... statsComponents) { + return underlying.getStatsName(statsComponents); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java index 70a2c5bc79429..378ff0413d622 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java @@ -54,9 +54,15 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { private final LongAdder successSumAdder = new LongAdder(); private final LongAdder failSumAdder = new LongAdder(); - public DataSketchesOpStatsLogger() { + private Map labels; + + // used for lazy registration for thread scoped metrics + private boolean threadInitialized; + + public DataSketchesOpStatsLogger(Map labels) { this.current = new ThreadLocalAccessor(); this.replacement = new ThreadLocalAccessor(); + this.labels = labels; } @Override @@ -172,6 +178,19 @@ public double getQuantileValue(boolean success, double quantile) { return s != null ? s.getQuantile(quantile) : Double.NaN; } + public Map getLabels() { + return labels; + } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } + private static class LocalData { private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java index 39be12b35509b..4c6d92fbb84c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import java.util.Map; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.stats.Counter; @@ -30,8 +31,13 @@ public class LongAdderCounter implements Counter { private final LongAdder counter = new LongAdder(); - public LongAdderCounter() { + private Map labels; + // used for lazy registration for thread scoped metric + private boolean threadInitialized; + + public LongAdderCounter(Map labels) { + this.labels = labels; } @Override @@ -58,4 +64,17 @@ public void add(long delta) { public Long get() { return counter.sum(); } + + public Map getLabels() { + return labels; + } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java index 0e59286861a40..7184ff5e64687 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java @@ -24,21 +24,24 @@ import io.prometheus.client.Collector; import java.io.IOException; import java.io.Writer; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.CachingStatsProvider; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; +import org.apache.pulsar.common.util.SimpleTextOutputStream; /** - * A Prometheus based {@link StatsProvider} implementation. + * A Prometheus based {@link PrometheusRawMetricsProvider} implementation. */ -public class PrometheusMetricsProvider implements StatsProvider { +public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider { private ScheduledExecutorService executor; public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds"; @@ -46,15 +49,26 @@ public class PrometheusMetricsProvider implements StatsProvider { public static final String CLUSTER_NAME = "cluster"; public static final String DEFAULT_CLUSTER_NAME = "pulsar"; - private String cluster; + private Map labels; private final CachingStatsProvider cachingStatsProvider; /** * These acts a registry of the metrics defined in this provider. */ - public final ConcurrentMap counters = new ConcurrentSkipListMap<>(); - public final ConcurrentMap> gauges = new ConcurrentSkipListMap<>(); - public final ConcurrentMap opStats = new ConcurrentSkipListMap<>(); + // The outside map is used to group the same metrics name, + // but with different labels metrics to ensure all the metrics with same metric name are grouped. + // `https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/ + // exposition_formats.md#grouping-and-sorting` + public final ConcurrentMap> counters = new ConcurrentHashMap<>(); + public final ConcurrentMap>> gauges = new ConcurrentHashMap<>(); + public final ConcurrentMap opStats = new ConcurrentHashMap<>(); + + final ConcurrentMap threadScopedOpStats = + new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedCounters = + new ConcurrentHashMap<>(); public PrometheusMetricsProvider() { this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() { @@ -70,7 +84,7 @@ public void stop() { @Override public StatsLogger getStatsLogger(String scope) { - return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope); + return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels); } @Override @@ -88,37 +102,41 @@ public String getStatsName(String... statsComponents) { }); } - @Override public void start(Configuration conf) { executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics")); - int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS); - cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection), - 1, latencyRolloverSeconds, TimeUnit.SECONDS); + 1, latencyRolloverSeconds, TimeUnit.SECONDS); + } - @Override public void stop() { - executor.shutdownNow(); + executor.shutdown(); } - @Override public StatsLogger getStatsLogger(String scope) { - return this.cachingStatsProvider.getStatsLogger(scope); + return cachingStatsProvider.getStatsLogger(scope); } @Override public void writeAllMetrics(Writer writer) throws IOException { - gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge)); - counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter)); - opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, - opStatLogger)); + throw new UnsupportedOperationException("writeAllMetrics is not support yet"); } @Override + public void generate(SimpleTextOutputStream writer) { + PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); + gauges.forEach((name, metric) -> + metric.forEach((sc, gauge) -> prometheusTextFormat.writeGauge(writer, sc.getScope(), gauge))); + counters.forEach((name, metric) -> + metric.forEach((sc, counter) -> prometheusTextFormat.writeCounter(writer, sc.getScope(), counter))); + opStats.forEach((sc, opStatLogger) -> + prometheusTextFormat.writeOpStat(writer, sc.getScope(), opStatLogger)); + } + public String getStatsName(String... statsComponents) { return cachingStatsProvider.getStatsName(statsComponents); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java index 6ca3996424ced..8347bbce04fe0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java @@ -20,6 +20,10 @@ import com.google.common.base.Joiner; import io.prometheus.client.Collector; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -31,36 +35,42 @@ public class PrometheusStatsLogger implements StatsLogger { private final PrometheusMetricsProvider provider; + @Getter private final String scope; + @Getter + private final Map labels; - public PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) { + PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope, Map labels) { this.provider = provider; this.scope = scope; + this.labels = labels; } @Override public OpStatsLogger getOpStatsLogger(String name) { - return provider.opStats.computeIfAbsent(completeName(name), x -> new DataSketchesOpStatsLogger()); + return provider.opStats.computeIfAbsent(scopeContext(name), x -> new DataSketchesOpStatsLogger(labels)); } - @Override public OpStatsLogger getThreadScopedOpStatsLogger(String name) { - throw new RuntimeException("not implemented"); + return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedDataSketchesStatsLogger(provider, x, labels)); } @Override public Counter getCounter(String name) { - return provider.counters.computeIfAbsent(completeName(name), x -> new LongAdderCounter()); + return provider.counters.computeIfAbsent(name, x -> new ConcurrentHashMap<>()) + .computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); } - @Override public Counter getThreadScopedCounter(String name) { - throw new RuntimeException("not implemented"); + return provider.threadScopedCounters.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedLongAdderCounter(provider, x, labels)); } @Override public void registerGauge(String name, Gauge gauge) { - provider.gauges.computeIfAbsent(completeName(name), x -> new SimpleGauge(gauge)); + provider.gauges.computeIfAbsent(name, x -> new ConcurrentHashMap<>()) + .computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); } @Override @@ -75,11 +85,21 @@ public void removeScope(String name, StatsLogger statsLogger) { @Override public StatsLogger scope(String name) { - return new PrometheusStatsLogger(provider, completeName(name)); + return new PrometheusStatsLogger(provider, completeName(name), labels); + } + + @Override + public StatsLogger scopeLabel(String labelName, String labelValue) { + Map newLabels = new TreeMap<>(labels); + newLabels.put(labelName, labelValue); + return new PrometheusStatsLogger(provider, scope, newLabels); + } + + private ScopeContext scopeContext(String name) { + return new ScopeContext(completeName(name), labels); } private String completeName(String name) { - String completeName = scope.isEmpty() ? name : Joiner.on('_').join(scope, name); - return Collector.sanitizeMetricName(completeName); + return Collector.sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java new file mode 100644 index 0000000000000..57f61b5102b09 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * Logic to write metrics in Prometheus text format. + */ +public class PrometheusTextFormat { + + Set metricNameSet = new HashSet<>(); + + public void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge gauge) { + // Example: + // # TYPE bookie_storage_entries_count gauge + // bookie_storage_entries_count 519 + writeType(w, name, "gauge"); + w.write(name); + writeLabels(w, gauge.getLabels()); + w.write(' ').write(gauge.getSample().toString()).write('\n'); + + } + + public void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) { + // Example: + // # TYPE jvm_threads_started_total counter + // jvm_threads_started_total 59 + writeType(w, name, "counter"); + w.write(name); + writeLabels(w, counter.getLabels()); + w.write(' ').write(counter.get().toString()).write('\n'); + } + + public void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) { + // Example: + // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902 + // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0 + // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002 + writeType(w, name, "summary"); + writeQuantile(w, opStat, name, false, 0.5); + writeQuantile(w, opStat, name, false, 0.75); + writeQuantile(w, opStat, name, false, 0.95); + writeQuantile(w, opStat, name, false, 0.99); + writeQuantile(w, opStat, name, false, 0.999); + writeQuantile(w, opStat, name, false, 0.9999); + writeQuantile(w, opStat, name, false, 1.0); + writeCount(w, opStat, name, false); + writeSum(w, opStat, name, false); + + writeQuantile(w, opStat, name, true, 0.5); + writeQuantile(w, opStat, name, true, 0.75); + writeQuantile(w, opStat, name, true, 0.95); + writeQuantile(w, opStat, name, true, 0.99); + writeQuantile(w, opStat, name, true, 0.999); + writeQuantile(w, opStat, name, true, 0.9999); + writeQuantile(w, opStat, name, true, 1.0); + writeCount(w, opStat, name, true); + writeSum(w, opStat, name, true); + } + + private void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success, double quantile) { + w.write(name) + .write("{success=\"").write(success.toString()) + .write("\",quantile=\"").write(Double.toString(quantile)); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); + } + w.write("} ") + .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n'); + } + + private void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_count{success=\"").write(success.toString()); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); + } + w.write("} ") + .write(Long.toString(opStat.getCount(success))).write('\n'); + } + + private void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name, + Boolean success) { + w.write(name).write("_sum{success=\"").write(success.toString()); + if (!opStat.getLabels().isEmpty()) { + w.write("\", "); + writeLabelsNoBraces(w, opStat.getLabels()); + } else { + w.write("\""); + } + w.write("} ") + .write(Double.toString(opStat.getSum(success))).write('\n'); + } + + private void writeLabels(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { + return; + } + + w.write('{'); + writeLabelsNoBraces(w, labels); + w.write('}'); + } + + private void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) { + if (labels.isEmpty()) { + return; + } + + boolean isFirst = true; + for (Map.Entry e : labels.entrySet()) { + if (!isFirst) { + w.write(','); + } + isFirst = false; + w.write(e.getKey()) + .write("=\"") + .write(e.getValue()) + .write('"'); + } + } + + /** + * In order to avoid print multiple `# TYPE` header for the same metric name. + * @param w + * @param name + * @param type + */ + void writeType(SimpleTextOutputStream w, String name, String type) { + if (metricNameSet.contains(name)) { + return; + } + metricNameSet.add(name); + w.write("# TYPE ").write(name).write(" ").write(type).write("\n"); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java deleted file mode 100644 index d00093c5ad09e..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.stats.prometheus.metrics; - -import java.io.IOException; -import java.io.Writer; -import org.apache.bookkeeper.stats.Counter; - -/** - * Logic to write metrics in Prometheus text format. - */ -public class PrometheusTextFormatUtil { - static void writeGauge(Writer w, String name, String cluster, SimpleGauge gauge) { - // Example: - // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge - // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057 - try { - w.append("# TYPE ").append(name).append(" gauge\n"); - w.append(name).append("{cluster=\"").append(cluster).append("\"}") - .append(' ').append(gauge.getSample().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - static void writeCounter(Writer w, String name, String cluster, Counter counter) { - // Example: - // # TYPE jvm_threads_started_total counter - // jvm_threads_started_total{cluster="test"} 59 - try { - w.append("# TYPE ").append(name).append(" counter\n"); - w.append(name).append("{cluster=\"").append(cluster).append("\"}") - .append(' ').append(counter.get().toString()).append('\n'); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) { - // Example: - // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued summary - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.5"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.75"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.95"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.99"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.999"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="0.9999"} NaN - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false", - // quantile="1.0"} -Infinity - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="false"} 0 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="false"} 0.0 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.5"} 0.031 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.75"} 0.043 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.95"} 0.061 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.99"} 0.064 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.999"} 0.073 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="0.9999"} 0.073 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true", - // quantile="1.0"} 0.552 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="true"} 40911432 - // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="true"} 527.0 - try { - w.append("# TYPE ").append(name).append(" summary\n"); - writeQuantile(w, opStat, name, cluster, false, 0.5); - writeQuantile(w, opStat, name, cluster, false, 0.75); - writeQuantile(w, opStat, name, cluster, false, 0.95); - writeQuantile(w, opStat, name, cluster, false, 0.99); - writeQuantile(w, opStat, name, cluster, false, 0.999); - writeQuantile(w, opStat, name, cluster, false, 0.9999); - writeQuantile(w, opStat, name, cluster, false, 1.0); - writeCount(w, opStat, name, cluster, false); - writeSum(w, opStat, name, cluster, false); - - writeQuantile(w, opStat, name, cluster, true, 0.5); - writeQuantile(w, opStat, name, cluster, true, 0.75); - writeQuantile(w, opStat, name, cluster, true, 0.95); - writeQuantile(w, opStat, name, cluster, true, 0.99); - writeQuantile(w, opStat, name, cluster, true, 0.999); - writeQuantile(w, opStat, name, cluster, true, 0.9999); - writeQuantile(w, opStat, name, cluster, true, 1.0); - writeCount(w, opStat, name, cluster, true); - writeSum(w, opStat, name, cluster, true); - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success, double quantile) throws IOException { - w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\", quantile=\"") - .append(Double.toString(quantile)).append("\"} ") - .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n'); - } - - private static void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success) throws IOException { - w.append(name).append("_count{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\"} ") - .append(Long.toString(opStat.getCount(success))).append('\n'); - } - - private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster, - Boolean success) throws IOException { - w.append(name).append("_sum{cluster=\"").append(cluster).append("\", success=\"") - .append(success.toString()).append("\"} ") - .append(Double.toString(opStat.getSum(success))).append('\n'); - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java new file mode 100644 index 0000000000000..25648fb7b92db --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ScopeContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.Map; +import java.util.Objects; + +/** + * Holder for a scope and a set of associated labels. + */ +public class ScopeContext { + private final String scope; + private final Map labels; + + public ScopeContext(String scope, Map labels) { + this.scope = scope; + this.labels = labels; + } + + public String getScope() { + return scope; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScopeContext that = (ScopeContext) o; + return Objects.equals(scope, that.scope) && Objects.equals(labels, that.labels); + } + + @Override + public int hashCode() { + return Objects.hash(scope, labels); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java index 612168a2d9143..90b65e47bf699 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import java.util.Map; import org.apache.bookkeeper.stats.Gauge; /** @@ -25,13 +26,19 @@ */ public class SimpleGauge { + private final Map labels; private final Gauge gauge; - public SimpleGauge(final Gauge gauge) { + public SimpleGauge(final Gauge gauge, Map labels) { this.gauge = gauge; + this.labels = labels; } - public Number getSample() { + Number getSample() { return gauge.getSample(); } + + public Map getLabels() { + return labels; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java new file mode 100644 index 0000000000000..6fb0dfa7862ef --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadRegistry.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * For mapping thread ids to thread pools and threads within those pools + * or just for lone named threads. Thread scoped metrics add labels to + * metrics by retrieving the ThreadPoolThread object from this registry. + * For flexibility, this registry is not based on TLS. + */ +public class ThreadRegistry { + private static ConcurrentMap threadPoolMap = new ConcurrentHashMap<>(); + + /** + * Threads can register themselves as their first act before carrying out any work. + * @param threadPool + * @param threadPoolThread + */ + public static void register(String threadPool, int threadPoolThread) { + register(threadPool, threadPoolThread, Thread.currentThread().getId()); + } + + /** + * Thread factories can register a thread by its id. + * @param threadPool + * @param threadPoolThread + * @param threadId + */ + public static void register(String threadPool, int threadPoolThread, long threadId) { + ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId); + threadPoolMap.put(threadId, tpt); + } + + /** + * Clears all stored thread state. + */ + public static void clear() { + threadPoolMap.clear(); + } + + /** + * Retrieves the registered ThreadPoolThread (if registered) for the calling thread. + * @return + */ + public static ThreadPoolThread get() { + return threadPoolMap.get(Thread.currentThread().getId()); + } + + /** + * Stores the thread pool and ordinal. + */ + public static final class ThreadPoolThread { + final String threadPool; + final int ordinal; + final long threadId; + + public ThreadPoolThread(String threadPool, int ordinal, long threadId) { + this.threadPool = threadPool; + this.ordinal = ordinal; + this.threadId = threadId; + } + + public String getThreadPool() { + return threadPool; + } + + public int getOrdinal() { + return ordinal; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java new file mode 100644 index 0000000000000..fec1ae20aef6f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedDataSketchesStatsLogger.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsData; +import org.apache.bookkeeper.stats.OpStatsLogger; + +/** + * OpStatsLogger implementation that lazily registers OpStatsLoggers per thread + * with added labels for the threadpool/thresd name and thread no. + */ +public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger { + + private ThreadLocal statsLoggers; + private DataSketchesOpStatsLogger defaultStatsLogger; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedDataSketchesStatsLogger(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = labels; + this.defaultStatsLogger = new DataSketchesOpStatsLogger(labels); + + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultStatsLogger.initializeThread(defaultLabels); + + this.statsLoggers = ThreadLocal.withInitial(() -> { + return new DataSketchesOpStatsLogger(labels); + }); + } + + @Override + public void registerFailedEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerFailedEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerSuccessfulEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulValue(long value) { + getStatsLogger().registerSuccessfulValue(value); + } + + @Override + public void registerFailedValue(long value) { + getStatsLogger().registerFailedValue(value); + } + + @Override + public OpStatsData toOpStatsData() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + private DataSketchesOpStatsLogger getStatsLogger() { + DataSketchesOpStatsLogger statsLogger = statsLoggers.get(); + + // Lazy registration + // Update the stats logger with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard OpsStatsLogger (defaultStatsLogger) + if (!statsLogger.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + if (tpt == null) { + statsLoggers.set(defaultStatsLogger); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + return defaultStatsLogger; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + statsLogger.initializeThread(threadScopedlabels); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + } + } + + return statsLogger; + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java new file mode 100644 index 0000000000000..8148d680d9651 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadScopedLongAdderCounter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.bookkeeper.stats.Counter; + +/** + * {@link Counter} implementation that lazily registers LongAdderCounters per thread + * * with added labels for the threadpool/thread name and thread no. + */ +public class ThreadScopedLongAdderCounter implements Counter { + private ThreadLocal counters; + private LongAdderCounter defaultCounter; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedLongAdderCounter(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = new HashMap<>(labels); + this.defaultCounter = new LongAdderCounter(labels); + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultCounter.initializeThread(defaultLabels); + + this.counters = ThreadLocal.withInitial(() -> { + return new LongAdderCounter(labels); + }); + } + + @Override + public void clear() { + getCounter().clear(); + } + + @Override + public void inc() { + getCounter().inc(); + } + + @Override + public void dec() { + getCounter().dec(); + } + + @Override + public void add(long delta) { + getCounter().add(delta); + } + + @Override + public Long get() { + return getCounter().get(); + } + + private LongAdderCounter getCounter() { + LongAdderCounter counter = counters.get(); + + // Lazy registration + // Update the counter with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard counter (defaultCounter) + if (!counter.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + + if (tpt == null) { + counters.set(defaultCounter); + provider.counters.computeIfAbsent(scopeContext.getScope(), x -> new ConcurrentHashMap<>()) + .put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); + return defaultCounter; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + counter.initializeThread(threadScopedlabels); + provider.counters.computeIfAbsent(scopeContext.getScope(), x -> new ConcurrentHashMap<>()) + .put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); + } + } + + return counter; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CachingStatsLoggerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CachingStatsLoggerTest.java new file mode 100644 index 0000000000000..654e2acfe4523 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CachingStatsLoggerTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.testng.annotations.Test; +import static org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics; +import static org.apache.pulsar.broker.stats.PrometheusMetricsTest.Metric; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +public class CachingStatsLoggerTest { + + @Test + public void testSameScopeLabelShareSameInstance() throws IOException { + PrometheusMetricsProvider metricsProvider = new PrometheusMetricsProvider(); + try { + metricsProvider.start(new PropertiesConfiguration()); + StatsLogger statsLogger = metricsProvider.getStatsLogger("test_raw"); + StatsLogger topicStatsLogger = statsLogger.scopeLabel("topic", "persistent://public/default/test-v1"); + StatsLogger t1 = statsLogger.scopeLabel("topic", "persistent://public/default/test-v2"); + StatsLogger t2 = topicStatsLogger.scopeLabel("aa", "aa-value"); + StatsLogger t3 = statsLogger.scopeLabel("topic", "persistent://public/default/test-v2"); + StatsLogger t4 = statsLogger.scopeLabel("topic", "persistent://public/default/test-v2"); + StatsLogger t5 = topicStatsLogger.scopeLabel("aa", "aa-value"); + + assertEquals(t3, t1); + assertEquals(t4, t1); + assertEquals(t5, t2); + assertNotEquals(t5, t4); + + StatsLogger t6 = statsLogger.scope("test"); + StatsLogger t7 = statsLogger.scope("test"); + StatsLogger t8 = statsLogger.scope("test_v1"); + + assertEquals(t7, t6); + assertNotEquals(t8, t6); + + // check OpStatsLogger with one label + OpStatsLogger op1 = t1.getOpStatsLogger("op"); + OpStatsLogger op2 = t3.getOpStatsLogger("op"); + OpStatsLogger op3 = t4.getOpStatsLogger("op"); + OpStatsLogger op4 = t4.getOpStatsLogger("op4"); + assertEquals(op2, op1); + assertEquals(op3, op1); + assertNotEquals(op4, op1); + assertNotEquals(t1, op1); + + // check counter with one label + Counter c1 = t1.getCounter("counter"); + Counter c2 = t3.getCounter("counter"); + Counter c3 = t4.getCounter("counter"); + Counter c4 = t4.getCounter("counter4"); + assertEquals(c2, c1); + assertEquals(c3, c1); + assertNotEquals(c4, c1); + assertNotEquals(t1, c1); + + // check OpStatsLogger with one label + OpStatsLogger mop1 = t2.getOpStatsLogger("op"); + OpStatsLogger mop2 = t5.getOpStatsLogger("op"); + OpStatsLogger mop3 = t5.getOpStatsLogger("op3"); + assertEquals(mop2, mop1); + assertNotEquals(mop3, mop1); + assertNotEquals(op1, mop1); + assertNotEquals(t1, mop1); + + // check counter with one label + Counter mc1 = t2.getCounter("counter"); + Counter mc2 = t5.getCounter("counter"); + Counter mc3 = t5.getCounter("counter3"); + assertEquals(mc2, mc1); + assertNotEquals(mc3, mc1); + assertNotEquals(c1, mc1); + assertNotEquals(t1, mc1); + + topicStatsLogger.getOpStatsLogger("writeLatency").registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); + topicStatsLogger.getOpStatsLogger("write_v1").registerSuccessfulEvent(200, TimeUnit.NANOSECONDS); + t1.getOpStatsLogger("write_v2").registerSuccessfulEvent(300, TimeUnit.NANOSECONDS); + t1.getOpStatsLogger("write_v3").registerSuccessfulEvent(400, TimeUnit.NANOSECONDS); + t2.getOpStatsLogger("write_4").registerSuccessfulEvent(500, TimeUnit.NANOSECONDS); + t3.getOpStatsLogger("write_v5").registerSuccessfulEvent(600, TimeUnit.NANOSECONDS); + t5.getOpStatsLogger("write_v5").registerSuccessfulEvent(600, TimeUnit.NANOSECONDS); + t6.getOpStatsLogger("write_v5").registerSuccessfulEvent(600, TimeUnit.NANOSECONDS); + t7.getOpStatsLogger("write_v5").registerSuccessfulEvent(600, TimeUnit.NANOSECONDS); + t8.getOpStatsLogger("write_v5").registerSuccessfulEvent(600, TimeUnit.NANOSECONDS); + + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); + metricsProvider.generate(stream); + Multimap metrics = parseMetrics(stream.getBuffer().toString(StandardCharsets.UTF_8)); + + assertEquals(((List) metrics.get("test_raw_counter4")).get(0).tags.get("topic"), + "persistent://public/default/test-v2"); + Metric metric = ((List) metrics.get("test_raw_counter3")).get(0); + assertEquals(metric.tags.get("topic"), "persistent://public/default/test-v1"); + assertEquals(metric.tags.get("aa"), "aa-value"); + + List metricList = ((List) metrics.get("test_raw_test_v1_write_v5_count")); + if (metricList.get(0).tags.get("success").equals("false")) { + assertEquals(metricList.get(0).value, 0.0); + assertEquals(metricList.get(1).value, 1.0); + } else { + assertEquals(metricList.get(0).value, 1.0); + assertEquals(metricList.get(1).value, 0.0); + } + + metricList = ((List) metrics.get("test_raw_test_write_v5_count")); + if (metricList.get(0).tags.get("success").equals("false")) { + assertEquals(metricList.get(0).value, 0.0); + assertEquals(metricList.get(1).value, 2.0); + } else { + assertEquals(metricList.get(0).value, 2.0); + assertEquals(metricList.get(1).value, 0.0); + } + } finally { + metricsProvider.stop(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index cfab570301873..eae2904c9c943 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -44,16 +44,25 @@ import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -66,6 +75,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -82,12 +92,14 @@ @Test(groups = "flaky") public class PrometheusMetricsTest extends BrokerTestBase { + private static final int MANAGED_LEDGER_SCHEDULER_THREADS_NUM = 4; @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { conf.setTopicLevelPoliciesEnabled(false); conf.setSystemTopicEnabled(false); + conf.setManagedLedgerNumSchedulerThreads(MANAGED_LEDGER_SCHEDULER_THREADS_NUM); super.baseSetup(); AuthenticationProviderToken.resetMetrics(); } @@ -1003,26 +1015,43 @@ public void testManagedLedgerBookieClientStats() throws Exception { List cm = (List) metrics.get(keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_completed_tasks")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + // check thread labels + Set threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_queue")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_total_tasks")); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); assertEquals(cm.get(0).tags.get("cluster"), "test"); + threads = Stream.of("0", "1", "2", "3").collect(Collectors.toSet()); + for (Metric metric : cm) { + threads.remove(metric.tags.get("thread")); + } + assertTrue(threads.isEmpty()); cm = (List) metrics.get( keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_threads")); assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + assertEquals((int) (cm.get(0).value), MANAGED_LEDGER_SCHEDULER_THREADS_NUM); cm = (List) metrics.get( keyNameBySubstrings(metrics, @@ -1632,6 +1661,34 @@ public static Multimap parseMetrics(String metrics) { return parsed; } + @Test + public void testRawMetricsProvider() throws IOException { + PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider(); + try { + rawMetricsProvider.start(new PropertiesConfiguration()); + rawMetricsProvider.getStatsLogger("test_raw") + .scopeLabel("topic", "persistent://public/default/test-v1") + .getOpStatsLogger("writeLatency") + .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); + + getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; + HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); + Multimap metrics = parseMetrics(EntityUtils.toString(response.getEntity())); + List subMetrics = (List) metrics.get("test_raw_writeLatency_count"); + if (subMetrics.get(0).tags.get("success").equals("false")) { + assertEquals(subMetrics.get(1).value, 1); + } else { + assertEquals(subMetrics.get(0).value, 1); + } + assertEquals(subMetrics.get(0).tags.get("topic"), "persistent://public/default/test-v1"); + } finally { + rawMetricsProvider.stop(); + } + + } + public static class Metric { public Map tags = new TreeMap<>(); public double value;