Skip to content

Commit

Permalink
Added helper methods for manual spans in mutiny pipelines.
Browse files Browse the repository at this point in the history
In a traditional blocking and synchronous framework the opentelemetry context
is attached to ThreadLocal. In reactive programming, where multiple processings share the same event-loop thread, one has to use Vert.x duplicated contexts instead (see https://quarkus.io/guides/duplicated-context).
wrapWithSpan ensures that the pipeline is executed on a duplicated context (If the current context already is duplicated, it will stay the same. Therefore, nested calls to wrapWithSpan will all run on the same vert.x context).

inspired by Jan Peremsky (https://github.com/jan-peremsky/quarkus-reactive-otel/blob/c74043d388ec4df155f466f1d6938931c3389b70/src/main/java/com/fng/ewallet/pex/Tracer.java)
and edeandrea (https://github.com/quarkusio/quarkus-super-heroes/blob/main/event-statistics/src/main/java/io/quarkus/sample/superheroes/statistics/listener/SuperStats.java)

suggestions from review

I will squash with the previous commit once everything is approved

suggestion from reviewer

todo: squash when everything is done
-> adapt commit message. In this solution span and scope are local variables. no need for a stack.

unnecessary line removed

fix test to run on different contexts again

cleanup
  • Loading branch information
arn-ivu committed Jan 15, 2025
1 parent 552d558 commit 26eeeee
Show file tree
Hide file tree
Showing 4 changed files with 492 additions and 0 deletions.
54 changes: 54 additions & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,60 @@ public void tracedWork() {
}
----

=== Mutiny
Methods returning reactive types can also be annotated with `@WithSpan` and `@AddingSpanAttributes` to create a new span or add attributes to the current span.

If you need to create spans manually within a mutiny pipeline, use `wrapWithSpan` method from `io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper`.

Example. Assuming you have the following pipeline:
[source,java]
----
Uni<String> uni = Uni.createFrom().item("hello")
//start trace here
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
//end trace here
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
wrap it like this:
[source,java]
----
import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
...
@Inject
Tracer tracer;
...
Context context = Context.current();
Uni<String> uni = Uni.createFrom().item("hello")
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(context), "my-span-name",
Uni.createFrom().item(m)
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
for multi-pipelines it works similarly:
[source,java]
----
Multi.createFrom().items("Alice", "Bob", "Charlie")
.transformToMultiAndConcatenate(m -> TracingHelper.withTrace("my-span-name",
Multi.createFrom().item(m)
.onItem().transform(name -> "Hello " + name)
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
Instead of `transformToMultiAndConcatenate` you can use `transformToMultiAndMerge` if you don't care about the order of the items.

=== Quarkus Messaging - Kafka

When using the Quarkus Messaging extension for Kafka,
Expand Down
5 changes: 5 additions & 0 deletions extensions/opentelemetry/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
<artifactId>vertx-web-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package io.quarkus.opentelemetry.deployment.traces;

import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import jakarta.inject.Inject;

class MutinyTracingHelperTest {

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, TestSpanExporterProvider.class)
.addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider"));

@Inject
private TestSpanExporter spanExporter;

@Inject
private Tracer tracer;

@Inject
private Vertx vertx;

@AfterEach
void tearDown() {
spanExporter.reset();
}

@ParameterizedTest(name = "{index}: Simple uni pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleUniPipeline(final String contextType, final String contextName) {

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, "testSpan",
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan").startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");

//ensure there are two spans with subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder("testSpan", "subspan");
assertChildSpan(spans, "testSpan", "subspan");
}

@ParameterizedTest(name = "{index}: Explicit parent {1}")
@MethodSource("generateContextRunners")
void testSpanWithExplicitParent(final String contextType, final String contextName) {

final String parentSpanName = "parentSpan";
final String pipelineSpanName = "pipelineSpan";
final String subspanName = "subspan";

final Span parentSpan = tracer.spanBuilder(parentSpanName).startSpan();
final io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current().with(parentSpan);

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(parentContext),
pipelineSpanName,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder(subspanName).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");
parentSpan.end();

//ensure there are 3 spans with proper parent-child relationships
final List<SpanData> spans = spanExporter.getFinishedSpanItems(3);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, pipelineSpanName,
subspanName);
assertChildSpan(spans, parentSpanName, pipelineSpanName);
assertChildSpan(spans, pipelineSpanName, subspanName);
}

@ParameterizedTest(name = "{index}: Nested uni pipeline with implicit parent {1}")
@MethodSource("generateContextRunners")
void testNestedPipeline_implicitParent(final String contextType,
final String contextName) {

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans with doSomething and doSomethingAsync as children of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertChildSpan(spans, parentSpanName, childSpanName);
}

@ParameterizedTest(name = "{index}: Nested uni pipeline with explicit no parent {1}")
@MethodSource("generateContextRunners")
void testNestedPipeline_explicitNoParent(final String contextType, final String contextName) {

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, Optional.empty(), childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans but without parent-child relationship
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo("0000000000000000");//signifies no parent
}

@ParameterizedTest(name = "{index}: Concatenating multi pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleMultiPipeline_Concatenate(final String contextType, final String contextName) {

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUniAndConcatenate(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
//the traced pipeline
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion().assertItems("test1 transformed", "test2 transformed", "test3 transformed");

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

@ParameterizedTest(name = "{index}: Merging multi pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleMultiPipeline_Merge(final String contextType, final String contextName) {

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUniAndMerge(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion();

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

private static void assertChildSpan(final List<SpanData> spans, final String parentSpanName,
final String childSpanName1) {
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName1))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo(
spans.stream().filter(span -> span.getName().equals(parentSpanName)).findAny().get().getSpanId());
}

private static Stream<Arguments> generateContextRunners() {
return Stream.of(
Arguments.of("WITHOUT_CONTEXT", "Without Context"),
Arguments.of("ROOT_CONTEXT", "On Root Context"),
Arguments.of("DUPLICATED_CONTEXT", "On Duplicated Context"));
}

private void runOnContext(final Runnable runnable, final Vertx vertx, final String contextType) {
switch (contextType) {
case "WITHOUT_CONTEXT":
runWithoutContext(runnable);
break;
case "ROOT_CONTEXT":
runOnRootContext(runnable, vertx);
break;
case "DUPLICATED_CONTEXT":
runOnDuplicatedContext(runnable, vertx);
break;
default:
throw new IllegalArgumentException("Unknown context type: " + contextType);
}
}

private static void runWithoutContext(final Runnable runnable) {
assertThat(QuarkusContextStorage.getVertxContext()).isNull();
runnable.run();
}

private static void runOnRootContext(final Runnable runnable, final Vertx vertx) {
final Context rootContext = VertxContext.getRootContext(vertx.getOrCreateContext());
assertThat(rootContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(rootContext)).isFalse();
assertThat(rootContext).isNotEqualTo(QuarkusContextStorage.getVertxContext());

rootContext.runOnContext(v -> runnable.run());
}

private static void runOnDuplicatedContext(final Runnable runnable, final Vertx vertx) {
final Context duplicatedContext = VertxContext.createNewDuplicatedContext(vertx.getOrCreateContext());
assertThat(duplicatedContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(duplicatedContext)).isTrue();

duplicatedContext.runOnContext(v -> runnable.run());
}

}
Loading

0 comments on commit 26eeeee

Please sign in to comment.