Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4155: Unit saving pipeline #432

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.3"
version = "0.36.4"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
62 changes: 34 additions & 28 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};
Expand All @@ -10,30 +14,28 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver";
/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, S, W> {
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
}
}

pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
timorleph marked this conversation as resolved.
Show resolved Hide resolved
self.backup.write_all(&unit.encode()).await?;
self.backup.flush().await
}

Expand All @@ -49,7 +51,7 @@ impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item).await {
if let Err(e) = self.save_unit(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down Expand Up @@ -80,16 +82,17 @@ mod tests {
StreamExt,
};

use aleph_bft_mock::{Data, Hasher64, Keychain, Saver, Signature};
use aleph_bft_mock::{Data, Hasher64, Keychain, Saver};

use crate::{
backup::BackupSaver,
units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit},
NodeCount, NodeIndex, Terminator,
dag::ReconstructedUnit,
units::{creator_set, preunit_to_signed_unit, TestingSignedUnit},
NodeCount, Terminator,
};

type TestBackupSaver = BackupSaver<Hasher64, Data, Signature, Saver>;
type TestUnit = UncheckedSignedUnit<Hasher64, Data, Signature>;
type TestUnit = ReconstructedUnit<TestingSignedUnit>;
type TestBackupSaver = BackupSaver<Hasher64, Data, Keychain, Saver>;
struct PrepareSaverResponse<F: futures::Future> {
task: F,
units_for_saver: mpsc::UnboundedSender<TestUnit>,
Expand Down Expand Up @@ -122,6 +125,7 @@ mod tests {

#[tokio::test]
async fn test_proper_relative_responses_ordering() {
let node_count = NodeCount(5);
let PrepareSaverResponse {
task,
units_for_saver,
Expand All @@ -133,17 +137,19 @@ mod tests {
task.await;
});

let creators = creator_set(NodeCount(5));
let keychains: Vec<_> = (0..5)
.map(|id| Keychain::new(NodeCount(5), NodeIndex(id)))
let creators = creator_set(node_count);
let keychains: Vec<_> = node_count
.into_iterator()
.map(|id| Keychain::new(node_count, id))
.collect();
let units: Vec<TestUnit> = (0..5)
.map(|k| {
preunit_to_unchecked_signed_unit(
creators[k].create_unit(0).unwrap(),
let units: Vec<TestUnit> = node_count
.into_iterator()
.map(|id| {
ReconstructedUnit::initial(preunit_to_signed_unit(
creators[id.0].create_unit(0).unwrap(),
0,
&keychains[k],
)
&keychains[id.0],
))
})
.collect();

Expand Down
24 changes: 10 additions & 14 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use validation::{Error as ValidationError, Validator};

const LOG_TARGET: &str = "AlephBFT-dag";

pub type DagUnit<H, D, MK> = ReconstructedUnit<SignedUnit<H, D, MK>>;

/// The result of sending some information to the Dag.
pub struct DagResult<H: Hasher, D: Data, MK: MultiKeychain> {
/// Units added to the dag.
pub units: Vec<ReconstructedUnit<SignedUnit<H, D, MK>>>,
pub units: Vec<DagUnit<H, D, MK>>,
/// Requests for more information.
pub requests: Vec<Request<H>>,
/// Alerts raised due to encountered forks.
Expand Down Expand Up @@ -114,25 +116,16 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}

fn handle_result(&mut self, result: &DagResult<H, D, MK>) {
// just clean the validator cache of units that we are returning
for unit in &result.units {
self.validator.finished_processing(&unit.hash());
}
}

/// Add a unit to the Dag.
pub fn add_unit<U: WrappedUnit<H, Wrapped = SignedUnit<H, D, MK>>>(
&mut self,
unit: UncheckedSignedUnit<H, D, MK::Signature>,
store: &UnitStore<U>,
) -> DagResult<H, D, MK> {
let result = match self.validator.validate(unit, store) {
match self.validator.validate(unit, store) {
Ok(unit) => self.reconstruction.add_unit(unit).into(),
Err(e) => Self::handle_validation_error(e),
};
self.handle_result(&result);
result
}
}

/// Add parents of a unit to the Dag.
Expand Down Expand Up @@ -180,7 +173,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
.add_parents(unit_hash, parent_hashes)
.into(),
);
self.handle_result(&result);
result
}

Expand Down Expand Up @@ -208,10 +200,14 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}
}
self.handle_result(&result);
result
}

/// Notify the dag that a unit has finished processing and can be cleared from the cache.
pub fn finished_processing(&mut self, hash: &H::Hash) {
self.validator.finished_processing(hash);
}

pub fn status(&self) -> DagStatus {
self.validator.status()
}
Expand Down
41 changes: 20 additions & 21 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alerts::{Alert, ForkingNotification, NetworkMessage},
creation,
dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest},
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
extension::{ExtenderUnit, Service as Extender},
handle_task_termination,
member::UnitMessage,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<ReconstructedUnit<SignedUnit<H, D, MK>>>,
store: UnitStore<DagUnit<H, D, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
Expand All @@ -118,12 +118,12 @@ where
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
exiting: bool,
}
Expand Down Expand Up @@ -210,15 +210,15 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {

struct RunwayConfig<H: Hasher, D: Data, FH: FinalizationHandler<D>, MK: MultiKeychain> {
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
resolved_requests: Sender<Request<H>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
Expand Down Expand Up @@ -455,10 +455,18 @@ where
}
}

fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit<SignedUnit<H, D, MK>>) {
fn on_unit_reconstructed(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord());
if self.backup_units_for_saver.unbounded_send(unit).is_err() {
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
self.resolve_missing_parents(&unit_hash);
self.resolve_missing_coord(&unit.coord());
if self
Expand All @@ -477,21 +485,12 @@ where
warn!(target: "AlephBFT-runway", "Creator channel should be open.");
self.exiting = true;
}
if self
.backup_units_for_saver
.unbounded_send(unit.unpack().into())
.is_err()
{
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: UncheckedSignedUnit<H, D, MK::Signature>) {
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone()));
let unit = unit.unpack();
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into()));

if unit.as_signable().creator() == self.index() {
trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash());
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit));
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet<UnitCoord> {
let mut already_saved = HashSet::new();

while !buf.is_empty() {
let unit = UncheckedSignedUnit::<Hasher64, Data, Signature>::decode(buf).unwrap();
let unit = <UncheckedSignedUnit<Hasher64, Data, Signature>>::decode(buf).unwrap();
let full_unit = unit.as_signable();
let coord = full_unit.coord();
let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask;
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/units/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ pub(crate) use store::*;
#[cfg(test)]
pub use testing::{
create_preunits, creator_set, full_unit_to_unchecked_signed_unit, preunit_to_full_unit,
preunit_to_unchecked_signed_unit, random_full_parent_units_up_to, random_unit_with_parents,
FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, WrappedSignedUnit,
preunit_to_signed_unit, preunit_to_unchecked_signed_unit, random_full_parent_units_up_to,
random_unit_with_parents, FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit,
WrappedSignedUnit,
};
pub use validator::{ValidationError, Validator};

Expand Down
16 changes: 14 additions & 2 deletions consensus/src/units/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,31 @@ impl Creator {
}
}

pub fn full_unit_to_signed_unit(full_unit: FullUnit, keychain: &Keychain) -> SignedUnit {
Signed::sign(full_unit, keychain)
}

pub fn preunit_to_signed_unit(
pu: PreUnit,
session_id: SessionId,
keychain: &Keychain,
) -> SignedUnit {
full_unit_to_signed_unit(preunit_to_full_unit(pu, session_id), keychain)
}

pub fn full_unit_to_unchecked_signed_unit(
full_unit: FullUnit,
keychain: &Keychain,
) -> UncheckedSignedUnit {
Signed::sign(full_unit, keychain).into()
full_unit_to_signed_unit(full_unit, keychain).into()
}

pub fn preunit_to_unchecked_signed_unit(
pu: PreUnit,
session_id: SessionId,
keychain: &Keychain,
) -> UncheckedSignedUnit {
full_unit_to_unchecked_signed_unit(preunit_to_full_unit(pu, session_id), keychain)
preunit_to_signed_unit(pu, session_id, keychain).into()
}

fn initial_preunit(n_members: NodeCount, node_id: NodeIndex) -> PreUnit {
Expand Down
Loading