diff --git a/.gitignore b/.gitignore index a689329..743aabc 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ .DS_Store perf.data* -flamegrapg.svg +flamegraph.svg + +*.log diff --git a/Makefile b/Makefile index 3b1d130..d111622 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,10 @@ SHELL := /bin/bash .PHONY: proto fmt: - cargo sort -w && cargo fmt --all && cargo clippy --all-targets + cargo sort -w && cargo fmt --all && cargo clippy --all-targets --all-features && cargo clippy --all-targets fmt_check: - cargo sort -c -w && cargo fmt --all -- --check && cargo clippy --all-targets --locked -- -D warnings + cargo sort -c -w && cargo fmt --all -- --check && cargo clippy --all-targets --all-features --locked -- -D warnings && cargo clippy --all-targets --locked -- -D warnings clean: cargo clean diff --git a/bench/Cargo.toml b/bench/Cargo.toml index fd9dd03..4eb0394 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -29,6 +29,7 @@ tokio = { version = "1", features = [ "sync", "macros", "time", + "tracing", ] } toml = "0.4.2" tonic = "0.6.2" @@ -39,8 +40,13 @@ tikv-jemallocator = "0.4.3" [features] tracing = ["runkv-wheel/tracing"] -deadlock = ["runkv-tests/deadlock"] +deadlock = [ + "runkv-tests/deadlock", + "runkv-storage/deadlock", + "runkv-wheel/deadlock", +] console = ["tokio/tracing", "runkv-common/console"] +trace-notify-pool = ["runkv-common/trace-notify-pool"] [[bin]] name = "bench_kv" diff --git a/common/Cargo.toml b/common/Cargo.toml index 3872ee1..077e181 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -50,6 +50,7 @@ test-log = "0.2.10" [features] console = ["console-subscriber"] +trace-notify-pool = [] [[bench]] name = "bench_sharded_hash_map" diff --git a/common/src/log.rs b/common/src/log.rs index b5fa82a..d046133 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -1,5 +1,6 @@ use isahc::config::Configurable; use tracing_subscriber::filter::{EnvFilter, Targets}; +use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::{Layer, SubscriberExt}; use tracing_subscriber::util::SubscriberInitExt; @@ -21,6 +22,18 @@ pub fn init_runkv_logger(service: &str, id: u64, log_path: &str) -> LogGuard { let tokio_console_enabled = cfg!(feature = "console"); let jaeger_enabled = cfg!(feature = "tracing"); + if tokio_console_enabled { + #[cfg(feature = "console")] + { + console_subscriber::init(); + return LogGuard { + _file_appender_guard: None, + jaeger_enabled, + tokio_console_enabled, + }; + } + } + let (file_appender, file_appender_guard) = tracing_appender::non_blocking( tracing_appender::rolling::daily(log_path, format!("runkv-{}-{}.log", service, id)), ); @@ -32,36 +45,14 @@ pub fn init_runkv_logger(service: &str, id: u64, log_path: &str) -> LogGuard { }; let fmt_layer = { - // Configure RunKV's own crates to log at TRACE level, and ignore all third-party crates. - let filter = Targets::new() - // Enable trace for most modules. - .with_target("runkv_common", tracing::Level::TRACE) - .with_target("runkv_storage", tracing::Level::TRACE) - .with_target("runkv_rudder", tracing::Level::TRACE) - .with_target("runkv_wheel", tracing::Level::TRACE) - .with_target("runkv_exhauster", tracing::Level::TRACE) - .with_target("runkv_tests", tracing::Level::TRACE) - .with_target("openraft::raft", tracing::Level::TRACE) - .with_target("raft", tracing::Level::TRACE) - .with_target("events", tracing::Level::WARN); - tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::ACTIVE) + .with_target(true) + .with_level(true) .with_writer(file_appender) .with_ansi(false) - .with_filter(filter) }; - if tokio_console_enabled { - #[cfg(feature = "console")] - { - console_subscriber::init(); - return LogGuard { - _file_appender_guard: None, - jaeger_enabled, - tokio_console_enabled, - }; - } - } if jaeger_enabled { opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); diff --git a/common/src/notify_pool.rs b/common/src/notify_pool.rs index 5a51bc2..49ccc50 100644 --- a/common/src/notify_pool.rs +++ b/common/src/notify_pool.rs @@ -1,5 +1,5 @@ use std::collections::hash_map::{DefaultHasher, Entry, HashMap}; -use std::fmt::Display; +use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -8,14 +8,16 @@ use tokio::sync::oneshot; pub struct NotifyPoolCore where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { inner: Arc>>>, } impl Clone for NotifyPoolCore where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { fn clone(&self) -> Self { Self { @@ -26,7 +28,8 @@ where impl Default for NotifyPoolCore where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { fn default() -> Self { Self { @@ -37,7 +40,8 @@ where pub struct NotifyPool where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { shards: u16, buckets: HashMap>, @@ -45,7 +49,8 @@ where impl Clone for NotifyPool where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { fn clone(&self) -> Self { Self { @@ -57,14 +62,24 @@ where impl NotifyPool where - I: Eq + Hash + Copy + Clone + Display, + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, { + #[allow(clippy::let_and_return)] pub fn new(shards: u16) -> Self { let mut buckets = HashMap::default(); for i in 0..shards { buckets.insert(i, NotifyPoolCore::default()); } - Self { shards, buckets } + let pool = Self { shards, buckets }; + #[cfg(feature = "trace-notify-pool")] + { + let pool_clone = pool.clone(); + tokio::spawn(async move { + pool_clone.trace().await; + }); + } + pool } pub fn register(&self, id: I) -> anyhow::Result> { @@ -105,6 +120,43 @@ where } } +#[cfg(feature = "trace-notify-pool")] +#[derive(Default, Debug)] +struct TraceOutput +where + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, +{ + total: usize, + ids: Vec, +} + +#[cfg(feature = "trace-notify-pool")] +impl NotifyPool +where + I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static, + R: Send + Sync + 'static, +{ + async fn trace(&self) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + + let mut output = TraceOutput { + total: 0, + ids: vec![], + }; + + for (_, bucket) in self.buckets.iter() { + let guard = bucket.inner.lock(); + for (id, _) in guard.iter() { + output.total += 1; + output.ids.push(*id); + } + } + tracing::trace!("notofy pool: {:?}", output); + } + } +} + #[cfg(test)] mod tests { use test_log::test; diff --git a/common/src/packer.rs b/common/src/packer.rs index 1855e01..a356d12 100644 --- a/common/src/packer.rs +++ b/common/src/packer.rs @@ -69,8 +69,11 @@ where } } - pub fn append(&self, data: T, notifier: Option>) { - self.core.queue.lock().push(Item { data, notifier }); + pub fn append(&self, data: T, notifier: Option>) -> bool { + let mut queue = self.core.queue.lock(); + let is_leader = queue.is_empty(); + queue.push(Item { data, notifier }); + is_leader } pub fn package(&self) -> Vec> { diff --git a/etc/grafana-dashboards/runkv-overview.json b/etc/grafana-dashboards/runkv-overview.json index fd4d4ac..43ff3e1 100644 --- a/etc/grafana-dashboards/runkv-overview.json +++ b/etc/grafana-dashboards/runkv-overview.json @@ -122,7 +122,7 @@ "uid": "PEDE6B306CC9C0CD0" }, "editorMode": "code", - "expr": "sum(irate(kv_service_latency_histogram_vec_count{ service=\"get,put,delete,snapshot,txn\"}[1m]))", + "expr": "sum(irate(kv_service_latency_histogram_vec_count{ service=~\"get|put|delete|snapshot|txn\"}[1m]))", "hide": false, "legendFormat": "total", "range": true, @@ -134,7 +134,7 @@ "uid": "PEDE6B306CC9C0CD0" }, "editorMode": "code", - "expr": "sum by (service) (irate(kv_service_latency_histogram_vec_count[1m]))", + "expr": "sum by (service) (irate(kv_service_latency_histogram_vec_count{ service=~\"get|put|delete|snapshot|txn\" }[1m]))", "hide": false, "legendFormat": "{{service}}", "range": true, @@ -867,8 +867,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -998,8 +997,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1161,8 +1159,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -4210,6 +4207,6 @@ "timezone": "", "title": "RunKV Overview", "uid": "kJf6Tv_nk", - "version": 2, + "version": 1, "weekStart": "" } \ No newline at end of file diff --git a/rudder/src/worker/compaction_detector.rs b/rudder/src/worker/compaction_detector.rs index 2b58a5b..1b144e8 100644 --- a/rudder/src/worker/compaction_detector.rs +++ b/rudder/src/worker/compaction_detector.rs @@ -252,7 +252,7 @@ async fn check_levels_size( l1_capacity: usize, level_multiplier: usize, ) -> Result> { - let levels = version_manager.levels().await; + let levels = version_manager.levels(); let mut overstep = Vec::with_capacity(levels); overstep.push(false); let mut limit = l1_capacity; @@ -502,6 +502,10 @@ async fn pick_ssts( // Skip if not enough sstable involved for L0 compaction. continue; } + + // FIXME: `base_level_ssts` seems can be empty with + // `base_level_ssts.iter().cloned().collect_vec()`? + // Pick overlapping sstable in `level + 1` iff compaction strategy of `level + 1` is // `NonOverlap`. if ctx.level as usize + 1 < ctx.lsm_tree_config.levels_options.len() diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 2e13ce4..c74c080 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -51,6 +51,9 @@ criterion = { version = "0.3", features = ["async", "async_tokio"] } env_logger = "*" test-log = "0.2.10" +[features] +deadlock = [] + [[bench]] name = "bench_block_iter" harness = false diff --git a/storage/src/lsm_tree/manifest/version.rs b/storage/src/lsm_tree/manifest/version.rs index 8fd8ad1..176a158 100644 --- a/storage/src/lsm_tree/manifest/version.rs +++ b/storage/src/lsm_tree/manifest/version.rs @@ -88,8 +88,8 @@ impl VersionManager { } #[tracing::instrument(level = "trace", ret)] - pub async fn levels(&self) -> usize { - self.core.read().await.levels.len() + pub fn levels(&self) -> usize { + self.level_options.len() } #[tracing::instrument(level = "trace", ret)] @@ -383,7 +383,7 @@ impl VersionManager { sst_ids: Vec, ) -> Result>> { if sst_ids.is_empty() { - return Ok(vec![]); + return Ok(vec![vec![]; levels.len()]); } let mut first_user_key = Vec::default(); let mut last_user_key = Vec::default(); diff --git a/storage/src/raft_log_store/log.rs b/storage/src/raft_log_store/log.rs index 739913a..f997be9 100644 --- a/storage/src/raft_log_store/log.rs +++ b/storage/src/raft_log_store/log.rs @@ -4,13 +4,12 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; -use futures::channel::oneshot; use futures_async_stream::try_stream; use itertools::Itertools; -use parking_lot::RwLock; +use runkv_common::packer::{Item, Packer}; use tokio::fs::{create_dir_all, read_dir, File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio::sync::RwLock as AsyncRwLock; +use tokio::sync::{oneshot, RwLock as AsyncRwLock}; use tracing::{trace, trace_span, Instrument}; use super::entry::Entry; @@ -63,17 +62,6 @@ pub struct WriteHandle { pub len: usize, } -struct Writer { - entries: Vec, - tx: oneshot::Sender>, -} - -impl Writer { - fn new(entries: Vec, tx: oneshot::Sender>) -> Self { - Self { entries, tx } - } -} - #[derive(Clone)] pub struct LogOptions { pub node: u64, @@ -90,7 +78,7 @@ struct LogCore { first_log_file_id: AtomicU64, active_file_len: AtomicUsize, - queue: Arc>>, + writer_packer: Packer, Vec>, } #[derive(Clone)] @@ -168,7 +156,7 @@ impl Log { first_log_file_id: AtomicU64::new(first_log_file_id), active_file_len: AtomicUsize::new(0), - queue: Arc::new(RwLock::new(vec![])), + writer_packer: Packer::new(64), }), metrics: options.metrics, @@ -186,14 +174,8 @@ impl Log { let mut total_size = 0; let (tx, rx) = oneshot::channel(); - let writer = Writer::new(entries, tx); // Append entries to queue. - let is_leader = { - let mut queue = self.core.queue.write(); - let is_leader = queue.is_empty(); - queue.push(writer); - is_leader - }; + let is_leader = self.core.writer_packer.append(entries, Some(tx)); if is_leader { let mut file = self @@ -206,10 +188,7 @@ impl Log { let mut buf = Vec::with_capacity(DEFAULT_BUFFER_SIZE); // Take writer batch. - let writers = { - let mut queue = self.core.queue.write(); - std::mem::take(&mut (*queue)) - }; + let writers = self.core.writer_packer.package(); self.metrics .batch_writers_histogram .observe(writers.len() as f64); @@ -217,13 +196,17 @@ impl Log { let mut txs = Vec::with_capacity(writers.len()); let mut handles = Vec::with_capacity(writers.len()); - for writer in writers { - let mut entry_handles = Vec::with_capacity(writer.entries.len()); + for Item { + data: entries, + notifier, + } in writers + { + let mut entry_handles = Vec::with_capacity(entries.len()); let file_id = self.core.first_log_file_id.load(Ordering::Acquire) + self.core.frozen_files.read().await.len() as u64; - for entry in writer.entries { + for entry in entries { let entry_offset = offset + buf.len(); entry.encode(&mut buf); let entry_len = offset + buf.len() - entry_offset; @@ -279,7 +262,7 @@ impl Log { self.core.frozen_files.write().await.push(frozen_file); } - txs.push(writer.tx); + txs.push(notifier.unwrap()); handles.push(entry_handles); } @@ -330,23 +313,26 @@ impl Log { #[tracing::instrument(level = "trace")] pub async fn read(&self, log_file_id: u64, offset: u64, len: usize) -> Result> { - let mut frozen_files = self.core.frozen_files.write().await; - let first_log_file_id = self.core.first_log_file_id.load(Ordering::Acquire); - let log_file_index = (log_file_id - first_log_file_id) as usize; - let buf = if log_file_index < frozen_files.len() { - frozen_files[log_file_index] - .seek(std::io::SeekFrom::Start(offset)) - .await?; - let mut buf = vec![0; len]; - frozen_files[log_file_index].read_exact(&mut buf).await?; - buf - } else { - let mut active_file = self.core.active_file.write().await; - active_file.seek(std::io::SeekFrom::Start(offset)).await?; - let mut buf = vec![0; len]; - active_file.read_exact(&mut buf).await?; - buf - }; + // NOTE: Be careful with the lock order between `active_file` and `frozen_files`!!! + + { + let mut frozen_files = self.core.frozen_files.write().await; + let first_log_file_id = self.core.first_log_file_id.load(Ordering::Acquire); + let log_file_index = (log_file_id - first_log_file_id) as usize; + if log_file_index < frozen_files.len() { + frozen_files[log_file_index] + .seek(std::io::SeekFrom::Start(offset)) + .await?; + let mut buf = vec![0; len]; + frozen_files[log_file_index].read_exact(&mut buf).await?; + return Ok(buf); + } + } + + let mut active_file = self.core.active_file.write().await; + active_file.seek(std::io::SeekFrom::Start(offset)).await?; + let mut buf = vec![0; len]; + active_file.read_exact(&mut buf).await?; Ok(buf) } diff --git a/storage/src/raft_log_store/store.rs b/storage/src/raft_log_store/store.rs index bb6db73..1654818 100644 --- a/storage/src/raft_log_store/store.rs +++ b/storage/src/raft_log_store/store.rs @@ -334,6 +334,8 @@ impl RaftLogStore { } pub async fn put(&self, group: u64, key: Vec, value: Vec) -> Result<()> { + #[cfg(feature = "deadlock")] + tracing::info!("{} logappend enter", group); self.core .log .append(vec![LogEntry::Kv(Kv::Put { @@ -342,7 +344,13 @@ impl RaftLogStore { value: value.clone(), })]) .await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} logappend exit", group); + #[cfg(feature = "deadlock")] + tracing::info!("{} stateput enter", group); self.core.states.put(group, key, value).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} stateput exit", group); Ok(()) } diff --git a/wheel/Cargo.toml b/wheel/Cargo.toml index ac70e70..20b7ddf 100644 --- a/wheel/Cargo.toml +++ b/wheel/Cargo.toml @@ -57,3 +57,4 @@ test-log = "0.2.10" [features] tracing = [] +deadlock = [] diff --git a/wheel/src/components/fsm.rs b/wheel/src/components/fsm.rs index 3208d12..74aff5c 100644 --- a/wheel/src/components/fsm.rs +++ b/wheel/src/components/fsm.rs @@ -289,8 +289,12 @@ impl Fsm for ObjectLsmTreeFsm { // Update `available index`. let mut available_index = None; if let Some(last_entry) = entries.last() { + #[cfg(feature = "deadlock")] + tracing::info!("{} store enter", self.raft_node); self.store_index(AVAILABLE_INDEX_KEY, last_entry.index) .await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} store exit", self.raft_node); available_index = Some(last_entry.index); } @@ -302,9 +306,20 @@ impl Fsm for ObjectLsmTreeFsm { // Get apply progress. let avaiable_index = match available_index { Some(index) => index, - None => self.load_index(AVAILABLE_INDEX_KEY).await?, + None => { + #[cfg(feature = "deadlock")] + tracing::info!("{} load enter", self.raft_node); + let index = self.load_index(AVAILABLE_INDEX_KEY).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} load exit", self.raft_node); + index + } }; + #[cfg(feature = "deadlock")] + tracing::info!("{} load enter", self.raft_node); let last_done_index = self.load_index(DONE_INDEX_KEY).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} load exit", self.raft_node); // Entries to apply: [ first apply index ..= last apply index]. let first_apply_index = last_done_index + 1; @@ -318,28 +333,52 @@ impl Fsm for ObjectLsmTreeFsm { // Load entries [ first apply index .. first carried index ] from raft log store then apply. let load_len = (first_carried_index - first_apply_index) as usize; if load_len > 0 { + #[cfg(feature = "deadlock")] + tracing::info!("{} entries enter", self.raft_node); let loaded_entries = self .raft_log_store .entries(first_apply_index, load_len) .await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} entries exit", self.raft_node); check_log_gap(&entries, first_apply_index..first_carried_index)?; for entry in loaded_entries { + #[cfg(feature = "deadlock")] + tracing::info!("{} mutable enter", self.raft_node); self.apply_entry(entry).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} mutable exit", self.raft_node); } } // Apply carried entries. for entry in entries { let index = entry.index; + #[cfg(feature = "deadlock")] + tracing::info!("{} mutable enter", self.raft_node); self.apply_entry(entry).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} mutable exit", self.raft_node); + #[cfg(feature = "deadlock")] + tracing::info!("{} immutable enter", self.raft_node); self.apply_read_only_until(index).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} immutable exit", self.raft_node); } + #[cfg(feature = "deadlock")] + tracing::info!("{} immutable enter", self.raft_node); self.apply_read_only_until(last_apply_index).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} immutable exit", self.raft_node); // Update `done index`. let done_index = last_apply_index; if last_done_index != done_index { + #[cfg(feature = "deadlock")] + tracing::info!("{} store enter", self.raft_node); self.store_index(DONE_INDEX_KEY, done_index).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} store exit", self.raft_node); } Ok(()) diff --git a/wheel/src/components/lsm_tree.rs b/wheel/src/components/lsm_tree.rs index 5099c97..888fa69 100644 --- a/wheel/src/components/lsm_tree.rs +++ b/wheel/src/components/lsm_tree.rs @@ -113,7 +113,7 @@ impl ObjectStoreLsmTreeCore { let levels = { let levels = self .version_manager - .pick_overlap_ssts_by_key(0..self.version_manager.levels().await, key) + .pick_overlap_ssts_by_key(0..self.version_manager.levels(), key) .await .unwrap(); levels @@ -233,13 +233,11 @@ impl ObjectStoreLsmTreeCore { } fn drop_oldest_immutable_memtable(&self) -> MemtableWithCtx { - let imm = self - .memtables + self.memtables .write() .immutable_memtables .pop_back() - .unwrap(); - imm + .unwrap() } } diff --git a/wheel/src/service.rs b/wheel/src/service.rs index 4c0cfe6..8d1bc23 100644 --- a/wheel/src/service.rs +++ b/wheel/src/service.rs @@ -18,7 +18,7 @@ use runkv_proto::wheel::raft_service_server::RaftService; use runkv_proto::wheel::wheel_service_server::WheelService; use runkv_proto::wheel::*; use tonic::{Request, Response, Status}; -use tracing::{trace_span, Instrument}; +use tracing::{trace, trace_span, Instrument}; use crate::components::command::Command; use crate::components::raft_manager::RaftManager; @@ -174,6 +174,7 @@ impl Wheel { // Register request. let request_id = self.inner.request_id.fetch_add(1, Ordering::SeqCst) + 1; + trace!("id: {} request: {:?}", request_id, request); let rx = self .inner .txn_notify_pool diff --git a/wheel/src/worker/raft.rs b/wheel/src/worker/raft.rs index 72894a3..29bedae 100644 --- a/wheel/src/worker/raft.rs +++ b/wheel/src/worker/raft.rs @@ -552,7 +552,11 @@ where #[tracing::instrument(level = "trace")] async fn send_messages(&mut self, messages: Vec) -> Result<()> { + #[cfg(feature = "deadlock")] + tracing::info!("{} send enter", self.raft_node); if messages.is_empty() { + #[cfg(feature = "deadlock")] + tracing::info!("{} send exit empty", self.raft_node); return Ok(()); } @@ -622,6 +626,8 @@ where self.metrics .send_messages_throughput_gauge .add(bytes as f64); + #[cfg(feature = "deadlock")] + tracing::info!("{} send exit", self.raft_node); Ok(()) } @@ -640,6 +646,8 @@ where read_states: Vec, entries: Vec, ) -> Result<()> { + #[cfg(feature = "deadlock")] + tracing::info!("{} apply enter", self.raft_node); let is_leader = match &self.raft_soft_state { None => false, Some(ss) => ss.raft_state == raft::StateRole::Leader, @@ -655,22 +663,36 @@ where // TODO: Clear stale read-onlt proposals. // Only leader can get read-only proposals at the same term. let id = (&mut &ctx[..]).get_u64(); + #[cfg(feature = "deadlock")] + tracing::info!("{} pool enter", self.raft_node); self.read_only_cmd_pool.ready(id, index); + #[cfg(feature = "deadlock")] + tracing::info!("{} pool exit", self.raft_node); } + #[cfg(feature = "deadlock")] + tracing::info!("{} fsmapply enter", self.raft_node); self.fsm.apply(self.group, is_leader, entries).await?; + #[cfg(feature = "deadlock")] + tracing::info!("{} fsmapply exit", self.raft_node); let elapsed = start.elapsed(); self.metrics .apply_log_entries_latency_histogram .observe(elapsed.as_secs_f64()); + #[cfg(feature = "deadlock")] + tracing::info!("{} apply exit", self.raft_node); Ok(()) } #[tracing::instrument(level = "trace")] async fn append_log_entries(&mut self, entries: Vec) -> Result<()> { + #[cfg(feature = "deadlock")] + tracing::info!("{} append enter", self.raft_node); if entries.is_empty() { + #[cfg(feature = "deadlock")] + tracing::info!("{} append exit empty", self.raft_node); return Ok(()); } @@ -733,6 +755,8 @@ where self.metrics .append_log_entries_throughput_gauge .add(bytes as f64); + #[cfg(feature = "deadlock")] + tracing::info!("{} append exit", self.raft_node); Ok(()) }