Skip to content

Commit

Permalink
[ Speculative decoding ][ Prompt lookup ] Enable Perf Metrics for ass…
Browse files Browse the repository at this point in the history
…isting pipelines (#1599)

Ticket:
*[160440](https://jira.devtools.intel.com/browse/CVS-160440)
  • Loading branch information
iefode authored Jan 21, 2025
1 parent 8472e48 commit 2a63618
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 14 deletions.
8 changes: 1 addition & 7 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage);
_register_step_cache_usage(scheduler_output.m_cache_usage);
m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage();

m_batch_size = 0; // total number of running sequences
for (size_t i = 0; i < scheduler_output.m_scheduled_sequence_groups_ids.size(); ++i) {
size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i];
SequenceGroup::CPtr sequence_group = m_requests[seq_group_id];
m_batch_size += sequence_group->num_running_seqs();
}
}

// if no tokens were scheduled, we are out of memory => free all requests and return
Expand Down Expand Up @@ -210,6 +203,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
static ManualTimer timer("sample");
timer.start();
sampler_output = m_sampler->sample(m_requests, logits, m_is_validation_mode_enabled);
m_batch_size = sampler_output.num_generated_tokens;
timer.end();
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc

// for perf metrics
float m_load_time_ms = 0.0f;
size_t m_batch_size = 0; // stored number of scheduled sequences on last step
size_t m_batch_size = 0; // stored number of processed tokens on last step

// flag to enable validation mode for sampler
bool m_is_validation_mode_enabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,8 @@ std::vector<SequenceGroup::Ptr> ContinuousBatchingPipeline::ContinuousBatchingFo
return m_awaiting_requests;
}

size_t ContinuousBatchingPipeline::ContinuousBatchingForPromptLookupImpl::get_processed_tokens_per_iteration() {
return m_batch_size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ContinuousBatchingPipeline::ContinuousBatchingForPromptLookupImpl : public
bool is_requests_empty();
std::vector<SequenceGroup::Ptr> get_awaiting_requests();

size_t get_processed_tokens_per_iteration();

using ContinuousBatchingPipeline::ContinuousBatchingImpl::drop_requests;
protected:
TokenIds generate_candidates(const TokenIds& input_ids, size_t num_pred_tokens, size_t max_ngram_size);
Expand Down
29 changes: 28 additions & 1 deletion src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() {
}

void ContinuousBatchingPipeline::PromptLookupImpl::step() {
auto& raw_perf_counters = m_perf_metrics.raw_metrics;

ManualTimer step_timer("prompt_lookup_decoding: step()");
step_timer.start();

ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()");
candidates_timer.start();
m_pipeline->generate_candidates();
candidates_timer.end();
m_sd_metrics.draft_duration += candidates_timer.get_duration();
auto generated_len_before = m_pipeline->get_generated_request_len();

ManualTimer main_timer("prompt_lookup_decoding: step()");
ManualTimer main_timer("prompt_lookup_decoding: pipeline: step()");
main_timer.start();
m_pipeline->step();
main_timer.end();
Expand All @@ -63,6 +68,18 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() {
m_sd_metrics.update_draft_accepted_tokens(request_id, num_matches);
}

// update perf metrics
const auto num_generated_tokens = m_pipeline->get_processed_tokens_per_iteration();
if (num_generated_tokens > 0) {
raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);

auto infer_duration = step_timer.get_duration_microsec();

raw_perf_counters.m_token_infer_durations.emplace_back(infer_duration);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_duration);
raw_perf_counters.m_new_token_times.emplace_back(main_timer.get_end_time());
}

if (generated_len_after.empty() && 0) {
m_sd_metrics.print(true);
m_sd_metrics.clean_up();
Expand All @@ -73,6 +90,9 @@ std::vector<EncodedGenerationResult>
ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
m_perf_metrics = PerfMetrics();
m_perf_metrics.raw_metrics.m_inference_durations = {{ MicroSeconds(0.0f) }};

OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

Expand Down Expand Up @@ -173,6 +193,13 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}

result.m_status = generations[request_id]->get_status();

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
m_perf_metrics.raw_metrics.generate_durations.clear();
m_perf_metrics.raw_metrics.generate_durations.emplace_back(generate_timer.get_duration_microsec());
m_perf_metrics.num_input_tokens = request->get_prompt_len();
m_perf_metrics.evaluate_statistics(generate_timer.get_start_time());

results.push_back(std::move(result));
}

Expand Down
1 change: 1 addition & 0 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPi
protected:
std::shared_ptr<ContinuousBatchingForPromptLookupImpl> m_pipeline;
SpeculativeDecodingMetrics m_sd_metrics;
PerfMetrics m_perf_metrics;

void drop_requests();

