From 7fef6997e8d30a3c4720133288cc74d4c20d0436 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 2 Dec 2024 10:25:01 +0100 Subject: [PATCH] HTTP client stream should handle a write to a stream which has been reset without having being allocated. Motivation: Vert.x HTTP client stream does not allocate a stream when the stream has been reset by the application before its allocation. When such stream is being written, the stream behaves normally and fails since the internal state is not correct. Changes: Record the reset state of a stream and guard against writes in such case. --- .../http/impl/Http1xClientConnection.java | 30 ++++++++++++++++--- .../core/http/impl/Http2ClientConnection.java | 6 ++++ .../core/http/impl/VertxHttp2Stream.java | 21 +++++++++++++ .../java/io/vertx/core/http/HttpTest.java | 14 +++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 3f7a3141cad..e42e6c081db 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -395,7 +395,7 @@ private static class StreamImpl extends Stream implements HttpClientStream { private final Http1xClientConnection conn; private final InboundBuffer queue; - private boolean reset; + private Throwable reset; private boolean closed; private Handler headHandler; private Handler chunkHandler; @@ -414,7 +414,7 @@ private static class StreamImpl extends Stream implements HttpClientStream { this.conn = conn; this.queue = new InboundBuffer<>(context, 5) .handler(item -> { - if (!reset) { + if (reset == null) { if (item instanceof MultiMap) { Handler handler = endHandler; if (handler != null) { @@ -526,7 +526,22 @@ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boo writeHead(request, chunked, buf, end, connect, handler == null ? null : context.promise(handler)); } + private boolean checkReset(Handler> handler) { + Throwable reset; + synchronized (this) { + reset = this.reset; + if (reset == null) { + return false; + } + } + handler.handle(context.failedFuture(reset)); + return true; + } + private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler> handler) { + if (checkReset(handler)) { + return; + } EventLoop eventLoop = conn.context.nettyEventLoop(); synchronized (this) { if (shouldQueue(eventLoop)) { @@ -537,12 +552,19 @@ private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, bo return; } } + if (reset != null) { + handler.handle(context.failedFuture(reset)); + return; + } ((Stream)this).request = request; conn.beginRequest(this, request, chunked, buf, end, connect, handler); } @Override public void writeBuffer(ByteBuf buff, boolean end, Handler> handler) { + if (checkReset(handler)) { + return; + } if (buff != null || end) { FutureListener listener = handler == null ? null : context.promise(handler); writeBuffer(buff, end, listener); @@ -636,10 +658,10 @@ public void doFetch(long amount) { @Override public void reset(Throwable cause) { synchronized (conn) { - if (reset) { + if (reset != null) { return; } - reset = true; + reset = Objects.requireNonNull(cause); } EventLoop eventLoop = conn.context.nettyEventLoop(); if (eventLoop.inEventLoop()) { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 9c94eba3129..e9e06712c56 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -546,6 +546,9 @@ void handleException(Throwable exception) { @Override public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler> handler) { + if (checkReset(handler)) { + return; + } priority(priority); ContextInternal ctx = conn.getContext(); EventLoop eventLoop = ctx.nettyEventLoop(); @@ -629,6 +632,9 @@ private void createStream(HttpRequestHead head, Http2Headers headers) throws Htt @Override public void writeBuffer(ByteBuf buf, boolean end, Handler> listener) { + if (checkReset(listener)) { + return; + } if (buf != null) { int size = buf.readableBytes(); synchronized (this) { diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 05e9e573b92..2d7611e58ea 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -24,6 +24,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpFrame; import io.vertx.core.http.StreamPriority; +import io.vertx.core.http.StreamResetException; import io.vertx.core.http.impl.headers.Http2HeadersAdaptor; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -49,6 +50,7 @@ abstract class VertxHttp2Stream { private long bytesWritten; private int writeInProgress = 0; protected boolean isConnect; + private long reset = -1L; VertxHttp2Stream(C conn, ContextInternal context) { this.conn = conn; @@ -178,7 +180,22 @@ private void doWriteFrame(int type, int flags, ByteBuf payload) { conn.handler.writeFrame(stream, (byte) type, (short) flags, payload); } + protected final boolean checkReset(Handler> handler) { + long reset; + synchronized (this) { + reset = this.reset; + if (reset == -1L) { + return false; + } + } + handler.handle(context.failedFuture(new StreamResetException(reset))); + return true; + } + final void writeHeaders(Http2Headers headers, boolean end, boolean checkFlush, Handler> handler) { + if (checkReset(handler)) { + return; + } EventLoop eventLoop = conn.getContext().nettyEventLoop(); synchronized (this) { if (shouldQueue(eventLoop)) { @@ -205,6 +222,9 @@ private void writePriorityFrame(StreamPriority priority) { } final void writeData(ByteBuf chunk, boolean end, Handler> handler) { + if (checkReset(handler)) { + return; + } ContextInternal ctx = conn.getContext(); EventLoop eventLoop = ctx.nettyEventLoop(); synchronized (this) { @@ -260,6 +280,7 @@ protected void doWriteReset(long code) { int streamId; synchronized (this) { streamId = stream != null ? stream.id() : -1; + reset = code; } if (streamId != -1) { conn.handler.writeReset(streamId, code); diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index b2dcf9ec636..3c8a9f45b7c 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -5854,6 +5854,20 @@ public void testResetClientRequestResponseInProgress() throws Exception { await(); } + @Test + public void testResetPartialClientRequest() throws Exception { + server.requestHandler(req -> { + }); + startServer(testAddress); + client.request(requestOptions).onComplete(onSuccess(req -> { + assertTrue(req.reset()); + req.end("body").onComplete(onFailure(err -> { + testComplete(); + })); + })); + await(); + } + @Test public void testSimpleCookie() throws Exception { testCookies("foo=bar", req -> {