diff --git a/Cargo.lock b/Cargo.lock index 6448fcf6..1e4909d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.36.2" +version = "0.36.3" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 31fb3614..01645bac 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.36.2" +version = "0.36.3" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/alerts/mod.rs b/consensus/src/alerts/mod.rs index 52b2f315..998076b7 100644 --- a/consensus/src/alerts/mod.rs +++ b/consensus/src/alerts/mod.rs @@ -73,9 +73,9 @@ impl Alert { } } - // Simplified forker check, should only be called for alerts that have already been checked to - // contain valid proofs. - fn forker(&self) -> NodeIndex { + /// Simplified forker check, should only be called for alerts that have already been checked to + /// contain valid proofs. + pub fn forker(&self) -> NodeIndex { self.proof.0.as_signable().creator() } diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs deleted file mode 100644 index 51f543c7..00000000 --- a/consensus/src/consensus.rs +++ /dev/null @@ -1,109 +0,0 @@ -use futures::{ - channel::{mpsc, oneshot}, - future::pending, - FutureExt, -}; -use log::{debug, error}; - -use crate::{ - config::Config, - creation, handle_task_termination, - reconstruction::{ReconstructedUnit, Request, Service as ReconstructionService}, - runway::ExplicitParents, - units::SignedUnit, - Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, SpawnHandle, Terminator, -}; - -pub struct IO> { - pub units_from_runway: Receiver>, - pub parents_from_runway: Receiver>, - pub units_for_runway: Sender>>, - pub requests_for_runway: Sender>, - pub new_units_for_runway: Sender>, - pub data_provider: DP, - pub starting_round: oneshot::Receiver>, -} - -pub async fn run>( - conf: Config, - io: IO, - keychain: MK, - spawn_handle: impl SpawnHandle, - mut terminator: Terminator, -) { - debug!(target: "AlephBFT", "{:?} Starting all services...", conf.node_ix()); - let IO { - units_from_runway, - parents_from_runway, - units_for_runway, - requests_for_runway, - new_units_for_runway, - data_provider, - starting_round, - } = io; - - let index = conf.node_ix(); - - let (parents_for_creator, parents_from_dag) = mpsc::unbounded(); - - let creator_terminator = terminator.add_offspring_connection("creator"); - let io = creation::IO { - outgoing_units: new_units_for_runway, - incoming_parents: parents_from_dag, - data_provider, - }; - let creator_handle = spawn_handle - .spawn_essential( - "consensus/creation", - creation::run(conf, io, keychain, starting_round, creator_terminator), - ) - .shared(); - let creator_handle_for_panic = creator_handle.clone(); - let creator_panic_handle = async move { - if creator_handle_for_panic.await.is_err() { - return; - } - pending().await - }; - - let reconstruction = ReconstructionService::new( - units_from_runway, - parents_from_runway, - requests_for_runway, - units_for_runway, - parents_for_creator, - ); - - let reconstruction_terminator = terminator.add_offspring_connection("reconstruction"); - let mut reconstruction_handle = spawn_handle - .spawn_essential("consensus/reconstruction", async move { - reconstruction.run(reconstruction_terminator).await - }) - .fuse(); - debug!(target: "AlephBFT", "{:?} All services started.", index); - - futures::select! { - _ = terminator.get_exit().fuse() => {}, - _ = reconstruction_handle => { - debug!(target: "AlephBFT-consensus", "{:?} unit reconstruction task terminated early.", index); - }, - _ = creator_panic_handle.fuse() => { - error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index); - }, - } - debug!(target: "AlephBFT", "{:?} All services stopping.", index); - - // we stop no matter if received Ok or Err - terminator.terminate_sync().await; - - handle_task_termination( - reconstruction_handle, - "AlephBFT-consensus", - "Reconstruction", - index, - ) - .await; - handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await; - - debug!(target: "AlephBFT", "{:?} All services stopped.", index); -} diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs new file mode 100644 index 00000000..d97a89e5 --- /dev/null +++ b/consensus/src/dag/mod.rs @@ -0,0 +1,668 @@ +//! Converts units from the network into ones that are in the Dag, in the correct order. +use std::collections::HashMap; + +use crate::{ + alerts::{Alert, ForkingNotification}, + units::{ + SignedUnit, UncheckedSignedUnit, Unit, UnitStore, Validator as UnitValidator, WrappedUnit, + }, + Data, Hasher, MultiKeychain, +}; +use log::{debug, trace, warn}; + +mod reconstruction; +mod validation; + +pub use reconstruction::{ReconstructedUnit, Request}; +use reconstruction::{Reconstruction, ReconstructionResult}; +pub use validation::ValidatorStatus as DagStatus; +use validation::{Error as ValidationError, Validator}; + +const LOG_TARGET: &str = "AlephBFT-dag"; + +/// The result of sending some information to the Dag. +pub struct DagResult { + /// Units added to the dag. + pub units: Vec>>, + /// Requests for more information. + pub requests: Vec>, + /// Alerts raised due to encountered forks. + pub alerts: Vec>, +} + +impl DagResult { + fn empty() -> Self { + DagResult { + units: Vec::new(), + requests: Vec::new(), + alerts: Vec::new(), + } + } + + fn alert(alert: Alert) -> Self { + DagResult { + units: Vec::new(), + requests: Vec::new(), + alerts: vec![alert], + } + } + + fn add_alert(&mut self, alert: Alert) { + self.alerts.push(alert); + } + + fn accumulate(&mut self, other: DagResult) { + let DagResult { + mut units, + mut requests, + mut alerts, + } = other; + self.units.append(&mut units); + self.requests.append(&mut requests); + self.alerts.append(&mut alerts); + } +} + +impl From>> + for DagResult +{ + fn from(other: ReconstructionResult>) -> Self { + let ReconstructionResult { units, requests } = other; + DagResult { + units, + requests, + alerts: Vec::new(), + } + } +} + +/// The Dag ensuring that all units from the network get returned reconstructed in the correct order. +pub struct Dag { + validator: Validator, + reconstruction: Reconstruction>, +} + +impl Dag { + /// A new dag using the provided unit validator under the hood. + pub fn new(unit_validator: UnitValidator) -> Self { + Dag { + validator: Validator::new(unit_validator), + reconstruction: Reconstruction::new(), + } + } + + fn handle_validation_error(error: ValidationError) -> DagResult { + use ValidationError::*; + match error { + Invalid(e) => { + warn!(target: LOG_TARGET, "Received unit failing validation: {}", e); + DagResult::empty() + } + Duplicate(unit) => { + trace!(target: LOG_TARGET, "Received unit with hash {:?} again.", unit.hash()); + DagResult::empty() + } + Uncommitted(unit) => { + debug!(target: LOG_TARGET, "Received unit with hash {:?} created by known forker {:?} for which we don't have a commitment, discarding.", unit.hash(), unit.creator()); + DagResult::empty() + } + NewForker(alert) => { + warn!(target: LOG_TARGET, "New forker detected."); + trace!(target: LOG_TARGET, "Created alert: {:?}.", alert); + DagResult::alert(*alert) + } + } + } + + fn handle_result(&mut self, result: &DagResult) { + // just clean the validator cache of units that we are returning + for unit in &result.units { + self.validator.finished_processing(&unit.hash()); + } + } + + /// Add a unit to the Dag. + pub fn add_unit>>( + &mut self, + unit: UncheckedSignedUnit, + store: &UnitStore, + ) -> DagResult { + let result = match self.validator.validate(unit, store) { + Ok(unit) => self.reconstruction.add_unit(unit).into(), + Err(e) => Self::handle_validation_error(e), + }; + self.handle_result(&result); + result + } + + /// Add parents of a unit to the Dag. + pub fn add_parents>>( + &mut self, + unit_hash: H::Hash, + parents: Vec>, + store: &UnitStore, + ) -> DagResult { + use ValidationError::*; + let mut result = DagResult::empty(); + let mut parent_hashes = HashMap::new(); + for unit in parents { + let unit = match self.validator.validate(unit, store) { + Ok(unit) => { + result.accumulate(self.reconstruction.add_unit(unit.clone()).into()); + unit + } + Err(Invalid(e)) => { + warn!(target: LOG_TARGET, "Received parent failing validation: {}", e); + return result; + } + Err(Duplicate(unit)) => { + trace!(target: LOG_TARGET, "Received parent with hash {:?} again.", unit.hash()); + unit + } + Err(Uncommitted(unit)) => { + debug!(target: LOG_TARGET, "Received uncommitted parent {:?}, we should get the commitment soon.", unit.hash()); + unit + } + Err(NewForker(alert)) => { + warn!(target: LOG_TARGET, "New forker detected."); + trace!(target: LOG_TARGET, "Created alert: {:?}.", alert); + result.add_alert(*alert); + // technically this was a correct unit, so we could have passed it on, + // but this will happen at most once and we will receive the parent + // response again, so we just discard it now + return result; + } + }; + parent_hashes.insert(unit.coord(), unit.hash()); + } + result.accumulate( + self.reconstruction + .add_parents(unit_hash, parent_hashes) + .into(), + ); + self.handle_result(&result); + result + } + + /// Process a forking notification, potentially returning a lot of unit processing results. + pub fn process_forking_notification>>( + &mut self, + notification: ForkingNotification, + store: &UnitStore, + ) -> DagResult { + use ForkingNotification::*; + let mut result = DagResult::empty(); + match notification { + Forker((unit, other_unit)) => { + // Just treat them as normal incoming units, if they are a forking proof + // this will either trigger a new forker or we already knew about this one. + result.accumulate(self.add_unit(unit, store)); + result.accumulate(self.add_unit(other_unit, store)); + } + Units(units) => { + for unit in units { + result.accumulate(match self.validator.validate_committed(unit, store) { + Ok(unit) => self.reconstruction.add_unit(unit).into(), + Err(e) => Self::handle_validation_error(e), + }) + } + } + } + self.handle_result(&result); + result + } + + pub fn status(&self) -> DagStatus { + self.validator.status() + } +} + +#[cfg(test)] +mod test { + use crate::{ + alerts::ForkingNotification, + dag::{Dag, DagResult, Request}, + units::{ + random_full_parent_units_up_to, random_unit_with_parents, Unit, UnitStore, + Validator as UnitValidator, WrappedSignedUnit, + }, + NodeCount, NodeIndex, Signed, + }; + use aleph_bft_mock::Keychain; + + #[test] + fn accepts_initial_units() { + let node_count = NodeCount(4); + let node_id = NodeIndex(0); + let session_id = 43; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + for unit in random_full_parent_units_up_to(0, node_count, session_id) + .into_iter() + .flatten() + .map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit, keychain) + }) + { + let DagResult { + units, + requests, + alerts, + } = dag.add_unit(unit.into(), &store); + assert_eq!(units.len(), 1); + assert!(requests.is_empty()); + assert!(alerts.is_empty()); + } + } + + #[test] + fn accepts_units_in_order() { + let node_count = NodeCount(4); + let node_id = NodeIndex(0); + let session_id = 43; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + for unit in random_full_parent_units_up_to(13, node_count, session_id) + .into_iter() + .flatten() + .map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit, keychain) + }) + { + let DagResult { + units, + requests, + alerts, + } = dag.add_unit(unit.into(), &store); + assert_eq!(units.len(), 1); + assert!(requests.is_empty()); + assert!(alerts.is_empty()); + } + } + + #[test] + fn accepts_units_in_reverse_order() { + let node_count = NodeCount(4); + let node_id = NodeIndex(0); + let session_id = 43; + let max_round = 2137; + let total_rounds = 13; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + for unit in random_full_parent_units_up_to(total_rounds, node_count, session_id) + .into_iter() + .flatten() + .rev() + .map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit, keychain) + }) + { + let unit_round = unit.round(); + let unit_creator = unit.creator(); + let DagResult { + units, + requests, + alerts, + } = dag.add_unit(unit.into(), &store); + assert!(alerts.is_empty()); + match unit_round { + 0 => match unit_creator { + NodeIndex(0) => { + assert_eq!(units.len(), (total_rounds * 4 + 1).into()); + assert!(requests.is_empty()); + } + _ => { + assert_eq!(units.len(), 1); + assert!(requests.is_empty()); + } + }, + _ => { + assert_eq!(requests.len(), 4); + assert!(units.is_empty()); + } + } + } + } + + #[test] + fn alerts_on_fork() { + let node_count = NodeCount(4); + let node_id = NodeIndex(0); + let session_id = 43; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let mut store = UnitStore::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + let forker_id = NodeIndex(3); + let keychain = keychains.get(forker_id.0).expect("we have the keychain"); + let unit = random_full_parent_units_up_to(0, node_count, session_id) + .get(0) + .expect("we have initial units") + .get(forker_id.0) + .expect("We have the forker") + .clone(); + let unit = Signed::sign(unit, keychain); + let mut fork = random_full_parent_units_up_to(0, node_count, session_id) + .get(0) + .expect("we have initial units") + .get(forker_id.0) + .expect("We have the forker") + .clone(); + // we might have randomly created an identical "fork" + while fork.hash() == unit.hash() { + fork = random_full_parent_units_up_to(0, node_count, session_id) + .get(0) + .expect("we have initial units") + .get(forker_id.0) + .expect("We have the forker") + .clone(); + } + let fork = Signed::sign(fork, keychain); + let DagResult { + mut units, + requests, + alerts, + } = dag.add_unit(unit.into(), &store); + assert_eq!(units.len(), 1); + assert!(requests.is_empty()); + assert!(alerts.is_empty()); + store.insert(units.pop().expect("just checked")); + let DagResult { + units, + requests, + alerts, + } = dag.add_unit(fork.into(), &store); + assert!(units.is_empty()); + assert!(requests.is_empty()); + assert_eq!(alerts.len(), 1); + } + + #[test] + fn detects_fork_through_notification() { + let node_count = NodeCount(7); + let node_id = NodeIndex(0); + let forker_id = NodeIndex(3); + let session_id = 0; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + let unit = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(forker_id.0) + .expect("we have the unit for the forker") + .clone(); + let unit = Signed::sign(unit, &keychains[forker_id.0]); + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(forker_id.0) + .expect("we have the unit for the forker") + .clone(); + let fork = Signed::sign(fork, &keychains[forker_id.0]); + let DagResult { + units, + requests, + alerts, + } = dag.process_forking_notification( + ForkingNotification::Forker((unit.clone().into(), fork.into())), + &store, + ); + // parents were not passed, so the correct unit does not yet get returned + assert!(units.is_empty()); + assert_eq!(requests.len(), node_count.0); + assert_eq!(alerts.len(), 1); + } + + #[test] + fn accepts_committed() { + let node_count = NodeCount(7); + let node_id = NodeIndex(0); + let forker_id = NodeIndex(3); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + let units = random_full_parent_units_up_to(produced_round, node_count, session_id); + let fork_parents = units + .get(2) + .expect("we have the requested round") + .iter() + .take(5) + .cloned() + .collect(); + let fork = random_unit_with_parents(forker_id, &fork_parents); + let fork = Signed::sign(fork, &keychains[forker_id.0]); + let unit = units + .get(3) + .expect("we have the requested round") + .get(forker_id.0) + .expect("we have the forker's unit") + .clone(); + let unit = Signed::sign(unit, &keychains[forker_id.0]); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.process_forking_notification( + ForkingNotification::Forker((unit.clone().into(), fork.clone().into())), + &store, + ); + assert!(reconstructed_units.is_empty()); + assert_eq!(requests.len(), node_count.0); + assert_eq!(alerts.len(), 1); + // normally adding forker units should no longer work now, so trying to add all units only adds initial units of non-forkers + let mut units_added = 0; + for unit in units.iter().flatten().map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit.clone(), keychain) + }) { + let DagResult { + units, + requests: _, + alerts, + } = dag.add_unit(unit.into(), &store); + units_added += units.len(); + assert!(alerts.is_empty()); + } + assert_eq!(units_added, node_count.0 - 1); + let committed_units = units + .iter() + .take(3) + .map(|units| { + units + .get(forker_id.0) + .expect("we have the forker's unit") + .clone() + }) + .map(|unit| Signed::sign(unit, &keychains[forker_id.0])) + .chain(Some(fork)) + .map(|unit| unit.into()) + .collect(); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.process_forking_notification(ForkingNotification::Units(committed_units), &store); + assert!(alerts.is_empty()); + // the non-fork unit was added first in the forking notif, so all units reconstruct successfully + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), node_count.0 * 4 + 1); + } + + #[test] + fn handles_explicit_parents() { + let node_count = NodeCount(7); + let node_id = NodeIndex(0); + let forker_id = NodeIndex(3); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let validator = UnitValidator::new(session_id, keychains[node_id.0], max_round); + let mut dag = Dag::new(validator); + let units = random_full_parent_units_up_to(produced_round, node_count, session_id); + let fork_parents = units + .get(2) + .expect("we have the requested round") + .iter() + .take(5) + .cloned() + .collect(); + let fork = random_unit_with_parents(forker_id, &fork_parents); + let fork = Signed::sign(fork, &keychains[forker_id.0]); + let unit = units + .get(3) + .expect("we have the requested round") + .get(forker_id.0) + .expect("we have the forker's unit") + .clone(); + let unit = Signed::sign(unit, &keychains[forker_id.0]); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.process_forking_notification( + // note the reverse order, to create parent requests later + ForkingNotification::Forker((fork.clone().into(), unit.clone().into())), + &store, + ); + assert!(reconstructed_units.is_empty()); + // the fork only has 5 parents + assert_eq!(requests.len(), 5); + assert_eq!(alerts.len(), 1); + let mut units_added = 0; + let mut all_requests = Vec::new(); + for unit in units.iter().flatten().map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit.clone(), keychain) + }) { + let DagResult { + units, + mut requests, + alerts, + } = dag.add_unit(unit.into(), &store); + units_added += units.len(); + all_requests.append(&mut requests); + assert!(alerts.is_empty()); + } + assert_eq!(units_added, node_count.0 - 1); + let mut parent_requests: Vec<_> = all_requests + .into_iter() + .filter_map(|request| match request { + Request::Coord(_) => None, + Request::ParentsOf(hash) => Some(hash), + }) + .collect(); + // all the round 4 non-forker units should be confused + assert_eq!(parent_requests.len(), node_count.0 - 1); + let committed_units = units + .iter() + .take(3) + .map(|units| { + units + .get(forker_id.0) + .expect("we have the forker's unit") + .clone() + }) + .map(|unit| Signed::sign(unit, &keychains[forker_id.0])) + .chain(Some(fork)) + .map(|unit| unit.into()) + .collect(); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.process_forking_notification(ForkingNotification::Units(committed_units), &store); + assert!(alerts.is_empty()); + // we already got the requests earlier, in parent_requests + assert!(requests.is_empty()); + assert!(!reconstructed_units.is_empty()); + // gotta also commit to the correct unit, so that it can get imported + let committed_units = units + .iter() + .take(4) + .map(|units| { + units + .get(forker_id.0) + .expect("we have the forker's unit") + .clone() + }) + .map(|unit| Signed::sign(unit, &keychains[forker_id.0]).into()) + .collect(); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.process_forking_notification(ForkingNotification::Units(committed_units), &store); + assert!(alerts.is_empty()); + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), 1); + let confused_unit = parent_requests.pop().expect("we chacked it's not empty"); + let parents = units + .get(3) + .expect("we have round 3 units") + .iter() + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + .map(|unit| unit.into()) + .collect(); + let DagResult { + units: reconstructed_units, + requests, + alerts, + } = dag.add_parents(confused_unit, parents, &store); + assert!(alerts.is_empty()); + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), 1); + assert_eq!(reconstructed_units[0].hash(), confused_unit); + } +} diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/dag/reconstruction/dag.rs similarity index 98% rename from consensus/src/reconstruction/dag.rs rename to consensus/src/dag/reconstruction/dag.rs index dcd17323..40871bf8 100644 --- a/consensus/src/reconstruction/dag.rs +++ b/consensus/src/dag/reconstruction/dag.rs @@ -1,5 +1,5 @@ use crate::{ - reconstruction::ReconstructedUnit, + dag::reconstruction::ReconstructedUnit, units::{HashFor, Unit}, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -123,7 +123,7 @@ impl Dag { #[cfg(test)] mod test { use crate::{ - reconstruction::{dag::Dag, ReconstructedUnit}, + dag::reconstruction::{dag::Dag, ReconstructedUnit}, units::{random_full_parent_units_up_to, TestingFullUnit, Unit}, Hasher, NodeCount, NodeIndex, NodeMap, }; diff --git a/consensus/src/dag/reconstruction/mod.rs b/consensus/src/dag/reconstruction/mod.rs new file mode 100644 index 00000000..9123bdab --- /dev/null +++ b/consensus/src/dag/reconstruction/mod.rs @@ -0,0 +1,359 @@ +use std::collections::HashMap; + +use crate::{ + extension::ExtenderUnit, + units::{ControlHash, HashFor, Unit, UnitCoord, WrappedUnit}, + Hasher, NodeMap, +}; + +mod dag; +mod parents; + +use dag::Dag; +use parents::Reconstruction as ParentReconstruction; + +/// A unit with its parents represented explicitly. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ReconstructedUnit { + unit: U, + parents: NodeMap>, +} + +impl ReconstructedUnit { + /// Returns a reconstructed unit if the parents agree with the hash, errors out otherwise. + pub fn with_parents(unit: U, parents: NodeMap>) -> Result { + match unit.control_hash().combined_hash + == ControlHash::::combine_hashes(&parents) + { + true => Ok(ReconstructedUnit { unit, parents }), + false => Err(unit), + } + } + + /// Reconstructs empty parents for a round 0 unit. + /// Assumes obviously incorrect units with wrong control hashes have been rejected earlier. + /// Will panic if called for any other kind of unit. + pub fn initial(unit: U) -> Self { + let n_members = unit.control_hash().n_members(); + assert!(unit.round() == 0, "Only the zeroth unit can be initial."); + ReconstructedUnit { + unit, + parents: NodeMap::with_size(n_members), + } + } + + /// The reconstructed parents, guaranteed to be correct. + pub fn parents(&self) -> &NodeMap> { + &self.parents + } + + /// Create an extender unit from this one. + pub fn extender_unit(&self) -> ExtenderUnit { + ExtenderUnit::new( + self.unit.creator(), + self.unit.round(), + self.hash(), + self.parents.clone(), + ) + } +} + +impl Unit for ReconstructedUnit { + type Hasher = U::Hasher; + + fn hash(&self) -> HashFor { + self.unit.hash() + } + + fn coord(&self) -> UnitCoord { + self.unit.coord() + } + + fn control_hash(&self) -> &ControlHash { + self.unit.control_hash() + } +} + +impl WrappedUnit for ReconstructedUnit { + type Wrapped = U; + + fn unpack(self) -> U { + self.unit + } +} + +/// What we need to request to reconstruct units. +#[derive(Debug, PartialEq, Eq)] +pub enum Request { + /// We need a unit at this coordinate. + Coord(UnitCoord), + /// We need the explicit list of parents for the unit identified by the hash. + /// This should only happen in the presence of forks, when optimistic reconstruction failed. + ParentsOf(H::Hash), +} + +/// The result of a reconstruction attempt. Might contain multiple reconstructed units, +/// as well as requests for some data that is needed for further reconstruction. +#[derive(Debug, PartialEq, Eq)] +pub struct ReconstructionResult { + /// All the units that got reconstructed. + pub units: Vec>, + /// Any requests that now should be made. + pub requests: Vec>, +} + +impl ReconstructionResult { + fn new(units: Vec>, requests: Vec>) -> Self { + ReconstructionResult { units, requests } + } + + fn empty() -> Self { + ReconstructionResult::new(Vec::new(), Vec::new()) + } + + fn reconstructed(unit: ReconstructedUnit) -> Self { + ReconstructionResult { + units: vec![unit], + requests: Vec::new(), + } + } + + fn request(request: Request) -> Self { + ReconstructionResult { + units: Vec::new(), + requests: vec![request], + } + } + + fn add_unit(&mut self, unit: ReconstructedUnit) { + self.units.push(unit); + } + + fn add_request(&mut self, request: Request) { + self.requests.push(request); + } + + fn accumulate(&mut self, other: ReconstructionResult) { + let ReconstructionResult { + mut units, + mut requests, + } = other; + self.units.append(&mut units); + self.requests.append(&mut requests); + } +} + +/// The reconstruction of the structure of the Dag. +/// When passed units containing control hashes, and responses to requests it produces, +/// it eventually outputs versions with explicit parents in an order conforming to the Dag order. +pub struct Reconstruction { + parents: ParentReconstruction, + dag: Dag, +} + +impl Reconstruction { + /// Create a new reconstruction. + pub fn new() -> Self { + let parents = ParentReconstruction::new(); + let dag = Dag::new(); + Reconstruction { parents, dag } + } + + fn handle_parents_reconstruction_result( + &mut self, + reconstruction_result: ReconstructionResult, + ) -> ReconstructionResult { + let ReconstructionResult { units, requests } = reconstruction_result; + let units = units + .into_iter() + .flat_map(|unit| self.dag.add_unit(unit)) + .collect(); + ReconstructionResult::new(units, requests) + } + + /// Add a unit to the reconstruction. + pub fn add_unit(&mut self, unit: U) -> ReconstructionResult { + let parent_reconstruction_result = self.parents.add_unit(unit); + self.handle_parents_reconstruction_result(parent_reconstruction_result) + } + + /// Add an explicit list of parents to the reconstruction. + pub fn add_parents( + &mut self, + unit: HashFor, + parents: HashMap>, + ) -> ReconstructionResult { + let parent_reconstruction_result = self.parents.add_parents(unit, parents); + self.handle_parents_reconstruction_result(parent_reconstruction_result) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use crate::{ + dag::reconstruction::{ReconstructedUnit, Reconstruction, ReconstructionResult, Request}, + units::{random_full_parent_units_up_to, Unit, UnitCoord}, + NodeCount, NodeIndex, + }; + + #[test] + fn reconstructs_initial_units() { + let mut reconstruction = Reconstruction::new(); + for unit in &random_full_parent_units_up_to(0, NodeCount(4), 43)[0] { + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_unit(unit.clone()); + assert!(requests.is_empty()); + assert_eq!(units.len(), 1); + let reconstructed_unit = units.pop().expect("just checked its there"); + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); + assert_eq!(reconstructed_unit.parents().item_count(), 0); + } + } + + #[test] + fn reconstructs_units_coming_in_order() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(7, NodeCount(4), 43); + for units in &dag { + for unit in units { + let round = unit.round(); + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_unit(unit.clone()); + assert!(requests.is_empty()); + assert_eq!(units.len(), 1); + let reconstructed_unit = units.pop().expect("just checked its there"); + match round { + 0 => { + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); + assert_eq!(reconstructed_unit.parents().item_count(), 0); + } + round => { + assert_eq!(reconstructed_unit.parents().item_count(), 4); + let parents = dag + .get((round - 1) as usize) + .expect("the parents are there"); + for (parent, reconstructed_parent) in + parents.iter().zip(reconstructed_unit.parents().values()) + { + assert_eq!(&parent.hash(), reconstructed_parent); + } + } + } + } + } + } + + #[test] + fn requests_all_parents() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(1, NodeCount(4), 43); + let unit = dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit"); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); + assert_eq!(requests.len(), 4); + } + + #[test] + fn requests_single_parent() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(1, NodeCount(4), 43); + for unit in dag.get(0).expect("just created").iter().skip(1) { + reconstruction.add_unit(unit.clone()); + } + let unit = dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit"); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); + assert_eq!(requests.len(), 1); + assert_eq!( + requests.last().expect("just checked"), + &Request::Coord(UnitCoord::new(0, NodeIndex(0))) + ); + } + + #[test] + fn reconstructs_units_coming_in_reverse_order() { + let mut reconstruction = Reconstruction::new(); + let mut dag = random_full_parent_units_up_to(7, NodeCount(4), 43); + dag.reverse(); + for units in dag.iter().take(7) { + for unit in units { + let ReconstructionResult { units, requests } = + reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); + assert_eq!(requests.len(), 4); + } + } + for unit in dag[7].iter().take(3) { + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(requests.is_empty()); + assert_eq!(units.len(), 1); + } + let ReconstructionResult { units, requests } = reconstruction.add_unit(dag[7][3].clone()); + assert!(requests.is_empty()); + assert_eq!(units.len(), 4 * 8 - 3); + } + + #[test] + fn handles_bad_hash() { + let node_count = NodeCount(7); + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(0, node_count, 43); + for unit in dag.get(0).expect("just created") { + reconstruction.add_unit(unit.clone()); + } + let other_dag = random_full_parent_units_up_to(1, node_count, 43); + let unit = other_dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit"); + let unit_hash = unit.hash(); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); + assert_eq!(requests.len(), 1); + assert_eq!( + requests.last().expect("just checked"), + &Request::ParentsOf(unit_hash), + ); + let parent_hashes: HashMap<_, _> = other_dag + .get(0) + .expect("other dag has initial units") + .iter() + .map(|unit| (unit.coord(), unit.hash())) + .collect(); + let ReconstructionResult { units, requests } = + reconstruction.add_parents(unit_hash, parent_hashes.clone()); + assert!(requests.is_empty()); + assert!(units.is_empty()); + let mut all_reconstructed = Vec::new(); + for other_initial in &other_dag[0] { + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_unit(other_initial.clone()); + assert!(requests.is_empty()); + all_reconstructed.append(&mut units); + } + // some of the initial units may randomly be identical, + // so all we can say that the last reconstructed unit should be the one we want + assert!(!all_reconstructed.is_empty()); + assert_eq!( + all_reconstructed.pop().expect("just checked").hash(), + unit_hash + ) + } +} diff --git a/consensus/src/reconstruction/parents.rs b/consensus/src/dag/reconstruction/parents.rs similarity index 75% rename from consensus/src/reconstruction/parents.rs rename to consensus/src/dag/reconstruction/parents.rs index 1692806a..b1bed339 100644 --- a/consensus/src/reconstruction/parents.rs +++ b/consensus/src/dag/reconstruction/parents.rs @@ -1,7 +1,7 @@ use crate::{ - reconstruction::ReconstructedUnit, + dag::reconstruction::{ReconstructedUnit, ReconstructionResult, Request}, units::{ControlHash, HashFor, Unit, UnitCoord}, - Hasher, NodeIndex, NodeMap, + NodeIndex, NodeMap, }; use std::collections::{hash_map::Entry, HashMap}; @@ -96,76 +96,6 @@ impl ReconstructingUnit { } } -/// What we need to request to reconstruct units. -#[derive(Debug, PartialEq, Eq)] -pub enum Request { - /// We need a unit at this coordinate. - Coord(UnitCoord), - /// We need the explicit list of parents for the unit identified by the hash. - /// This should only happen in the presence of forks, when optimistic reconstruction failed. - ParentsOf(H::Hash), -} - -/// The result of a reconstruction attempt. Might contain multiple reconstructed units, -/// as well as requests for some data that is needed for further reconstruction. -#[derive(Debug, PartialEq, Eq)] -pub struct ReconstructionResult { - reconstructed_units: Vec>, - requests: Vec>, -} - -impl ReconstructionResult { - fn new() -> Self { - ReconstructionResult { - reconstructed_units: Vec::new(), - requests: Vec::new(), - } - } - - fn reconstructed(unit: ReconstructedUnit) -> Self { - ReconstructionResult { - reconstructed_units: vec![unit], - requests: Vec::new(), - } - } - - fn request(request: Request) -> Self { - ReconstructionResult { - reconstructed_units: Vec::new(), - requests: vec![request], - } - } - - fn add_unit(&mut self, unit: ReconstructedUnit) { - self.reconstructed_units.push(unit); - } - - fn add_request(&mut self, request: Request) { - self.requests.push(request); - } - - fn accumulate(&mut self, other: ReconstructionResult) { - let ReconstructionResult { - mut reconstructed_units, - mut requests, - } = other; - self.reconstructed_units.append(&mut reconstructed_units); - self.requests.append(&mut requests); - } -} - -impl From> - for (Vec>, Vec>) -{ - fn from(result: ReconstructionResult) -> Self { - let ReconstructionResult { - reconstructed_units, - requests, - } = result; - (reconstructed_units, requests) - } -} - /// Receives units with control hashes and reconstructs their parents. pub struct Reconstruction { reconstructing_units: HashMap, ReconstructingUnit>, @@ -195,7 +125,7 @@ impl Reconstruction { Reconstructed(unit) => ReconstructionResult::reconstructed(unit), InProgress(unit) => { self.reconstructing_units.insert(child_hash, unit); - ReconstructionResult::new() + ReconstructionResult::empty() } RequestParents(unit) => { let hash = unit.as_unit().hash(); @@ -205,13 +135,13 @@ impl Reconstruction { }, // We might have reconstructed the unit through explicit parents if someone sent them to us for no reason, // in which case we don't have it any more. - None => ReconstructionResult::new(), + None => ReconstructionResult::empty(), } } /// Add a unit and start reconstructing its parents. pub fn add_unit(&mut self, unit: U) -> ReconstructionResult { - let mut result = ReconstructionResult::new(); + let mut result = ReconstructionResult::empty(); let unit_hash = unit.hash(); if self.reconstructing_units.contains_key(&unit_hash) { // We already received this unit once, no need to do anything. @@ -275,10 +205,10 @@ impl Reconstruction { Ok(unit) => ReconstructionResult::reconstructed(unit), Err(unit) => { self.reconstructing_units.insert(unit_hash, unit); - ReconstructionResult::new() + ReconstructionResult::empty() } }, - None => ReconstructionResult::new(), + None => ReconstructionResult::empty(), } } } @@ -288,9 +218,8 @@ mod test { use std::collections::HashMap; use crate::{ - reconstruction::{ - parents::{Reconstruction, Request}, - ReconstructedUnit, + dag::reconstruction::{ + parents::Reconstruction, ReconstructedUnit, ReconstructionResult, Request, }, units::{random_full_parent_units_up_to, Unit, UnitCoord}, NodeCount, NodeIndex, @@ -300,10 +229,13 @@ mod test { fn reconstructs_initial_units() { let mut reconstruction = Reconstruction::new(); for unit in &random_full_parent_units_up_to(0, NodeCount(4), 43)[0] { - let (mut reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_unit(unit.clone()); assert!(requests.is_empty()); - assert_eq!(reconstructed_units.len(), 1); - let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + assert_eq!(units.len(), 1); + let reconstructed_unit = units.pop().expect("just checked its there"); assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); assert_eq!(reconstructed_unit.parents().item_count(), 0); } @@ -316,11 +248,13 @@ mod test { for units in &dag { for unit in units { let round = unit.round(); - let (mut reconstructed_units, requests) = - reconstruction.add_unit(unit.clone()).into(); + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_unit(unit.clone()); assert!(requests.is_empty()); - assert_eq!(reconstructed_units.len(), 1); - let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + assert_eq!(units.len(), 1); + let reconstructed_unit = units.pop().expect("just checked its there"); match round { 0 => { assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); @@ -351,8 +285,8 @@ mod test { .expect("just created") .last() .expect("we have a unit"); - let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); - assert!(reconstructed_units.is_empty()); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); assert_eq!(requests.len(), 4); } @@ -368,8 +302,8 @@ mod test { .expect("just created") .last() .expect("we have a unit"); - let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); - assert!(reconstructed_units.is_empty()); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); assert_eq!(requests.len(), 1); assert_eq!( requests.last().expect("just checked"), @@ -383,20 +317,22 @@ mod test { let mut dag = random_full_parent_units_up_to(7, NodeCount(4), 43); dag.reverse(); for unit in dag.get(0).expect("we have the top units") { - let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); - assert!(reconstructed_units.is_empty()); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); assert_eq!(requests.len(), 4); } let mut total_reconstructed = 0; for mut units in dag.into_iter().skip(1) { let last_unit = units.pop().expect("we have the unit"); for unit in units { - let (reconstructed_units, _) = reconstruction.add_unit(unit.clone()).into(); - total_reconstructed += reconstructed_units.len(); + let ReconstructionResult { units, requests: _ } = + reconstruction.add_unit(unit.clone()); + total_reconstructed += units.len(); } - let (reconstructed_units, _) = reconstruction.add_unit(last_unit.clone()).into(); - total_reconstructed += reconstructed_units.len(); - assert!(reconstructed_units.len() >= 4); + let ReconstructionResult { units, requests: _ } = + reconstruction.add_unit(last_unit.clone()); + total_reconstructed += units.len(); + assert!(units.len() >= 4); } assert_eq!(total_reconstructed, 4 * 8); } @@ -415,8 +351,8 @@ mod test { .last() .expect("we have a unit"); let unit_hash = unit.hash(); - let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); - assert!(reconstructed_units.is_empty()); + let ReconstructionResult { units, requests } = reconstruction.add_unit(unit.clone()); + assert!(units.is_empty()); assert_eq!(requests.len(), 1); assert_eq!( requests.last().expect("just checked"), @@ -428,12 +364,13 @@ mod test { .iter() .map(|unit| (unit.coord(), unit.hash())) .collect(); - let (mut reconstructed_units, requests) = reconstruction - .add_parents(unit_hash, parent_hashes.clone()) - .into(); + let ReconstructionResult { + mut units, + requests, + } = reconstruction.add_parents(unit_hash, parent_hashes.clone()); assert!(requests.is_empty()); - assert_eq!(reconstructed_units.len(), 1); - let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + assert_eq!(units.len(), 1); + let reconstructed_unit = units.pop().expect("just checked its there"); assert_eq!(reconstructed_unit.parents().item_count(), 4); for (coord, parent_hash) in parent_hashes { assert_eq!( diff --git a/consensus/src/validation.rs b/consensus/src/dag/validation.rs similarity index 83% rename from consensus/src/validation.rs rename to consensus/src/dag/validation.rs index bc0ba262..65f5c62e 100644 --- a/consensus/src/validation.rs +++ b/consensus/src/dag/validation.rs @@ -1,7 +1,7 @@ use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; use crate::{ - alerts::{Alert, ForkingNotification}, + alerts::Alert, units::{ SignedUnit, UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, ValidationError, Validator as UnitValidator, WrappedUnit, @@ -147,7 +147,8 @@ impl Validator { Ok(unit) } - fn validate_committed>>( + /// Validate a committed unit, it has to be from a forker. + pub fn validate_committed>>( &mut self, unit: UncheckedSignedUnit, store: &UnitStore, @@ -161,26 +162,6 @@ impl Validator { Ok(unit) } - /// Process a forking notification, potentially returning a lot of unit processing results. - pub fn process_forking_notification>>( - &mut self, - notification: ForkingNotification, - store: &UnitStore, - ) -> Vec> { - use ForkingNotification::*; - match notification { - Forker((unit, other_unit)) => { - // Just treat them as normal incoming units, if they are a forking proof - // this will either trigger a new forker or we already knew about this one. - vec![self.validate(unit, store), self.validate(other_unit, store)] - } - Units(units) => units - .into_iter() - .map(|unit| self.validate_committed(unit, store)) - .collect(), - } - } - /// Signal that a unit finished processing and thus it's copy no longer has to be kept for fork detection. /// NOTE: This is only a memory optimization, if the units stay there forever everything still works. pub fn finished_processing(&mut self, unit: &H::Hash) { @@ -199,12 +180,11 @@ impl Validator { #[cfg(test)] mod test { use crate::{ - alerts::ForkingNotification, + dag::validation::{Error, Validator}, units::{ random_full_parent_units_up_to, Unit, UnitStore, Validator as UnitValidator, WrappedSignedUnit, }, - validation::{Error, Validator}, NodeCount, NodeIndex, Signed, }; use aleph_bft_mock::Keychain; @@ -400,40 +380,6 @@ mod test { } } - #[test] - fn detects_fork_through_notification() { - let node_count = NodeCount(7); - let session_id = 0; - let max_round = 2137; - let keychains: Vec<_> = node_count - .into_iterator() - .map(|node_id| Keychain::new(node_count, node_id)) - .collect(); - let store = UnitStore::::new(node_count); - let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); - let unit = random_full_parent_units_up_to(2, node_count, session_id) - .get(2) - .expect("we have the requested round") - .get(0) - .expect("we have the unit for the zeroth creator") - .clone(); - let unit = Signed::sign(unit, &keychains[0]); - let fork = random_full_parent_units_up_to(2, node_count, session_id) - .get(2) - .expect("we have the requested round") - .get(0) - .expect("we have the unit for the zeroth creator") - .clone(); - let fork = Signed::sign(fork, &keychains[0]); - let results = validator.process_forking_notification( - ForkingNotification::Forker((unit.clone().into(), fork.into())), - &store, - ); - assert_eq!(results.len(), 2); - assert_eq!(results[0], Ok(unit.clone())); - assert!(matches!(results[1], Err(Error::NewForker(_)))); - } - #[test] fn accepts_committed() { let node_count = NodeCount(7); @@ -469,15 +415,9 @@ mod test { validator.validate(fork.clone().into(), &store), Err(Error::NewForker(_)) )); - let results = validator.process_forking_notification( - ForkingNotification::Units(units.clone().into_iter().map(|unit| unit.into()).collect()), - &store, + assert_eq!( + validator.validate_committed(fork.clone().into(), &store), + Ok(fork) ); - for (unit, result) in units.iter().zip(results.iter()).take(3) { - assert_eq!(result, &Err(Error::Duplicate(unit.clone()))); - } - for (unit, result) in units.iter().zip(results.iter()).skip(3) { - assert_eq!(result, &Ok(unit.clone())); - } } } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 97abfe62..a33a23bf 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -4,16 +4,14 @@ mod alerts; mod config; -mod consensus; mod creation; +mod dag; mod extension; mod member; mod network; -mod reconstruction; mod runway; mod terminator; mod units; -mod validation; mod backup; mod task_queue; diff --git a/consensus/src/reconstruction/mod.rs b/consensus/src/reconstruction/mod.rs deleted file mode 100644 index 51ac5b35..00000000 --- a/consensus/src/reconstruction/mod.rs +++ /dev/null @@ -1,181 +0,0 @@ -use crate::{ - extension::ExtenderUnit, - runway::ExplicitParents, - units::{ControlHash, HashFor, Unit, UnitCoord, WrappedUnit}, - NodeMap, Receiver, Sender, Terminator, -}; -use futures::{FutureExt, StreamExt}; -use log::{debug, trace, warn}; - -mod dag; -mod parents; - -use dag::Dag; -pub use parents::Request; -use parents::{Reconstruction, ReconstructionResult}; - -const LOG_TARGET: &str = "AlephBFT-reconstruction"; - -/// A unit with its parents represented explicitly. -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ReconstructedUnit { - unit: U, - parents: NodeMap>, -} - -impl ReconstructedUnit { - /// Returns a reconstructed unit if the parents agree with the hash, errors out otherwise. - pub fn with_parents(unit: U, parents: NodeMap>) -> Result { - match unit.control_hash().combined_hash - == ControlHash::::combine_hashes(&parents) - { - true => Ok(ReconstructedUnit { unit, parents }), - false => Err(unit), - } - } - - /// Reconstructs empty parents for a round 0 unit. - /// Assumes obviously incorrect units with wrong control hashes have been rejected earlier. - /// Will panic if called for any other kind of unit. - pub fn initial(unit: U) -> Self { - let n_members = unit.control_hash().n_members(); - assert!(unit.round() == 0, "Only the zeroth unit can be initial."); - ReconstructedUnit { - unit, - parents: NodeMap::with_size(n_members), - } - } - - /// The reconstructed parents, guaranteed to be correct. - pub fn parents(&self) -> &NodeMap> { - &self.parents - } - - /// Create an extender unit from this one. - pub fn extender_unit(&self) -> ExtenderUnit { - ExtenderUnit::new( - self.unit.creator(), - self.unit.round(), - self.hash(), - self.parents.clone(), - ) - } -} - -impl Unit for ReconstructedUnit { - type Hasher = U::Hasher; - - fn hash(&self) -> HashFor { - self.unit.hash() - } - - fn coord(&self) -> UnitCoord { - self.unit.coord() - } - - fn control_hash(&self) -> &ControlHash { - self.unit.control_hash() - } -} - -impl WrappedUnit for ReconstructedUnit { - type Wrapped = U; - - fn unpack(self) -> U { - self.unit - } -} - -/// The service responsible for reconstructing the structure of the Dag. -/// Receives units containing control hashes and eventually outputs versions -/// with explicit parents in an order conforming to the Dag order. -pub struct Service { - reconstruction: Reconstruction, - dag: Dag, - units_from_runway: Receiver, - parents_from_runway: Receiver>, - requests_for_runway: Sender>, - units_for_runway: Sender>, - units_for_creator: Sender>, -} - -impl Service { - /// Create a new reconstruction service with the provided IO channels. - pub fn new( - units_from_runway: Receiver, - parents_from_runway: Receiver>, - requests_for_runway: Sender>, - units_for_runway: Sender>, - units_for_creator: Sender>, - ) -> Self { - let reconstruction = Reconstruction::new(); - let dag = Dag::new(); - Service { - reconstruction, - dag, - units_from_runway, - parents_from_runway, - requests_for_runway, - units_for_runway, - units_for_creator, - } - } - - fn handle_reconstruction_result( - &mut self, - reconstruction_result: ReconstructionResult, - ) -> bool { - let (units, requests) = reconstruction_result.into(); - trace!(target: LOG_TARGET, "Reconstructed {} units, and have {} requests.", units.len(), requests.len()); - for request in requests { - if self.requests_for_runway.unbounded_send(request).is_err() { - warn!(target: LOG_TARGET, "Request channel should be open."); - return false; - } - } - for unit in units { - for unit in self.dag.add_unit(unit) { - if self.units_for_creator.unbounded_send(unit.clone()).is_err() { - warn!(target: LOG_TARGET, "Creator channel should be open."); - return false; - } - if self.units_for_runway.unbounded_send(unit).is_err() { - warn!(target: LOG_TARGET, "Notification channel should be open."); - return false; - } - } - } - true - } - - /// Run the reconstruction service until terminated. - pub async fn run(mut self, mut terminator: Terminator) { - loop { - let reconstruction_result = futures::select! { - unit = self.units_from_runway.next() => match unit { - Some(unit) => self.reconstruction.add_unit(unit), - None => { - warn!(target: LOG_TARGET, "Units for reconstruction unexpectedly ended."); - return; - } - }, - parents = self.parents_from_runway.next() => match parents { - Some((unit, parents)) => self.reconstruction.add_parents(unit, parents), - None => { - warn!(target: LOG_TARGET, "Parents for reconstruction unexpectedly ended."); - return; - } - }, - _ = terminator.get_exit().fuse() => { - debug!(target: LOG_TARGET, "Received exit signal."); - break; - } - }; - if !self.handle_reconstruction_result(reconstruction_result) { - return; - } - } - debug!(target: LOG_TARGET, "Reconstruction decided to exit."); - terminator.terminate_sync().await; - } -} diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index bf817946..13e6a49c 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -1,31 +1,30 @@ use crate::{ alerts::{Alert, ForkingNotification, NetworkMessage}, - consensus, + creation, + dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest}, extension::{ExtenderUnit, Service as Extender}, handle_task_termination, member::UnitMessage, - reconstruction::{ReconstructedUnit, Request as ReconstructionRequest}, units::{ - SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, - Validator as UnitValidator, WrappedUnit, + SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, Validator, + WrappedUnit, }, - validation::{Error as ValidationError, Validator, ValidatorStatus}, Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round, Sender, Signature, Signed, SpawnHandle, Terminator, UncheckedSigned, }; use aleph_bft_types::Recipient; -use futures::AsyncWrite; use futures::{ channel::{mpsc, oneshot}, - pin_mut, AsyncRead, Future, FutureExt, StreamExt, + future::pending, + pin_mut, AsyncRead, AsyncWrite, Future, FutureExt, StreamExt, }; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use std::{ cmp::max, - collections::{HashMap, HashSet}, + collections::HashSet, convert::TryFrom, fmt::{Display, Formatter, Result as FmtResult}, marker::PhantomData, @@ -39,8 +38,6 @@ use crate::backup::{BackupLoader, BackupSaver}; use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; -pub type ExplicitParents = (::Hash, HashMap::Hash>); - /// Possible requests for information from other nodes. pub enum Request { Coord(UnitCoord), @@ -114,17 +111,14 @@ where missing_parents: HashSet, store: UnitStore>>, keychain: MK, - validator: Validator, + dag: Dag, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, resolved_requests: Sender>, - units_for_reconstruction: Sender>, - parents_for_reconstruction: Sender>, - units_from_reconstruction: Receiver>>, - requests_from_reconstruction: Receiver>, + parents_for_creator: Sender>>, ordered_batch_rx: Receiver>, finalization_handler: FH, backup_units_for_saver: Sender>, @@ -137,16 +131,14 @@ where struct RunwayStatus<'a, H: Hasher> { missing_coords: &'a HashSet, missing_parents: &'a HashSet, - validator_status: ValidatorStatus, - dag_status: UnitStoreStatus, + dag_status: DagStatus, + store_status: UnitStoreStatus, } impl<'a, H: Hasher> RunwayStatus<'a, H> { fn short_report(&self) -> String { - let rounds_behind = max( - self.validator_status.top_round(), - self.dag_status.top_round(), - ) - self.dag_status.top_round(); + let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round()) + - self.store_status.top_round(); let missing_coords = self.missing_coords.len(); match (rounds_behind, missing_coords) { (0..=2, 0) => "healthy".to_string(), @@ -223,13 +215,10 @@ struct RunwayConfig, MK: MultiKey backup_units_from_saver: Receiver>, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, - units_for_reconstruction: Sender>, - parents_for_reconstruction: Sender>, - units_from_reconstruction: Receiver>>, - requests_from_reconstruction: Receiver>, unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, + parents_for_creator: Sender>>, ordered_batch_rx: Receiver>, resolved_requests: Sender>, new_units_from_creation: Receiver>, @@ -242,7 +231,7 @@ where FH: FinalizationHandler, MK: MultiKeychain, { - fn new(config: RunwayConfig, keychain: MK, validator: UnitValidator) -> Self { + fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { let n_members = keychain.node_count(); let RunwayConfig { finalization_handler, @@ -251,24 +240,21 @@ where backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, - units_for_reconstruction, - parents_for_reconstruction, - units_from_reconstruction, - requests_from_reconstruction, unit_messages_from_network, unit_messages_for_network, responses_for_collection, + parents_for_creator, ordered_batch_rx, resolved_requests, new_units_from_creation, } = config; let store = UnitStore::new(n_members); - let validator = Validator::new(validator); + let dag = Dag::new(validator); Runway { store, keychain, - validator, + dag, missing_coords: HashSet::new(), missing_parents: HashSet::new(), resolved_requests, @@ -276,10 +262,7 @@ where notifications_from_alerter, unit_messages_from_network, unit_messages_for_network, - units_for_reconstruction, - parents_for_reconstruction, - units_from_reconstruction, - requests_from_reconstruction, + parents_for_creator, ordered_batch_rx, finalization_handler, backup_units_for_saver, @@ -295,13 +278,31 @@ where self.keychain.index() } - fn on_unit_received(&mut self, unit: UncheckedSignedUnit) { - match self.validator.validate(unit, &self.store) { - Ok(unit) => self.send_unit_for_reconstruction(unit), - Err(e) => self.handle_validation_error(e), + fn handle_dag_result(&mut self, result: DagResult) { + let DagResult { + units, + requests, + alerts, + } = result; + for unit in units { + self.on_unit_reconstructed(unit); + } + for request in requests { + self.on_reconstruction_request(request); + } + for alert in alerts { + if self.alerts_for_alerter.unbounded_send(alert).is_err() { + warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); + self.exiting = true; + } } } + fn on_unit_received(&mut self, unit: UncheckedSignedUnit) { + let result = self.dag.add_unit(unit, &self.store); + self.handle_dag_result(result); + } + fn on_unit_message(&mut self, message: RunwayNotificationIn) { match message { RunwayNotificationIn::NewUnit(u) => { @@ -416,75 +417,24 @@ where } } - fn handle_validation_error(&mut self, e: ValidationError) { - use ValidationError::*; - match e { - Invalid(e) => { - warn!(target: "AlephBFT-runway", "Received unit failing validation: {}", e) - } - Duplicate(su) => { - trace!(target: "AlephBFT-runway", "Received unit with hash {:?} again.", su.hash()) - } - Uncommitted(su) => { - debug!(target: "AlephBFT-runway", "Received unit with hash {:?} created by known forker {:?} for which we don't have a commitment, discarding.", su.hash(), su.creator()) - } - NewForker(alert) => { - warn!(target: "AlephBFT-runway", "New forker detected."); - trace!(target: "AlephBFT-runway", "Created alert: {:?}.", alert); - if self.alerts_for_alerter.unbounded_send(*alert).is_err() { - warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); - self.exiting = true; - } - } - } - } - fn on_parents_response( &mut self, u_hash: H::Hash, parents: Vec>, ) { - use ValidationError::*; if self.store.unit(&u_hash).is_some() { trace!(target: "AlephBFT-runway", "{:?} We got parents response but already imported the unit.", self.index()); return; } - let mut parent_hashes = HashMap::new(); - for unit in parents.into_iter() { - let su = match self.validator.validate(unit, &self.store) { - Ok(su) => { - self.send_unit_for_reconstruction(su.clone()); - su - } - Err(Duplicate(su)) => { - trace!(target: "AlephBFT-runway", "Already have parent {:?}.", su.hash()); - su - } - Err(Uncommitted(su)) => { - debug!(target: "AlephBFT-runway", "Received uncommitted parent {:?}, we should get the commitment soon.", su.hash()); - su - } - Err(NewForker(alert)) => { - warn!(target: "AlephBFT-runway", "New forker detected."); - trace!(target: "AlephBFT-runway", "Created alert: {:?}.", alert); - if self.alerts_for_alerter.unbounded_send(*alert).is_err() { - warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); - self.exiting = true; - } - // technically this was a correct unit, so we could have passed it on, - // but this will happen at most once and we will receive the parent - // response again, so we just discard it now - return; - } - Err(Invalid(e)) => { - warn!(target: "AlephBFT-runway", "{:?} In received parent response received a unit that does not pass validation: {}", self.index(), e); - return; - } - }; - parent_hashes.insert(su.coord(), su.hash()); - } + let result = self.dag.add_parents(u_hash, parents, &self.store); + self.handle_dag_result(result); + } - self.send_parents_for_reconstruction((u_hash, parent_hashes)); + fn on_forking_notification(&mut self, notification: ForkingNotification) { + let result = self + .dag + .process_forking_notification(notification, &self.store); + self.handle_dag_result(result); } fn resolve_missing_parents(&mut self, u_hash: &H::Hash) { @@ -493,17 +443,6 @@ where } } - fn on_unit_created(&mut self, unit: SignedUnit) { - debug!(target: "AlephBFT-runway", "{:?} On create notification.", self.index()); - let signed_unit = self - .validator - .validate(unit.into_unchecked(), &self.store) - .expect("unit created by us is correct and not a fork"); - // we are guaranteed this will immediately succeed, because creation uses the first units we add to the dag, - // i.e. the canonical units in the store - self.send_unit_for_reconstruction(signed_unit); - } - fn on_reconstruction_request(&mut self, request: ReconstructionRequest) { use ReconstructionRequest::*; match request { @@ -519,7 +458,6 @@ where fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit>) { let unit_hash = unit.hash(); trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord()); - self.validator.finished_processing(&unit_hash); self.store.insert(unit.clone()); self.resolve_missing_parents(&unit_hash); self.resolve_missing_coord(&unit.coord()); @@ -531,6 +469,14 @@ where error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to extender: {:?}.", self.index(), unit_hash); self.exiting = true; } + if self + .parents_for_creator + .unbounded_send(unit.clone()) + .is_err() + { + warn!(target: "AlephBFT-runway", "Creator channel should be open."); + self.exiting = true; + } if self .backup_units_for_saver .unbounded_send(unit.unpack().into()) @@ -606,31 +552,12 @@ where } } - fn send_unit_for_reconstruction(&mut self, unit: SignedUnit) { - trace!(target: "AlephBFT-runway", "Sending unit {:?} {} to reconstruction.", unit.hash(), unit.coord()); - if self.units_for_reconstruction.unbounded_send(unit).is_err() { - warn!(target: "AlephBFT-runway", "{:?} Unit channel to reconstruction should be open", self.index()); - self.exiting = true; - } - } - - fn send_parents_for_reconstruction(&mut self, parents: ExplicitParents) { - if self - .parents_for_reconstruction - .unbounded_send(parents) - .is_err() - { - warn!(target: "AlephBFT-runway", "{:?} Parents channel to reconstruction should be open", self.index()); - self.exiting = true; - } - } - fn status(&self) -> RunwayStatus<'_, H> { RunwayStatus { missing_coords: &self.missing_coords, missing_parents: &self.missing_parents, - validator_status: self.validator.status(), - dag_status: self.store.status(), + dag_status: self.dag.status(), + store_status: self.store.status(), } } @@ -665,24 +592,8 @@ where debug!(target: "AlephBFT-runway", "{:?} Runway started.", index); loop { futures::select! { - unit = self.units_from_reconstruction.next() => match unit { - Some(unit) => self.on_unit_reconstructed(unit), - None => { - error!(target: "AlephBFT-runway", "{:?} Reconstructed unit stream closed.", index); - break; - } - }, - - request = self.requests_from_reconstruction.next() => match request { - Some(request) => self.on_reconstruction_request(request), - None => { - error!(target: "AlephBFT-runway", "{:?} Reconstruction request stream closed.", index); - break; - } - }, - signed_unit = self.new_units_from_creation.next() => match signed_unit { - Some(signed_unit) => self.on_unit_created(signed_unit), + Some(signed_unit) => self.on_unit_received(signed_unit.into()), None => { error!(target: "AlephBFT-runway", "{:?} Creation stream closed.", index); break; @@ -692,15 +603,7 @@ where notification = self.notifications_from_alerter.next() => match notification { Some(notification) => { trace!(target: "AlephBFT-runway", "Received alerter notification: {:?}.", notification); - for result in self.validator.process_forking_notification(notification, &self.store) { - match result { - Ok(unit) => { - trace!(target: "AlephBFT-runway", "Validated unit {:?} from alerter.", unit.hash()); - self.send_unit_for_reconstruction(unit) - }, - Err(e) => self.handle_validation_error(e), - } - } + self.on_forking_notification(notification); }, None => { error!(target: "AlephBFT-runway", "{:?} Alert notification stream closed.", index); @@ -764,7 +667,7 @@ pub(crate) struct NetworkIO { #[cfg(feature = "initial_unit_collection")] fn initial_unit_collection<'a, H: Hasher, D: Data, MK: MultiKeychain>( keychain: &'a MK, - validator: &'a UnitValidator, + validator: &'a Validator, unit_messages_for_network: &Sender>, unit_collection_sender: oneshot::Sender, responses_from_runway: Receiver>, @@ -865,38 +768,41 @@ pub(crate) async fn run( _phantom: _, } = runway_io; - let (units_for_reconstruction, units_from_runway) = mpsc::unbounded(); - let (parents_for_reconstruction, parents_from_runway) = mpsc::unbounded(); - let (units_for_runway, units_from_reconstruction) = mpsc::unbounded(); - let (requests_for_runway, requests_from_reconstruction) = mpsc::unbounded(); let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); let (new_units_for_runway, new_units_from_creation) = mpsc::unbounded(); - let consensus_terminator = terminator.add_offspring_connection("AlephBFT-consensus"); - let consensus_config = config.clone(); - let consensus_spawner = spawn_handle.clone(); + let (parents_for_creator, parents_from_runway) = mpsc::unbounded(); + let creation_terminator = terminator.add_offspring_connection("creator"); + let creation_config = config.clone(); let (starting_round_sender, starting_round) = oneshot::channel(); - let consensus_keychain = keychain.clone(); - let consensus_handle = spawn_handle.spawn_essential("runway/consensus", async move { - consensus::run( - consensus_config, - consensus::IO { - units_from_runway, - parents_from_runway, - units_for_runway, - requests_for_runway, - new_units_for_runway, - data_provider, + let creation_keychain = keychain.clone(); + let creation_handle = spawn_handle + .spawn_essential("runway/creation", async move { + creation::run( + creation_config, + creation::IO { + outgoing_units: new_units_for_runway, + incoming_parents: parents_from_runway, + data_provider, + }, + creation_keychain, starting_round, - }, - consensus_keychain, - consensus_spawner, - consensus_terminator, - ) - .await - }); - let mut consensus_handle = consensus_handle.fuse(); + creation_terminator, + ) + .await + }) + .shared(); + let creator_handle_for_panic = creation_handle.clone(); + let creator_panic_handle = async move { + if creator_handle_for_panic.await.is_err() { + return; + } + pending().await + } + .fuse(); + pin_mut!(creator_panic_handle); + let creation_handle = creation_handle.fuse(); let (backup_units_for_saver, backup_units_from_runway) = mpsc::unbounded(); let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded(); @@ -942,7 +848,7 @@ pub(crate) async fn run( .fuse(); let index = keychain.index(); - let validator = UnitValidator::new(config.session_id(), keychain.clone(), config.max_round()); + let validator = Validator::new(config.session_id(), keychain.clone(), config.max_round()); let (responses_for_collection, responses_from_runway) = mpsc::unbounded(); let (unit_collections_sender, unit_collection_result) = oneshot::channel(); let (loaded_data_tx, loaded_data_rx) = oneshot::channel(); @@ -1001,12 +907,9 @@ pub(crate) async fn run( backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, - units_for_reconstruction, - parents_for_reconstruction, - units_from_reconstruction, - requests_from_reconstruction, unit_messages_from_network: network_io.unit_messages_from_network, unit_messages_for_network: network_io.unit_messages_for_network, + parents_for_creator, ordered_batch_rx, responses_for_collection, resolved_requests: network_io.resolved_requests, @@ -1035,8 +938,8 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Alerter task terminated early.", index); break; }, - _ = consensus_handle => { - debug!(target: "AlephBFT-runway", "{:?} Consensus task terminated early.", index); + _ = creator_panic_handle => { + debug!(target: "AlephBFT-runway", "{:?} creator task terminated early with its task being dropped.", index); break; }, _ = backup_saver_handle => { @@ -1059,7 +962,7 @@ pub(crate) async fn run( terminator.terminate_sync().await; handle_task_termination(extender_handle, "AlephBFT-runway", "Extender", index).await; - handle_task_termination(consensus_handle, "AlephBFT-runway", "Consensus", index).await; + handle_task_termination(creation_handle, "AlephBFT-runway", "Creator", index).await; handle_task_termination(alerter_handle, "AlephBFT-runway", "Alerter", index).await; handle_task_termination(runway_handle, "AlephBFT-runway", "Runway", index).await; handle_task_termination(backup_saver_handle, "AlephBFT-runway", "BackupSaver", index).await; diff --git a/consensus/src/testing/consensus.rs b/consensus/src/testing/consensus.rs deleted file mode 100644 index 3c3ea75b..00000000 --- a/consensus/src/testing/consensus.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::{ - consensus, - reconstruction::Request as GenericRequest, - testing::{complete_oneshot, gen_config, gen_delay_config, init_log}, - units::{preunit_to_full_unit, random_full_parent_units_up_to, ControlHash, PreUnit, Unit}, - Hasher, NodeCount, NodeIndex, NodeMap, Signed, SpawnHandle, Terminator, -}; -use aleph_bft_mock::{DataProvider, Hasher64, Keychain, Spawner}; -use futures::{ - channel::{mpsc::unbounded, oneshot}, - sink::SinkExt, - stream::StreamExt, -}; -use log::trace; - -type Request = GenericRequest; - -#[tokio::test] -async fn catches_wrong_control_hash() { - init_log(); - let n_nodes = NodeCount(4); - let spawner = Spawner::new(); - let node_ix = NodeIndex(0); - let (mut units_for_consensus, units_from_us) = unbounded(); - let (_parents_for_consensus, parents_from_us) = unbounded(); - let (units_for_us, _units_from_consensus) = unbounded(); - let (requests_for_us, mut requests_from_consensus) = unbounded(); - let (new_units_for_us, _new_units_from_consensus) = unbounded(); - - let conf = gen_config(node_ix, n_nodes, gen_delay_config()); - let (exit_tx, exit_rx) = oneshot::channel(); - let starting_round = complete_oneshot(Some(0)); - let keychains: Vec<_> = n_nodes - .into_iterator() - .map(|node_id| Keychain::new(n_nodes, node_id)) - .collect(); - let keychain = keychains[node_ix.0]; - let data_provider = DataProvider::new(); - - let consensus_handle = spawner.spawn_essential( - "consensus", - consensus::run( - conf, - consensus::IO { - units_from_runway: units_from_us, - parents_from_runway: parents_from_us, - units_for_runway: units_for_us, - requests_for_runway: requests_for_us, - new_units_for_runway: new_units_for_us, - data_provider, - starting_round, - }, - keychain, - spawner, - Terminator::create_root(exit_rx, "AlephBFT-consensus"), - ), - ); - let other_initial_units: Vec<_> = random_full_parent_units_up_to(0, n_nodes, 0) - .pop() - .expect("created initial units") - .into_iter() - .map(|unit| { - let keychain = keychains - .get(unit.creator().0) - .expect("we have the keychains"); - Signed::sign(unit, keychain) - }) - .collect(); - for unit in &other_initial_units { - units_for_consensus - .send(unit.clone()) - .await - .expect("channel works"); - } - let mut parent_hashes = NodeMap::with_size(n_nodes); - for unit in other_initial_units.into_iter() { - parent_hashes.insert(unit.creator(), unit.hash()); - } - let bad_pu = PreUnit::::new(1.into(), 1, ControlHash::new(&parent_hashes)); - let bad_control_hash: ::Hash = [0, 1, 0, 1, 0, 1, 0, 1]; - assert!( - bad_control_hash != bad_pu.control_hash().combined_hash, - "Bad control hash cannot be the correct one." - ); - let mut control_hash = bad_pu.control_hash().clone(); - control_hash.combined_hash = bad_control_hash; - let bad_pu = PreUnit::new(bad_pu.creator(), bad_pu.round(), control_hash); - let keychain = &keychains[bad_pu.creator().0]; - let bad_unit = Signed::sign(preunit_to_full_unit(bad_pu, 0), keychain); - let unit_hash = bad_unit.hash(); - units_for_consensus - .send(bad_unit) - .await - .expect("channel is open"); - loop { - let request = requests_from_consensus - .next() - .await - .expect("channel is open"); - trace!(target: "consensus-test", "request {:?}", request); - if let Request::ParentsOf(h) = request { - assert_eq!(h, unit_hash, "Expected notification for our bad unit."); - break; - } - } - - let _ = exit_tx.send(()); - - consensus_handle.await.expect("The node is honest."); -} diff --git a/consensus/src/testing/dag.rs b/consensus/src/testing/dag.rs index 07943c2c..ec5f92ce 100644 --- a/consensus/src/testing/dag.rs +++ b/consensus/src/testing/dag.rs @@ -1,14 +1,16 @@ use crate::{ - consensus, + alerts::ForkingNotification, + dag::{ + Dag as GenericDag, DagResult, ReconstructedUnit as GenericReconstructedUnit, + Request as GenericRequest, + }, extension::Service as Extender, - reconstruction::{ReconstructedUnit as GenericReconstructedUnit, Request as GenericRequest}, - runway::ExplicitParents as GenericExplicitParents, - testing::{complete_oneshot, gen_config, gen_delay_config}, - units::{ControlHash, FullUnit, PreUnit, SignedUnit as GenericSignedUnit, Unit, UnitCoord}, - NodeCount, NodeIndex, NodeMap, NodeSubset, Receiver, Round, Sender, Signed, SpawnHandle, - Terminator, + units::{ + ControlHash, FullUnit, PreUnit, SignedUnit as GenericSignedUnit, Unit, UnitStore, Validator, + }, + NodeCount, NodeIndex, NodeMap, NodeSubset, Round, Signed, SpawnHandle, Terminator, }; -use aleph_bft_mock::{Data, DataProvider, Hash64, Hasher64, Keychain, Spawner}; +use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, Spawner}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, @@ -17,12 +19,16 @@ use futures::{ use futures_timer::Delay; use log::{debug, trace}; use rand::{distributions::Open01, prelude::*}; -use std::{cmp, collections::HashMap, time::Duration}; +use std::{ + cmp, + collections::{HashMap, HashSet}, + time::Duration, +}; type Request = GenericRequest; -type ExplicitParents = GenericExplicitParents; type SignedUnit = GenericSignedUnit; type ReconstructedUnit = GenericReconstructedUnit; +type Dag = GenericDag; #[derive(Clone)] struct UnitWithParents { @@ -46,75 +52,89 @@ impl UnitWithParents { parent_hashes, } } + fn hash(&self) -> Hash64 { self.unit.hash() } - fn parent_hashes_map(&self) -> HashMap { - let mut result = HashMap::new(); - let round = self.unit.round(); - for (creator, hash) in self.parent_hashes.iter() { - result.insert(UnitCoord::new(round - 1, creator), *hash); - } - result + fn parent_hashes(&self) -> Vec { + self.parent_hashes.values().cloned().collect() } } -struct ConsensusDagFeeder { - units_for_consensus: Sender, - parents_for_consensus: Sender, - requests_from_consensus: Receiver, - reconstructed_units_from_consensus: Receiver, - units_from_creator: Receiver, +struct DagFeeder { units: Vec, units_map: HashMap, + // we hold all forker units, to accept all forks + // this is not realistic, but simulates a kinda "worse than worst case scenario" + forker_units: HashMap>, + store: UnitStore, + dag: Dag, + result: Vec, } -type DagFeederParts = ( - ConsensusDagFeeder, - Receiver, - Receiver, - Sender, - Sender, - Sender, -); - -impl ConsensusDagFeeder { - fn new(units: Vec) -> DagFeederParts { +impl DagFeeder { + fn new( + node_id: NodeIndex, + units: Vec, + forker_units: HashMap>, + ) -> DagFeeder { let units_map = units.iter().map(|u| (u.hash(), u.clone())).collect(); - let (units_for_consensus, units_from_feeder) = mpsc::unbounded(); - let (parents_for_consensus, parents_from_feeder) = mpsc::unbounded(); - let (requests_for_feeder, requests_from_consensus) = mpsc::unbounded(); - let (reconstructed_units_for_feeder, reconstructed_units_from_consensus) = - mpsc::unbounded(); - let (units_for_feeder, units_from_creator) = mpsc::unbounded(); - let cdf = ConsensusDagFeeder { - units_for_consensus, - parents_for_consensus, - requests_from_consensus, - reconstructed_units_from_consensus, - units_from_creator, + let node_count = units + .get(0) + .expect("we have at least one unit") + .unit + .control_hash() + .n_members(); + // the index is unimportant, since we don't actually actively use signing here + let dag = Dag::new(Validator::new(0, Keychain::new(node_count, node_id), 2137)); + let store = UnitStore::new(node_count); + DagFeeder { units, units_map, - }; - ( - cdf, - units_from_feeder, - parents_from_feeder, - requests_for_feeder, - reconstructed_units_for_feeder, - units_for_feeder, - ) + forker_units, + store, + dag, + result: Vec::new(), + } } - fn on_request(&self, request: Request) { + fn on_request(&mut self, request: Request) { use GenericRequest::*; match request { ParentsOf(h) => { // We need to answer these requests as otherwise reconstruction cannot make progress - let parent_hashes = self.units_map.get(&h).unwrap().parent_hashes_map(); - let parents = (h, parent_hashes); - self.parents_for_consensus.unbounded_send(parents).unwrap(); + let parents = self + .units_map + .get(&h) + .expect("we have all the units") + .parent_hashes() + .iter() + .map(|hash| { + self.units_map + .get(hash) + .expect("we have all the units") + .unit + .clone() + .into() + }) + .collect(); + let DagResult { + units, + requests, + alerts, + } = self.dag.add_parents(h, parents, &self.store); + for unit in units { + self.on_reconstructed_unit(unit); + } + for alert in alerts { + self.on_alert(alert.forker()); + // have to repeat it, as it wasn't properly accepted because of the alert + self.on_request(ParentsOf(h)); + } + for request in requests { + self.on_request(request); + } } Coord(_) => { // We don't need to answer missing units requests. @@ -122,90 +142,82 @@ impl ConsensusDagFeeder { } } - fn on_reconstructed_unit(&self, unit: ReconstructedUnit) { + fn on_reconstructed_unit(&mut self, unit: ReconstructedUnit) { let h = unit.hash(); - let round = unit.round(); let parents = unit.parents(); - let expected_hashes = self + let expected_hashes: HashSet<_> = self .units_map .get(&h) .expect("we have the unit") - .parent_hashes_map(); + .parent_hashes() + .into_iter() + .collect(); assert_eq!(parents.item_count(), expected_hashes.len()); - for (creator, hash) in parents { - assert_eq!( - Some(hash), - expected_hashes.get(&UnitCoord::new(round - 1, creator)) - ); + for (_, hash) in parents { + assert!(expected_hashes.contains(hash)); } + self.result.push(unit.clone()); + self.store.insert(unit); } - async fn run(mut self) { - for unit in &self.units { - self.units_for_consensus - .unbounded_send(unit.unit.clone()) - .expect("channel should be open"); + fn on_alert(&mut self, forker: NodeIndex) { + let committed_units = self + .forker_units + .get(&forker) + .expect("we have units for forkers") + .iter() + .map(|unit| unit.unit.clone().into()) + .collect(); + let DagResult { + units, + requests, + alerts, + } = self + .dag + .process_forking_notification(ForkingNotification::Units(committed_units), &self.store); + assert!(alerts.is_empty()); + for unit in units { + self.on_reconstructed_unit(unit); } + for request in requests { + self.on_request(request); + } + } - loop { - futures::select! { - request = self.requests_from_consensus.next() => match request { - Some(request) => self.on_request(request), - None => break, - }, - unit = self.reconstructed_units_from_consensus.next() => match unit { - Some(unit) => self.on_reconstructed_unit(unit), - None => break, - }, - _ = self.units_from_creator.next() => continue, - }; + fn feed(mut self) -> Vec { + let units = self.units.clone(); + for unit in units { + let DagResult { + units, + requests, + alerts, + } = self.dag.add_unit(unit.unit.into(), &self.store); + for unit in units { + self.on_reconstructed_unit(unit); + } + for alert in alerts { + self.on_alert(alert.forker()); + } + for request in requests { + self.on_request(request); + } } - debug!(target: "dag-test", "Consensus stream closed."); + self.result } } async fn run_consensus_on_dag( units: Vec, - n_members: NodeCount, + forker_units: HashMap>, deadline_ms: u64, ) -> Vec> { let node_id = NodeIndex(0); - let ( - feeder, - units_from_feeder, - parents_from_feeder, - requests_for_feeder, - reconstructed_units_for_feeder, - units_for_feeder, - ) = ConsensusDagFeeder::new(units); - let conf = gen_config(node_id, n_members, gen_delay_config()); - let keychain = Keychain::new(n_members, node_id); - let (_exit_tx, exit_rx) = oneshot::channel(); + let feeder = DagFeeder::new(node_id, units, forker_units); + let spawner = Spawner::new(); let (_extender_exit_tx, extender_exit_rx) = oneshot::channel(); - let (reconstructed_units_for_us, mut reconstructed_units_from_consensus) = mpsc::unbounded(); let (batch_tx, mut batch_rx) = mpsc::unbounded(); - let spawner = Spawner::new(); - let starting_round = complete_oneshot(Some(0)); let (units_for_extender, units_from_us) = mpsc::unbounded(); let extender = Extender::::new(node_id, units_from_us, batch_tx); - spawner.spawn( - "consensus", - consensus::run( - conf, - consensus::IO { - units_from_runway: units_from_feeder, - parents_from_runway: parents_from_feeder, - units_for_runway: reconstructed_units_for_us, - requests_for_runway: requests_for_feeder, - new_units_for_runway: units_for_feeder, - data_provider: DataProvider::new(), - starting_round, - }, - keychain, - spawner, - Terminator::create_root(exit_rx, "AlephBFT-consensus"), - ), - ); spawner.spawn( "extender", extender.run(Terminator::create_root( @@ -213,7 +225,11 @@ async fn run_consensus_on_dag( "AlephBFT-extender", )), ); - spawner.spawn("feeder", feeder.run()); + for unit in feeder.feed() { + units_for_extender + .unbounded_send(unit.extender_unit()) + .expect("channel should be open"); + } let mut batches = Vec::new(); let mut delay_fut = Delay::new(Duration::from_millis(deadline_ms)).fuse(); loop { @@ -221,11 +237,6 @@ async fn run_consensus_on_dag( batch = batch_rx.next() => { batches.push(batch.unwrap()); }, - unit = reconstructed_units_from_consensus.next() => { - let unit = unit.expect("consensus is operating"); - units_for_extender.unbounded_send(unit.extender_unit()).expect("extender is operating"); - reconstructed_units_for_feeder.unbounded_send(unit).expect("feeder is operating"); - } _ = &mut delay_fut => { break; } @@ -234,7 +245,14 @@ async fn run_consensus_on_dag( batches } -fn generate_random_dag(n_members: NodeCount, height: Round, seed: u64) -> Vec { +fn generate_random_dag( + n_members: NodeCount, + height: Round, + seed: u64, +) -> ( + Vec, + HashMap>, +) { // The below asserts are mainly because these numbers must fit in 8 bits for hashing but also: this is // meant to be run for small dags only -- it's not optimized for large dags. assert!(n_members < 100.into()); @@ -259,6 +277,7 @@ fn generate_random_dag(n_members: NodeCount, height: Round, seed: u64) -> Vec> = HashMap::new(); let mut dag: Vec>> = vec![vec![vec![]; n_members.into()]; height.into()]; // dag is a (height x n_members)-dimensional array consisting of empty vectors. @@ -324,6 +343,9 @@ fn generate_random_dag(n_members: NodeCount, height: Round, seed: u64) -> Vec Vec], batches2: &[Vec]) -> bool { @@ -354,9 +376,13 @@ async fn ordering_random_dag_consistency_under_permutations() { let mut rng = StdRng::seed_from_u64(seed); let n_members = NodeCount(rng.gen_range(1..11)); let height = rng.gen_range(3..11); - let mut units = generate_random_dag(n_members, height, seed); - let batch_on_sorted = - run_consensus_on_dag(units.clone(), n_members, 80 + (n_members.0 as u64) * 5).await; + let (mut units, forker_units) = generate_random_dag(n_members, height, seed); + let batch_on_sorted = run_consensus_on_dag( + units.clone(), + forker_units.clone(), + 80 + (n_members.0 as u64) * 5, + ) + .await; debug!(target: "dag-test", "seed {:?} n_members {:?} height {:?} batch_len {:?}", seed, @@ -366,12 +392,16 @@ async fn ordering_random_dag_consistency_under_permutations() { ); for i in 0..8 { units.shuffle(&mut rng); - let mut batch = - run_consensus_on_dag(units.clone(), n_members, 25 + (n_members.0 as u64) * 5).await; + let mut batch = run_consensus_on_dag( + units.clone(), + forker_units.clone(), + 25 + (n_members.0 as u64) * 5, + ) + .await; if batch != batch_on_sorted { if batch_lists_consistent(&batch, &batch_on_sorted) { // there might be some timing issue here, we run it with more time - batch = run_consensus_on_dag(units.clone(), n_members, 200).await; + batch = run_consensus_on_dag(units.clone(), forker_units.clone(), 200).await; } if batch != batch_on_sorted { debug!(target: "dag-test", diff --git a/consensus/src/testing/mod.rs b/consensus/src/testing/mod.rs index f881086a..ce132155 100644 --- a/consensus/src/testing/mod.rs +++ b/consensus/src/testing/mod.rs @@ -1,7 +1,6 @@ #![cfg(test)] mod alerts; mod byzantine; -mod consensus; mod crash; mod crash_recovery; mod creation; @@ -32,12 +31,6 @@ pub fn init_log() { .try_init(); } -pub fn complete_oneshot(t: T) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - tx.send(t).unwrap(); - rx -} - pub fn gen_delay_config() -> DelayConfig { DelayConfig { tick_interval: Duration::from_millis(5), diff --git a/docs/src/internals.md b/docs/src/internals.md index d7ceca79..77025fc1 100644 --- a/docs/src/internals.md +++ b/docs/src/internals.md @@ -4,26 +4,30 @@ To explain the inner workings of AlephBFT it is instructive to follow the path o 1. The unit is created by one of the node's `Creator` component -- implemented in `creation/`. Creator sends the produced unit to `runway/`, which then sends it to `member.rs`. 2. A recurring task of broadcasting this unit is put in the task queue. The unit will be broadcast to all other nodes a few times (with some delays in between). -3. The unit is received by another node -- happens in `member.rs` and immediately send to `runway/` for further processing in `validation.rs`. -4. Validation checks signatures and basic unit properties, plus catches forks. This means that only **legit units**, in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam), are sent further. Thus no fork is ever passed on unless coming from an alert. -5. The units are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `reconstruction/parents.rs`. -6. Each unit whose parents are successfully decoded, is passed on to `reconstruction/dag.rs`, which ensures that units are passed on only when their parents already were. They are then put in a store in `runway/`. Each unit in the store is legit + has all its parents in the store. -7. Such units are passed to a component called the `Extender` -- see the files in `extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). -8. Once a unit's data is placed in one of batches by the `Extender` then its path is over, although we keep it in the runway store to be able to send it to other nodes on request. +3. The unit is received by another node -- happens in `member.rs` and immediately send to `runway/` for further processing in `dag/`. +4. Dag validates and reconstructs a unit's parents in several steps: + 1. Validation, implemented in `dag/validation.rs`, checks signatures and basic unit properties, plus catches forks. This means that only **legit units**, in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam), are sent further. Thus no fork is ever passed on unless coming from an alert. + 2. The units are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `dag/reconstruction/parents.rs`. + 3. Each unit whose parents are successfully decoded, is passed on to `dag/reconstruction/dag.rs`, which ensures that units are passed on only when their parents already were. They are then returned back to `runway/`. +5. In `runway/` such units are put in a store. Each unit in the store is legit + has all its parents in the store. +6. Such units are passed to a component called the `Extender` -- see the files in `extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). +7. Once a unit's data is placed in one of batches by the `Extender` then its path is over, although we keep it in the runway store to be able to send it to other nodes on request. ### 5.1 Creator The creator produces units according to the AlephBFT protocol rules. It will wait until the prespecified delay has passed and attempt to create a unit using a maximal number of parents. If it is not possible yet, it will wait till the first moment enough parents are available. After creating the last unit, the creator stops producing new ones, although this is never expected to happen during correct execution. -### 5.2 Validation +### 5.2 Dag -The validation process consists of checking basic properties of units (correct number of parents, correct session etc.), the signatures, and whether the unit is a fork based on the units that the node either already has or at least started processing. As mentioned, the idea is that only legit units are passed to the reconstructing component. In case a fork by a node `i` is detected, all of `i`'s units are attached to the appropriate alert, so that other nodes can accept them as legit. +The dag receives units from the network and returns any that were successfully reconstructed with parents. It does that in several steps, starting with validation. + +#### 5.2.1 Validation -### 5.3 Reconstruction +The validation process consists of checking basic properties of units (correct number of parents, correct session etc.), the signatures, and whether the unit is a fork based on the units that the node either already has or at least started processing. As mentioned, the idea is that only legit units are passed to the reconstructing component. In case a fork by a node `i` is detected, all of `i`'s units are attached to the appropriate alert, so that other nodes can accept them as legit. The next step is to reconstruct the structure of the Dag from the somewhat compressed information in the units. -#### 5.3.1 Parents +#### 5.2.2 Parents The reconstruction service receives legit units, but the information about their parents is only present as a control hash, i.e. which nodes created the parents and what was the combined hash of all the parents' hashes. Parents reconstruction remembers the first unit for any creator-round combination it encounters and optimistically uses this information to check the combined hash. If there are no dishonest nodes, which is the usual situation, then this means that every unit might at most have some parents that cannot yet be checked, because the node has not yet received them. In such a case requests for these missing units are sent to `Member`. After the units are received, the control hash check succeeds and thus the parents are reconstructed successfully. @@ -34,10 +38,10 @@ If dishonest nodes participate in the protocol, then two additional things can g In any case the reconstruction triggers a request to `Member` to download the full list of the unit's parent hashes, so that the ambiguity is resolved. Once a response is received by `Member` then it is passed back to the reconstruction so that it can "decode" the parents and proceed. -#### 5.3.2 Dag +#### 5.2.3 Dag -The units parents might, for many reasons, not be reconstructed in an order agreeing with the Dag order, i.e. some of their ancestors might not yet be reconstructed. The Dag component ensures that units are only added to the store after their parents were already added, and thus any units emitted by this component are in an order agreeing with the Dag order. +The units parents might, for many reasons, not be reconstructed in an order agreeing with the Dag order, i.e. some of their ancestors might not yet be reconstructed. The Dag component ensures that units are only added to the store after their parents were already added, and thus any units emitted by the Dag component are in an order agreeing with the Dag order. -### 5.4 Extender +### 5.3 Extender The `Extender`'s role is to receive Dag units (in an order agreeing with the Dag order) and extend the output stream. Towards this end it elects the `Head` for each `round`. Such an election works by going through candidate units from this round either eliminating them or eventually electing one. Votes are computed and cached for each candidate until a decision on it is made, after which the election moves on to the next round (if elected as `Head`) or to the next unit (otherwise). After electing every `Head` the `Extender` deterministically orders all its unordered ancestors and the `Head` itself and returns the resulting batch.