diff --git a/src/cpp/include/openvino/genai/generation_handle.hpp b/src/cpp/include/openvino/genai/generation_handle.hpp index 8d00ae0e9b..ed650217a4 100644 --- a/src/cpp/include/openvino/genai/generation_handle.hpp +++ b/src/cpp/include/openvino/genai/generation_handle.hpp @@ -74,6 +74,8 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl { bool can_read(); + void drop(); + GenerationOutputs back(); // Reads result of a generation for single iteration GenerationOutputs read(); @@ -81,5 +83,5 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl { std::vector read_all(); }; -using GenerationHandle = std::unique_ptr; +using GenerationHandle = std::shared_ptr; } diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index a66a88cad4..887da9b7b9 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -65,6 +65,9 @@ class ContinuousBatchingPipeline::Impl { while (requests_iterator != m_requests.end()) { const auto& request = *requests_iterator; if(request->has_finished() || request->out_of_memory() || request->handle_dropped()) { + // Notify the last time even if there will be no results + // This causes read_all() to unblock in all situations + request->notify_handle(); for (const auto& sequence: request->get_sequences()) { m_scheduler->free_sequence(sequence->get_id()); } @@ -136,7 +139,7 @@ class ContinuousBatchingPipeline::Impl { std::lock_guard lock{m_awaiting_requests_mutex}; m_awaiting_requests.push_back(sequence_group); } - return std::make_unique(sequence_group->get_generation_stream(), sampling_params); + return std::make_shared(sequence_group->get_generation_stream(), sampling_params); } GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) { diff --git a/src/cpp/src/generation_handle.cpp b/src/cpp/src/generation_handle.cpp index 26cc12604f..50e24d5b86 100644 --- a/src/cpp/src/generation_handle.cpp +++ b/src/cpp/src/generation_handle.cpp @@ -9,7 +9,7 @@ using namespace ov::genai; GenerationHandleImpl::~GenerationHandleImpl() { - m_generation_stream->drop(); + drop(); } GenerationStatus GenerationHandleImpl::get_status() { @@ -20,6 +20,10 @@ bool GenerationHandleImpl::can_read() { return m_generation_stream->can_read(); } +void GenerationHandleImpl::drop() { + m_generation_stream->drop(); +} + std::unordered_map GenerationHandleImpl::back() { return m_generation_stream->back(); } diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index d5b9506b2c..71f4e0632c 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -477,7 +477,7 @@ class SequenceGroup { } // For beam search streaming is not available, so we notify only upon finishing if(m_sampling_params.is_beam_search()) { - if (has_finished() || out_of_memory()) { + if (has_finished() || out_of_memory() || handle_dropped()) { push_outputs(); } } else if (m_sampling_params.is_greedy_decoding() || m_sampling_params.is_multinomial()) {