diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08b7ae17..02c410da 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: node-version: 20 - name: Cache node modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: "**/node_modules" key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }} diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 3ee30eb9..47db9752 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -49,7 +49,7 @@ jobs: override: true - name: Cache node modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: "**/node_modules" key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }} @@ -104,7 +104,7 @@ jobs: override: true - name: Cache node modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: "**/node_modules" key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }} diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 7a3aef89..3e125cfc 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -36,7 +36,7 @@ jobs: node-version: 20 - name: Cache node modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: "**/node_modules" key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }} diff --git a/deploy/agg.yaml b/deploy/agg.yaml index 8321ce76..5877021a 100644 --- a/deploy/agg.yaml +++ b/deploy/agg.yaml @@ -5,7 +5,6 @@ peers: - "/dns4/cn1/udp/9091/quic-v1" - "/dns4/cn2/udp/9092/quic-v1" - "/dns4/cn3/udp/9093/quic-v1" - - "/dns4/aggregator/udp/9094/quic-v1" chains: - name: "sepolia" rpc_url: "${RPC_URL}" diff --git a/deploy/cn1.yaml b/deploy/cn1.yaml index aefdc431..4ea7198f 100644 --- a/deploy/cn1.yaml +++ b/deploy/cn1.yaml @@ -2,10 +2,9 @@ address: "${ADDRESS}" quic_port: ${QUIC_PORT} enable_mdns: false peers: - - "/dns4/cn1/udp/9091/quic-v1" - - "/dns4/cn1/udp/9092/quic-v1" - - "/dns4/cn1/udp/9093/quic-v1" - - "/dns4/cn1/udp/9094/quic-v1" + - "/dns4/cn2/udp/9092/quic-v1" + - "/dns4/cn3/udp/9093/quic-v1" + - "/dns4/aggregator/udp/9094/quic-v1" chains: - name: "sepolia" rpc_url: "${RPC_URL}" diff --git a/deploy/cn2.yaml b/deploy/cn2.yaml index 8321ce76..d6962901 100644 --- a/deploy/cn2.yaml +++ b/deploy/cn2.yaml @@ -3,7 +3,6 @@ quic_port: ${QUIC_PORT} enable_mdns: false peers: - "/dns4/cn1/udp/9091/quic-v1" - - "/dns4/cn2/udp/9092/quic-v1" - "/dns4/cn3/udp/9093/quic-v1" - "/dns4/aggregator/udp/9094/quic-v1" chains: diff --git a/deploy/cn3.yaml b/deploy/cn3.yaml index 8321ce76..1175bc31 100644 --- a/deploy/cn3.yaml +++ b/deploy/cn3.yaml @@ -4,7 +4,6 @@ enable_mdns: false peers: - "/dns4/cn1/udp/9091/quic-v1" - "/dns4/cn2/udp/9092/quic-v1" - - "/dns4/cn3/udp/9093/quic-v1" - "/dns4/aggregator/udp/9094/quic-v1" chains: - name: "sepolia" diff --git a/packages/ciphernode/enclave_core/src/aggregator_start.rs b/packages/ciphernode/enclave_core/src/aggregator_start.rs index e5403b54..83347be9 100644 --- a/packages/ciphernode/enclave_core/src/aggregator_start.rs +++ b/packages/ciphernode/enclave_core/src/aggregator_start.rs @@ -1,3 +1,4 @@ +use crate::helpers::datastore::setup_datastore; use actix::{Actor, Addr}; use aggregator::ext::{PlaintextAggregatorExtension, PublicKeyAggregatorExtension}; use anyhow::Result; @@ -22,8 +23,6 @@ use std::sync::{Arc, Mutex}; use test_helpers::{PlaintextWriter, PublicKeyWriter}; use tokio::task::JoinHandle; -use crate::helpers::datastore::setup_datastore; - pub async fn execute( config: AppConfig, pubkey_write_path: Option<&str>, @@ -83,7 +82,7 @@ pub async fn execute( .build() .await?; - let (_, join_handle, peer_id) = NetworkManager::setup_with_peer( + let (_, handle, peer_id) = NetworkManager::setup_with_peer( bus.clone(), config.peers(), &cipher, @@ -103,5 +102,5 @@ pub async fn execute( SimpleLogger::::attach("AGG", bus.clone()); - Ok((bus, join_handle, peer_id)) + Ok((bus, handle, peer_id)) } diff --git a/packages/ciphernode/enclave_core/src/start.rs b/packages/ciphernode/enclave_core/src/start.rs index 3f10e5dd..5d64a218 100644 --- a/packages/ciphernode/enclave_core/src/start.rs +++ b/packages/ciphernode/enclave_core/src/start.rs @@ -1,3 +1,4 @@ +use crate::helpers::datastore::setup_datastore; use actix::{Actor, Addr}; use alloy::primitives::Address; use anyhow::Result; @@ -23,8 +24,6 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use tracing::instrument; -use crate::helpers::datastore::setup_datastore; - #[instrument(name = "app", skip_all)] pub async fn execute( config: AppConfig, @@ -80,7 +79,7 @@ pub async fn execute( .build() .await?; - let (_, join_handle, peer_id) = NetworkManager::setup_with_peer( + let (_, handle, peer_id) = NetworkManager::setup_with_peer( bus.clone(), config.peers(), &cipher, @@ -93,5 +92,5 @@ pub async fn execute( let nm = format!("CIPHER({})", &address.to_string()[0..5]); SimpleLogger::::attach(&nm, bus.clone()); - Ok((bus, join_handle, peer_id)) + Ok((bus, handle, peer_id)) } diff --git a/packages/ciphernode/events/src/enclave_event/mod.rs b/packages/ciphernode/events/src/enclave_event/mod.rs index c8018031..c90a00e9 100644 --- a/packages/ciphernode/events/src/enclave_event/mod.rs +++ b/packages/ciphernode/events/src/enclave_event/mod.rs @@ -139,14 +139,18 @@ impl EnclaveEvent { _ => false, } } + + pub fn event_type(&self) -> String { + let s = format!("{:?}", self); + extract_enclave_event_name(&s).to_string() + } } impl Event for EnclaveEvent { type Id = EventId; fn event_type(&self) -> String { - let s = format!("{:?}", self); - extract_enclave_event_name(&s).to_string() + self.event_type() } fn event_id(&self) -> Self::Id { @@ -261,10 +265,3 @@ fn extract_enclave_event_name(s: &str) -> &str { } s } - -impl EnclaveEvent { - pub fn event_type(&self) -> String { - let s = format!("{:?}", self); - extract_enclave_event_name(&s).to_string() - } -} diff --git a/packages/ciphernode/net/Cargo.toml b/packages/ciphernode/net/Cargo.toml index d2794b3b..bb03b48a 100644 --- a/packages/ciphernode/net/Cargo.toml +++ b/packages/ciphernode/net/Cargo.toml @@ -8,6 +8,8 @@ repository = "https://github.com/gnosisguild/enclave/packages/ciphernode" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = { workspace = true } +actix = { workspace = true } async-std = { workspace = true, features = ["attributes"] } async-trait = { workspace = true } futures = { workspace = true } @@ -29,6 +31,4 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } events = { workspace = true } -anyhow = { workspace = true } -actix = { workspace = true } zeroize = { workspace = true } diff --git a/packages/ciphernode/net/src/bin/p2p_test.rs b/packages/ciphernode/net/src/bin/p2p_test.rs index b34b6ce9..f5533618 100644 --- a/packages/ciphernode/net/src/bin/p2p_test.rs +++ b/packages/ciphernode/net/src/bin/p2p_test.rs @@ -1,9 +1,14 @@ +use actix::prelude::*; use anyhow::Result; +use events::{EventBus, EventBusConfig, GetHistory}; +use libp2p::gossipsub; use net::correlation_id::CorrelationId; use net::events::{NetworkPeerCommand, NetworkPeerEvent}; +use net::Dialer; use net::NetworkPeer; use std::time::Duration; use std::{collections::HashSet, env, process}; +use tokio::sync::mpsc; use tokio::time::{sleep, timeout}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -14,7 +19,7 @@ use tracing_subscriber::{prelude::*, EnvFilter}; // We have a docker test harness that runs the nodes and blocks things like mdns ports to ensure // that basic discovery is working -#[tokio::main] +#[actix::main] async fn main() -> Result<()> { tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) @@ -40,26 +45,36 @@ async fn main() -> Result<()> { let peers: Vec = dial_to.iter().cloned().collect(); let id = libp2p::identity::Keypair::generate_ed25519(); - let mut peer = NetworkPeer::new(&id, peers, udp_port, "test-topic", enable_mdns)?; - - // Extract input and outputs - let tx = peer.tx(); - let mut rx = peer.rx(); - - let router_task = tokio::spawn({ - let name = name.clone(); - async move { - println!("{} starting router task", name); - if let Err(e) = peer.start().await { - println!("{} router task failed: {}", name, e); - } - println!("{} router task finished", name); + let (tx, rx) = mpsc::channel(100); + + let net_bus = EventBus::::new(EventBusConfig { + capture_history: true, + deduplicate: false, + }) + .start(); + + let mut peer = NetworkPeer::new(&id, enable_mdns, net_bus.clone(), rx)?; + let topic_id = gossipsub::IdentTopic::new(topic); + peer.subscribe(&topic_id)?; + peer.listen_on(udp_port.unwrap_or(0))?; + + let name_clone = name.clone(); + let swarm_handle = actix::spawn(async move { + println!("{} starting swarm", name_clone); + if let Err(e) = peer.start().await { + println!("{} swarm failed: {}", name_clone, e); } + println!("{} swarm finished", name_clone); }); // Give network time to initialize sleep(Duration::from_secs(3)).await; + // Set up dialer for peers + for peer in peers { + Dialer::dial_peer(peer, net_bus.clone(), tx.clone()); + } + // Send our message first println!("{} sending message", name); tx.send(NetworkPeerCommand::GossipPublish { @@ -78,26 +93,28 @@ async fn main() -> Result<()> { .into_iter() .filter(|n| *n != name) .collect(); + println!("{} waiting for messages from: {:?}", name, expected); // Then wait to receive from others with a timeout let mut received = HashSet::new(); - - // Wrap the message receiving loop in a timeout let receive_result = timeout(Duration::from_secs(10), async { while received != expected { - match rx.recv().await? { - NetworkPeerEvent::GossipData(msg) => match String::from_utf8(msg) { - Ok(msg) => { - if !received.contains(&msg) { - println!("{} received '{}'", name, msg); - received.insert(msg); - } + let history = net_bus.send(GetHistory::::new()).await?; + for event in history.clone() { + match event { + NetworkPeerEvent::GossipData(msg) => { + println!( + "{} received '{}'", + name, + String::from_utf8(msg.clone()).unwrap() + ); + received.insert(String::from_utf8(msg).unwrap()); } - Err(e) => println!("{} received invalid UTF8: {}", name, e), - }, - _ => (), + _ => (), + } } + tokio::time::sleep(Duration::from_secs(1)).await; } Ok::<(), anyhow::Error>(()) }) @@ -121,8 +138,8 @@ async fn main() -> Result<()> { } // Make sure router task is still running - if router_task.is_finished() { - println!("{} warning: router task finished early", name); + if swarm_handle.is_finished() { + println!("{} warning: swarm task finished early", name); } // Give some time for final message propagation diff --git a/packages/ciphernode/net/src/dialer.rs b/packages/ciphernode/net/src/dialer.rs index f92a7740..2f532337 100644 --- a/packages/ciphernode/net/src/dialer.rs +++ b/packages/ciphernode/net/src/dialer.rs @@ -1,210 +1,249 @@ -use anyhow::Context; -use anyhow::Result; -use futures::future::join_all; +use crate::events::{NetworkPeerCommand, NetworkPeerEvent}; +use actix::prelude::*; +use anyhow::{Context as AnyhowContext, Result}; +use events::{EventBus, Subscribe}; use libp2p::{ multiaddr::Protocol, swarm::{dial_opts::DialOpts, ConnectionId, DialError}, Multiaddr, }; -use std::net::ToSocketAddrs; -use tokio::select; -use tokio::sync::{broadcast, mpsc}; -use tokio::time::{sleep, Duration}; -use tracing::error; -use tracing::info; - -use crate::{ - events::{NetworkPeerCommand, NetworkPeerEvent}, - retry::{retry_with_backoff, to_retry, RetryError, BACKOFF_DELAY, BACKOFF_MAX_RETRIES}, -}; +use std::{net::ToSocketAddrs, sync::Arc, time::Duration}; +use tokio::sync::mpsc; +use tracing::{info, warn}; -/// Dial a single Multiaddr with retries and return an error should those retries not work -async fn dial_multiaddr( - cmd_tx: &mpsc::Sender, - event_tx: &broadcast::Sender, - multiaddr_str: &str, -) -> Result<()> { - let multiaddr = &multiaddr_str.parse()?; - info!("Now dialing in to {}", multiaddr); - retry_with_backoff( - || attempt_connection(cmd_tx, event_tx, multiaddr), - BACKOFF_MAX_RETRIES, - BACKOFF_DELAY, - ) - .await?; - Ok(()) -} +const BACKOFF_DELAY: u64 = 500; +const BACKOFF_MAX_RETRIES: u32 = 10; +const CONNECTION_TIMEOUT: u64 = 60; -fn trace_error(r: Result<()>) { - if let Err(err) = r { - error!("{}", err); - } -} +#[derive(Message)] +#[rtype(result = "Result<()>")] +pub struct DialPeer(pub String); -/// Initiates connections to multiple network peers -/// -/// # Arguments -/// * `cmd_tx` - Sender for network peer commands -/// * `event_tx` - Broadcast sender for peer events -/// * `peers` - List of peer addresses to connect to -pub async fn dial_peers( - cmd_tx: &mpsc::Sender, - event_tx: &broadcast::Sender, - peers: &Vec, -) -> Result<()> { - let futures: Vec<_> = peers - .iter() - .map(|addr| dial_multiaddr(cmd_tx, event_tx, addr)) - .collect(); - let results = join_all(futures).await; - results.into_iter().for_each(trace_error); - Ok(()) +#[derive(Clone)] +struct PendingConnection { + id: ConnectionId, + addr: String, + attempt: u32, + delay_ms: u64, } -/// Attempt a connection with retrys to a multiaddr return an error if the connection could not be resolved after the retries. -async fn attempt_connection( - cmd_tx: &mpsc::Sender, - event_tx: &broadcast::Sender, - multiaddr: &Multiaddr, -) -> Result<(), RetryError> { - let mut event_rx = event_tx.subscribe(); - let multi = get_resolved_multiaddr(multiaddr).map_err(to_retry)?; - let opts: DialOpts = multi.clone().into(); - let dial_connection = opts.connection_id(); - info!("Dialing: '{}' with connection '{}'", multi, dial_connection); - cmd_tx - .send(NetworkPeerCommand::Dial(opts)) - .await - .map_err(to_retry)?; - wait_for_connection(&mut event_rx, dial_connection).await +#[derive(Clone)] +pub struct Dialer { + net_bus: Addr>, + tx: mpsc::Sender, + pending_connection: Option, + target_addr: String, } -/// Wait for results of a retry based on a given correlation id and return the correct variant of -/// RetryError depending on the result from the downstream event -async fn wait_for_connection( - event_rx: &mut broadcast::Receiver, - dial_connection: ConnectionId, -) -> Result<(), RetryError> { - loop { - // Create a timeout future that can be reset - select! { - result = event_rx.recv() => { - match result.map_err(to_retry)? { - NetworkPeerEvent::ConnectionEstablished { connection_id } => { - if connection_id == dial_connection { - info!("Connection Established"); - return Ok(()); - } +impl Dialer { + pub fn new( + net_bus: Addr>, + tx: mpsc::Sender, + target_addr: String, + ) -> Addr { + let addr = Self { + net_bus: net_bus.clone(), + tx, + pending_connection: None, + target_addr, + } + .start(); + + // Listen on all events + net_bus.do_send(Subscribe { + event_type: String::from("*"), + listener: addr.clone().recipient(), + }); + + addr + } + + pub fn dial_peer( + addr: String, + net_bus: Addr>, + tx: mpsc::Sender, + ) -> Addr { + Self::new(net_bus, tx, addr) + } + + async fn attempt_dial( + &mut self, + addr: String, + attempt: u32, + delay_ms: u64, + ) -> Option { + info!("Attempt {}/{} for {}", attempt, BACKOFF_MAX_RETRIES, addr); + if attempt > 1 { + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + + match addr.parse::() { + Ok(multi) => { + let resolved_multiaddr = match self.get_resolved_multiaddr(&multi) { + Ok(addr) => addr, + Err(e) => { + warn!("Error resolving multiaddr {}: {}", addr, e); + return None; } - NetworkPeerEvent::DialError { error } => { - info!("DialError!"); - return match error.as_ref() { - // If we are dialing ourself then we should just fail - DialError::NoAddresses { .. } => { - info!("DialError received. Returning RetryError::Failure"); - Err(RetryError::Failure(error.clone().into())) - } - // Try again otherwise - _ => Err(RetryError::Retry(error.clone().into())), - }; + }; + let opts: DialOpts = resolved_multiaddr.into(); + let connection_id = opts.connection_id(); + + match self.tx.send(NetworkPeerCommand::Dial(opts)).await { + Ok(_) => { + info!("Dialing {} with connection {}", addr, connection_id); + self.pending_connection = Some(PendingConnection { + id: connection_id, + addr, + attempt, + delay_ms, + }); + Some(connection_id) } - NetworkPeerEvent::OutgoingConnectionError { - connection_id, - error, - } => { - info!("OutgoingConnectionError!"); - if connection_id == dial_connection { - info!( - "Connection {} failed because of error {}. Retrying...", - connection_id, error - ); - return match error.as_ref() { - // If we are dialing ourself then we should just fail - DialError::NoAddresses { .. } => { - Err(RetryError::Failure(error.clone().into())) - } - // Try again otherwise - _ => Err(RetryError::Retry(error.clone().into())), - }; - } + Err(e) => { + warn!("Failed to initiate dial: {}", e); + None } - _ => (), } } - _ = sleep(Duration::from_secs(60)) => { - info!("Connection attempt timed out after 60 seconds of no events"); - return Err(RetryError::Retry(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "Connection attempt timed out", - ).into())); + Err(e) => { + warn!("Invalid multiaddr {}: {}", addr, e); + None } } } -} -/// Convert a Multiaddr to use a specific ip address with the ip4 or ip6 protocol -fn dns_to_ip_addr(original: &Multiaddr, ip_str: &str) -> Result { - let ip = ip_str.parse()?; - let mut new_addr = Multiaddr::empty(); - let mut skip_next = false; + // ----------------------------- + // DNS resolution logic + // ----------------------------- - for proto in original.iter() { - if skip_next { - skip_next = false; - continue; + fn get_resolved_multiaddr(&self, value: &Multiaddr) -> Result { + if let Some(domain) = self.extract_dns_host(value) { + let ip = self.resolve_ipv4(&domain)?; + self.dns_to_ip_addr(value, &ip) + } else { + Ok(value.clone()) } + } + + fn extract_dns_host(&self, addr: &Multiaddr) -> Option { + for proto in addr.iter() { + match proto { + Protocol::Dns4(hostname) | Protocol::Dns6(hostname) => { + return Some(hostname.to_string()) + } + _ => continue, + } + } + None + } + + fn dns_to_ip_addr(&self, original: &Multiaddr, ip_str: &str) -> Result { + let ip = ip_str.parse()?; + let mut new_addr = Multiaddr::empty(); + let mut skip_next = false; - match proto { - Protocol::Dns4(_) | Protocol::Dns6(_) => { - new_addr.push(Protocol::Ip4(ip)); + for proto in original.iter() { + if skip_next { skip_next = false; + continue; + } + + match proto { + Protocol::Dns4(_) | Protocol::Dns6(_) => { + new_addr.push(Protocol::Ip4(ip)); + skip_next = false; + } + _ => new_addr.push(proto), } - _ => new_addr.push(proto), } + + Ok(new_addr) } - Ok(new_addr) -} + fn resolve_ipv4(&self, domain: &str) -> Result { + let addr = format!("{}:0", domain) + .to_socket_addrs()? + .find(|addr| addr.ip().is_ipv4()) + .context("no IPv4 addresses found")?; + Ok(addr.ip().to_string()) + } -/// Detect the DNS host from a multiaddr -fn extract_dns_host(addr: &Multiaddr) -> Option { - // Iterate through the protocols in the multiaddr - for proto in addr.iter() { - match proto { - // Match on DNS4 or DNS6 protocols - Protocol::Dns4(hostname) | Protocol::Dns6(hostname) => { - return Some(hostname.to_string()) + fn handle_connection_error( + &mut self, + conn: PendingConnection, + error: Arc, + ctx: &mut Context, + ) { + warn!("Connection error for {}: {}", conn.addr, error); + if !matches!(error.as_ref(), DialError::NoAddresses { .. }) { + if conn.attempt < BACKOFF_MAX_RETRIES { + let mut dialer = self.clone(); + ctx.spawn( + async move { + dialer + .attempt_dial(conn.addr, conn.attempt + 1, conn.delay_ms * 2) + .await; + } + .into_actor(self), + ); + } else { + warn!("Permanent failure for {}: {}", conn.addr, error); } - _ => continue, + } else { + warn!("Permanent failure for {}: {}", conn.addr, error); } } - None } -/// If the Multiaddr uses a DNS domain look it up and return a multiaddr that uses a resolved IP -/// address -fn get_resolved_multiaddr(value: &Multiaddr) -> Result { - if let Some(domain) = extract_dns_host(value) { - let ip = resolve_ipv4(&domain)?; - let multi = dns_to_ip_addr(value, &ip)?; - return Ok(multi); - } else { - Ok(value.clone()) +impl Actor for Dialer { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let mut dialer = self.clone(); + let addr = self.target_addr.clone(); + ctx.spawn( + async move { + dialer.attempt_dial(addr, 1, BACKOFF_DELAY).await; + } + .into_actor(self), + ); } } -fn resolve_ipv4(domain: &str) -> Result { - let addr = format!("{}:0", domain) - .to_socket_addrs()? - .find(|addr| addr.ip().is_ipv4()) - .context("no IPv4 addresses found")?; - Ok(addr.ip().to_string()) -} +impl Handler for Dialer { + type Result = (); -fn resolve_ipv6(domain: &str) -> Result { - let addr = format!("{}:0", domain) - .to_socket_addrs()? - .find(|addr| addr.ip().is_ipv6()) - .context("no IPv6 addresses found")?; - Ok(addr.ip().to_string()) + fn handle(&mut self, msg: NetworkPeerEvent, ctx: &mut Context) { + match msg { + NetworkPeerEvent::ConnectionEstablished { connection_id } => { + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + info!("Connection Established for {}", conn.addr); + } + } + } + NetworkPeerEvent::DialError { + connection_id, + error, + } => { + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + self.handle_connection_error(conn, error, ctx); + } + } + } + NetworkPeerEvent::OutgoingConnectionError { + connection_id, + error, + } => { + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + self.handle_connection_error(conn, error, ctx); + } + } + } + _ => {} + } + } } diff --git a/packages/ciphernode/net/src/events.rs b/packages/ciphernode/net/src/events.rs index 196ea48f..0d28ee71 100644 --- a/packages/ciphernode/net/src/events.rs +++ b/packages/ciphernode/net/src/events.rs @@ -6,9 +6,13 @@ use libp2p::{ swarm::{dial_opts::DialOpts, ConnectionId, DialError}, }; +use events::{Event, EventId, Subscribe}; + use crate::correlation_id::CorrelationId; /// NetworkPeer Commands are sent to the network peer over a mspc channel +#[derive(Message)] +#[rtype(result = "()")] pub enum NetworkPeerCommand { GossipPublish { topic: String, @@ -20,7 +24,7 @@ pub enum NetworkPeerCommand { /// NetworkPeerEvents are broadcast over a broadcast channel to whom ever wishes to listen #[derive(Message, Clone, Debug)] -#[rtype(result = "anyhow::Result<()>")] +#[rtype(result = "()")] pub enum NetworkPeerEvent { /// Bytes have been broadcast over the network GossipData(Vec), @@ -35,7 +39,10 @@ pub enum NetworkPeerEvent { message_id: MessageId, }, /// There was an error Dialing a peer - DialError { error: Arc }, + DialError { + connection_id: ConnectionId, + error: Arc, + }, /// A connection was established to a peer ConnectionEstablished { connection_id: ConnectionId }, /// There was an error creating a connection @@ -44,3 +51,33 @@ pub enum NetworkPeerEvent { error: Arc, }, } + +impl NetworkPeerEvent { + pub fn event_type(&self) -> String { + let s = format!("{:?}", self); + extract_event_name(&s).to_string() + } +} + +impl Event for NetworkPeerEvent { + type Id = String; + + fn event_type(&self) -> String { + let s = format!("{:?}", self); + extract_event_name(&s).to_string() + } + + fn event_id(&self) -> Self::Id { + "network_peer_event".to_string() + } +} + +fn extract_event_name(s: &str) -> &str { + let bytes = s.as_bytes(); + for (i, &item) in bytes.iter().enumerate() { + if item == b' ' || item == b'(' { + return &s[..i]; + } + } + s +} diff --git a/packages/ciphernode/net/src/lib.rs b/packages/ciphernode/net/src/lib.rs index b695df32..59f55149 100644 --- a/packages/ciphernode/net/src/lib.rs +++ b/packages/ciphernode/net/src/lib.rs @@ -7,8 +7,8 @@ pub mod events; mod network_manager; mod network_peer; mod repo; -mod retry; +pub use dialer::*; pub use network_manager::*; pub use network_peer::*; pub use repo::*; diff --git a/packages/ciphernode/net/src/network_manager.rs b/packages/ciphernode/net/src/network_manager.rs index 576058cd..00948a47 100644 --- a/packages/ciphernode/net/src/network_manager.rs +++ b/packages/ciphernode/net/src/network_manager.rs @@ -1,26 +1,27 @@ use crate::correlation_id::CorrelationId; -use crate::events::NetworkPeerCommand; -use crate::events::NetworkPeerEvent; -use crate::NetworkPeer; +use crate::dialer::Dialer; +use crate::events::{NetworkPeerCommand, NetworkPeerEvent}; +use crate::network_peer::NetworkPeer; + /// Actor for connecting to an libp2p client via it's mpsc channel interface /// This Actor should be responsible for use actix::prelude::*; use anyhow::{bail, Result}; use crypto::Cipher; use data::Repository; +use events::EventBusConfig; use events::{EnclaveEvent, EventBus, EventId, Subscribe}; -use libp2p::identity::ed25519; +use libp2p::{gossipsub, identity::ed25519}; use std::collections::HashSet; use std::sync::Arc; -use tokio::select; -use tokio::sync::broadcast; use tokio::sync::mpsc; -use tracing::{error, info, instrument, trace}; +use tracing::{error, info, instrument, trace, warn}; /// NetworkManager Actor converts between EventBus events and Libp2p events forwarding them to a /// NetworkPeer for propagation over the p2p network pub struct NetworkManager { bus: Addr>, + net_bus: Addr>, tx: mpsc::Sender, sent_events: HashSet, topic: String, @@ -30,20 +31,17 @@ impl Actor for NetworkManager { type Context = Context; } -/// Libp2pEvent is used to send data to the NetworkPeer from the NetworkManager -#[derive(Message, Clone, Debug, PartialEq, Eq)] -#[rtype(result = "anyhow::Result<()>")] -struct LibP2pEvent(pub Vec); - impl NetworkManager { /// Create a new NetworkManager actor pub fn new( bus: Addr>, + net_bus: Addr>, tx: mpsc::Sender, topic: &str, ) -> Self { Self { bus, + net_bus, tx, sent_events: HashSet::new(), topic: topic.to_string(), @@ -52,11 +50,11 @@ impl NetworkManager { pub fn setup( bus: Addr>, + net_bus: Addr>, tx: mpsc::Sender, - mut rx: broadcast::Receiver, topic: &str, ) -> Addr { - let addr = NetworkManager::new(bus.clone(), tx, topic).start(); + let addr = NetworkManager::new(bus.clone(), net_bus.clone(), tx, topic).start(); // Listen on all events bus.do_send(Subscribe { @@ -64,22 +62,9 @@ impl NetworkManager { listener: addr.clone().recipient(), }); - tokio::spawn({ - let addr = addr.clone(); - async move { - loop { - select! { - Ok(event) = rx.recv() => { - match event { - NetworkPeerEvent::GossipData(data) => { - addr.do_send(LibP2pEvent(data)) - }, - _ => () - } - } - } - } - } + net_bus.do_send(Subscribe { + event_type: String::from("*"), + listener: addr.clone().recipient(), }); addr @@ -95,6 +80,11 @@ impl NetworkManager { enable_mdns: bool, repository: Repository>, ) -> Result<(Addr, tokio::task::JoinHandle>, String)> { + let net_bus = EventBus::::new(EventBusConfig { + capture_history: true, + deduplicate: false, + }) + .start(); let topic = "tmp-enclave-gossip-topic"; // Get existing keypair or generate a new one let mut bytes = match repository.read().await? { @@ -108,29 +98,46 @@ impl NetworkManager { // Create peer from keypair let keypair: libp2p::identity::Keypair = ed25519::Keypair::try_from_bytes(&mut bytes)?.try_into()?; - let mut peer = NetworkPeer::new(&keypair, peers, Some(quic_port), topic, enable_mdns)?; - // Setup and start network manager - let rx = peer.rx(); - let p2p_addr = NetworkManager::setup(bus, peer.tx(), rx, topic); - let handle = tokio::spawn(async move { Ok(peer.start().await?) }); + // Create Channel for Dialer + let (tx, rx) = mpsc::channel(100); + let mut swarm_manager = match NetworkPeer::new(&keypair, enable_mdns, net_bus.clone(), rx) { + Ok(swarm_manager) => swarm_manager, + Err(e) => { + warn!("Failed to create NetworkPeer: {:?}", e); + return Err(e); + } + }; + let topic = gossipsub::IdentTopic::new(topic); + swarm_manager.subscribe(&topic)?; + swarm_manager.listen_on(quic_port)?; + + let handle = tokio::spawn(async move { Ok(swarm_manager.start().await?) }); + for peer in peers { + Dialer::dial_peer(peer, net_bus.clone(), tx.clone()); + } + + let p2p_addr = NetworkManager::setup(bus, net_bus, tx, &topic.to_string()); Ok((p2p_addr, handle, keypair.public().to_peer_id().to_string())) } } -impl Handler for NetworkManager { - type Result = anyhow::Result<()>; - fn handle(&mut self, msg: LibP2pEvent, _: &mut Self::Context) -> Self::Result { - let LibP2pEvent(bytes) = msg; - match EnclaveEvent::from_bytes(&bytes) { - Ok(event) => { - self.bus.do_send(event.clone()); - self.sent_events.insert(event.into()); - } - Err(err) => error!(error=?err, "Could not create EnclaveEvent from Libp2p Bytes!"), +impl Handler for NetworkManager { + type Result = (); + fn handle(&mut self, msg: NetworkPeerEvent, _: &mut Self::Context) -> Self::Result { + match msg { + NetworkPeerEvent::GossipData(data) => match EnclaveEvent::from_bytes(&data) { + Ok(event) => { + self.bus.do_send(event.clone()); + self.sent_events.insert(event.into()); + } + Err(err) => { + error!(error=?err, "Could not create EnclaveEvent from GossipData Bytes!") + } + }, + _ => (), } - Ok(()) } } diff --git a/packages/ciphernode/net/src/network_peer.rs b/packages/ciphernode/net/src/network_peer.rs index 412ae2b1..e7f3ce40 100644 --- a/packages/ciphernode/net/src/network_peer.rs +++ b/packages/ciphernode/net/src/network_peer.rs @@ -1,4 +1,6 @@ +use actix::prelude::*; use anyhow::Result; +use events::EventBus; use libp2p::{ connection_limits::{self, ConnectionLimits}, futures::StreamExt, @@ -13,144 +15,110 @@ use libp2p::{ use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::{hash::DefaultHasher, io::Error, time::Duration}; -use tokio::{select, sync::broadcast, sync::mpsc}; +use tokio::{select, sync::mpsc}; use tracing::{debug, info, trace, warn}; -use crate::dialer::dial_peers; use crate::events::NetworkPeerCommand; use crate::events::NetworkPeerEvent; #[derive(NetworkBehaviour)] pub struct NodeBehaviour { - gossipsub: gossipsub::Behaviour, - kademlia: KademliaBehaviour, - connection_limits: connection_limits::Behaviour, - mdns: Toggle, - identify: IdentifyBehaviour, + pub gossipsub: gossipsub::Behaviour, + pub kademlia: KademliaBehaviour, + pub connection_limits: connection_limits::Behaviour, + pub mdns: Toggle, + pub identify: IdentifyBehaviour, } -/// Manage the peer to peer connection. This struct wraps a libp2p Swarm and enables communication -/// with it using channels. pub struct NetworkPeer { - /// The Libp2p Swarm instance swarm: Swarm, - /// A list of peers to automatically dial - peers: Vec, - /// The UDP port that the peer listens to over QUIC - udp_port: Option, - /// The gossipsub topic that the peer should listen on - topic: gossipsub::IdentTopic, - /// Broadcast channel to report NetworkPeerEvents to listeners - event_tx: broadcast::Sender, - /// Transmission channel to send NetworkPeerCommands to the NetworkPeer - cmd_tx: mpsc::Sender, - /// Local receiver to process NetworkPeerCommands from + net_bus: Addr>, cmd_rx: mpsc::Receiver, } impl NetworkPeer { pub fn new( id: &Keypair, - peers: Vec, - udp_port: Option, - topic: &str, enable_mdns: bool, + net_bus: Addr>, + cmd_rx: mpsc::Receiver, ) -> Result { - let (event_tx, _) = broadcast::channel(100); // TODO : tune this param - let (cmd_tx, cmd_rx) = mpsc::channel(100); // TODO : tune this param - let swarm = libp2p::SwarmBuilder::with_existing_identity(id.clone()) .with_tokio() .with_quic() .with_behaviour(|key| create_mdns_kad_behaviour(enable_mdns, key))? .build(); - // TODO: Use topics to manage network traffic instead of just using a single topic - let topic = gossipsub::IdentTopic::new(topic); - Ok(Self { swarm, - peers, - udp_port, - topic, - event_tx, - cmd_tx, + net_bus, cmd_rx, }) } - pub fn rx(&mut self) -> broadcast::Receiver { - self.event_tx.subscribe() + pub fn listen_on(&mut self, port: u16) -> Result<()> { + self.swarm + .listen_on(format!("/ip4/0.0.0.0/udp/{}/quic-v1", port).parse()?)?; + Ok(()) } - pub fn tx(&self) -> mpsc::Sender { - self.cmd_tx.clone() + pub fn subscribe(&mut self, topic: &gossipsub::IdentTopic) -> Result<()> { + self.swarm.behaviour_mut().gossipsub.subscribe(topic)?; + Ok(()) } - pub async fn start(&mut self) -> Result<()> { - let event_tx = self.event_tx.clone(); - let cmd_tx = self.cmd_tx.clone(); - let cmd_rx = &mut self.cmd_rx; - - // Subscribe to topic - self.swarm - .behaviour_mut() - .gossipsub - .subscribe(&self.topic)?; - - // Listen on the quic port - let addr = match self.udp_port { - Some(port) => format!("/ip4/0.0.0.0/udp/{}/quic-v1", port), - None => "/ip4/0.0.0.0/udp/0/quic-v1".to_string(), - }; - - info!("Requesting node.listen_on('{}')", addr); - self.swarm.listen_on(addr.parse()?)?; - - info!("Peers to dial: {:?}", self.peers); - tokio::spawn({ - let event_tx = event_tx.clone(); - let peers = self.peers.clone(); - async move { - dial_peers(&cmd_tx, &event_tx, &peers).await?; - - return anyhow::Ok(()); + fn process_command(&mut self, command: NetworkPeerCommand) { + match command { + NetworkPeerCommand::GossipPublish { + data, + topic, + correlation_id, + } => { + let gossipsub_behaviour = &mut self.swarm.behaviour_mut().gossipsub; + match gossipsub_behaviour.publish(gossipsub::IdentTopic::new(topic), data) { + Ok(message_id) => { + self.net_bus.do_send(NetworkPeerEvent::GossipPublished { + correlation_id, + message_id, + }); + } + Err(e) => { + warn!(error=?e, "Could not publish to swarm. Retrying..."); + self.net_bus.do_send(NetworkPeerEvent::GossipPublishError { + correlation_id, + error: Arc::new(e), + }); + } + } + } + NetworkPeerCommand::Dial(multi) => { + let connection_id = multi.connection_id(); + match self.swarm.dial(multi) { + Ok(v) => { + info!("Dial returned {:?}", v); + } + Err(error) => { + info!("Dialing error! {}", error); + self.net_bus.do_send(NetworkPeerEvent::DialError { + connection_id, + error: error.into(), + }); + } + } } - }); + } + } + pub async fn start(&mut self) -> Result<()> { loop { select! { - // Process commands - Some(command) = cmd_rx.recv() => { - match command { - NetworkPeerCommand::GossipPublish { data, topic, correlation_id } => { - let gossipsub_behaviour = &mut self.swarm.behaviour_mut().gossipsub; - match gossipsub_behaviour - .publish(gossipsub::IdentTopic::new(topic), data) { - Ok(message_id) => { - event_tx.send(NetworkPeerEvent::GossipPublished { correlation_id, message_id })?; - }, - Err(e) => { - warn!(error=?e, "Could not publish to swarm. Retrying..."); - event_tx.send(NetworkPeerEvent::GossipPublishError { correlation_id, error: Arc::new(e) })?; - } - } - }, - NetworkPeerCommand::Dial(multi) => { - info!("DIAL: {:?}", multi); - match self.swarm.dial(multi) { - Ok(v) => info!("Dial returned {:?}", v), - Err(error) => { - info!("Dialing error! {}", error); - event_tx.send(NetworkPeerEvent::DialError { error: error.into() })?; - } - } - } + cmd = self.cmd_rx.recv() => { + if let Some(cmd) = cmd { + self.process_command(cmd); } } - // Process events - event = self.swarm.select_next_some() => { - process_swarm_event(&mut self.swarm, &event_tx, event).await? + event = self.swarm.select_next_some() => { + process_swarm_event(&mut self.swarm, &self.net_bus, event).await? } } } @@ -210,7 +178,7 @@ fn create_mdns_kad_behaviour( /// Process all swarm events async fn process_swarm_event( swarm: &mut Swarm, - event_tx: &broadcast::Sender, + net_bus: &Addr>, event: SwarmEvent, ) -> Result<()> { match event { @@ -230,7 +198,7 @@ async fn process_swarm_event( info!("Added address to kademlia {}", remote_addr); swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); info!("Added peer to gossipsub {}", remote_addr); - event_tx.send(NetworkPeerEvent::ConnectionEstablished { connection_id })?; + net_bus.do_send(NetworkPeerEvent::ConnectionEstablished { connection_id }); } SwarmEvent::OutgoingConnectionError { @@ -239,10 +207,10 @@ async fn process_swarm_event( connection_id, } => { info!("Failed to dial {peer_id:?}: {error}"); - event_tx.send(NetworkPeerEvent::OutgoingConnectionError { + net_bus.do_send(NetworkPeerEvent::OutgoingConnectionError { connection_id, error: Arc::new(error), - })?; + }); } SwarmEvent::IncomingConnectionError { error, .. } => { @@ -276,7 +244,7 @@ async fn process_swarm_event( message, })) => { trace!("Got message with id: {id} from peer: {peer_id}",); - event_tx.send(NetworkPeerEvent::GossipData(message.data))?; + net_bus.do_send(NetworkPeerEvent::GossipData(message.data)); } SwarmEvent::NewListenAddr { address, .. } => { warn!("Local node is listening on {address}"); diff --git a/packages/ciphernode/net/src/retry.rs b/packages/ciphernode/net/src/retry.rs deleted file mode 100644 index a1fdd95e..00000000 --- a/packages/ciphernode/net/src/retry.rs +++ /dev/null @@ -1,70 +0,0 @@ -use anyhow::Result; -use std::{future::Future, time::Duration}; -use tokio::time::sleep; -use tracing::{error, warn}; - -pub enum RetryError { - Failure(anyhow::Error), - Retry(anyhow::Error), -} - -pub fn to_retry(e: impl Into) -> RetryError { - RetryError::Retry(e.into()) -} - -pub const BACKOFF_DELAY: u64 = 500; -pub const BACKOFF_MAX_RETRIES: u32 = 10; - -/// Retries an async operation with exponential backoff -/// -/// # Arguments -/// * `operation` - Async function to retry -/// * `max_attempts` - Maximum number of retry attempts -/// * `initial_delay_ms` - Initial delay between retries in milliseconds -/// -/// # Returns -/// * `Result<()>` - Ok if the operation succeeded, Err if all retries failed -pub async fn retry_with_backoff( - operation: F, - max_attempts: u32, - initial_delay_ms: u64, -) -> Result<()> -where - F: Fn() -> Fut, - Fut: Future>, -{ - let mut current_attempt = 1; - let mut delay_ms = initial_delay_ms; - - loop { - match operation().await { - Ok(_) => return Ok(()), - Err(re) => { - match re { - RetryError::Retry(e) => { - if current_attempt >= max_attempts { - return Err(anyhow::anyhow!( - "Operation failed after {} attempts. Last error: {}", - max_attempts, - e - )); - } - - warn!( - "Attempt {}/{} failed, retrying in {}ms: {}", - current_attempt, max_attempts, delay_ms, e - ); - - sleep(Duration::from_millis(delay_ms)).await; - current_attempt += 1; - delay_ms *= 2; // Exponential backoff - } - RetryError::Failure(e) => { - error!("FAILURE!: returning to caller."); - return Err(e); - } - } - } - } - } -} diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 241a0e18..c826221c 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -474,15 +474,18 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network - let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::::new(EventBusConfig { capture_history: true, deduplicate: true, }) .start(); - let event_rx = event_tx.subscribe(); + let net_bus = EventBus::::new(EventBusConfig { + capture_history: true, + deduplicate: false, + }) + .start(); // Pas cmd and event channels to NetworkManager - NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "my-topic"); + NetworkManager::setup(bus.clone(), net_bus.clone(), cmd_tx.clone(), "my-topic"); // Capture messages from output on msgs vec let msgs: Arc>>> = Arc::new(Mutex::new(Vec::new())); @@ -499,7 +502,7 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { _ => None, } { msgs_loop.lock().await.push(msg.clone()); - event_tx.send(NetworkPeerEvent::GossipData(msg))?; + net_bus.do_send(NetworkPeerEvent::GossipData(msg)); } // if this manages to broadcast an event to the // event bus we will expect to see an extra event on @@ -556,13 +559,17 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { // Setup elements in test let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network - let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::::new(EventBusConfig { capture_history: true, deduplicate: true, }) .start(); - NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "mytopic"); + let net_bus = EventBus::::new(EventBusConfig { + capture_history: true, + deduplicate: false, + }) + .start(); + NetworkManager::setup(bus.clone(), net_bus.clone(), cmd_tx.clone(), "mytopic"); // Capture messages from output on msgs vec let event = EnclaveEvent::from(E3Requested { @@ -574,7 +581,7 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { }); // lets send an event from the network - let _ = event_tx.send(NetworkPeerEvent::GossipData(event.to_bytes()?)); + net_bus.do_send(NetworkPeerEvent::GossipData(event.to_bytes()?)); sleep(Duration::from_millis(1)).await; // need to push to next tick