-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CB] Split token streaming and generation to different threads for all CB based pipelines #1544
base: master
Are you sure you want to change the base?
[CB] Split token streaming and generation to different threads for all CB based pipelines #1544
Conversation
…l CB based pipelines
…generation_threads
…de/openvino.genai into cb_streaming_and_generation_threads
src/cpp/src/generation_stream.hpp
Outdated
@@ -38,7 +38,7 @@ class GenerationStream { | |||
} | |||
|
|||
bool can_read() { | |||
return !m_output_queue.empty(); | |||
return !m_output_queue.empty() && !m_output_queue.full(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please explain it?
Logically, we can read from queue if it's full
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To get one element only once. In other case, we can take one element several times ( = print it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand.. if stream is will and we cannot read from it, it will always be full.
should not be the original issue fixed somewhere in a different place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will be full in case not updated generated stream. Please, check src/cpp/src/synchronized_queue.hpp to get the details.
This is a good place to handle it.
stream_tokens(); | ||
}); | ||
|
||
while (!generation->is_dropped() && has_active_request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
step()
will drop all non-running requests via _free_non_running_requests()
(including dropped ones). Similary, drop_requests()
will remove all requests. So !generation->is_dropped()
does not seem to be required here.
If it's needed to handle generation->drop()
from stream_tokens
lambda, then it's undefined behavior, because a handle can be dropped after this condition has passed.
Looks like handle's status (dropped or not) is really a critical resource and should be safely used between main and streamer threads. So, we need to ensure correct work of current step()
if handle is dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, in PR #1594 I've tried to remove mis-use of handle_dropped()
(see method num_finished_seqs()
) to ensure step()
is not affected by handle_drop()
during schedule, model runner and sampler phases.
Now, it's safe to drop()
request and step()
will still be working
@@ -299,8 +305,38 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o | |||
} | |||
auto all_requests = m_awaiting_requests; // we need to store all requests to get results from them once generation has finished | |||
|
|||
bool continue_generation = true; | |||
while (has_non_finished_requests() && continue_generation) { | |||
std::atomic<bool> has_active_request = has_non_finished_requests(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::atomic<bool> has_active_request = has_non_finished_requests(); | |
std::atomic<bool> has_active_requests = has_non_finished_requests(); |
}; | ||
|
||
// to define streaming thread | ||
std::thread t_stream([&stream_tokens] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is streamer_ptr
is nullptr
, we don't need this thread
for (const auto& gen_token : token.begin()->second.generated_ids) { | ||
if (streamer_ptr->put(gen_token)) { | ||
generation->drop(); | ||
cv.notify_all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who is notified here? cv.wait()
is used only in current thread, so it seems notification is not required.
while (!generation->is_dropped() && (has_active_request || streamer_ptr && generation->can_read())) { | ||
// waiting for any tokens or request finishing | ||
cv.wait(lock, [&generation, &has_active_request]{ return generation->can_read() || !has_active_request; }); | ||
if (streamer_ptr && generation->can_read()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (streamer_ptr && generation->can_read()) { | |
if (generation->can_read()) { |
let's avoid thread creation if streamer is nullptr
throw; | ||
} | ||
has_active_request = has_non_finished_requests(); | ||
cv.notify_all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cv.notify_all(); | |
cv.notify_one(); |
as we have one waiting thread
…de/openvino.genai into cb_streaming_and_generation_threads
…de/openvino.genai into cb_streaming_and_generation_threads
…generation_threads
…generation_threads
…de/openvino.genai into cb_streaming_and_generation_threads
Notes:
Merge after: https://github.com/openvinotoolkit/openvino.genai/pull/1594/files
Ticket:
Results (CPU):