Skip to content

Commit

Permalink
fix: fix deadlock in runkv_storage::raft_log_store::log (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored May 19, 2022
1 parent 7925e66 commit 1d3c69f
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 107 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@
.DS_Store

perf.data*
flamegrapg.svg
flamegraph.svg

*.log
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ SHELL := /bin/bash
.PHONY: proto

fmt:
cargo sort -w && cargo fmt --all && cargo clippy --all-targets
cargo sort -w && cargo fmt --all && cargo clippy --all-targets --all-features && cargo clippy --all-targets

fmt_check:
cargo sort -c -w && cargo fmt --all -- --check && cargo clippy --all-targets --locked -- -D warnings
cargo sort -c -w && cargo fmt --all -- --check && cargo clippy --all-targets --all-features --locked -- -D warnings && cargo clippy --all-targets --locked -- -D warnings

clean:
cargo clean
Expand Down
8 changes: 7 additions & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tokio = { version = "1", features = [
"sync",
"macros",
"time",
"tracing",
] }
toml = "0.4.2"
tonic = "0.6.2"
Expand All @@ -39,8 +40,13 @@ tikv-jemallocator = "0.4.3"

[features]
tracing = ["runkv-wheel/tracing"]
deadlock = ["runkv-tests/deadlock"]
deadlock = [
"runkv-tests/deadlock",
"runkv-storage/deadlock",
"runkv-wheel/deadlock",
]
console = ["tokio/tracing", "runkv-common/console"]
trace-notify-pool = ["runkv-common/trace-notify-pool"]

[[bin]]
name = "bench_kv"
Expand Down
1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ test-log = "0.2.10"

[features]
console = ["console-subscriber"]
trace-notify-pool = []

[[bench]]
name = "bench_sharded_hash_map"
Expand Down
41 changes: 16 additions & 25 deletions common/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use isahc::config::Configurable;
use tracing_subscriber::filter::{EnvFilter, Targets};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::{Layer, SubscriberExt};
use tracing_subscriber::util::SubscriberInitExt;

Expand All @@ -21,6 +22,18 @@ pub fn init_runkv_logger(service: &str, id: u64, log_path: &str) -> LogGuard {
let tokio_console_enabled = cfg!(feature = "console");
let jaeger_enabled = cfg!(feature = "tracing");

if tokio_console_enabled {
#[cfg(feature = "console")]
{
console_subscriber::init();
return LogGuard {
_file_appender_guard: None,
jaeger_enabled,
tokio_console_enabled,
};
}
}

let (file_appender, file_appender_guard) = tracing_appender::non_blocking(
tracing_appender::rolling::daily(log_path, format!("runkv-{}-{}.log", service, id)),
);
Expand All @@ -32,36 +45,14 @@ pub fn init_runkv_logger(service: &str, id: u64, log_path: &str) -> LogGuard {
};

let fmt_layer = {
// Configure RunKV's own crates to log at TRACE level, and ignore all third-party crates.
let filter = Targets::new()
// Enable trace for most modules.
.with_target("runkv_common", tracing::Level::TRACE)
.with_target("runkv_storage", tracing::Level::TRACE)
.with_target("runkv_rudder", tracing::Level::TRACE)
.with_target("runkv_wheel", tracing::Level::TRACE)
.with_target("runkv_exhauster", tracing::Level::TRACE)
.with_target("runkv_tests", tracing::Level::TRACE)
.with_target("openraft::raft", tracing::Level::TRACE)
.with_target("raft", tracing::Level::TRACE)
.with_target("events", tracing::Level::WARN);

tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::ACTIVE)
.with_target(true)
.with_level(true)
.with_writer(file_appender)
.with_ansi(false)
.with_filter(filter)
};

if tokio_console_enabled {
#[cfg(feature = "console")]
{
console_subscriber::init();
return LogGuard {
_file_appender_guard: None,
jaeger_enabled,
tokio_console_enabled,
};
}
}
if jaeger_enabled {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());

Expand Down
68 changes: 60 additions & 8 deletions common/src/notify_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::hash_map::{DefaultHasher, Entry, HashMap};
use std::fmt::Display;
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

Expand All @@ -8,14 +8,16 @@ use tokio::sync::oneshot;

pub struct NotifyPoolCore<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
inner: Arc<Mutex<HashMap<I, oneshot::Sender<R>>>>,
}

impl<I, R> Clone for NotifyPoolCore<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -26,7 +28,8 @@ where

impl<I, R> Default for NotifyPoolCore<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
fn default() -> Self {
Self {
Expand All @@ -37,15 +40,17 @@ where

pub struct NotifyPool<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
shards: u16,
buckets: HashMap<u16, NotifyPoolCore<I, R>>,
}

impl<I, R> Clone for NotifyPool<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -57,14 +62,24 @@ where

impl<I, R> NotifyPool<I, R>
where
I: Eq + Hash + Copy + Clone + Display,
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
#[allow(clippy::let_and_return)]
pub fn new(shards: u16) -> Self {
let mut buckets = HashMap::default();
for i in 0..shards {
buckets.insert(i, NotifyPoolCore::default());
}
Self { shards, buckets }
let pool = Self { shards, buckets };
#[cfg(feature = "trace-notify-pool")]
{
let pool_clone = pool.clone();
tokio::spawn(async move {
pool_clone.trace().await;
});
}
pool
}

pub fn register(&self, id: I) -> anyhow::Result<oneshot::Receiver<R>> {
Expand Down Expand Up @@ -105,6 +120,43 @@ where
}
}

