diff --git a/custom/src/main/java/com/grafana/extensions/exporter/MovingAverageThresholdSampler.java b/custom/src/main/java/com/grafana/extensions/exporter/MovingAverageThresholdSampler.java deleted file mode 100644 index 60c1562a..00000000 --- a/custom/src/main/java/com/grafana/extensions/exporter/MovingAverageThresholdSampler.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright Grafana Labs - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.grafana.extensions.exporter; - -import com.grafana.extensions.util.MovingAverage; -import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class MovingAverageThresholdSampler implements SpanExporter { - private final Map movingAvgs = new ConcurrentHashMap<>(); - private final double thresholdVal; - private final int windowSize; - - public static final Logger logger = - Logger.getLogger(MovingAverageThresholdSampler.class.getName()); - private final SpanExporter delegate; - - protected MovingAverageThresholdSampler( - double thresholdVal, int windowSize, SpanExporter delegate) { - this.delegate = delegate; - this.thresholdVal = 0.5; - this.windowSize = windowSize; - } - - public static SpanExporter configure(SpanExporter delegate, ConfigProperties properties) { - double threshold = properties.getDouble("threshold", 1.5); - int windowSize = properties.getInt("window", 5); - return new MovingAverageThresholdSampler(threshold, windowSize, delegate); - } - - @Override - public CompletableResultCode export(Collection spans) { - Iterator iter = spans.iterator(); - while (iter.hasNext()) { - SpanData span = iter.next(); - String spanName = span.getName(); - logger.log( - Level.INFO, - "exp:spanName {0} - windowSize {1}: {2}", - new Object[] {span.getName(), windowSize, span.getAttributes()}); - long duration = (span.getEndEpochNanos() - span.getStartEpochNanos()) / 1_000_000; - MovingAverage currMovingAvg = - movingAvgs.computeIfAbsent(spanName, ma -> new MovingAverage(windowSize)); - currMovingAvg.add(duration); - if (currMovingAvg.getCount() >= windowSize) { - double avg = currMovingAvg.calcAverage(); - logger.log( - Level.INFO, - "exp: avg {0} * threshold {1} = {2}, duration {3}", - new Object[] {avg, thresholdVal, avg * thresholdVal, duration}); - if (duration < avg * thresholdVal) { - logger.log(Level.INFO, "discarding: " + span.getName()); - spans.remove(span); - } - } - logger.log(Level.INFO, "exporting span:{0}", new Object[] {spans.size()}); - } - return delegate.export(spans); - } - - @Override - public CompletableResultCode flush() { - return delegate.flush(); - } - - @Override - public CompletableResultCode shutdown() { - return delegate.shutdown(); - } -} diff --git a/custom/src/main/java/com/grafana/extensions/sampler/DynamicSampler.java b/custom/src/main/java/com/grafana/extensions/sampler/DynamicSampler.java index 43debcb0..75118082 100644 --- a/custom/src/main/java/com/grafana/extensions/sampler/DynamicSampler.java +++ b/custom/src/main/java/com/grafana/extensions/sampler/DynamicSampler.java @@ -20,14 +20,14 @@ public class DynamicSampler { private final Set sampledTraces = new ConcurrentSkipListSet<>(); public static final Logger logger = Logger.getLogger(DynamicSampler.class.getName()); private final int windowSize; - private final double thresholdVal; + private final double threshold; private final Map movingAvgs = new ConcurrentHashMap<>(); private static DynamicSampler INSTANCE; private DynamicSampler(ConfigProperties properties) { // read properties and configure dynamic sampling - this.thresholdVal = properties.getDouble("threshold", 1.5); - this.windowSize = properties.getInt("window", 5); + this.threshold = properties.getDouble("threshold", 1.3); // 30% + this.windowSize = properties.getInt("window", 3); } public static void configure(ConfigProperties properties) { @@ -42,6 +42,11 @@ public void setSampled(String traceId) { sampledTraces.add(traceId); } + // for testing + public void setMovingAvg(String spanName, MovingAverage ma) { + this.movingAvgs.put(spanName, ma); + } + boolean isSampled(String traceId) { return sampledTraces.contains(traceId); } @@ -74,7 +79,7 @@ boolean shouldSample(ReadableSpan span) { Level.INFO, "spanName {0} - windowSize {1}: {2}", new Object[] {span.getName(), windowSize, span.getAttributes()}); - long duration = (span.getLatencyNanos()) / 1_000_000; + long duration = span.getLatencyNanos(); MovingAverage currMovingAvg = movingAvgs.computeIfAbsent(spanName, ma -> new MovingAverage(windowSize)); currMovingAvg.add(duration); @@ -83,12 +88,16 @@ boolean shouldSample(ReadableSpan span) { logger.log( Level.INFO, "avg {0} * threshold {1} = {2}, duration {3}", - new Object[] {avg, thresholdVal, avg * thresholdVal, duration}); + new Object[] {avg, threshold, avg * threshold, duration}); // discard - if (duration < avg * thresholdVal) { + if (duration < avg * threshold) { return false; } } + logger.log( + Level.INFO, + "sending span part of Trace: {0} - {1}", + new Object[] {span.toSpanData().getTraceId(), duration}); return true; } } diff --git a/custom/src/main/java/com/grafana/extensions/util/MovingAverage.java b/custom/src/main/java/com/grafana/extensions/util/MovingAverage.java index a7cc638d..3c682292 100644 --- a/custom/src/main/java/com/grafana/extensions/util/MovingAverage.java +++ b/custom/src/main/java/com/grafana/extensions/util/MovingAverage.java @@ -7,6 +7,7 @@ import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.ThreadLocalRandom; public class MovingAverage { private final Queue window = new LinkedList<>(); @@ -17,6 +18,14 @@ public MovingAverage(int size) { this.size = size; } + public static MovingAverage getPrepopulatedMovingAvgForTest(int size, int lowerBound) { + MovingAverage ma = new MovingAverage(size); + for(int i=0;i { + String traceId = Span.current().getSpanContext().getTraceId(); String serverTiming = ServerTimingHeaderCustomizer.toHeaderValue(Context.current()); - serverTimingHeaderReader.consume( new StringHttpCommonAttributesGetter(serverTiming), "request", "response"); - assertThat(DynamicSampler.getInstance().getSampledTraces()).isEmpty(); + System.out.println(serverTiming); + assertThat(DynamicSampler.getInstance().getSampledTraces()).doesNotContain(traceId); }); } diff --git a/custom/src/test/java/com/grafana/extensions/servertiming/ServerTimingHeaderTest.java b/custom/src/test/java/com/grafana/extensions/servertiming/ServerTimingHeaderTest.java index 4c7d95ec..c0c9fbcd 100644 --- a/custom/src/test/java/com/grafana/extensions/servertiming/ServerTimingHeaderTest.java +++ b/custom/src/test/java/com/grafana/extensions/servertiming/ServerTimingHeaderTest.java @@ -10,15 +10,19 @@ import static org.assertj.core.api.Assertions.assertThat; import com.grafana.extensions.sampler.DynamicSampler; +import com.grafana.extensions.util.MovingAverage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; import io.opentelemetry.sdk.resources.Resource; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -31,6 +35,11 @@ class ServerTimingHeaderTest { record TestCase(Resource want, ConfigProperties config) {} + @BeforeAll + static void initialize() { + DynamicSampler.configure(DefaultConfigProperties.createFromMap(Collections.emptyMap())); + } + @BeforeEach void setUp() { ServerTimingHeaderCustomizer.enabled = true; @@ -48,6 +57,8 @@ void shouldNotSetAnyHeadersWithoutValidCurrentSpan() { @Test void shouldSetHeaders() { + MovingAverage testMovingAvg = MovingAverage.getPrepopulatedMovingAvgForTest(3, 11_900_000); + DynamicSampler.getInstance().setMovingAvg("server", testMovingAvg); assertSetHeader("00", span -> {}); assertSetHeader( "01", span -> DynamicSampler.getInstance().setSampled(span.getSpanContext().getTraceId())); @@ -75,8 +86,9 @@ private void assertSetHeader(String traceFlags, Consumer spanConsumer) { + "-" + traceFlags + "\""; + assertThat(headers) - .containsEntry(SERVER_TIMING, serverTimingHeaderValue) + .containsEntry(SERVER_TIMING, serverTimingHeaderValue) .containsEntry(EXPOSE_HEADERS, SERVER_TIMING); } }