diff --git a/src/main/java/com/epam/aidial/core/util/BufferingReadStream.java b/src/main/java/com/epam/aidial/core/util/BufferingReadStream.java index 4e84a868c..ee7421215 100644 --- a/src/main/java/com/epam/aidial/core/util/BufferingReadStream.java +++ b/src/main/java/com/epam/aidial/core/util/BufferingReadStream.java @@ -143,18 +143,21 @@ private synchronized void handleChunk(Buffer chunk) { } if (eventStreamParser != null) { // build chain of chunk futures: the chunks should be sent in the same order as they arrive - Future future = eventStreamParser.parse(chunk) - .andThen(result -> handleStreamEvent(chunk, result.result() == Boolean.TRUE, pos)); if (streamHandlerFuture == null) { - streamHandlerFuture = future; + streamHandlerFuture = parseChunk(chunk, pos); } else { - streamHandlerFuture = streamHandlerFuture.transform(result -> future); + streamHandlerFuture = streamHandlerFuture.transform(ignore -> parseChunk(chunk, pos)); } } else { notifyOnChunk(chunk); } } + private synchronized Future parseChunk(Buffer chunk, int pos) { + return eventStreamParser.parse(chunk) + .andThen(result -> handleStreamEvent(chunk, result.result() == Boolean.TRUE, pos)); + } + private synchronized void handleStreamEvent(Buffer chunk, boolean isLastChunk, int pos) { if (isLastChunk) { if (lastChunkPos == -1) { diff --git a/src/main/java/com/epam/aidial/core/util/EventStreamParser.java b/src/main/java/com/epam/aidial/core/util/EventStreamParser.java index 384b709de..bc4a01cd4 100644 --- a/src/main/java/com/epam/aidial/core/util/EventStreamParser.java +++ b/src/main/java/com/epam/aidial/core/util/EventStreamParser.java @@ -85,7 +85,8 @@ public synchronized Future parse(Buffer chunk) { if (futures == null) { return Future.succeededFuture(lastChunk); } - return Future.join(futures).transform(ignore -> Future.succeededFuture(lastChunk)); + boolean localLastChunk = lastChunk; + return Future.join(futures).transform(ignore -> Future.succeededFuture(localLastChunk)); } private void handleEventStage(Buffer chunk) {