diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 4877860442..4c035fbd7b 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -156,13 +156,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage); _register_step_cache_usage(scheduler_output.m_cache_usage); m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage(); - - m_batch_size = 0; // total number of running sequences - for (size_t i = 0; i < scheduler_output.m_scheduled_sequence_groups_ids.size(); ++i) { - size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; - SequenceGroup::CPtr sequence_group = m_requests[seq_group_id]; - m_batch_size += sequence_group->num_running_seqs(); - } } // if no tokens were scheduled, we are out of memory => free all requests and return @@ -210,6 +203,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { static ManualTimer timer("sample"); timer.start(); sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled); + m_batch_size = sampler_output.num_generated_tokens; timer.end(); } diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index 8980038f73..7e2480e5b0 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -31,7 +31,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc // for perf metrics float m_load_time_ms = 0.0f; - size_t m_batch_size = 0; // stored number of scheduled sequences on last step + size_t m_batch_size = 0; // stored number of processed tokens on last step // flag to enable validation mode for sampler bool m_is_validation_mode_enabled = false; diff --git a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp index d01c863549..aa4ea8a53a 100644 --- a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp +++ b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp @@ -91,4 +91,8 @@ std::vector ContinuousBatchingPipeline::ContinuousBatchingFo return m_awaiting_requests; } +size_t ContinuousBatchingPipeline::ContinuousBatchingForPromptLookupImpl::get_processed_tokens_per_iteration() { + return m_batch_size; +} + } \ No newline at end of file diff --git a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.hpp b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.hpp index fc4942701e..98b2d71586 100644 --- a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.hpp +++ b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.hpp @@ -37,6 +37,8 @@ class ContinuousBatchingPipeline::ContinuousBatchingForPromptLookupImpl : public bool is_requests_empty(); std::vector get_awaiting_requests(); + size_t get_processed_tokens_per_iteration(); + using ContinuousBatchingPipeline::ContinuousBatchingImpl::drop_requests; protected: TokenIds generate_candidates(const TokenIds& input_ids, size_t num_pred_tokens, size_t max_ngram_size); diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index 41c3e6370f..9eb54f700c 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -29,6 +29,11 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() { } void ContinuousBatchingPipeline::PromptLookupImpl::step() { + auto& raw_perf_counters = m_perf_metrics.raw_metrics; + + ManualTimer step_timer("prompt_lookup_decoding: step()"); + step_timer.start(); + ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()"); candidates_timer.start(); m_pipeline->generate_candidates(); @@ -36,7 +41,7 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() { m_sd_metrics.draft_duration += candidates_timer.get_duration(); auto generated_len_before = m_pipeline->get_generated_request_len(); - ManualTimer main_timer("prompt_lookup_decoding: step()"); + ManualTimer main_timer("prompt_lookup_decoding: pipeline: step()"); main_timer.start(); m_pipeline->step(); main_timer.end(); @@ -63,6 +68,18 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() { m_sd_metrics.update_draft_accepted_tokens(request_id, num_matches); } + // update perf metrics + const auto num_generated_tokens = m_pipeline->get_processed_tokens_per_iteration(); + if (num_generated_tokens > 0) { + raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens); + + auto infer_duration = step_timer.get_duration_microsec(); + + raw_perf_counters.m_token_infer_durations.emplace_back(infer_duration); + raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_duration); + raw_perf_counters.m_new_token_times.emplace_back(main_timer.get_end_time()); + } + if (generated_len_after.empty() && 0) { m_sd_metrics.print(true); m_sd_metrics.clean_up(); @@ -73,6 +90,9 @@ std::vector ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { + m_perf_metrics = PerfMetrics(); + m_perf_metrics.raw_metrics.m_inference_durations = {{ MicroSeconds(0.0f) }}; + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); @@ -173,6 +193,13 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vectorget_status(); + + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + m_perf_metrics.raw_metrics.generate_durations.clear(); + m_perf_metrics.raw_metrics.generate_durations.emplace_back(generate_timer.get_duration_microsec()); + m_perf_metrics.num_input_tokens = request->get_prompt_len(); + m_perf_metrics.evaluate_statistics(generate_timer.get_start_time()); + results.push_back(std::move(result)); } diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp index 1499bcc76e..0535931d81 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp @@ -15,6 +15,7 @@ class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPi protected: std::shared_ptr m_pipeline; SpeculativeDecodingMetrics m_sd_metrics; + PerfMetrics m_perf_metrics; void drop_requests(); diff --git a/src/cpp/src/sampler.cpp b/src/cpp/src/sampler.cpp index 827309724e..4c399ab641 100644 --- a/src/cpp/src/sampler.cpp +++ b/src/cpp/src/sampler.cpp @@ -250,6 +250,7 @@ void Sampler::GroupBeamSearcher::select_next_tokens(const ov::Tensor& logits, for (Group& group : m_groups) { if (!group.done) { for (Beam& beam : group.ongoing) { + sampler_output.num_generated_tokens++; uint64_t parent_seq_id = beam.m_sequence->get_id(); // here we need to map index of sequence in beam search group(s) and sequence group @@ -793,6 +794,7 @@ SamplerOutput Sampler::sample(const std::vector & sequence_g bool is_validation_passed = true; // make `num_tokens_to_process` iteration to validate a candidate generated by `draft_model` + 1 iteration to generate one more token by `main_model` for (size_t i = 0; i <= num_tokens_to_process; ++i) { + sampler_output.num_generated_tokens++; // calculate token offset from the end of logit size_t token_offset = num_tokens_to_process - i; // max counter of needed to be sampled tokens diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index ca8937cb60..73c656a41d 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -38,6 +38,8 @@ struct SamplerOutput { // IDs of sequences that need to be forked (note, the same sequence can be forked multiple times) // it will later be used by scheduler to fork block_tables for child sequences std::unordered_map> m_forked_sequences; + // store number of generated_tokens + size_t num_generated_tokens = 0; }; class Sampler { diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp index dccc633d4d..bec2b75e0d 100644 --- a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp @@ -292,6 +292,10 @@ std::vector ContinuousBatchingPipeline::ContinuousBatchingFo return m_awaiting_requests; } +size_t ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::get_processed_tokens_per_iteration() { + return m_batch_size; +} + void ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::pull_awaiting_requests(bool is_pause_request) { std::lock_guard lock{m_awaiting_requests_mutex}; diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp index 3777d9b87b..e4e4be63d8 100644 --- a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.hpp @@ -32,6 +32,8 @@ class ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl : bool is_requests_empty(); std::vector get_awaiting_requests(); + size_t get_processed_tokens_per_iteration(); + UpdateRequestResult init_request_by_candidate(uint64_t request_id, const GeneratedSequences& candidates); protected: diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 5483523698..7a6066fc5c 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -131,6 +131,12 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) { void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { // this blocks adding new requests during step as it may break coherence between main and draft models std::lock_guard lock{m_draft_generations_mutex}; + + auto& raw_perf_counters = m_perf_metrics.raw_metrics; + + ManualTimer step_timer("speculative_decoding: step()"); + step_timer.start(); + m_draft_pipeline->pull_awaiting_requests(true); m_main_pipeline->pull_awaiting_requests(); @@ -182,6 +188,18 @@ void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { m_sd_metrics.update_draft_accepted_tokens(request_id, (updated_seq_info.inserted_tokens_cnt - updated_seq_info.removed_tokens_cnt)); } + // update perf metrics + const auto num_generated_tokens = m_main_pipeline->get_processed_tokens_per_iteration(); + if (num_generated_tokens > 0) { + auto infer_duration = step_timer.get_duration_microsec(); + + raw_perf_counters.m_token_infer_durations.emplace_back(infer_duration); + raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_duration); + raw_perf_counters.m_new_token_times.emplace_back(main_timer.get_end_time()); + + raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens); + } + if (main_generated_requests.empty() && 0) { m_sd_metrics.print(true); m_sd_metrics.clean_up(); @@ -192,6 +210,9 @@ std::vector ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { + m_perf_metrics = PerfMetrics(); + m_perf_metrics.raw_metrics.m_inference_durations = {{ MicroSeconds(0.0f) }}; + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); @@ -273,6 +294,8 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< std::vector results; results.reserve(all_requests.size()); + generate_timer.end(); + for (size_t request_id = 0; request_id < all_requests.size(); ++request_id) { const auto& request = all_requests[request_id]; auto sampling_params = request->get_sampling_parameters(); @@ -297,6 +320,14 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< } result.m_status = main_generations[request_id]->get_status(); + + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + m_perf_metrics.raw_metrics.generate_durations.clear(); + m_perf_metrics.raw_metrics.generate_durations.emplace_back(generate_timer.get_duration_microsec()); + m_perf_metrics.num_input_tokens = request->get_prompt_len(); + m_perf_metrics.evaluate_statistics(generate_timer.get_start_time()); + + result.perf_metrics = m_perf_metrics; results.push_back(std::move(result)); } diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index 7475d9d766..4023519287 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -37,7 +37,10 @@ struct ModelDesc { class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline { protected: std::shared_ptr m_main_pipeline, m_draft_pipeline; + // Metrics SpeculativeDecodingMetrics m_sd_metrics; + PerfMetrics m_perf_metrics; + // Mutex protecting access to m_draft_generations, so add_request and step methods can be called from different threads std::mutex m_draft_generations_mutex; std::map m_draft_generations; diff --git a/src/cpp/src/timer.hpp b/src/cpp/src/timer.hpp index f389e10d5d..588fb4967d 100644 --- a/src/cpp/src/timer.hpp +++ b/src/cpp/src/timer.hpp @@ -9,7 +9,7 @@ class ManualTimer { double m_total; - decltype(std::chrono::steady_clock::now()) m_start; + std::chrono::steady_clock::time_point m_start, m_end; std::string m_title; public: ManualTimer(const std::string& title) : @@ -22,15 +22,27 @@ class ManualTimer { } void end() { - auto m_end = std::chrono::steady_clock::now(); - m_total += std::chrono::duration(m_end - m_start).count(); + m_end = std::chrono::steady_clock::now(); + m_total += std::chrono::duration_cast(m_end - m_start).count(); + } + + std::chrono::steady_clock::time_point get_start_time() { + return m_start; + } + + std::chrono::steady_clock::time_point get_end_time() { + return m_end; } float get_duration() const { - return m_total / 1000.; + return m_total / 1e6; + } + + float get_duration_microsec() const { + return m_total; } ~ManualTimer() { - // std::cout << m_title << ": " << m_total / 1000. << " secs" << std::endl; + // std::cout << m_title << ": " << m_total / 1e6. << " secs" << std::endl; } };