Skip to content

Commit

Permalink
fix: Core returns chunks of streaming response in a wrong order #420 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
astsiapanay authored Aug 5, 2024
1 parent b42b9b1 commit 2cae280
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
11 changes: 7 additions & 4 deletions src/main/java/com/epam/aidial/core/util/BufferingReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,21 @@ private synchronized void handleChunk(Buffer chunk) {
}
if (eventStreamParser != null) {
// build chain of chunk futures: the chunks should be sent in the same order as they arrive
Future<Boolean> future = eventStreamParser.parse(chunk)
.andThen(result -> handleStreamEvent(chunk, result.result() == Boolean.TRUE, pos));
if (streamHandlerFuture == null) {
streamHandlerFuture = future;
streamHandlerFuture = parseChunk(chunk, pos);
} else {
streamHandlerFuture = streamHandlerFuture.transform(result -> future);
streamHandlerFuture = streamHandlerFuture.transform(ignore -> parseChunk(chunk, pos));
}
} else {
notifyOnChunk(chunk);
}
}

private synchronized Future<Boolean> parseChunk(Buffer chunk, int pos) {
return eventStreamParser.parse(chunk)
.andThen(result -> handleStreamEvent(chunk, result.result() == Boolean.TRUE, pos));
}

private synchronized void handleStreamEvent(Buffer chunk, boolean isLastChunk, int pos) {
if (isLastChunk) {
if (lastChunkPos == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public synchronized Future<Boolean> parse(Buffer chunk) {
if (futures == null) {
return Future.succeededFuture(lastChunk);
}
return Future.join(futures).transform(ignore -> Future.succeededFuture(lastChunk));
boolean localLastChunk = lastChunk;
return Future.join(futures).transform(ignore -> Future.succeededFuture(localLastChunk));
}

private void handleEventStage(Buffer chunk) {
Expand Down

0 comments on commit 2cae280

Please sign in to comment.