diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 469458e5117f2..94e31f1c43103 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -1,6 +1,5 @@ package io.quarkus.vertx.runtime; -import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe; import static io.smallrye.common.expression.Expression.Flag.LENIENT_SYNTAX; import static io.smallrye.common.expression.Expression.Flag.NO_TRIM; @@ -32,9 +31,7 @@ import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.expression.Expression; import io.smallrye.common.expression.ResolveContext; -import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; -import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -123,35 +120,30 @@ public void handle(Void x) { consumer.handler(new Handler>() { @Override public void handle(Message m) { + // Will run on the context used for the consumer registration. + // It's a duplicated context, but we need to mark it as safe. + // The safety comes from the fact that it's instantiated by Vert.x for every + // message. + setCurrentContextSafe(true); if (blocking) { - // We need to create a duplicated context from the "context" - Context dup = VertxContext.getOrCreateDuplicatedContext(context); - setContextSafe(dup, true); - if (runOnVirtualThread) { - // Switch to a Vert.x context to capture it and use it during the invocation. - dup.runOnContext(new Handler() { + VirtualThreadsRecorder.getCurrent().execute(new Runnable() { @Override - public void handle(Void event) { - VirtualThreadsRecorder.getCurrent().execute(new Runnable() { - @Override - public void run() { - try { - invoker.invoke(m); - } catch (Exception e) { - if (m.replyAddress() == null) { - // No reply handler - throw wrapIfNecessary(e); - } else { - m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); - } - } + public void run() { + try { + invoker.invoke(m); + } catch (Exception e) { + if (m.replyAddress() == null) { + // No reply handler + throw wrapIfNecessary(e); + } else { + m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); } - }); + } } }); } else { - Future future = dup.executeBlocking(new Callable() { + Future future = Vertx.currentContext().executeBlocking(new Callable() { @Override public Void call() { try { @@ -170,11 +162,6 @@ public Void call() { future.onFailure(context::reportException); } } else { - // Will run on the context used for the consumer registration. - // It's a duplicated context, but we need to mark it as safe. - // The safety comes from the fact that it's instantiated by Vert.x for every - // message. - setCurrentContextSafe(true); try { invoker.invoke(m); } catch (Exception e) { diff --git a/integration-tests/opentelemetry-vertx/pom.xml b/integration-tests/opentelemetry-vertx/pom.xml index f6c24ef4e5361..62da225a990a1 100644 --- a/integration-tests/opentelemetry-vertx/pom.xml +++ b/integration-tests/opentelemetry-vertx/pom.xml @@ -22,6 +22,10 @@ io.quarkus quarkus-vertx-http + + io.quarkus + quarkus-rest-jackson + io.quarkus quarkus-micrometer @@ -72,6 +76,19 @@ + + io.quarkus + quarkus-rest-jackson-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-vertx-http-deployment diff --git a/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusConsumer.java b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusConsumer.java new file mode 100644 index 0000000000000..67989d33bd541 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusConsumer.java @@ -0,0 +1,46 @@ +package io.quarkus.it.opentelemetry.vertx; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.instrumentation.annotations.WithSpan; +import io.quarkus.logging.Log; +import io.quarkus.vertx.ConsumeEvent; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.annotation.RunOnVirtualThread; +import io.vertx.core.MultiMap; + +@ApplicationScoped +public class EventBusConsumer { + + @ConsumeEvent("pets") + // non-blocking + public String sayHi(Pet pet) { + Log.infov("Received a pet: {0} {1}", pet, Span.current()); + process(); + return "Hello " + pet.getName() + " (" + pet.getKind() + ")"; + } + + @ConsumeEvent("persons") + @Blocking + public String name(String name) { + Log.infov("Received a pet: {0} {1}", name, Span.current()); + process(); + return "Hello " + name; + } + + @ConsumeEvent("person-headers") + @RunOnVirtualThread + public String personWithHeader(MultiMap headers, Person person) { + Log.infov("Received a person: {0} {1}", person, Span.current()); + process(); + String s = "Hello " + person.getFirstName() + " " + person.getLastName() + ", " + headers; + return s; + } + + @WithSpan + public void process() { + + } + +} diff --git a/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusSender.java b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusSender.java new file mode 100644 index 0000000000000..c05d6ce28c842 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/EventBusSender.java @@ -0,0 +1,45 @@ +package io.quarkus.it.opentelemetry.vertx; + +import jakarta.inject.Inject; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; + +import io.smallrye.mutiny.Uni; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.core.eventbus.EventBus; +import io.vertx.mutiny.core.eventbus.Message; + +@Path("/event-bus") +public class EventBusSender { + + @Inject + EventBus bus; + + @POST + @Path("/person") + public Uni helloToPerson(JsonObject json) { + return bus. request("persons", json.getString("name")) + .onItem().transform(Message::body); + } + + @POST + @Path("/person2") + @Produces("text/plain") + public Uni helloToPersonWithHeaders(JsonObject json) { + return bus. request( + "person-headers", + new Person(json.getString("firstName"), json.getString("lastName")), + new DeliveryOptions().addHeader("header", "headerValue")) + .onItem().transform(Message::body); + } + + @POST + @Path("/pet") + public Uni helloToPet(JsonObject json) { + return bus. request("pets", new Pet(json.getString("name"), json.getString("kind"))) + .onItem().transform(Message::body); + } + +} diff --git a/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Person.java b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Person.java new file mode 100644 index 0000000000000..672006fa57236 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Person.java @@ -0,0 +1,37 @@ +package io.quarkus.it.opentelemetry.vertx; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Person { + + private String firstName; + private String lastName; + + public Person(String firstName, String lastName) { + this.firstName = firstName; + this.lastName = lastName; + } + + public Person() { + // Used by reflection. + } + + public String getFirstName() { + return firstName; + } + + public Person setFirstName(String firstName) { + this.firstName = firstName; + return this; + } + + public String getLastName() { + return lastName; + } + + public Person setLastName(String lastName) { + this.lastName = lastName; + return this; + } +} diff --git a/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Pet.java b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Pet.java new file mode 100644 index 0000000000000..a4dd2948810f2 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/main/java/io/quarkus/it/opentelemetry/vertx/Pet.java @@ -0,0 +1,25 @@ +package io.quarkus.it.opentelemetry.vertx; + +/** + * Simple pojo. + * The test using this pojo will use the generic codec facility. + */ +public class Pet { + + private final String name; + + private final String kind; + + public Pet(String name, String kind) { + this.name = name; + this.kind = kind; + } + + public String getName() { + return name; + } + + public String getKind() { + return kind; + } +} diff --git a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusIT.java b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusIT.java new file mode 100644 index 0000000000000..a391fbdaa1c27 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusIT.java @@ -0,0 +1,7 @@ +package io.quarkus.it.opentelemetry.vertx; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class EventBusIT extends EventBusTest { +} diff --git a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusTest.java b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusTest.java new file mode 100644 index 0000000000000..1c83c4e3e112d --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/EventBusTest.java @@ -0,0 +1,117 @@ +package io.quarkus.it.opentelemetry.vertx; + +import static io.restassured.RestAssured.given; +import static java.net.HttpURLConnection.HTTP_OK; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.SpanKind; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import io.vertx.core.json.JsonObject; + +@QuarkusTest +public class EventBusTest extends SpanExporterBaseTest { + + @BeforeEach + void reset() { + given().get("/reset").then().statusCode(HTTP_OK); + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().isEmpty()); + } + + @Test + public void testEventBusWithString() { + String body = new JsonObject().put("name", "Bob Morane").toString(); + given().contentType(ContentType.JSON).body(body) + .post("/event-bus/person") + .then().statusCode(200).body(equalTo("Hello Bob Morane")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3); + List> spans = getSpans(); + + Map serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid()); + String spanId = getSpanId(serverCall); + assertNotEquals(SpanId.getInvalid(), spanId); + + Map producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId); + String producerSpanId = getSpanId(producerSpan); + assertNotEquals(SpanId.getInvalid(), producerSpanId); + + Map consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId); + String consumerSpanId = getSpanId(consumerSpan); + assertNotEquals(SpanId.getInvalid(), consumerSpanId); + + Map methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId); + String methodCallSpanId = getSpanId(methodCallSpan); + assertNotEquals(SpanId.getInvalid(), methodCallSpanId); + } + + @Test + public void testEventBusWithObjectAndHeader() { + String body = new JsonObject() + .put("firstName", "Bob") + .put("lastName", "Morane") + .toString(); + given().contentType(ContentType.JSON).body(body) + .post("/event-bus/person2") + .then().statusCode(200) + // For some reason Multimap.toString() has \n at the end. + .body(CoreMatchers.startsWith("Hello Bob Morane, header=headerValue\n")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3); + List> spans = getSpans(); + + Map serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid()); + String spanId = getSpanId(serverCall); + assertNotEquals(SpanId.getInvalid(), spanId); + + Map producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId); + String producerSpanId = getSpanId(producerSpan); + assertNotEquals(SpanId.getInvalid(), producerSpanId); + + Map consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId); + String consumerSpanId = getSpanId(consumerSpan); + assertNotEquals(SpanId.getInvalid(), consumerSpanId); + + Map methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId); + String methodCallSpanId = getSpanId(methodCallSpan); + assertNotEquals(SpanId.getInvalid(), methodCallSpanId); + } + + @Test + public void testEventBusWithPet() { + String body = new JsonObject().put("name", "Neo").put("kind", "rabbit").toString(); + given().contentType(ContentType.JSON).body(body) + .post("/event-bus/pet") + .then().statusCode(200).body(equalTo("Hello Neo (rabbit)")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3); + List> spans = getSpans(); + + Map serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid()); + String spanId = getSpanId(serverCall); + assertNotEquals(SpanId.getInvalid(), spanId); + + Map producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId); + String producerSpanId = getSpanId(producerSpan); + assertNotEquals(SpanId.getInvalid(), producerSpanId); + + Map consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId); + String consumerSpanId = getSpanId(consumerSpan); + assertNotEquals(SpanId.getInvalid(), consumerSpanId); + + Map methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId); + String methodCallSpanId = getSpanId(methodCallSpan); + assertNotEquals(SpanId.getInvalid(), methodCallSpanId); + } +} diff --git a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java index e54ffda114384..621f0e5437e11 100644 --- a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java +++ b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java @@ -11,10 +11,8 @@ import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_SYSTEM; -import static io.restassured.RestAssured.get; import static io.restassured.RestAssured.given; import static java.net.HttpURLConnection.HTTP_OK; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.equalTo; @@ -24,22 +22,19 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.quarkus.test.junit.QuarkusTest; -import io.restassured.common.mapper.TypeRef; import io.restassured.http.ContentType; import io.vertx.core.http.HttpMethod; @QuarkusTest -class HelloRouterTest { +class HelloRouterTest extends SpanExporterBaseTest { - @AfterEach + @BeforeEach void reset() { given().get("/reset").then().statusCode(HTTP_OK); await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 0); @@ -149,49 +144,4 @@ void bus() { assertEquals(consumer.get("parentSpanId"), producer.get("spanId")); } - private String printSpans(List> spans) { - if (spans.isEmpty()) { - return "empty"; - } - return spans.stream() - .map(stringObjectMap -> stringObjectMap.get("spanId") + " - " + - stringObjectMap.get("kind") + " - " + - stringObjectMap.get("http.route") + "\n") - .collect(Collectors.joining()); - } - - private Boolean spanSize(int expected) { - List> spans = getSpans(); - int size = spans.size(); - if (size == expected) { - return true; - } else { - System.out.println("Reset but span remain: " + printSpans(spans)); - return false; - } - } - - private static List> getSpans() { - return get("/export").body().as(new TypeRef<>() { - }); - } - - private static List getMessages() { - return given().get("/bus/messages").body().as(new TypeRef<>() { - }); - } - - private static List> getSpansByKindAndParentId(List> spans, SpanKind kind, - Object parentSpanId) { - return spans.stream() - .filter(map -> map.get("kind").equals(kind.toString())) - .filter(map -> map.get("parentSpanId").equals(parentSpanId)).collect(toList()); - } - - private static Map getSpanByKindAndParentId(List> spans, SpanKind kind, - Object parentSpanId) { - List> span = getSpansByKindAndParentId(spans, kind, parentSpanId); - assertEquals(1, span.size()); - return span.get(0); - } } diff --git a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/SpanExporterBaseTest.java b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/SpanExporterBaseTest.java new file mode 100644 index 0000000000000..e216a9ef39419 --- /dev/null +++ b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/SpanExporterBaseTest.java @@ -0,0 +1,66 @@ +package io.quarkus.it.opentelemetry.vertx; + +import static io.restassured.RestAssured.get; +import static io.restassured.RestAssured.given; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.opentelemetry.api.trace.SpanKind; +import io.restassured.common.mapper.TypeRef; + +public abstract class SpanExporterBaseTest { + + String printSpans(List> spans) { + if (spans.isEmpty()) { + return "empty"; + } + return spans.stream() + .map(stringObjectMap -> stringObjectMap.get("spanId") + " -> " + stringObjectMap.get("parentSpanId") + " - " + + stringObjectMap.get("kind") + " - " + + stringObjectMap.get("http.route") + "\n") + .collect(Collectors.joining()); + } + + Boolean spanSize(int expected) { + List> spans = getSpans(); + int size = spans.size(); + if (size == expected) { + return true; + } else { + System.out.println("Reset but span remain: " + printSpans(spans)); + return false; + } + } + + static List> getSpans() { + return get("/export").body().as(new TypeRef<>() { + }); + } + + static List getMessages() { + return given().get("/bus/messages").body().as(new TypeRef<>() { + }); + } + + static List> getSpansByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + return spans.stream() + .filter(map -> map.get("kind").equals(kind.toString())) + .filter(map -> map.get("parentSpanId").equals(parentSpanId)).collect(toList()); + } + + static Map getSpanByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + List> span = getSpansByKindAndParentId(spans, kind, parentSpanId); + assertEquals(1, span.size()); + return span.get(0); + } + + static String getSpanId(Map span) { + return (String) span.get("spanId"); + } +}