Skip to content

Commit

Permalink
ref: make dynamic sampler singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
Syerikjan Khusayan committed Dec 3, 2024
1 parent f877bd9 commit d43dc5b
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package com.grafana.extensions;

import com.grafana.extensions.exporter.MovingAverageThresholdSampler;
import com.grafana.extensions.filter.MetricsCustomizer;
import com.grafana.extensions.instrumentations.TestedInstrumentationsCustomizer;
import com.grafana.extensions.resources.ResourceCustomizer;
Expand All @@ -28,7 +27,6 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
.addTracerProviderCustomizer(SamplingSpanProcessor::configure)
.addSpanExporterCustomizer(SamplingExporter::configure)
.addSamplerCustomizer(DeferredSampler::configure)
.addSpanExporterCustomizer(MovingAverageThresholdSampler::configure)
.addResourceCustomizer(ResourceCustomizer::truncate);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
public class DeferredSampler implements Sampler {

private static final Sampler FROM_PARENT = Sampler.parentBased(Sampler.alwaysOff());
private final ConfigProperties properties;

public DeferredSampler(ConfigProperties properties) {
this.properties = properties;
}

public static Sampler configure(Sampler configured, ConfigProperties configProperties) {
return new DeferredSampler();
return new DeferredSampler(configProperties);
}

@Override
Expand All @@ -41,7 +46,7 @@ public SamplingResult shouldSample(
.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks)
.getDecision();
if (SamplingDecision.RECORD_AND_SAMPLE.equals(parentDecision)) {
DynamicSampler.setSampled(traceId);
DynamicSampler.getInstance(properties).setSampled(traceId);
}

// always return true - because a child span might be sampled even if the parent is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,48 @@

package com.grafana.extensions.sampler;

import io.opentelemetry.api.common.AttributeKey;
import com.grafana.extensions.util.MovingAverage;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.trace.ReadableSpan;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DynamicSampler {
private static final Set<String> sampledTraces = new ConcurrentSkipListSet<>();
private final Set<String> sampledTraces = new ConcurrentSkipListSet<>();
public static final Logger logger = Logger.getLogger(DynamicSampler.class.getName());
private final int windowSize;
private final double thresholdVal;
private final Map<String, MovingAverage> movingAvgs = new ConcurrentHashMap<>();
private static DynamicSampler INSTANCE;

public DynamicSampler(ConfigProperties properties) {
private DynamicSampler(ConfigProperties properties) {
// read properties and configure dynamic sampling
this.thresholdVal = properties.getDouble("threshold", 1.5);
this.windowSize = properties.getInt("window", 5);
}

public static void setSampled(String traceId) {
public static DynamicSampler getInstance(ConfigProperties properties) {
if (INSTANCE == null) {
INSTANCE = new DynamicSampler(properties);
}

return INSTANCE;
}

public void setSampled(String traceId) {
sampledTraces.add(traceId);
}

static boolean isSampled(String traceId) {
boolean isSampled(String traceId) {
return sampledTraces.contains(traceId);
}

public static boolean evaluateSampled(ReadableSpan span) {
public boolean evaluateSampled(ReadableSpan span) {
String traceId = span.getSpanContext().getTraceId();
if (sampledTraces.contains(traceId)) {
return true;
Expand All @@ -40,18 +59,36 @@ public static boolean evaluateSampled(ReadableSpan span) {
}

// public visible for testing
public static void clear() {
public void clear() {
sampledTraces.clear();
}

// visible for testing
public static Set<String> getSampledTraces() {
public Set<String> getSampledTraces() {
return Collections.unmodifiableSet(sampledTraces);
}

static boolean shouldSample(ReadableSpan span) {
// add dynamic sampling logic here
// dummy implementation for testing for now
return Boolean.TRUE.equals(span.getAttributes().get(AttributeKey.booleanKey("sampled")));
boolean shouldSample(ReadableSpan span) {
String spanName = span.getName();
logger.log(
Level.INFO,
"spanName {0} - windowSize {1}: {2}",
new Object[] {span.getName(), windowSize, span.getAttributes()});
long duration = (span.getLatencyNanos()) / 1_000_000;
MovingAverage currMovingAvg =
movingAvgs.computeIfAbsent(spanName, ma -> new MovingAverage(windowSize));
currMovingAvg.add(duration);
if (currMovingAvg.getCount() >= windowSize) {
double avg = currMovingAvg.calcAverage();
logger.log(
Level.INFO,
"avg {0} * threshold {1} = {2}, duration {3}",
new Object[] {avg, thresholdVal, avg * thresholdVal, duration});
// discard
if (duration < avg * thresholdVal) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
public class SamplingExporter implements SpanExporter {

private final SpanExporter delegate;
private final ConfigProperties properties;

public SamplingExporter(SpanExporter delegate) {
public SamplingExporter(SpanExporter delegate, ConfigProperties properties) {
this.delegate = delegate;
this.properties = properties;
}

public static SpanExporter configure(SpanExporter delegate, ConfigProperties properties) {
return new SamplingExporter(delegate);
return new SamplingExporter(delegate, properties);
}

@Override
public CompletableResultCode export(Collection<SpanData> collection) {
ArrayList<SpanData> export = new ArrayList<>();
for (SpanData data : collection) {
if (DynamicSampler.isSampled(data.getSpanContext().getTraceId())) {
if (DynamicSampler.getInstance(this.properties)
.isSampled(data.getSpanContext().getTraceId())) {
export.add(data);
}
}
DynamicSampler.clear();
DynamicSampler.getInstance(this.properties).clear();
return delegate.export(export);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
import io.opentelemetry.sdk.trace.SpanProcessor;

public class SamplingSpanProcessor implements SpanProcessor {
private final ConfigProperties properties;

public SamplingSpanProcessor(ConfigProperties properties) {
this.properties = properties;
}

@Override
public void onStart(Context context, ReadWriteSpan readWriteSpan) {}
Expand All @@ -24,7 +29,7 @@ public boolean isStartRequired() {

@Override
public void onEnd(ReadableSpan readableSpan) {
DynamicSampler.evaluateSampled(readableSpan);
DynamicSampler.getInstance(properties).evaluateSampled(readableSpan);
}

@Override
Expand All @@ -34,7 +39,6 @@ public boolean isEndRequired() {

public static SdkTracerProviderBuilder configure(
SdkTracerProviderBuilder sdkTracerProviderBuilder, ConfigProperties configProperties) {
new DynamicSampler(configProperties);
return sdkTracerProviderBuilder.addSpanProcessor(new SamplingSpanProcessor());
return sdkTracerProviderBuilder.addSpanProcessor(new SamplingSpanProcessor(configProperties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public <RESPONSE> void customize(
static String toHeaderValue(Context context) {
ReadWriteSpan span = (ReadWriteSpan) Span.fromContext(context);
SpanContext spanContext = span.getSpanContext();
boolean sampled = DynamicSampler.evaluateSampled(span);
boolean sampled = DynamicSampler.getInstance().evaluateSampled(span);
TraceParentHolder traceParentHolder = new TraceParentHolder();
W3CTraceContextPropagator.getInstance()
.inject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public <REQUEST, RESPONSE> void consume(
GETTER);
SpanContext spanContext = Span.fromContext(traceparent).getSpanContext();
if (spanContext.getTraceFlags().isSampled()) {
DynamicSampler.setSampled(spanContext.getTraceId());
DynamicSampler.getInstance().setSampled(spanContext.getTraceId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Collections;

class ServerTimingHeaderReaderTest {

private ServerTimingHeaderReader serverTimingHeaderReader = new ServerTimingHeaderReader();
Expand All @@ -24,7 +27,7 @@ class ServerTimingHeaderReaderTest {

@BeforeEach
void setUp() {
DynamicSampler.clear();
DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).clear();
}

@Test
Expand All @@ -36,7 +39,7 @@ void notSampled() {

serverTimingHeaderReader.consume(
new StringHttpCommonAttributesGetter(serverTiming), "request", "response");
assertThat(DynamicSampler.getSampledTraces()).isEmpty();
assertThat(DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).getSampledTraces()).isEmpty();
});
}

Expand All @@ -46,14 +49,14 @@ void sampled() {
"server",
() -> {
String traceId = Span.current().getSpanContext().getTraceId();
DynamicSampler.setSampled(traceId);
DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).setSampled(traceId);
String serverTiming = ServerTimingHeaderCustomizer.toHeaderValue(Context.current());

// remove the traceId to see that it is added back by the reader
DynamicSampler.clear();
DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).clear();
serverTimingHeaderReader.consume(
new StringHttpCommonAttributesGetter(serverTiming), "request", "response");
assertThat(DynamicSampler.getSampledTraces()).contains(traceId);
assertThat(DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).getSampledTraces()).contains(traceId);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
import io.opentelemetry.sdk.resources.Resource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -26,11 +32,12 @@ class ServerTimingHeaderTest {
@RegisterExtension InstrumentationExtension testing = LibraryInstrumentationExtension.create();

private final ServerTimingHeaderCustomizer serverTiming = new ServerTimingHeaderCustomizer();
record TestCase(Resource want, ConfigProperties config) {}

@BeforeEach
void setUp() {
ServerTimingHeaderCustomizer.enabled = true;
DynamicSampler.clear();
DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).clear();
}

@Test
Expand All @@ -45,7 +52,8 @@ void shouldNotSetAnyHeadersWithoutValidCurrentSpan() {
@Test
void shouldSetHeaders() {
assertSetHeader("00", span -> {});
assertSetHeader("01", span -> DynamicSampler.setSampled(span.getSpanContext().getTraceId()));
assertSetHeader(
"01", span -> DynamicSampler.getInstance(DefaultConfigProperties.createFromMap(Collections.emptyMap())).setSampled(span.getSpanContext().getTraceId()));
}

private void assertSetHeader(String traceFlags, Consumer<Span> spanConsumer) {
Expand Down

0 comments on commit d43dc5b

Please sign in to comment.