Skip to content

Commit

Permalink
Web client http context lifecycle cleanup
Browse files Browse the repository at this point in the history
Motivation:

The web client implementation `HttpContext` has been designed when Vert.x was still using handler of async results. Since the update to using the future model mainstream the implementation although it has been updated to use futures has not been fully updated and the internal still use artificial promises to connect the flow between the various execution phases of the context.

This rewrites partly the implementation of `HttpContext` to remove un-necessary logic (e.g. the creation of intermediary promises) and simplify the implementation to make it easier to reason about the flow of execution phases.

In addition the body codec SPI has been slightly simplified (transformed an un-necessary handler async result into a synchronous method.
  • Loading branch information
vietj authored Jan 21, 2025
2 parents 29177db + 079f6b8 commit e0268e7
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 136 deletions.
6 changes: 6 additions & 0 deletions vertx-web-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
<artifactId>vertx-auth-htdigest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClientOptions;
Expand Down Expand Up @@ -59,7 +58,6 @@ public class HttpContext<T> {
private RequestOptions requestOptions;
private HttpClientRequest clientRequest;
private HttpClientResponse clientResponse;
private Promise<HttpClientRequest> requestPromise;
private HttpResponse<T> response;
private Throwable failure;
private int redirects;
Expand Down Expand Up @@ -120,7 +118,7 @@ public RequestOptions requestOptions() {
return requestOptions;
}

public void setRequestOptions(RequestOptions requestOptions) {
public void requestOptions(RequestOptions requestOptions) {
this.requestOptions = requestOptions;
}

Expand Down Expand Up @@ -178,7 +176,7 @@ public Throwable failure() {
/**
* @return all traced redirects
*/
public List<String> getRedirectedLocations() {
public List<String> redirectedLocations() {
return redirectedLocations;
}

Expand Down Expand Up @@ -402,6 +400,12 @@ private void handleFailure() {
clientRequest = null;
req.reset();
}
if (body != null) {
if (body instanceof Pipe) {
((Pipe<?>)body).close();
}
body = null;
}
promise.tryFail(failure);
}

Expand All @@ -426,145 +430,113 @@ private void handlePrepareRequest() {
contentType = prev;
}
}
if (body instanceof Pipe) {
//
} else if (body instanceof MultipartForm) {
MultipartFormUpload multipartForm;
try {
boolean multipart = "multipart/form-data".equals(contentType);
HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed() ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5;
multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode);
this.body = multipartForm.pipe();
} catch (Exception e) {
fail(e);
return;
}
for (Map.Entry<String, String> header : multipartForm.headers()) {
requestOptions.putHeader(header.getKey(), header.getValue());
}
} else if (body == null && "application/json".equals(contentType)) {
body = Buffer.buffer("null");
} else if (body instanceof JsonObject) {
body = ((JsonObject) body).toBuffer();
} else if (body != null && !(body instanceof Buffer)) {
body = Json.encodeToBuffer(body);
}

if (body instanceof Buffer) {
Buffer buffer = (Buffer) body;
requestOptions.putHeader(HttpHeaders.CONTENT_LENGTH, "" + buffer.length());
}

createRequest(requestOptions);
}

private void handleCreateRequest() {
requestPromise = context.promise();
if (body != null || "application/json".equals(contentType)) {
if (body instanceof MultipartForm) {
MultipartFormUpload multipartForm;
try {
boolean multipart = "multipart/form-data".equals(contentType);
HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed() ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5;
multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode);
this.body = multipartForm;
} catch (Exception e) {
fail(e);
return;
}
for (Map.Entry<String, String> header : multipartForm.headers()) {
requestOptions.putHeader(header.getKey(), header.getValue());
}
}
if (body instanceof ReadStream<?>) {
ReadStream<Buffer> stream = (ReadStream<Buffer>) body;
Pipe<Buffer> pipe = stream.pipe(); // Shouldn't this be called in an earlier phase ?
requestPromise.future().onComplete(ar -> {
if (ar.succeeded()) {
HttpClientRequest req = ar.result();
if (this.request.headers == null || !this.request.headers.contains(HttpHeaders.CONTENT_LENGTH)) {
req.setChunked(true);
}
pipe.endOnFailure(false);
pipe.to(req).onComplete(ar2 -> {
clientRequest = null;
if (ar2.failed()) {
req.reset(0L, ar2.cause());
}
});
if (body instanceof MultipartFormUpload) {
((MultipartFormUpload) body).pump();
}
} else {
// Test this
clientRequest = null;
pipe.close();
}
});
} else {
Buffer buffer;
if (body instanceof Buffer) {
buffer = (Buffer) body;
} else if (body instanceof JsonObject) {
buffer = ((JsonObject) body).toBuffer();
} else {
buffer = Json.encodeToBuffer(body);
}
requestOptions.putHeader(HttpHeaders.CONTENT_LENGTH, "" + buffer.length());
requestPromise.future().onSuccess(request -> {
clientRequest = null;
request.end(buffer);
});
}
} else {
requestPromise.future().onSuccess(request -> {
clientRequest = null;
request.end();
});
}
client.request(requestOptions)
.onComplete(ar1 -> {
if (ar1.succeeded()) {
sendRequest(ar1.result());
} else {
fail(ar1.cause());
requestPromise.fail(ar1.cause());
}
});
}

