Skip to content

Commit

Permalink
feat: trying out processor and exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Syerikjan Khusayan committed Dec 2, 2024
1 parent c0342e7 commit 281f635
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package com.grafana.extensions;

import com.grafana.extensions.exporter.MovingAverageThresholdSampler;
import com.grafana.extensions.filter.MetricsCustomizer;
import com.grafana.extensions.instrumentations.TestedInstrumentationsCustomizer;
import com.grafana.extensions.resources.ResourceCustomizer;
import com.grafana.extensions.sampling.MovingAverageThresholdSampler;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import java.util.HashMap;
Expand All @@ -22,7 +22,8 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
.addPropertiesSupplier(GrafanaAutoConfigCustomizerProvider::getDefaultProperties)
.addPropertiesCustomizer(TestedInstrumentationsCustomizer::customizeProperties)
.addMeterProviderCustomizer(MetricsCustomizer::configure)
.addSamplerCustomizer(MovingAverageThresholdSampler::configure)
.addSpanExporterCustomizer(MovingAverageThresholdSampler::configure)
// .addSpanProcessorCustomizer(MovingAverageProcessor::configure)
.addResourceCustomizer(ResourceCustomizer::truncate);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Grafana Labs
* SPDX-License-Identifier: Apache-2.0
*/

package com.grafana.extensions.exporter;

import com.grafana.extensions.util.MovingAverage;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MovingAverageThresholdSampler implements SpanExporter {
private final Map<String, MovingAverage> movingAvgs = new ConcurrentHashMap<>();
private final double thresholdVal;
private final int windowSize;

public static final Logger logger =
Logger.getLogger(MovingAverageThresholdSampler.class.getName());

protected MovingAverageThresholdSampler(double thresholdVal, int windowSize) {
this.thresholdVal = 0.5;
this.windowSize = windowSize;
}

public static SpanExporter configure(SpanExporter se, ConfigProperties properties) {
double threshold = properties.getDouble("threshold", 1.5);
int windowSize = properties.getInt("window", 5);
return new MovingAverageThresholdSampler(threshold, windowSize);
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
for (SpanData span : spans) {
String spanName = span.getName();
logger.log(
Level.INFO,
"exp:spanName {0} - windowSize {1}: {2}",
new Object[] {span.getName(), windowSize, span.getAttributes()});
long duration = (span.getEndEpochNanos() - span.getStartEpochNanos()) / 1_000_000;
MovingAverage currMovingAvg =
movingAvgs.computeIfAbsent(spanName, ma -> new MovingAverage(windowSize));
currMovingAvg.addAndCalcAverage(duration);
if (currMovingAvg.getCount() >= windowSize) {
double avg = currMovingAvg.addAndCalcAverage(duration);
logger.log(
Level.INFO,
"exp: avg {0} * threshold {1} = {2}, duration {3}",
new Object[] {avg, thresholdVal, avg * thresholdVal, duration});
if (duration < avg * thresholdVal) {
logger.log(Level.INFO, "removing: " + span.getName());
spans.remove(span);
}
}
logger.log(Level.INFO, "exporting span:{0}", new Object[] {spans.size()});
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Grafana Labs
* SPDX-License-Identifier: Apache-2.0
*/

package com.grafana.extensions.processor;

import com.grafana.extensions.util.MovingAverage;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MovingAverageProcessor implements SpanProcessor {
private final Map<String, MovingAverage> movingAvgs = new ConcurrentHashMap<>();
private final double thresholdVal;
private final int windowSize;

public static final Logger logger = Logger.getLogger(MovingAverageProcessor.class.getName());

protected MovingAverageProcessor(double thresholdVal, int windowSize) {
this.thresholdVal = thresholdVal;
this.windowSize = windowSize;

// exporter = OtlpProtocolPropertiesSupplier;
}

public static SpanProcessor configure(SpanProcessor sp, ConfigProperties properties) {
double threshold = properties.getDouble("threshold", 1.5);
int windowSize = properties.getInt("window", 5);
return new MovingAverageProcessor(threshold, windowSize);
}

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {}

@Override
public boolean isStartRequired() {
return false;
}

@Override
public void onEnd(ReadableSpan span) {
String spanName = span.getName();
logger.log(
Level.INFO,
"spanName {0} - windowSize {1}: {2}",
new Object[] {span.getName(), windowSize, span.getAttributes()});
long duration = (span.getLatencyNanos()) / 1_000_000;
MovingAverage currMovingAvg =
movingAvgs.computeIfAbsent(spanName, ma -> new MovingAverage(windowSize));
currMovingAvg.addAndCalcAverage(duration);
if (currMovingAvg.getCount() >= windowSize) {
double avg = currMovingAvg.addAndCalcAverage(duration);
logger.log(
Level.INFO,
"avg {0} * threshold {1} = {2}, duration {3}",
new Object[] {avg, thresholdVal, avg * thresholdVal, duration});
if (duration < avg * thresholdVal) {
return;
}
}
logger.log(Level.INFO, "sending forward:{0}", new Object[] {span.getName()});
}

@Override
public boolean isEndRequired() {
return true;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package com.grafana.extensions.sampling;
package com.grafana.extensions.util;

import java.util.LinkedList;
import java.util.Queue;
Expand Down

0 comments on commit 281f635

Please sign in to comment.