#[cfg(feature = "trace-notify-pool")]
#[derive(Default, Debug)]
struct TraceOutput<I>
where
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
{
total: usize,
ids: Vec<I>,
}

#[cfg(feature = "trace-notify-pool")]
impl<I, R> NotifyPool<I, R>
where
I: Eq + Hash + Copy + Clone + Send + Sync + Debug + Display + 'static,
R: Send + Sync + 'static,
{
async fn trace(&self) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;

let mut output = TraceOutput {
total: 0,
ids: vec![],
};

for (_, bucket) in self.buckets.iter() {
let guard = bucket.inner.lock();
for (id, _) in guard.iter() {
output.total += 1;
output.ids.push(*id);
}
}
tracing::trace!("notofy pool: {:?}", output);
}
}
}

#[cfg(test)]
mod tests {
use test_log::test;
Expand Down
7 changes: 5 additions & 2 deletions common/src/packer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ where
}
}

pub fn append(&self, data: T, notifier: Option<oneshot::Sender<R>>) {
self.core.queue.lock().push(Item { data, notifier });
pub fn append(&self, data: T, notifier: Option<oneshot::Sender<R>>) -> bool {
let mut queue = self.core.queue.lock();
let is_leader = queue.is_empty();
queue.push(Item { data, notifier });
is_leader
}

pub fn package(&self) -> Vec<Item<T, R>> {
Expand Down
15 changes: 6 additions & 9 deletions etc/grafana-dashboards/runkv-overview.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
"uid": "PEDE6B306CC9C0CD0"
},
"editorMode": "code",
"expr": "sum(irate(kv_service_latency_histogram_vec_count{ service=\"get,put,delete,snapshot,txn\"}[1m]))",
"expr": "sum(irate(kv_service_latency_histogram_vec_count{ service=~\"get|put|delete|snapshot|txn\"}[1m]))",
"hide": false,
"legendFormat": "total",
"range": true,
Expand All @@ -134,7 +134,7 @@
"uid": "PEDE6B306CC9C0CD0"
},
"editorMode": "code",
"expr": "sum by (service) (irate(kv_service_latency_histogram_vec_count[1m]))",
"expr": "sum by (service) (irate(kv_service_latency_histogram_vec_count{ service=~\"get|put|delete|snapshot|txn\" }[1m]))",
"hide": false,
"legendFormat": "{{service}}",
"range": true,
Expand Down Expand Up @@ -867,8 +867,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
Expand Down Expand Up @@ -998,8 +997,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
Expand Down Expand Up @@ -1161,8 +1159,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
Expand Down Expand Up @@ -4210,6 +4207,6 @@
"timezone": "",
"title": "RunKV Overview",
"uid": "kJf6Tv_nk",
"version": 2,
"version": 1,
"weekStart": ""
}
6 changes: 5 additions & 1 deletion rudder/src/worker/compaction_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async fn check_levels_size(
l1_capacity: usize,
level_multiplier: usize,
) -> Result<Vec<bool>> {
let levels = version_manager.levels().await;
let levels = version_manager.levels();
let mut overstep = Vec::with_capacity(levels);
overstep.push(false);
let mut limit = l1_capacity;
Expand Down Expand Up @@ -502,6 +502,10 @@ async fn pick_ssts(
// Skip if not enough sstable involved for L0 compaction.
continue;
}

// FIXME: `base_level_ssts` seems can be empty with
// `base_level_ssts.iter().cloned().collect_vec()`?

// Pick overlapping sstable in `level + 1` iff compaction strategy of `level + 1` is
// `NonOverlap`.
if ctx.level as usize + 1 < ctx.lsm_tree_config.levels_options.len()
Expand Down
3 changes: 3 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ criterion = { version = "0.3", features = ["async", "async_tokio"] }
env_logger = "*"
test-log = "0.2.10"

[features]
deadlock = []

[[bench]]
name = "bench_block_iter"
harness = false
Expand Down
6 changes: 3 additions & 3 deletions storage/src/lsm_tree/manifest/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ impl VersionManager {
}

#[tracing::instrument(level = "trace", ret)]
pub async fn levels(&self) -> usize {
self.core.read().await.levels.len()
pub fn levels(&self) -> usize {
self.level_options.len()
}

#[tracing::instrument(level = "trace", ret)]
Expand Down Expand Up @@ -383,7 +383,7 @@ impl VersionManager {
sst_ids: Vec<u64>,
) -> Result<Vec<Vec<u64>>> {
if sst_ids.is_empty() {
return Ok(vec![]);
return Ok(vec![vec![]; levels.len()]);
}
let mut first_user_key = Vec::default();
let mut last_user_key = Vec::default();
Expand Down
Loading

0 comments on commit 1d3c69f

Please sign in to comment.