private void handleReceiveResponse() {
BodyStream<T> stream;
try {
stream = request.bodyCodec().stream();
} catch (Exception e) {
fail(e);
return;
}
HttpClientResponse resp = clientResponse;
Context context = Vertx.currentContext();
Promise<HttpResponse<T>> promise = Promise.promise();
promise.future().onComplete(r -> {
// We are running on a context (the HTTP client mandates it)
context.runOnContext(v -> {
if (r.succeeded()) {
dispatchResponse(r.result());
resp
.pipeTo(stream)
.compose(v -> stream.result())
.map(result -> new HttpResponseImpl<>(
resp.version(),
resp.statusCode(),
resp.statusMessage(),
resp.headers(),
resp.trailers(),
resp.cookies(),
result,
redirectedLocations
)).onComplete(ar -> {
if (ar.succeeded()) {
dispatchResponse(ar.result());
} else {
fail(r.cause());
fail(ar.cause());
}
});
});
resp.exceptionHandler(err -> {
if (!promise.future().isComplete()) {
promise.fail(err);
}
});
Pipe<Buffer> pipe = resp.pipe();
request.bodyCodec().create(ar1 -> {
if (ar1.succeeded()) {
BodyStream<T> stream = ar1.result();
pipe.to(stream).onComplete(ar2 -> {
if (ar2.succeeded()) {
stream.result().onComplete(ar3 -> {
if (ar3.succeeded()) {
promise.complete(new HttpResponseImpl<>(
resp.version(),
resp.statusCode(),
resp.statusMessage(),
resp.headers(),
resp.trailers(),
resp.cookies(),
stream.result().result(),
redirectedLocations
));
} else {
promise.fail(ar3.cause());
}
});
} else {
promise.fail(ar2.cause());
}
});
} else {
pipe.close();
fail(ar1.cause());
}
});
}

private void handleSendRequest() {
clientRequest.response().onComplete(ar -> {
clientRequest = null;
if (ar.succeeded()) {
receiveResponse(ar.result().pause());
} else {
fail(ar.cause());
}
});
requestPromise.complete(clientRequest);
doSendRequest(clientRequest);
}

private void doSendRequest(HttpClientRequest request) {
Object bodyToSend = body;
if (bodyToSend != null) {
body = null;
if (bodyToSend instanceof Pipe) {
Pipe<Buffer> pipe = (Pipe<Buffer>) bodyToSend;
if (this.request.headers == null || !this.request.headers.contains(HttpHeaders.CONTENT_LENGTH)) {
request.setChunked(true);
}
pipe.endOnFailure(false);
pipe.to(request).onComplete(ar2 -> {
if (ar2.failed()) {
request.reset(0L, ar2.cause());
}
});
} else {
Buffer buffer = (Buffer) bodyToSend;
request.send(buffer);
}
} else {
request.send();
}
}

public <T> T get(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public boolean multipartMixed() {

@Override
public Future<HttpResponse<T>> sendStream(ReadStream<Buffer> body) {
return send(null, body);
return send(null, body.pipe());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.*;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.internal.http.HttpHeadersInternal;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.multipart.FormDataPart;
import io.vertx.ext.web.multipart.MultipartForm;

Expand Down Expand Up @@ -230,4 +233,36 @@ public synchronized MultipartFormUpload endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}

@Override
public Pipe<Buffer> pipe() {
Pipe<Buffer> pipe = ReadStream.super.pipe();
return new Pipe<>() {
@Override
public Pipe<Buffer> endOnFailure(boolean end) {
pipe.endOnFailure(end);
return this;
}
@Override
public Pipe<Buffer> endOnSuccess(boolean end) {
pipe.endOnSuccess(end);
return this;
}
@Override
public Pipe<Buffer> endOnComplete(boolean end) {
pipe.endOnComplete(end);
return this;
}
@Override
public Future<Void> to(WriteStream<Buffer> dst) {
Future<Void> f = pipe.to(dst);
pump();
return f;
}
@Override
public void close() {
pipe.close();
}
};
}
}
Loading

0 comments on commit e0268e7

Please sign in to comment.