diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java deleted file mode 100644 index b21eed01ccd400..00000000000000 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java +++ /dev/null @@ -1,252 +0,0 @@ -package io.quarkus.vertx.deployment; - -import static io.quarkus.vertx.deployment.VertxConstants.COMPLETION_STAGE; -import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE; -import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE_HEADERS; -import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE; -import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE_HEADERS; -import static io.quarkus.vertx.deployment.VertxConstants.UNI; -import static org.objectweb.asm.Opcodes.ACC_FINAL; -import static org.objectweb.asm.Opcodes.ACC_PRIVATE; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; - -import org.jboss.jandex.AnnotationInstance; -import org.jboss.jandex.AnnotationValue; -import org.jboss.jandex.DotName; -import org.jboss.jandex.MethodInfo; -import org.jboss.jandex.Type; - -import io.quarkus.arc.Arc; -import io.quarkus.arc.ArcContainer; -import io.quarkus.arc.InjectableBean; -import io.quarkus.arc.InstanceHandle; -import io.quarkus.arc.processor.BeanInfo; -import io.quarkus.arc.processor.BuiltinScope; -import io.quarkus.arc.processor.DotNames; -import io.quarkus.gizmo.ClassCreator; -import io.quarkus.gizmo.ClassOutput; -import io.quarkus.gizmo.FieldCreator; -import io.quarkus.gizmo.FieldDescriptor; -import io.quarkus.gizmo.MethodCreator; -import io.quarkus.gizmo.MethodDescriptor; -import io.quarkus.gizmo.ResultHandle; -import io.quarkus.runtime.util.HashUtil; -import io.quarkus.vertx.runtime.EventConsumerInvoker; -import io.smallrye.common.annotation.Blocking; -import io.smallrye.common.annotation.RunOnVirtualThread; -import io.smallrye.mutiny.Uni; -import io.vertx.core.MultiMap; -import io.vertx.core.eventbus.Message; - -class EventBusConsumer { - - private static final String INVOKER_SUFFIX = "_VertxInvoker"; - - private static final MethodDescriptor INVOKER_CONSTRUCTOR = MethodDescriptor - .ofConstructor(EventConsumerInvoker.class); - private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor - .ofMethod(Arc.class, "container", ArcContainer.class); - private static final MethodDescriptor INSTANCE_HANDLE_GET = MethodDescriptor.ofMethod(InstanceHandle.class, "get", - Object.class); - private static final MethodDescriptor ARC_CONTAINER_BEAN = MethodDescriptor.ofMethod(ArcContainer.class, "bean", - InjectableBean.class, String.class); - private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_BEAN = MethodDescriptor - .ofMethod(ArcContainer.class, - "instance", InstanceHandle.class, - InjectableBean.class); - private static final MethodDescriptor MUTINY_MESSAGE_NEW_INSTANCE = MethodDescriptor.ofMethod( - io.vertx.mutiny.core.eventbus.Message.class, - "newInstance", io.vertx.mutiny.core.eventbus.Message.class, Message.class); - private static final MethodDescriptor MUTINY_MESSAGE_HEADERS_NEW_INSTANCE = MethodDescriptor.ofMethod( - io.vertx.mutiny.core.MultiMap.class, - "newInstance", io.vertx.mutiny.core.MultiMap.class, io.vertx.core.MultiMap.class); - private static final MethodDescriptor MESSAGE_HEADERS = MethodDescriptor.ofMethod(Message.class, "headers", - io.vertx.core.MultiMap.class); - private static final MethodDescriptor MESSAGE_BODY = MethodDescriptor.ofMethod(Message.class, "body", Object.class); - private static final MethodDescriptor INSTANCE_HANDLE_DESTROY = MethodDescriptor - .ofMethod(InstanceHandle.class, "destroy", - void.class); - protected static final MethodDescriptor WHEN_COMPLETE = MethodDescriptor.ofMethod(CompletionStage.class, - "whenComplete", CompletionStage.class, BiConsumer.class); - protected static final MethodDescriptor SUBSCRIBE_AS_COMPLETION_STAGE = MethodDescriptor - .ofMethod(Uni.class, "subscribeAsCompletionStage", CompletableFuture.class); - protected static final MethodDescriptor THROWABLE_GET_MESSAGE = MethodDescriptor - .ofMethod(Throwable.class, "getMessage", String.class); - protected static final MethodDescriptor THROWABLE_TO_STRING = MethodDescriptor - .ofMethod(Throwable.class, "toString", String.class); - protected static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); - protected static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); - - static String generateInvoker(BeanInfo bean, MethodInfo method, - AnnotationInstance consumeEvent, - ClassOutput classOutput) { - - String baseName; - if (bean.getImplClazz().enclosingClass() != null) { - baseName = DotNames.simpleName(bean.getImplClazz().enclosingClass()) + "_" - + DotNames.simpleName(bean.getImplClazz()); - } else { - baseName = DotNames.simpleName(bean.getImplClazz().name()); - } - String targetPackage = DotNames.internalPackageNameWithTrailingSlash(bean.getImplClazz().name()); - - StringBuilder sigBuilder = new StringBuilder(); - sigBuilder.append(method.name()).append("_").append(method.returnType().name().toString()); - for (Type i : method.parameterTypes()) { - sigBuilder.append(i.name().toString()); - } - String generatedName = targetPackage + baseName + INVOKER_SUFFIX + "_" + method.name() + "_" - + HashUtil.sha1(sigBuilder.toString()); - - boolean blocking, runOnVirtualThread; - AnnotationValue blockingValue = consumeEvent.value("blocking"); - blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean()); - runOnVirtualThread = method.hasAnnotation(RUN_ON_VIRTUAL_THREAD); - - ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName) - .superClass(EventConsumerInvoker.class).build(); - - // Initialized state - FieldCreator beanField = invokerCreator.getFieldCreator("bean", InjectableBean.class) - .setModifiers(ACC_PRIVATE | ACC_FINAL); - FieldCreator containerField = invokerCreator.getFieldCreator("container", ArcContainer.class) - .setModifiers(ACC_PRIVATE | ACC_FINAL); - - if (blocking || runOnVirtualThread) { - MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class); - isBlocking.returnValue(isBlocking.load(true)); - } - - if (runOnVirtualThread) { - MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class); - isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true)); - } - - AnnotationValue orderedValue = consumeEvent.value("ordered"); - boolean ordered = orderedValue != null && orderedValue.asBoolean(); - if (ordered) { - MethodCreator isOrdered = invokerCreator.getMethodCreator("isOrdered", boolean.class); - isOrdered.returnValue(isOrdered.load(true)); - } - - implementConstructor(bean, invokerCreator, beanField, containerField); - implementInvoke(bean, method, invokerCreator, beanField.getFieldDescriptor(), containerField.getFieldDescriptor()); - - invokerCreator.close(); - return generatedName.replace('/', '.'); - } - - static void implementConstructor(BeanInfo bean, ClassCreator invokerCreator, FieldCreator beanField, - FieldCreator containerField) { - MethodCreator constructor = invokerCreator.getMethodCreator("", void.class); - // Invoke super() - constructor.invokeSpecialMethod(INVOKER_CONSTRUCTOR, constructor.getThis()); - - ResultHandle containerHandle = constructor - .invokeStaticMethod(ARC_CONTAINER); - ResultHandle beanHandle = constructor.invokeInterfaceMethod( - ARC_CONTAINER_BEAN, - containerHandle, constructor.load(bean.getIdentifier())); - constructor.writeInstanceField(beanField.getFieldDescriptor(), constructor.getThis(), beanHandle); - constructor.writeInstanceField(containerField.getFieldDescriptor(), constructor.getThis(), containerHandle); - constructor.returnValue(null); - } - - private static void implementInvoke(BeanInfo bean, MethodInfo method, ClassCreator invokerCreator, - FieldDescriptor beanField, - FieldDescriptor containerField) { - - // The method descriptor is: CompletionStage invokeBean(Message message) - MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", Object.class, Message.class) - .addException(Exception.class); - - ResultHandle containerHandle = invoke.readInstanceField(containerField, invoke.getThis()); - ResultHandle beanHandle = invoke.readInstanceField(beanField, invoke.getThis()); - ResultHandle instanceHandle = invoke.invokeInterfaceMethod(ARC_CONTAINER_INSTANCE_FOR_BEAN, containerHandle, - beanHandle); - ResultHandle beanInstanceHandle = invoke - .invokeInterfaceMethod(INSTANCE_HANDLE_GET, instanceHandle); - ResultHandle messageHandle = invoke.getMethodParam(0); - ResultHandle result; - - // Determine if the method is consuming a Message, body, or headers + body. - // https://github.com/quarkusio/quarkus/issues/21621 - Optional headerType = Optional.empty(); - Type paramType; - if (method.parametersCount() == 2) { - Type firstParamType = method.parameterType(0); - if (VertxConstants.isMessageHeaders(firstParamType.name())) { - headerType = Optional.of(firstParamType); - } - paramType = method.parameterType(1); - } else { - paramType = method.parameterType(0); - } - if (paramType.name().equals(MESSAGE)) { - // io.vertx.core.eventbus.Message - invoke.invokeVirtualMethod( - MethodDescriptor - .ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, Message.class), - beanInstanceHandle, messageHandle); - result = invoke.loadNull(); - } else if (paramType.name().equals(MUTINY_MESSAGE)) { - // io.vertx.mutiny.core.eventbus.Message - ResultHandle mutinyMessageHandle = invoke.invokeStaticMethod(MUTINY_MESSAGE_NEW_INSTANCE, messageHandle); - invoke.invokeVirtualMethod( - MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, - io.vertx.mutiny.core.eventbus.Message.class), - beanInstanceHandle, mutinyMessageHandle); - result = invoke.loadNull(); - } else { - // Parameter is payload - ResultHandle bodyHandle = invoke.invokeInterfaceMethod(MESSAGE_BODY, messageHandle); - ResultHandle returnHandle; - if (headerType.isPresent()) { - ResultHandle headerHandle = invoke.invokeInterfaceMethod(MESSAGE_HEADERS, messageHandle); - // If the method expects Mutiny MultiMap, wrap the Vertx MultiMap. - Type header = headerType.get(); - if (header.name().equals(MUTINY_MESSAGE_HEADERS)) { - headerHandle = invoke.invokeStaticMethod(MUTINY_MESSAGE_HEADERS_NEW_INSTANCE, headerHandle); - } - returnHandle = invoke.invokeVirtualMethod( - MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), - method.returnType().name().toString(), headerType.get().name().toString(), - paramType.name().toString()), - beanInstanceHandle, headerHandle, bodyHandle); - } else { - returnHandle = invoke.invokeVirtualMethod( - MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), - method.returnType().name().toString(), paramType.name().toString()), - beanInstanceHandle, bodyHandle); - } - if (returnHandle != null) { - if (method.returnType().name().equals(COMPLETION_STAGE)) { - result = returnHandle; - } else if (method.returnType().name().equals(UNI)) { - result = invoke.invokeInterfaceMethod(SUBSCRIBE_AS_COMPLETION_STAGE, - returnHandle); - } else { - // Message.reply(returnValue) - result = returnHandle; - } - } else { - result = invoke.loadNull(); - } - } - - // handle.destroy() - destroy dependent instance afterwards - if (BuiltinScope.DEPENDENT.is(bean.getScope())) { - invoke.invokeInterfaceMethod(INSTANCE_HANDLE_DESTROY, instanceHandle); - } - - invoke.returnValue(result); - } - - private EventBusConsumer() { - // Avoid direct instantiation. - } -} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventConsumerBusinessMethodItem.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventConsumerBusinessMethodItem.java index 5b52c3d697ceaa..c5e04b66a245c3 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventConsumerBusinessMethodItem.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventConsumerBusinessMethodItem.java @@ -1,33 +1,76 @@ package io.quarkus.vertx.deployment; import org.jboss.jandex.AnnotationInstance; -import org.jboss.jandex.MethodInfo; import io.quarkus.arc.processor.BeanInfo; +import io.quarkus.arc.processor.InvokerInfo; import io.quarkus.builder.item.MultiBuildItem; public final class EventConsumerBusinessMethodItem extends MultiBuildItem { private final BeanInfo bean; private final AnnotationInstance consumeEvent; - private final MethodInfo method; + private final boolean blockingAnnotation; + private final boolean runOnVirtualThreadAnnotation; + private final boolean splitHeadersBodyParams; + private final InvokerInfo invoker; - public EventConsumerBusinessMethodItem(BeanInfo bean, MethodInfo method, AnnotationInstance consumeEvent) { + public EventConsumerBusinessMethodItem(BeanInfo bean, AnnotationInstance consumeEvent, boolean blockingAnnotation, + boolean runOnVirtualThreadAnnotation, boolean splitHeadersBodyParams, InvokerInfo invoker) { this.bean = bean; - this.method = method; this.consumeEvent = consumeEvent; + this.blockingAnnotation = blockingAnnotation; + this.runOnVirtualThreadAnnotation = runOnVirtualThreadAnnotation; + this.splitHeadersBodyParams = splitHeadersBodyParams; + this.invoker = invoker; } + /** + * Returns the bean that declares this event consumer method. + */ public BeanInfo getBean() { return bean; } - public MethodInfo getMethod() { - return method; - } - + /** + * Returns the {@link io.quarkus.vertx.ConsumeEvent} annotation declared + * on this event consumer method. + */ public AnnotationInstance getConsumeEvent() { return consumeEvent; } + /** + * Returns whether this event consumer method declares + * the {@link io.smallrye.common.annotation.Blocking} annotation. + */ + public boolean isBlockingAnnotation() { + return blockingAnnotation; + } + + /** + * Returns whether this event consumer method declares + * the {@link io.smallrye.common.annotation.RunOnVirtualThread} annotation. + */ + public boolean isRunOnVirtualThreadAnnotation() { + return runOnVirtualThreadAnnotation; + } + + /** + * Returns whether this event consumer method declares 2 parameters, + * where the first is the event headers and the second is the event body. + * In this case, the {@link io.quarkus.vertx.runtime.EventConsumerInvoker} + * has to split the headers and body parameters explicitly. + */ + public boolean isSplitHeadersBodyParams() { + return splitHeadersBodyParams; + } + + /** + * Returns the {@linkplain InvokerInfo invoker} for this event consumer method. + */ + public InvokerInfo getInvoker() { + return invoker; + } + } diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 9bd62eba808f27..28592e9d9839fe 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -1,6 +1,10 @@ package io.quarkus.vertx.deployment; import static io.quarkus.vertx.deployment.VertxConstants.CONSUME_EVENT; +import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE; +import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE; +import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE_HEADERS; +import static io.quarkus.vertx.deployment.VertxConstants.UNI; import static io.quarkus.vertx.deployment.VertxConstants.isMessage; import static io.quarkus.vertx.deployment.VertxConstants.isMessageHeaders; @@ -22,12 +26,15 @@ import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem.BeanConfiguratorBuildItem; import io.quarkus.arc.deployment.CurrentContextFactoryBuildItem; +import io.quarkus.arc.deployment.InvokerFactoryBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion; import io.quarkus.arc.processor.AnnotationStore; import io.quarkus.arc.processor.BeanInfo; import io.quarkus.arc.processor.BuildExtension; import io.quarkus.arc.processor.BuiltinScope; +import io.quarkus.arc.processor.InvokerBuilder; +import io.quarkus.arc.processor.InvokerInfo; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.Capabilities; import io.quarkus.deployment.Capability; @@ -48,12 +55,16 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; +import io.quarkus.deployment.recording.RecorderContext; import io.quarkus.gizmo.ClassOutput; import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; +import io.quarkus.vertx.runtime.EventConsumerInfo; import io.quarkus.vertx.runtime.VertxEventBusConsumerRecorder; import io.quarkus.vertx.runtime.VertxProducer; +import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.mutiny.Uni; class VertxProcessor { @@ -75,18 +86,19 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec List messageConsumerBusinessMethods, BuildProducer generatedClass, AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown, - BuildProducer serviceStart, BuildProducer reflectiveClass, - List codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes) { - Map messageConsumerConfigurations = new HashMap<>(); + BuildProducer serviceStart, + List codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes, + RecorderContext recorderContext) { + List messageConsumerConfigurations = new ArrayList<>(); ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true); for (EventConsumerBusinessMethodItem businessMethod : messageConsumerBusinessMethods) { - String invokerClass = EventBusConsumer.generateInvoker(businessMethod.getBean(), businessMethod.getMethod(), - businessMethod.getConsumeEvent(), classOutput); - messageConsumerConfigurations.put(invokerClass, - annotationProxy.builder(businessMethod.getConsumeEvent(), ConsumeEvent.class) - .withDefaultValue("value", businessMethod.getBean().getBeanClass().toString()) - .build(classOutput)); - reflectiveClass.produce(ReflectiveClassBuildItem.builder(invokerClass).build()); + ConsumeEvent annotation = annotationProxy.builder(businessMethod.getConsumeEvent(), ConsumeEvent.class) + .withDefaultValue("value", businessMethod.getBean().getBeanClass().toString()) + .build(classOutput); + + messageConsumerConfigurations.add(new EventConsumerInfo(annotation, businessMethod.isBlockingAnnotation(), + businessMethod.isRunOnVirtualThreadAnnotation(), businessMethod.isSplitHeadersBodyParams(), + recorderContext.newInstance(businessMethod.getInvoker().getClassName()))); } ClassLoader tccl = Thread.currentThread().getContextClassLoader(); @@ -124,6 +136,7 @@ public UnremovableBeanBuildItem unremovableBeans() { @BuildStep void collectEventConsumers( BeanRegistrationPhaseBuildItem beanRegistrationPhase, + InvokerFactoryBuildItem invokerFactory, BuildProducer messageConsumerBusinessMethods, BuildProducer errors) { // We need to collect all business methods annotated with @ConsumeEvent first @@ -164,8 +177,33 @@ void collectEventConsumers( "An event consumer business method that cannot use @RunOnVirtualThread and set the ordered attribute to true [method: %s, bean:%s]", method, bean)); } - messageConsumerBusinessMethods - .produce(new EventConsumerBusinessMethodItem(bean, method, consumeEvent)); + + InvokerBuilder builder = invokerFactory.createInvoker(bean, method) + .withInstanceLookup(); + + if (method.parametersCount() == 1 && method.parameterType(0).name().equals(MESSAGE)) { + // io.vertx.core.eventbus.Message + // no transformation required + } else if (method.parametersCount() == 1 && method.parameterType(0).name().equals(MUTINY_MESSAGE)) { + // io.vertx.mutiny.core.eventbus.Message + builder.withArgumentTransformer(0, io.vertx.mutiny.core.eventbus.Message.class, "newInstance"); + } else if (method.parametersCount() == 1) { + // parameter is payload + builder.withArgumentTransformer(0, io.vertx.core.eventbus.Message.class, "body"); + } else if (method.parametersCount() == 2 && method.parameterType(0).name().equals(MUTINY_MESSAGE_HEADERS)) { + // if the method expects Mutiny MultiMap, wrap the Vert.x MultiMap + builder.withArgumentTransformer(0, io.vertx.mutiny.core.MultiMap.class, "newInstance"); + } + + if (method.returnType().name().equals(UNI)) { + builder.withReturnValueTransformer(Uni.class, "subscribeAsCompletionStage"); + } + + InvokerInfo invoker = builder.build(); + + messageConsumerBusinessMethods.produce(new EventConsumerBusinessMethodItem(bean, consumeEvent, + method.hasAnnotation(Blocking.class), method.hasAnnotation(RunOnVirtualThread.class), + params.size() == 2, invoker)); LOGGER.debugf("Found event consumer business method %s declared on %s", method, bean); } } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInfo.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInfo.java new file mode 100644 index 00000000000000..eaa27be6257bae --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInfo.java @@ -0,0 +1,49 @@ +package io.quarkus.vertx.runtime; + +import jakarta.enterprise.invoke.Invoker; + +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.annotations.RecordableConstructor; +import io.quarkus.vertx.ConsumeEvent; + +public class EventConsumerInfo { + /** + * The {@link ConsumeEvent} annotation declared on the event consumer method. + */ + public final ConsumeEvent annotation; + + /** + * Whether the {@link io.smallrye.common.annotation.Blocking} annotation + * was declared on the event consumer method. + */ + public final boolean blockingAnnotation; + + /** + * Whether the {@link io.smallrye.common.annotation.RunOnVirtualThread} annotation + * was declared on the event consumer method. + */ + public final boolean runOnVirtualThreadAnnotation; + + /** + * Whether the event consumer method declares 2 parameters, where the first + * is the event headers and the second is the event body. In this case, + * the {@link io.quarkus.vertx.runtime.EventConsumerInvoker} has to split + * the headers and body parameters explicitly. + */ + public final boolean splitHeadersBodyParams; + + /** + * The {@linkplain Invoker invoker} for the event consumer method. + */ + public final RuntimeValue> invoker; + + @RecordableConstructor + public EventConsumerInfo(ConsumeEvent annotation, boolean blockingAnnotation, boolean runOnVirtualThreadAnnotation, + boolean splitHeadersBodyParams, RuntimeValue> invoker) { + this.annotation = annotation; + this.blockingAnnotation = blockingAnnotation; + this.runOnVirtualThreadAnnotation = runOnVirtualThreadAnnotation; + this.splitHeadersBodyParams = splitHeadersBodyParams; + this.invoker = invoker; + } +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java index 24b8f18ef01665..0be28b64f5cc76 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java @@ -3,6 +3,8 @@ import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import jakarta.enterprise.invoke.Invoker; + import io.quarkus.arc.Arc; import io.quarkus.arc.InjectableContext.ContextState; import io.quarkus.arc.ManagedContext; @@ -12,18 +14,22 @@ /** * Invokes a business method annotated with {@link ConsumeEvent}. */ -public abstract class EventConsumerInvoker { - - public boolean isBlocking() { - return false; - } - - public boolean isRunningOnVirtualThread() { - return false; - } - - public boolean isOrdered() { - return false; +public class EventConsumerInvoker { + /** + * The {@linkplain Invoker invoker} for the event consumer method. + */ + private final Invoker invoker; + + /** + * Whether this event consumer method declares 2 parameters, where the first + * is the event headers and the second is the event body. In this case, + * we have to split the headers and body parameters explicitly. + */ + private final boolean splitHeadersBodyParams; + + public EventConsumerInvoker(Invoker invoker, boolean splitHeadersBodyParams) { + this.invoker = invoker; + this.splitHeadersBodyParams = splitHeadersBodyParams; } public void invoke(Message message) throws Exception { @@ -66,7 +72,13 @@ public void invoke(Message message) throws Exception { } } - protected abstract Object invokeBean(Message message) throws Exception; + private Object invokeBean(Message message) throws Exception { + if (splitHeadersBodyParams) { + return invoker.invoke(null, new Object[] { message.headers(), message.body() }); + } else { + return invoker.invoke(null, new Object[] { message }); + } + } private static class RequestActiveConsumer implements BiConsumer { diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 0eaff963ff578e..145e34e844bd8c 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.Callable; @@ -54,7 +53,8 @@ public class VertxEventBusConsumerRecorder { static volatile Vertx vertx; static volatile List> messageConsumers; - public void configureVertx(Supplier vertx, Map messageConsumerConfigurations, + public void configureVertx(Supplier vertx, + List messageConsumerConfigurations, LaunchMode launchMode, ShutdownContext shutdown, Map, Class> codecByClass, List> selectorTypes) { VertxEventBusConsumerRecorder.vertx = vertx.get(); @@ -93,15 +93,19 @@ void destroy() { vertx = null; } - void registerMessageConsumers(Map messageConsumerConfigurations) { + void registerMessageConsumers(List messageConsumerConfigurations) { if (!messageConsumerConfigurations.isEmpty()) { EventBus eventBus = vertx.eventBus(); VertxInternal vi = (VertxInternal) VertxEventBusConsumerRecorder.vertx; CountDownLatch latch = new CountDownLatch(messageConsumerConfigurations.size()); final List registrationFailures = new ArrayList<>(); - for (Entry entry : messageConsumerConfigurations.entrySet()) { - EventConsumerInvoker invoker = createInvoker(entry.getKey()); - String address = lookUpPropertyValue(entry.getValue().value()); + for (EventConsumerInfo info : messageConsumerConfigurations) { + EventConsumerInvoker invoker = new EventConsumerInvoker(info.invoker.getValue(), info.splitHeadersBodyParams); + String address = lookUpPropertyValue(info.annotation.value()); + boolean local = info.annotation.local(); + boolean blocking = info.annotation.blocking() || info.blockingAnnotation || info.runOnVirtualThreadAnnotation; + boolean runOnVirtualThread = info.runOnVirtualThreadAnnotation; + boolean ordered = info.annotation.ordered(); // Create a context attached to each consumer // If we don't all consumers will use the same event loop and so published messages (dispatched to all // consumers) delivery is serialized. @@ -110,7 +114,7 @@ void registerMessageConsumers(Map messageConsumerConfigura @Override public void handle(Void x) { MessageConsumer consumer; - if (entry.getValue().local()) { + if (local) { consumer = eventBus.localConsumer(address); } else { consumer = eventBus.consumer(address); @@ -119,12 +123,12 @@ public void handle(Void x) { consumer.handler(new Handler>() { @Override public void handle(Message m) { - if (invoker.isBlocking()) { + if (blocking) { // We need to create a duplicated context from the "context" Context dup = VertxContext.getOrCreateDuplicatedContext(context); setContextSafe(dup, true); - if (invoker.isRunningOnVirtualThread()) { + if (runOnVirtualThread) { // Switch to a Vert.x context to capture it and use it during the invocation. dup.runOnContext(new Handler() { @Override @@ -162,7 +166,7 @@ public Void call() { } return null; } - }, invoker.isOrdered()); + }, ordered); future.onFailure(context::reportException); } } else { @@ -240,22 +244,6 @@ void unregisterMessageConsumers() { messageConsumers.clear(); } - @SuppressWarnings("unchecked") - private EventConsumerInvoker createInvoker(String invokerClassName) { - try { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - if (cl == null) { - cl = VertxProducer.class.getClassLoader(); - } - Class invokerClazz = (Class) cl - .loadClass(invokerClassName); - return invokerClazz.getDeclaredConstructor().newInstance(); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException - | InvocationTargetException e) { - throw new IllegalStateException("Unable to create invoker: " + invokerClassName, e); - } - } - @SuppressWarnings("unchecked") private void registerCodecs(Map, Class> codecByClass, List> selectorTypes) { EventBus eventBus = vertx.eventBus();