Skip to content

Commit

Permalink
Dial all configured known relay and direct node addresses on schedule (
Browse files Browse the repository at this point in the history
…#622)

* Poll all known peer addresses

* Update PeerAddress method name

* Update CHANGELOG

* WIP: poll known peers

* Check if a direct node was identified (and add comments)

* Don't dial direct node address on startup, rely on scheduler

* More comments

* Remove unused import

* fmt

* Doc strings for EventLoop struct

* Clippy
  • Loading branch information
sandreae authored Jun 18, 2024
1 parent 27923b5 commit 2ed3c4e
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Introduce `PeerAddress` struct to help resolve `String` to internal address types [#621](https://github.com/p2panda/aquadoggo/pull/621)
- Re-dial all configured known peers on schedule [#622](https://github.com/p2panda/aquadoggo/pull/622)

### Changed

Expand Down
173 changes: 142 additions & 31 deletions aquadoggo/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ use libp2p::swarm::SwarmEvent;
use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm};
use log::{debug, info, trace, warn};
use tokio::task;
use tokio_stream::wrappers::BroadcastStream;
use tokio::time::interval;
use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
use tokio_stream::StreamExt;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::network::behaviour::{Event, P2pandaBehaviour};
use crate::network::config::NODE_NAMESPACE;
use crate::network::config::{PeerAddress, NODE_NAMESPACE};
use crate::network::{identity, peers, swarm, utils, ShutdownHandler};
use crate::NetworkConfiguration;

const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);

Expand Down Expand Up @@ -165,34 +167,12 @@ pub async fn network_service(
);
}
}

// Dial all nodes we want to directly connect to.
for direct_node_address in network_config.direct_node_addresses.iter_mut() {
info!("Connecting to node @ {}", direct_node_address);

let direct_node_address = match direct_node_address.quic_multiaddr() {
Ok(address) => address,
Err(e) => {
debug!("Failed to resolve direct node multiaddr: {}", e.to_string());
continue;
}
};

let opts = DialOpts::unknown_peer_id()
.address(direct_node_address.clone())
.build();

match swarm.dial(opts) {
Ok(_) => (),
Err(err) => debug!("Error dialing node: {:?}", err),
};
}

info!("Network service ready!");

// Spawn main event loop handling all p2panda and libp2p network events.
spawn_event_loop(
swarm,
network_config.to_owned(),
local_peer_id,
connected_relays,
shutdown,
Expand Down Expand Up @@ -317,30 +297,64 @@ pub async fn connect_to_relay(
Ok((relay_peer_id, relay_address.clone()))
}

const REDIAL_INTERVAL: Duration = Duration::from_secs(20);

/// Main loop polling the async swarm event stream and incoming service messages stream.
struct EventLoop {
/// libp2p swarm.
swarm: Swarm<P2pandaBehaviour>,

/// p2panda network configuration.
network_config: NetworkConfiguration,

/// Our own local PeerId.
local_peer_id: PeerId,

/// Addresses of relays which we connected to during node startup, this means we
/// are:
/// - registered on it's rendezvous service and actively discovering other peers
/// - listening on a circuit relay address for relayed connections
relay_addresses: HashMap<PeerId, Multiaddr>,

/// Addresses of configured relay or direct peers with their corresponding PeerId.
/// Is only populated once we have made the first connection to the addressed peer
/// and received an identify message back containing the PeerId.
known_peers: HashMap<Multiaddr, PeerId>,

/// Scheduler which triggers known peer redial attempts.
redial_scheduler: IntervalStream,

/// Service message channel sender.
tx: ServiceSender,

/// Service message channel receiver.
rx: BroadcastStream<ServiceMessage>,
relay_addresses: HashMap<PeerId, Multiaddr>,

/// Shutdown handler.
shutdown_handler: ShutdownHandler,

/// Did we learn our own port yet?
learned_port: bool,
}

impl EventLoop {
pub fn new(
swarm: Swarm<P2pandaBehaviour>,
network_config: NetworkConfiguration,
local_peer_id: PeerId,
tx: ServiceSender,
relay_addresses: HashMap<PeerId, Multiaddr>,
known_peers: HashMap<Multiaddr, PeerId>,
tx: ServiceSender,
shutdown_handler: ShutdownHandler,
) -> Self {
Self {
swarm,
network_config,
redial_scheduler: IntervalStream::new(interval(REDIAL_INTERVAL)),
local_peer_id,
rx: BroadcastStream::new(tx.subscribe()),
tx,
known_peers,
relay_addresses,
shutdown_handler,
learned_port: false,
Expand Down Expand Up @@ -403,13 +417,69 @@ impl EventLoop {
return
},
},
// The redial_scheduler emits an event every `REDIAL_INTERVAL` seconds.
Some(_) = self.redial_scheduler.next() => {
self.attempt_dial_known_addresses().await;
},
_ = shutdown_request_received.next() => {
self.shutdown().await;
}
}
}
}

