Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prometheus raw metrics provider support #4

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -51,6 +49,7 @@
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
Expand Down Expand Up @@ -315,12 +314,6 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa
return;
}

try {
Writer writer = new StringWriter();
statsProvider.writeAllMetrics(writer);
stream.write(writer.toString());
} catch (IOException e) {
// nop
}
((PrometheusMetricsProvider) statsProvider).generate(stream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import com.google.common.base.Joiner;
import io.prometheus.client.Collector;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;

/**
* A {@code StatsLogger} that caches the stats objects created by other {@code StatsLogger}.
*/
public class CachingStatsLogger implements StatsLogger {

protected final StatsLogger underlying;
protected final ConcurrentMap<ScopeContext, Counter> counters;
protected final ConcurrentMap<ScopeContext, OpStatsLogger> opStatsLoggers;
protected final ConcurrentMap<ScopeContext, StatsLogger> scopeStatsLoggers;
protected final ConcurrentMap<ScopeContext, StatsLogger> scopeLabelStatsLoggers;

private final String scope;
private final Map<String, String> labels;

public CachingStatsLogger(String scope, StatsLogger statsLogger, Map<String, String> labels) {
this.scope = scope;
this.labels = labels;
this.underlying = statsLogger;
this.counters = new ConcurrentHashMap<>();
this.opStatsLoggers = new ConcurrentHashMap<>();
this.scopeStatsLoggers = new ConcurrentHashMap<>();
this.scopeLabelStatsLoggers = new ConcurrentHashMap<>();
}

@Override
public int hashCode() {
return underlying.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof CachingStatsLogger)) {
return false;
}
CachingStatsLogger another = (CachingStatsLogger) obj;
return underlying.equals(another.underlying);
}

@Override
public String toString() {
return underlying.toString();
}

@Override
public OpStatsLogger getOpStatsLogger(String name) {
return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name));
}

@Override
public Counter getCounter(String name) {
return counters.computeIfAbsent(scopeContext(name), x -> underlying.getCounter(name));
}

@Override
public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
underlying.registerGauge(name, gauge);
}

@Override
public <T extends Number> void unregisterGauge(String name, Gauge<T> gauge) {
underlying.unregisterGauge(name, gauge);
}

@Override
public StatsLogger scope(String name) {
return scopeStatsLoggers.computeIfAbsent(scopeContext(name),
x -> new CachingStatsLogger(scope, underlying.scope(name), labels));
}

@Override
public StatsLogger scopeLabel(String labelName, String labelValue) {
Map<String, String> newLabels = new TreeMap<>(labels);
newLabels.put(labelName, labelValue);
return scopeLabelStatsLoggers.computeIfAbsent(new ScopeContext(completeName(""), newLabels),
x -> new CachingStatsLogger(scope, underlying.scopeLabel(labelName, labelValue), newLabels));
}


@Override
public void removeScope(String name, StatsLogger statsLogger) {
scopeStatsLoggers.remove(scopeContext(name), statsLogger);
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public OpStatsLogger getThreadScopedOpStatsLogger(String name) {
return getOpStatsLogger(name);
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public Counter getThreadScopedCounter(String name) {
return getCounter(name);
}

private ScopeContext scopeContext(String name) {
return new ScopeContext(completeName(name), labels);
}

private String completeName(String name) {
return Collector.sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;

/**
* A {@code CachingStatsProvider} adds the caching functionality to an existing {@code StatsProvider}.
*
* <p>The stats provider will cache the stats objects created by the other {@code StatsProvider} to allow
* the reusability of stats objects and avoid creating a lot of stats objects.
*/
public class CachingStatsProvider implements StatsProvider {

protected final StatsProvider underlying;
protected final ConcurrentMap<String, StatsLogger> statsLoggers;

public CachingStatsProvider(StatsProvider provider) {
this.underlying = provider;
this.statsLoggers = new ConcurrentHashMap<>();
}

@Override
public void start(Configuration conf) {
this.underlying.start(conf);
}

@Override
public void stop() {
this.underlying.stop();
}

@Override
public StatsLogger getStatsLogger(String scope) {
return statsLoggers.computeIfAbsent(scope,
x -> new CachingStatsLogger(scope, underlying.getStatsLogger(scope), Collections.emptyMap()));
}

@Override
public String getStatsName(String... statsComponents) {
return underlying.getStatsName(statsComponents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger {
private final LongAdder successSumAdder = new LongAdder();
private final LongAdder failSumAdder = new LongAdder();

public DataSketchesOpStatsLogger() {
private Map<String, String> labels;

// used for lazy registration for thread scoped metrics
private boolean threadInitialized;

public DataSketchesOpStatsLogger(Map<String, String> labels) {
this.current = new ThreadLocalAccessor();
this.replacement = new ThreadLocalAccessor();
this.labels = labels;
}

@Override
Expand Down Expand Up @@ -172,6 +178,19 @@ public double getQuantileValue(boolean success, double quantile) {
return s != null ? s.getQuantile(quantile) : Double.NaN;
}

public Map<String, String> getLabels() {
return labels;
}

public boolean isThreadInitialized() {
return threadInitialized;
}

public void initializeThread(Map<String, String> labels) {
this.labels = labels;
this.threadInitialized = true;
}

private static class LocalData {
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Counter;

Expand All @@ -30,8 +31,13 @@
public class LongAdderCounter implements Counter {
private final LongAdder counter = new LongAdder();

public LongAdderCounter() {
private Map<String, String> labels;

// used for lazy registration for thread scoped metric
private boolean threadInitialized;

public LongAdderCounter(Map<String, String> labels) {
this.labels = labels;
}

@Override
Expand All @@ -58,4 +64,17 @@ public void add(long delta) {
public Long get() {
return counter.sum();
}

public Map<String, String> getLabels() {
return labels;
}

public boolean isThreadInitialized() {
return threadInitialized;
}

public void initializeThread(Map<String, String> labels) {
this.labels = labels;
this.threadInitialized = true;
}
}
Loading