Skip to content

Commit

Permalink
Censorship resistance test
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Jan 29, 2025
1 parent 496ff02 commit e36a7f9
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 129 additions & 0 deletions consensus/src/testing/behind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::{
collections::{HashSet, VecDeque},
time::{Duration, Instant},
};

use crate::{
testing::{init_log, spawn_honest_member, HonestMember, NetworkData},
NodeCount, NodeIndex, SpawnHandle,
};
use aleph_bft_mock::{DataProvider, NetworkHook, Router, Spawner};
use futures::StreamExt;
use log::info;

struct Latency {
who: NodeIndex,
buffer: VecDeque<(Instant, (NetworkData, NodeIndex, NodeIndex))>,
}

const LATENCY: Duration = Duration::from_millis(300);

impl Latency {
pub fn new(who: NodeIndex) -> Self {
Latency {
who,
buffer: VecDeque::new(),
}
}

fn add_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
match sender == self.who || recipient == self.who {
true => {
self.buffer
.push_back((Instant::now(), (data, sender, recipient)));
Vec::new()
}
false => vec![(data, sender, recipient)],
}
}

fn messages_to_send(&mut self) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
let mut result = Vec::new();
while !self.buffer.is_empty() {
let (when, msg) = self
.buffer
.pop_front()
.expect("just checked it is not empty");
if Instant::now().duration_since(when) < LATENCY {
self.buffer.push_front((when, msg));
break;
}
result.push(msg);
}
result
}
}

impl NetworkHook<NetworkData> for Latency {
fn process_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
let mut result = self.add_message(data, sender, recipient);
result.append(&mut self.messages_to_send());
result
}
}

