Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VertxHttpClientHTTPConduit based client hangs when run asynchronously on Vert.x event loop with a body exceeding single chunk #1691

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/modules/ROOT/examples/calculator-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency><!-- Reproduce https://github.com/quarkiverse/quarkus-cxf/issues/860 -->
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-pg-client</artifactId>
</dependency>

<dependency>
<groupId>io.rest-assured</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private Object produceCxfClient(CXFClientInfo cxfClientInfo) {
throw new RuntimeException("Could not load " + RUNTIME_INITIALIZED_PROXY_MARKER_INTERFACE_NAME, e);
}
final QuarkusClientFactoryBean quarkusClientFactoryBean = new QuarkusClientFactoryBean(seiClass);
final QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, interfaces);
final QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, vertx,
interfaces);
final Map<String, Object> props = new LinkedHashMap<>();
factory.setProperties(props);
props.put(CXFClientInfo.class.getName(), cxfClientInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package io.quarkiverse.cxf;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.xml.ws.AsyncHandler;
import jakarta.xml.ws.Binding;
import jakarta.xml.ws.BindingProvider;
import jakarta.xml.ws.EndpointReference;
import jakarta.xml.ws.Response;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientFactoryBean;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.jaxws.JaxWsClientProxy;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.service.model.BindingOperationInfo;

import io.quarkus.runtime.BlockingOperationControl;
import io.vertx.core.Vertx;

public class QuarkusJaxWsProxyFactoryBean extends JaxWsProxyFactoryBean {

private final Class<?>[] additionalImplementingClasses;
private final Vertx vertx;

public QuarkusJaxWsProxyFactoryBean(ClientFactoryBean fact, Class<?>... additionalImplementingClasses) {
public QuarkusJaxWsProxyFactoryBean(ClientFactoryBean fact, Vertx vertx, Class<?>... additionalImplementingClasses) {
super(fact);
this.vertx = vertx;
this.additionalImplementingClasses = additionalImplementingClasses;
}

Expand All @@ -21,4 +46,196 @@ protected Class<?>[] getImplementingClasses() {
return result;
}

@Override
protected ClientProxy clientClientProxy(Client c) {
return new QuarkusJaxWsClientProxy(vertx, (JaxWsClientProxy) super.clientClientProxy(c));
}

public static class QuarkusJaxWsClientProxy extends ClientProxy implements BindingProvider {

private final JaxWsClientProxy delegate;
private final Vertx vertx;

public QuarkusJaxWsClientProxy(Vertx vertx, JaxWsClientProxy delegate) {
super(delegate.getClient());
this.vertx = vertx;
this.delegate = delegate;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Fixed Show fixed Hide fixed
final boolean isAsync = isAsync(method);
if (isAsync && !BlockingOperationControl.isBlockingAllowed()) {
/* We are returning a Future and we are on Vert.x event loop thread */

final CompletableFuture<Response<Object>> result = new CompletableFuture<>();

/*
* We complete the result Future using AsyncHandler because that one gets a completed Response
* whose get() method does not block - see org.apache.cxf.jaxws.JaxwsClientCallback for how
* the AsyncHandler is called
*/
final Object[] newArgs;
final int len = args.length;
if (len > 0 && args[len - 1] instanceof AsyncHandler) {
final AsyncHandler<Object> jaxWsHandler = (AsyncHandler<Object>) args[len - 1];
newArgs = new Object[len];
System.arraycopy(args, 0, newArgs, 0, len);
newArgs[len - 1] = new AsyncHandler<Object>() {
@Override
public void handleResponse(Response<Object> res) {
try {
jaxWsHandler.handleResponse(res);
} finally {
result.complete(res);
}
}
};
} else {
newArgs = new Object[len + 1];
System.arraycopy(args, 0, newArgs, 0, len);
newArgs[len] = new AsyncHandler<Object>() {
@Override
public void handleResponse(Response<Object> res) {
result.complete(res);
}
};
}

/*
* Because even the async mode of VertxHttpClientConduit may block,
* we better dispatch the invocation to a worker thread
*/
vertx.executeBlocking(new Callable<>() {
@Override
public Void call() throws Exception {
try {
delegate.invoke(proxy, method, newArgs);
return null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
throw e;
} catch (Throwable e) {
throw new Exception(e);
}
}
}).onFailure(result::completeExceptionally);
return new QuarkusJaxWsResponse<Object>(result);
}
return delegate.invoke(proxy, method, args);

}

boolean isAsync(Method m) {
return m.getName().endsWith("Async")
&& (Future.class.equals(m.getReturnType())
|| Response.class.equals(m.getReturnType()));
}

@Override
public void close() throws IOException {
Fixed Show fixed Hide fixed
delegate.close();
}

@Override
public int hashCode() {
Fixed Show fixed Hide fixed
return delegate.hashCode();
}

@Override
public Object invokeSync(Method method, BindingOperationInfo oi, Object[] params) throws Exception {
Fixed Show fixed Hide fixed
return delegate.invokeSync(method, oi, params);
}

@Override
public Map<String, Object> getRequestContext() {
Fixed Show fixed Hide fixed
return delegate.getRequestContext();
}

@Override
public Map<String, Object> getResponseContext() {
Fixed Show fixed Hide fixed
return delegate.getResponseContext();
}

@Override
public Client getClient() {
Fixed Show fixed Hide fixed
return delegate.getClient();
}

@Override
public boolean equals(Object obj) {
Fixed Show fixed Hide fixed
return delegate.equals(obj);
}

@Override
public String toString() {
Fixed Show fixed Hide fixed
return delegate.toString();
}

@Override
public Binding getBinding() {
return delegate.getBinding();
}

@Override
public EndpointReference getEndpointReference() {
return delegate.getEndpointReference();
}

@Override
public <T extends EndpointReference> T getEndpointReference(Class<T> clazz) {
return delegate.getEndpointReference(clazz);
}

static class QuarkusJaxWsResponse<T> implements Response<T> {

final CompletableFuture<Response<T>> delegate;

public QuarkusJaxWsResponse(CompletableFuture<Response<T>> delegate) {
this.delegate = delegate;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get().get();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit).get();
}

@Override
public Map<String, Object> getContext() {
try {
return delegate.get().getContext();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand Down Expand Up @@ -52,9 +51,9 @@ void init(@Observes StartupEvent start) {
}

@Path("/helloWithWsdlWithEagerInit")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> helloWithWsdlWithEagerInit(@QueryParam("person") String person) {
public Uni<String> helloWithWsdlWithEagerInit(String person) {
while (helloWithWsdlWithEagerInit == null) {
/* Spin until the client is ready */
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.quarkiverse.cxf.it.vertx.async;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import io.quarkiverse.cxf.annotation.CXFClient;
Expand All @@ -17,9 +16,9 @@ public class RestAsyncWithoutWsdl {
HelloService helloWithoutWsdl;

@Path("/helloWithoutWsdl")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> helloWithoutWsdl(@QueryParam("person") String person) {
public Uni<String> helloWithoutWsdl(String person) {
/* Without WSDL and without @Blocking should work */
return Uni.createFrom()
.future(helloWithoutWsdl.helloAsync(person))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.quarkiverse.cxf.it.vertx.async;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import io.quarkiverse.cxf.annotation.CXFClient;
Expand All @@ -18,10 +17,10 @@ public class RestAsyncWithoutWsdlWithBlocking {
HelloService helloWithoutWsdlWithBlocking;

@Path("/helloWithoutWsdlWithBlocking")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
@Blocking
public Uni<String> helloWithoutWsdlWithBlocking(@QueryParam("person") String person) {
public Uni<String> helloWithoutWsdlWithBlocking(String person) {
/* Without WSDL and with @Blocking should work */
return Uni.createFrom()
.future(helloWithoutWsdlWithBlocking.helloAsync(person))
Expand Down
Loading
Loading