/// Attempt to dial all hardcoded relay and direct node addresses. Only establishes a new connection
/// if we are currently not connected to the target peer.
async fn attempt_dial_known_addresses(&mut self) {
fn try_dial_peer(
swarm: &mut Swarm<P2pandaBehaviour>,
known_peers: &mut HashMap<Multiaddr, PeerId>,
address: &mut PeerAddress,
) {
// Get the peers quic multiaddress, this can error if the address was provided in the form
// of a domain name and we are not able to resolve it to a valid multiaddress (for example,
// if we are offline).
let address = match address.quic_multiaddr() {
Ok(address) => address,
Err(e) => {
debug!("Failed to resolve relay multiaddr: {}", e.to_string());
return;
}
};

// Construct dial opts depending on if we know the peer id of the peer we are dialing.
// We know the peer id if we have connected once to the peer in the current session.
let opts = match known_peers.get(&address) {
Some(peer_id) => DialOpts::peer_id(*peer_id)
.addresses(vec![address.to_owned()])
.condition(PeerCondition::NotDialing)
.condition(PeerCondition::Disconnected)
.build(),
None => DialOpts::unknown_peer_id()
.address(address.to_owned())
.build(),
};

// Dial the known peer. When dialing a peer by it's peer id this method will attempt a
// new connections if we are already connected to the peer or we are already dialing
// them.
match swarm.dial(opts) {
Ok(_) => (),
Err(err) => debug!("Error dialing node: {:?}", err),
};
}

// Attempt to dial all relay addresses.
for relay_address in self.network_config.relay_addresses.iter_mut() {
try_dial_peer(&mut self.swarm, &mut self.known_peers, relay_address);
}

// Attempt to dial all direct peer addresses.
for direct_node_address in self.network_config.direct_node_addresses.iter_mut() {
try_dial_peer(&mut self.swarm, &mut self.known_peers, direct_node_address);
}
}

/// Send a message on the communication bus to inform other services.
fn send_service_message(&mut self, message: ServiceMessage) {
if self.tx.send(message).is_err() {
Expand Down Expand Up @@ -489,10 +559,43 @@ impl EventLoop {
async fn handle_identify_events(&mut self, event: &identify::Event) {
match event {
identify::Event::Received {
info: identify::Info { observed_addr, .. },
..
info:
identify::Info {
observed_addr,
listen_addrs,
..
},
peer_id,
} => {
debug!("Observed external address reported: {observed_addr}");
// Configuring known static relay and peer addresses is done by providing an ip
// address or domain name and port. We don't yet know the peer id of the relay or
// direct peer. Here we observe all identify events and check the addresses the
// identified peer provides. If one matches our known addresses then we can add
// their peer id to our address book. This is then used when dialing the peer
// to avoid multiple connections being established to the same peer.

// Check if the identified peer is one of our configured relay addresses.
for address in self.network_config.relay_addresses.iter_mut() {
if let Ok(addr) = address.quic_multiaddr() {
if listen_addrs.contains(&addr) {
debug!("Relay identified: {peer_id} {addr}");
self.known_peers.insert(addr, *peer_id);
}
}
}

// Check if the identified peer is one of our direct node addresses.
for address in self.network_config.direct_node_addresses.iter_mut() {
if let Ok(addr) = address.quic_multiaddr() {
if listen_addrs.contains(&addr) {
debug!("Direct node identified: {peer_id} {addr}");
self.known_peers.insert(addr, *peer_id);
}
}
}

// If we don't know of the observed address a peer told us then add it to our
// external addresses.
if !self
.swarm
.external_addresses()
Expand Down Expand Up @@ -530,6 +633,7 @@ impl EventLoop {

pub async fn spawn_event_loop(
swarm: Swarm<P2pandaBehaviour>,
network_config: NetworkConfiguration,
local_peer_id: PeerId,
relay_addresses: HashMap<PeerId, Multiaddr>,
shutdown: Shutdown,
Expand All @@ -538,12 +642,19 @@ pub async fn spawn_event_loop(
) -> Result<()> {
let mut shutdown_handler = ShutdownHandler::new();

let mut known_peers = HashMap::new();
for (peer_id, addr) in relay_addresses.iter() {
known_peers.insert(addr.to_owned(), *peer_id);
}

// Spawn a task to run swarm in event loop
let event_loop = EventLoop::new(
swarm,
network_config,
local_peer_id,
tx,
relay_addresses,
known_peers,
tx,
shutdown_handler.clone(),
);
let handle = task::spawn(event_loop.run());
Expand Down

0 comments on commit 2ed3c4e

Please sign in to comment.