Skip to content

Commit

Permalink
Revert "Report ConsumeWorkerMetrics at slot transitions (#3212)" (#4667)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Jan 30, 2025
1 parent 644091a commit c83075b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 42 deletions.
56 changes: 15 additions & 41 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_runtime::bank::Bank,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::clock::Slot,
solana_sdk::timing::AtomicInterval,
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
std::{
sync::{
Expand Down Expand Up @@ -183,43 +183,32 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
/// done.
pub(crate) struct ConsumeWorkerMetrics {
id: String,
interval: AtomicInterval,
has_data: AtomicBool,
slot: AtomicU64,

count_metrics: ConsumeWorkerCountMetrics,
error_metrics: ConsumeWorkerTransactionErrorMetrics,
timing_metrics: ConsumeWorkerTimingMetrics,
}

impl ConsumeWorkerMetrics {
/// Report and reset metrics when the worker did some work and:
/// a) (when a leader) Previous slot is not the same as current.
/// b) (when not a leader) report the metrics accumulated so far.
pub fn maybe_report_and_reset(&self, slot: Option<Slot>) {
let prev_slot_id: u64 = self.slot.load(Ordering::Relaxed);
if let Some(slot) = slot {
if slot != prev_slot_id {
if !self.has_data.swap(false, Ordering::Relaxed) {
return;
}
self.count_metrics.report_and_reset(&self.id, slot);
self.timing_metrics.report_and_reset(&self.id, slot);
self.error_metrics.report_and_reset(&self.id, slot);
self.slot.swap(slot, Ordering::Relaxed);
}
} else if prev_slot_id != 0 {
self.count_metrics.report_and_reset(&self.id, prev_slot_id);
self.timing_metrics.report_and_reset(&self.id, prev_slot_id);
self.error_metrics.report_and_reset(&self.id, prev_slot_id);
self.slot.swap(0, Ordering::Relaxed);
/// Report and reset metrics iff the interval has elapsed and the worker did some work.
pub fn maybe_report_and_reset(&self) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS)
&& self.has_data.swap(false, Ordering::Relaxed)
{
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
}
}

fn new(id: u32) -> Self {
Self {
id: id.to_string(),
interval: AtomicInterval::default(),
has_data: AtomicBool::new(false),
slot: AtomicU64::new(0),
count_metrics: ConsumeWorkerCountMetrics::default(),
error_metrics: ConsumeWorkerTransactionErrorMetrics::default(),
timing_metrics: ConsumeWorkerTimingMetrics::default(),
Expand Down Expand Up @@ -468,7 +457,7 @@ impl Default for ConsumeWorkerCountMetrics {
}

impl ConsumeWorkerCountMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_counts",
"id" => id,
Expand Down Expand Up @@ -516,11 +505,6 @@ impl ConsumeWorkerCountMetrics {
self.max_prioritization_fees.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand All @@ -542,7 +526,7 @@ struct ConsumeWorkerTimingMetrics {
}

impl ConsumeWorkerTimingMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_timing",
"id" => id,
Expand Down Expand Up @@ -598,11 +582,6 @@ impl ConsumeWorkerTimingMetrics {
self.wait_for_bank_failure_us.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand Down Expand Up @@ -636,7 +615,7 @@ struct ConsumeWorkerTransactionErrorMetrics {
}

impl ConsumeWorkerTransactionErrorMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_error_metrics",
"id" => id,
Expand Down Expand Up @@ -747,11 +726,6 @@ impl ConsumeWorkerTransactionErrorMetrics {
.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
.maybe_report_and_reset_interval(should_report);
self.worker_metrics
.iter()
.for_each(|metrics| metrics.maybe_report_and_reset(new_leader_slot));
.for_each(|metrics| metrics.maybe_report_and_reset());
}

Ok(())
Expand Down

0 comments on commit c83075b

Please sign in to comment.