Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP client stream should handle a write to a stream which has been reset without having being allocated. #5413

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ private void endRequest(Stream s) {
* @param stream to reset
* @return whether the stream should be considered as closed
*/
private boolean reset(Stream stream) {
private Boolean reset(Stream stream) {
if (stream.reset) {
return null;
}
stream.reset = true;
if (!responses.contains(stream)) {
requests.remove(stream);
return true;
Expand All @@ -326,16 +330,20 @@ private boolean reset(Stream stream) {
return false;
}

private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> handler) {
private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> listener) {
writeToChannel(new MessageWrite() {
@Override
public void write() {
if (stream.reset) {
listener.fail("Stream reset");
return;
}
stream.request = request;
beginRequest(stream, request, chunked, buf, end, connect, handler);
beginRequest(stream, request, chunked, buf, end, connect, listener);
}
@Override
public void cancel(Throwable cause) {
handler.fail(cause);
listener.fail(cause);
}
});
}
Expand All @@ -344,6 +352,10 @@ private void writeBuffer(Stream stream, ByteBuf buff, boolean end, PromiseIntern
writeToChannel(new MessageWrite() {
@Override
public void write() {
if (stream.reset) {
listener.fail("Stream reset");
return;
}
writeBuffer(stream, buff, end, (FutureListener<Void>)listener);
}

Expand All @@ -368,7 +380,7 @@ private abstract static class Stream {
private boolean responseEnded;
private long bytesRead;
private long bytesWritten;

private boolean reset;

Stream(ContextInternal context, Promise<HttpClientStream> promise, int id) {
this.context = context;
Expand Down Expand Up @@ -404,7 +416,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {

private final Http1xClientConnection conn;
private final InboundMessageQueue<Object> queue;
private boolean reset;
private boolean closed;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
Expand All @@ -431,18 +442,16 @@ protected void handlePause() {
}
@Override
protected void handleMessage(Object item) {
if (!reset) {
if (item instanceof MultiMap) {
Handler<MultiMap> handler = endHandler;
if (handler != null) {
context.dispatch((MultiMap) item, handler);
}
} else {
Buffer buffer = (Buffer) item;
Handler<Buffer> handler = chunkHandler;
if (handler != null) {
context.dispatch(buffer, handler);
}
if (item instanceof MultiMap) {
Handler<MultiMap> handler = endHandler;
if (handler != null) {
context.dispatch((MultiMap) item, handler);
}
} else {
Buffer buffer = (Buffer) item;
Handler<Buffer> handler = chunkHandler;
if (handler != null) {
context.dispatch(buffer, handler);
}
}
}
Expand Down Expand Up @@ -584,26 +593,24 @@ public Future<Void> reset(Throwable cause) {
Promise<Void> promise = context.promise();
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
_reset(cause, promise);
reset(cause, promise);
} else {
eventLoop.execute(() -> _reset(cause, promise));
eventLoop.execute(() -> reset(cause, promise));
}
return promise.future();
}

private void _reset(Throwable cause, Promise<Void> promise) {
if (reset) {
private void reset(Throwable cause, Promise<Void> promise) {
Boolean removed = conn.reset(this);
if (removed == null) {
promise.fail("Stream already reset");
return;
}
reset = true;
boolean removed = conn.reset(this);
if (removed) {
context.execute(cause, this::handleClosed);
} else {
context.execute(cause, this::handleException);
}
promise.complete();
if (removed) {
context.execute(cause, this::handleClosed);
} else {
context.execute(cause, this::handleException);
}
promise.complete(); }
}

@Override
Expand Down Expand Up @@ -856,7 +863,9 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {
Buffer buff = BufferInternal.safeBuffer(chunk);
int len = buff.length();
stream.bytesRead += len;
stream.handleChunk(buff);
if (!stream.reset) {
stream.handleChunk(buff);
}
}

private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
Expand Down Expand Up @@ -908,7 +917,9 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
checkLifecycle();
}
lastResponseReceivedTimestamp = System.currentTimeMillis();
stream.handleEnd(trailer);
if (!stream.reset) {
stream.handleEnd(trailer);
}
if (stream.requestEnded) {
stream.handleClosed(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
private long bytesWritten;
protected boolean isConnect;
private Throwable failure;
private long reset = -1L;

VertxHttp2Stream(C conn, ContextInternal context) {
this.conn = conn;
Expand Down Expand Up @@ -126,6 +127,7 @@ void onException(Throwable cause) {
}

void onReset(long code) {
reset = code;
context.emit(code, this::handleReset);
}

Expand Down Expand Up @@ -246,6 +248,12 @@ public void cancel(Throwable cause) {
}

void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise<Void> promise) {
if (reset != -1L) {
if (promise != null) {
promise.fail("Stream reset");
}
return;
}
if (end) {
endWritten();
}
Expand Down Expand Up @@ -273,6 +281,10 @@ public void cancel(Throwable cause) {
}

void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
if (reset != -1L) {
promise.fail("Stream reset");
return;
}
ByteBuf chunk;
if (buf == null && end) {
chunk = Unpooled.EMPTY_BUFFER;
Expand All @@ -289,6 +301,9 @@ void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
}

final Future<Void> writeReset(long code) {
if (code < 0L) {
throw new IllegalArgumentException("Invalid reset code value");
}
Promise<Void> promise = context.promise();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
Expand All @@ -300,6 +315,11 @@ final Future<Void> writeReset(long code) {
}

protected void doWriteReset(long code, Promise<Void> promise) {
if (reset != -1L) {
promise.fail("Stream already reset");
return;
}
reset = code;
int streamId;
synchronized (this) {
streamId = stream != null ? stream.id() : -1;
Expand Down
14 changes: 14 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5863,6 +5863,20 @@ public void testResetClientRequestResponseInProgress() throws Exception {
await();
}

@Test
public void testResetPartialClientRequest() throws Exception {
server.requestHandler(req -> {
});
startServer(testAddress);
client.request(requestOptions).onComplete(onSuccess(req -> {
assertTrue(req.reset().succeeded());
req.end("body").onComplete(onFailure(err -> {
testComplete();
}));
}));
await();
}

@Test
public void testSimpleCookie() throws Exception {
testCookies("foo=bar", req -> {
Expand Down
Loading