Expand Down
2 changes: 2 additions & 0 deletions src/cpp/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ void Sampler::GroupBeamSearcher::select_next_tokens(const ov::Tensor& logits,
for (Group& group : m_groups) {
if (!group.done) {
for (Beam& beam : group.ongoing) {
sampler_output.num_generated_tokens++;
uint64_t parent_seq_id = beam.m_sequence->get_id();

// here we need to map index of sequence in beam search group(s) and sequence group
Expand Down Expand Up @@ -793,6 +794,7 @@ SamplerOutput Sampler::sample(const std::vector<SequenceGroup::Ptr> & sequence_g
bool is_validation_passed = true;
// make `num_tokens_to_process` iteration to validate a candidate generated by `draft_model` + 1 iteration to generate one more token by `main_model`
for (size_t i = 0; i <= num_tokens_to_process; ++i) {
sampler_output.num_generated_tokens++;
// calculate token offset from the end of logit
size_t token_offset = num_tokens_to_process - i;
// max counter of needed to be sampled tokens
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/src/sampler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ struct SamplerOutput {
// IDs of sequences that need to be forked (note, the same sequence can be forked multiple times)
// it will later be used by scheduler to fork block_tables for child sequences
std::unordered_map<uint64_t, std::list<uint64_t>> m_forked_sequences;
// store number of generated_tokens
size_t num_generated_tokens = 0;
};

class Sampler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ std::vector<SequenceGroup::Ptr> ContinuousBatchingPipeline::ContinuousBatchingFo
return m_awaiting_requests;
}

size_t ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::get_processed_tokens_per_iteration() {
return m_batch_size;
}

void
ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::pull_awaiting_requests(bool is_pause_request) {
std::lock_guard<std::mutex> lock{m_awaiting_requests_mutex};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl :
bool is_requests_empty();
std::vector<SequenceGroup::Ptr> get_awaiting_requests();

size_t get_processed_tokens_per_iteration();

UpdateRequestResult init_request_by_candidate(uint64_t request_id, const GeneratedSequences& candidates);

protected:
Expand Down
31 changes: 31 additions & 0 deletions src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) {
void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() {
// this blocks adding new requests during step as it may break coherence between main and draft models
std::lock_guard<std::mutex> lock{m_draft_generations_mutex};

auto& raw_perf_counters = m_perf_metrics.raw_metrics;

ManualTimer step_timer("speculative_decoding: step()");
step_timer.start();

m_draft_pipeline->pull_awaiting_requests(true);
m_main_pipeline->pull_awaiting_requests();

Expand Down Expand Up @@ -182,6 +188,18 @@ void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() {
m_sd_metrics.update_draft_accepted_tokens(request_id, (updated_seq_info.inserted_tokens_cnt - updated_seq_info.removed_tokens_cnt));
}

// update perf metrics
const auto num_generated_tokens = m_main_pipeline->get_processed_tokens_per_iteration();
if (num_generated_tokens > 0) {
auto infer_duration = step_timer.get_duration_microsec();

raw_perf_counters.m_token_infer_durations.emplace_back(infer_duration);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_duration);
raw_perf_counters.m_new_token_times.emplace_back(main_timer.get_end_time());

raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);
}

if (main_generated_requests.empty() && 0) {
m_sd_metrics.print(true);
m_sd_metrics.clean_up();
Expand All @@ -192,6 +210,9 @@ std::vector<EncodedGenerationResult>
ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
m_perf_metrics = PerfMetrics();
m_perf_metrics.raw_metrics.m_inference_durations = {{ MicroSeconds(0.0f) }};

OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

Expand Down Expand Up @@ -273,6 +294,8 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<
std::vector<EncodedGenerationResult> results;
results.reserve(all_requests.size());

generate_timer.end();

for (size_t request_id = 0; request_id < all_requests.size(); ++request_id) {
const auto& request = all_requests[request_id];
auto sampling_params = request->get_sampling_parameters();
Expand All @@ -297,6 +320,14 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<
}

result.m_status = main_generations[request_id]->get_status();

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
m_perf_metrics.raw_metrics.generate_durations.clear();
m_perf_metrics.raw_metrics.generate_durations.emplace_back(generate_timer.get_duration_microsec());
m_perf_metrics.num_input_tokens = request->get_prompt_len();
m_perf_metrics.evaluate_statistics(generate_timer.get_start_time());

result.perf_metrics = m_perf_metrics;
results.push_back(std::move(result));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ struct ModelDesc {
class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline {
protected:
std::shared_ptr<ContinuousBatchingForSpeculativeDecodingImpl> m_main_pipeline, m_draft_pipeline;
// Metrics
SpeculativeDecodingMetrics m_sd_metrics;
PerfMetrics m_perf_metrics;

// Mutex protecting access to m_draft_generations, so add_request and step methods can be called from different threads
std::mutex m_draft_generations_mutex;
std::map<uint64_t, GenerationHandle> m_draft_generations;
Expand Down
22 changes: 17 additions & 5 deletions src/cpp/src/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class ManualTimer {
double m_total;
decltype(std::chrono::steady_clock::now()) m_start;
std::chrono::steady_clock::time_point m_start, m_end;
std::string m_title;
public:
ManualTimer(const std::string& title) :
Expand All @@ -22,15 +22,27 @@ class ManualTimer {
}

void end() {
auto m_end = std::chrono::steady_clock::now();
m_total += std::chrono::duration<double, std::milli>(m_end - m_start).count();
m_end = std::chrono::steady_clock::now();
m_total += std::chrono::duration_cast<std::chrono::microseconds>(m_end - m_start).count();
}

std::chrono::steady_clock::time_point get_start_time() {
return m_start;
}

std::chrono::steady_clock::time_point get_end_time() {
return m_end;
}

float get_duration() const {
return m_total / 1000.;
return m_total / 1e6;
}

float get_duration_microsec() const {
return m_total;
}

~ManualTimer() {
// std::cout << m_title << ": " << m_total / 1000. << " secs" << std::endl;
// std::cout << m_title << ": " << m_total / 1e6. << " secs" << std::endl;
}
};

0 comments on commit 2a63618

Please sign in to comment.