#[tokio::test(flavor = "multi_thread")]
async fn delayed_finalized() {
let n_members = NodeCount(7);
let australian = NodeIndex(0);
init_log();
let spawner = Spawner::new();
let mut batch_rxs = Vec::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let (mut net_hub, networks) = Router::new(n_members);

net_hub.add_hook(Latency::new(australian));

spawner.spawn("network-hub", net_hub);

for (network, _) in networks {
let ix = network.index();
let HonestMember {
finalization_rx,
exit_tx,
handle,
..
} = spawn_honest_member(
spawner,
ix,
n_members,
vec![],
DataProvider::new_range(ix.0 * 50, (ix.0 + 1) * 50),
network,
);
batch_rxs.push(finalization_rx);
exits.push(exit_tx);
handles.push(handle);
}
let to_finalize: HashSet<u32> = (0..((n_members.0) * 50))
.map(|number| number as u32)
.collect();

for mut rx in batch_rxs.drain(..) {
let mut to_finalize_local = to_finalize.clone();
while !to_finalize_local.is_empty() {
let number = rx.next().await.unwrap();
info!("finalizing {}", number);
assert!(to_finalize_local.remove(&number));
}
info!("finished one node");
}

for exit in exits {
let _ = exit.send(());
}
for handle in handles {
let _ = handle.await;
}
}
23 changes: 15 additions & 8 deletions consensus/src/testing/byzantine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
Hasher, Network as NetworkT, NetworkData as NetworkDataT, NodeCount, NodeIndex, NodeMap,
Recipient, Round, SessionId, Signed, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner};
use aleph_bft_mock::{
Data, DataProvider, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner,
};
use futures::{channel::oneshot, StreamExt};
use log::{debug, error, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -230,7 +232,12 @@ impl AlertHook {
}

impl NetworkHook<NetworkData> for AlertHook {
fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, recipient: NodeIndex) {
fn process_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
use crate::{alerts::AlertMessage::*, network::NetworkDataInner::*};
if let crate::NetworkData(Alert(ForkAlert(_))) = data {
*self
Expand All @@ -239,21 +246,21 @@ impl NetworkHook<NetworkData> for AlertHook {
.entry((sender, recipient))
.or_insert(0) += 1;
}
vec![(data, sender, recipient)]
}
}

async fn honest_members_agree_on_batches_byzantine(
n_members: NodeCount,
n_honest: NodeCount,
n_batches: usize,
network_reliability: f64,
) {
init_log();
let spawner = Spawner::new();
let mut batch_rxs = Vec::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let (mut net_hub, networks) = Router::new(n_members, network_reliability);
let (mut net_hub, networks) = Router::new(n_members);

let alert_hook = AlertHook::new();
net_hub.add_hook(alert_hook.clone());
Expand All @@ -270,7 +277,7 @@ async fn honest_members_agree_on_batches_byzantine(
exit_tx,
handle,
..
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
batch_rxs.push(finalization_rx);
(exit_tx, handle)
};
Expand Down Expand Up @@ -317,17 +324,17 @@ async fn honest_members_agree_on_batches_byzantine(
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_byzantine_one_forker() {
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_byzantine_two_forkers() {
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_byzantine_ten_forkers() {
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5).await;
}
23 changes: 13 additions & 10 deletions consensus/src/testing/crash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ use crate::{
testing::{init_log, spawn_honest_member, HonestMember},
NodeCount, SpawnHandle,
};
use aleph_bft_mock::{Router, Spawner};
use aleph_bft_mock::{DataProvider, Router, Spawner, UnreliableHook};
use futures::StreamExt;
use serial_test::serial;

async fn honest_members_agree_on_batches(
n_members: NodeCount,
n_alive: NodeCount,
n_batches: usize,
network_reliability: f64,
network_reliability: Option<f64>,
) {
init_log();
let spawner = Spawner::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let mut batch_rxs = Vec::new();
let (net_hub, networks) = Router::new(n_members, network_reliability);
let (mut net_hub, networks) = Router::new(n_members);
if let Some(reliability) = network_reliability {
net_hub.add_hook(UnreliableHook::new(reliability));
}
spawner.spawn("network-hub", net_hub);

for (network, _) in networks {
Expand All @@ -28,7 +31,7 @@ async fn honest_members_agree_on_batches(
exit_tx,
handle,
..
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
batch_rxs.push(finalization_rx);
exits.push(exit_tx);
handles.push(handle);
Expand Down Expand Up @@ -59,35 +62,35 @@ async fn honest_members_agree_on_batches(
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_all_alive() {
honest_members_agree_on_batches(4.into(), 4.into(), 5, 1.0).await;
honest_members_agree_on_batches(4.into(), 4.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_one_crash() {
honest_members_agree_on_batches(4.into(), 3.into(), 5, 1.0).await;
honest_members_agree_on_batches(4.into(), 3.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_one_crash_unreliable_network() {
honest_members_agree_on_batches(4.into(), 3.into(), 5, 0.9).await;
honest_members_agree_on_batches(4.into(), 3.into(), 5, Some(0.9)).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_all_alive() {
honest_members_agree_on_batches(31.into(), 31.into(), 5, 1.0).await;
honest_members_agree_on_batches(31.into(), 31.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_ten_crashes() {
honest_members_agree_on_batches(31.into(), 21.into(), 5, 1.0).await;
honest_members_agree_on_batches(31.into(), 21.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_ten_crashes_unreliable_network() {
honest_members_agree_on_batches(31.into(), 21.into(), 5, 0.9).await;
honest_members_agree_on_batches(31.into(), 21.into(), 5, Some(0.9)).await;
}
24 changes: 19 additions & 5 deletions consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
units::{UncheckedSignedUnit, Unit, UnitCoord},
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner};
use aleph_bft_mock::{Data, DataProvider, Hasher64, Router, Signature, Spawner};
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -67,7 +67,14 @@ fn connect_nodes(
saved_state,
exit_tx,
handle,
} = spawn_honest_member(*spawner, ix, n_members, vec![], network);
} = spawn_honest_member(
*spawner,
ix,
n_members,
vec![],
DataProvider::new(),
network,
);
(
ix,
NodeData {
Expand Down Expand Up @@ -109,7 +116,14 @@ async fn reconnect_nodes(
saved_state,
exit_tx,
handle,
} = spawn_honest_member(*spawner, *node_id, n_members, saved_units.clone(), network);
} = spawn_honest_member(
*spawner,
*node_id,
n_members,
saved_units.clone(),
DataProvider::new(),
network,
);
reconnected_nodes.push((
*node_id,
NodeData {
Expand Down Expand Up @@ -166,7 +180,7 @@ async fn crashed_nodes_recover(n_members: NodeCount, n_batches: usize) {

let n_kill = (n_members - n_members.consensus_threshold()) + 1.into();
let spawner = Spawner::new();
let (net_hub, networks) = Router::new(n_members, 1.0);
let (net_hub, networks) = Router::new(n_members);
spawner.spawn("network-hub", net_hub);

let mut node_data = connect_nodes(&spawner, n_members, networks);
Expand Down Expand Up @@ -239,7 +253,7 @@ async fn saves_units_properly() {
let n_batches = 2;
let n_members = NodeCount(4);
let spawner = Spawner::new();
let (net_hub, networks) = Router::new(n_members, 1.0);
let (net_hub, networks) = Router::new(n_members);
spawner.spawn("network-hub", net_hub);

let mut node_data = connect_nodes(&spawner, n_members, networks);
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod alerts;
mod behind;
mod byzantine;
mod crash;
mod crash_recovery;
Expand Down Expand Up @@ -67,9 +68,9 @@ pub fn spawn_honest_member(
node_index: NodeIndex,
n_members: NodeCount,
units: Vec<u8>,
data_provider: DataProvider,
network: impl 'static + NetworkT<NetworkData>,
) -> HonestMember {
let data_provider = DataProvider::new();
let (finalization_handler, finalization_rx) = FinalizationHandler::new();
let config = gen_config(node_index, n_members, gen_delay_config());
let (exit_tx, exit_rx) = oneshot::channel();
Expand Down
Loading

0 comments on commit e36a7f9

Please sign in to comment.