From e0f45afe6a10f0a3aada6d37a5c23a3dbd632455 Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Thu, 7 Nov 2024 13:57:53 +0100 Subject: [PATCH] fix(validator): session signatures storage #274 --- Cargo.lock | 1 - Cargo.toml | 3 +- collator/Cargo.toml | 4 +- .../src/validator/impls/std_impl/session.rs | 85 ++++++++----------- 4 files changed, 40 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86230e043..a77f163b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3350,7 +3350,6 @@ dependencies = [ "parking_lot", "rand", "rayon", - "scc", "scopeguard", "serde", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 3d2268c53..35ce1f885 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,6 @@ rlimit = "0.10.1" rustc_version = "0.4" rustls = "0.23.16" rustls-webpki = "0.102" -scc = "2.1" scopeguard = "1.2" serde = "1.0" serde_json = "1.0.114" @@ -235,4 +234,4 @@ opt-level = 3 [profile.dev.package.hashbrown] opt-level = 3 [profile.dev.package."*"] -opt-level = 1 +opt-level = 1 \ No newline at end of file diff --git a/collator/Cargo.toml b/collator/Cargo.toml index e076d051e..fc8f98471 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -27,7 +27,6 @@ metrics = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } rayon = { workspace = true } -scc = { workspace = true } scopeguard = { workspace = true } serde = { workspace = true } sha2 = { workspace = true } @@ -65,3 +64,6 @@ block-creator-stats = [] [lints] workspace = true + +[profile.test] +incremental = true diff --git a/collator/src/validator/impls/std_impl/session.rs b/collator/src/validator/impls/std_impl/session.rs index be8562449..c92b3d0d8 100644 --- a/collator/src/validator/impls/std_impl/session.rs +++ b/collator/src/validator/impls/std_impl/session.rs @@ -12,14 +12,13 @@ use everscale_crypto::ed25519::KeyPair; use everscale_types::models::*; use futures_util::stream::FuturesUnordered; use futures_util::{Future, StreamExt}; -use scc::TreeIndex; use tokio::sync::{Notify, Semaphore}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; use tycho_util::futures::JoinTask; use tycho_util::metrics::HistogramGuard; -use tycho_util::FastHashMap; +use tycho_util::{FastDashMap, FastHashMap}; use super::ValidatorStdImplConfig; use crate::tracing_targets; @@ -87,8 +86,8 @@ impl ValidatorSession { shard_ident: info.shard_ident, weight_threshold, validators: Arc::new(validators), - block_signatures: TreeIndex::new(), - cached_signatures: TreeIndex::new(), + block_signatures: FastDashMap::default(), + cached_signatures: FastDashMap::default(), cancelled: AtomicBool::new(false), cancelled_signal: Notify::new(), }); @@ -153,17 +152,17 @@ impl ValidatorSession { .fetch_max(block_seqno, Ordering::Release); let state = self.inner.state.as_ref(); - state.cached_signatures.remove_range(..=block_seqno); - - let guard = scc::ebr::Guard::new(); - for (_, validation) in state.block_signatures.range(..=block_seqno, &guard) { - validation.cancelled.cancel(); - } - drop(guard); + state.cached_signatures.retain(|&key, _| key > block_seqno); // NOTE: Remove only blocks that are old enough. let until_seqno = block_seqno.saturating_sub(self.inner.config.old_blocks_to_keep); - state.block_signatures.remove_range(..=until_seqno); + + state.block_signatures.retain(|&key, validation| { + if key <= block_seqno { + validation.cancelled.cancel(); + } + key > until_seqno + }); } #[tracing::instrument( @@ -184,41 +183,36 @@ impl ValidatorSession { let state = &self.inner.state; - // Remove cached slot + let entry = state.block_signatures.entry(block_id.seqno); + if let tycho_util::DashMapEntry::Occupied(_) = entry { + anyhow::bail!( + "block validation is already in progress. \ + session_id={}, block_id={:?}", + self.inner.session_id, + block_id + ); + } + let cached = state .cached_signatures - .peek(&block_id.seqno, &scc::ebr::Guard::new()) - .map(Arc::clone); + .remove(&block_id.seqno) + .map(|(_, value)| value); // Prepare block signatures - let block_signatures = match &cached { - Some(cached) => self.reuse_signatures(block_id, cached.clone()).await, - None => self.prepare_new_signatures(block_id), - } - .build(block_id, state.weight_threshold); + let block_signatures = { + match &cached { + Some(cached) => self.reuse_signatures(block_id, cached.clone()).await, + None => self.prepare_new_signatures(block_id), + } + .build(block_id, state.weight_threshold) + }; - // Allow only one validation at a time - if state - .block_signatures - .insert(block_id.seqno, block_signatures.clone()) - .is_err() - { - // TODO: Panic here? - anyhow::bail!( - "block validation is already in progress. \ - session_id={}, block_id={block_id}", - self.inner.session_id - ); - } + entry.or_insert(block_signatures.clone()); - // NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures - // only after we have inserted the block. - // // At this point the following is true: // - All new signatures will be stored (and validated) in the block; // - There might be some new signatures that were stored in the cache, but we // have not yet processed them. We will use them later. - state.cached_signatures.remove(&block_id.seqno); // Start collecting signatures from other validators let mut result = FastHashMap::default(); @@ -604,8 +598,8 @@ struct SessionState { shard_ident: ShardIdent, weight_threshold: u64, validators: Arc>, - block_signatures: TreeIndex>, - cached_signatures: TreeIndex>, + block_signatures: FastDashMap>, + cached_signatures: FastDashMap>, cancelled: AtomicBool, cancelled_signal: Notify, } @@ -758,14 +752,9 @@ impl ExchangeSignatures for SessionState { if self.cancelled.load(Ordering::Acquire) { return Err(ValidationError::Cancelled); } - - let guard = scc::ebr::Guard::new(); - // Full signature exchange if we know the block. // Otherwise, cache the signature for the block to use it later. - // - // NOTE: scc's `peek` does not lock the tree - let result = if let Some(signatures) = self.block_signatures.peek(&block_seqno, &guard) { + let result = if let Some(signatures) = self.block_signatures.get(&block_seqno) { metrics::counter!(METRIC_BLOCK_EXCHANGES_IN_TOTAL).increment(1); let Some(slot) = signatures.other_signatures.get(peer_id) else { @@ -774,13 +763,13 @@ impl ExchangeSignatures for SessionState { // If more signatures are still needed, validate and store new to the block if !signatures.validated.load(Ordering::Acquire) { - self.add_signature(signatures, slot, peer_id, &signature)?; + self.add_signature(&signatures, slot, peer_id, &signature)?; } proto::Exchange::Complete(signatures.own_signature.clone()) } else { // Find the slot for the specified block seqno. - let Some(slot) = self.cached_signatures.peek(&block_seqno, &guard) else { + let Some(slot) = self.cached_signatures.get(&block_seqno) else { metrics::counter!(METRIC_MISS_EXCHANGES_IN_TOTAL).increment(1); return Err(ValidationError::NoSlot); }; @@ -796,8 +785,6 @@ impl ExchangeSignatures for SessionState { proto::Exchange::Cached }; - drop(guard); - let action = match &result { proto::Exchange::Complete(_) => "complete", proto::Exchange::Cached => "cached",