Skip to content

Commit

Permalink
Introduce PeerAddress struct for improved address resolution patter…
Browse files Browse the repository at this point in the history
…ns (#621)

* Introduce PeerAddress struct with socket and multiaddr resolution methods

* Don't pop of p2p protocol from relay address as it isn't there

* fmt

* Update CHANGELOG

* Cache socket addresses

* Remove Multiaddr from PeerAddress

* Remove serde traits from PeerAddress

* Add doc string to PeerAddress

* Rename methods
  • Loading branch information
sandreae authored Jun 16, 2024
1 parent 6da24b0 commit 2ce83d0
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 43 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618)
- Include all logs for target schemas during replication [#620](https://github.com/p2panda/aquadoggo/pull/620)

### Added

- Introduce `PeerAddress` struct to help resolve `String` to internal address types [#621](https://github.com/p2panda/aquadoggo/pull/621)

## [0.7.3]

### Fixed
Expand Down
16 changes: 5 additions & 11 deletions aquadoggo/src/api/config_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use std::str::FromStr;
use std::sync::OnceLock;

use anyhow::{anyhow, Result};
use libp2p::{Multiaddr, PeerId};
use libp2p::PeerId;
use p2panda_rs::schema::SchemaId;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tempfile::TempDir;

use crate::network::utils::to_multiaddress;
use crate::{AllowList, Configuration, NetworkConfiguration};

const WILDCARD: &str = "*";
Expand Down Expand Up @@ -287,17 +286,12 @@ impl TryFrom<ConfigFile> for Configuration {
.to_path_buf(),
};

let relay_addresses = value.relay_addresses.into_iter().map(From::from).collect();
let direct_node_addresses = value
.direct_node_addresses
.iter()
.map(to_multiaddress)
.collect::<Result<Vec<Multiaddr>, _>>()?;

let relay_addresses = value
.relay_addresses
.iter()
.map(to_multiaddress)
.collect::<Result<Vec<Multiaddr>, _>>()?;
.into_iter()
.map(From::from)
.collect();

Ok(Configuration {
allow_schema_ids,
Expand Down
75 changes: 73 additions & 2 deletions aquadoggo/src/network/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::net::{IpAddr, SocketAddr, ToSocketAddrs};

use anyhow::Error;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};

use crate::AllowList;
Expand All @@ -23,7 +27,7 @@ pub struct NetworkConfiguration {
/// with a static IP Address). If you need to connect to nodes with changing, dynamic IP
/// addresses or even with nodes behind a firewall or NAT, do not use this field but use at
/// least one relay.
pub direct_node_addresses: Vec<Multiaddr>,
pub direct_node_addresses: Vec<PeerAddress>,

/// List of peers which are allowed to connect to your node.
///
Expand Down Expand Up @@ -62,7 +66,7 @@ pub struct NetworkConfiguration {
/// WARNING: This will potentially expose your IP address on the network. Do only connect to
/// trusted relays or make sure your IP address is hidden via a VPN or proxy if you're
/// concerned about leaking your IP.
pub relay_addresses: Vec<Multiaddr>,
pub relay_addresses: Vec<PeerAddress>,

/// Enable if node should also function as a relay.
///
Expand Down Expand Up @@ -146,3 +150,70 @@ impl NetworkConfiguration {
.with_max_established_per_peer(Some(self.max_connections_per_peer))
}
}

/// Helper struct for handling ambiguous string addresses which may need resolving via
/// a DNS lookup. The [ToSocketAddrs](https://doc.rust-lang.org/std/net/trait.ToSocketAddrs.html)
/// implementation is used to attempt converting a `String`` to a `SocketAddrs` and then from
/// here to a `Multiaddr`.
///
/// When `to_socket` is first called it's successful result is cached internally and this value
/// is used directly from this point on. This is an optimization which avoids unnecessary DNS
/// lookups.
#[derive(Debug, Clone)]
pub struct PeerAddress {
addr_str: String,
socket_addr: Option<SocketAddr>,
}

impl PeerAddress {
pub fn new(addr_str: String) -> Self {
PeerAddress {
addr_str,
socket_addr: None,
}
}

pub fn socket(&mut self) -> Result<SocketAddr, Error> {
if let Some(socket_addr) = self.socket_addr {
return Ok(socket_addr);
}

let socket_addr = match self.addr_str.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(addrs) => addrs,
None => return Err(anyhow::format_err!("No socket addresses found")),
},
Err(e) => return Err(e.into()),
};

let _ = self.socket_addr.replace(socket_addr);
Ok(socket_addr)
}

pub fn quic_multiaddr(&mut self) -> Result<Multiaddr, Error> {
match self.socket() {
Ok(socket_address) => {
let mut multiaddr = match socket_address.ip() {
IpAddr::V4(ip) => Multiaddr::from(Protocol::Ip4(ip)),
IpAddr::V6(ip) => Multiaddr::from(Protocol::Ip6(ip)),
};
multiaddr.push(Protocol::Udp(socket_address.port()));
multiaddr.push(Protocol::QuicV1);
Ok(multiaddr)
}
Err(e) => Err(e),
}
}
}

impl From<String> for PeerAddress {
fn from(value: String) -> Self {
Self::new(value)
}
}

impl std::fmt::Display for PeerAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.addr_str)
}
}
35 changes: 23 additions & 12 deletions aquadoggo/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::network::behaviour::{Event, P2pandaBehaviour};
use crate::network::config::NODE_NAMESPACE;
use crate::network::{identity, peers, swarm, utils, ShutdownHandler};

const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);

/// Network service which handles all networking logic for a p2panda node.
///
Expand All @@ -40,7 +40,7 @@ pub async fn network_service(
tx: ServiceSender,
tx_ready: ServiceReadySender,
) -> Result<()> {
let network_config = &context.config.network;
let mut network_config = context.config.network.clone();
let key_pair = identity::to_libp2p_key_pair(&context.key_pair);
let local_peer_id = key_pair.public().to_peer_id();

Expand All @@ -49,7 +49,7 @@ pub async fn network_service(
// The swarm can be initiated with or without "relay" capabilities.
let mut swarm = if network_config.relay_mode {
info!("Networking service initializing with relay capabilities...");
let mut swarm = swarm::build_relay_swarm(network_config, key_pair).await?;
let mut swarm = swarm::build_relay_swarm(&network_config, key_pair).await?;

// Start listening on tcp address.
let listen_addr_tcp = Multiaddr::empty()
Expand All @@ -60,7 +60,7 @@ pub async fn network_service(
swarm
} else {
info!("Networking service initializing...");
swarm::build_client_swarm(network_config, key_pair).await?
swarm::build_client_swarm(&network_config, key_pair).await?
};

// Start listening on QUIC address. Pick a random one if the given is taken already.
Expand Down Expand Up @@ -93,10 +93,18 @@ pub async fn network_service(
// replication sessions, which could leave the node in a strange state.
swarm.behaviour_mut().peers.disable();

for mut relay_address in network_config.relay_addresses.clone() {
for relay_address in network_config.relay_addresses.iter_mut() {
info!("Connecting to relay node {}", relay_address);

// Attempt to connect to the relay node, we give this a 5 second timeout so as not to
let mut relay_address = match relay_address.quic_multiaddr() {
Ok(relay_address) => relay_address,
Err(e) => {
debug!("Failed to resolve relay multiaddr: {}", e.to_string());
continue;
}
};

// Attempt to connect to the relay node, we give this a 10 second timeout so as not to
// get stuck if one relay is unreachable.
if let Ok(result) = tokio::time::timeout(
RELAY_CONNECT_TIMEOUT,
Expand Down Expand Up @@ -159,9 +167,17 @@ pub async fn network_service(
}

// Dial all nodes we want to directly connect to.
for direct_node_address in &network_config.direct_node_addresses {
for direct_node_address in network_config.direct_node_addresses.iter_mut() {
info!("Connecting to node @ {}", direct_node_address);

let direct_node_address = match direct_node_address.quic_multiaddr() {
Ok(address) => address,
Err(e) => {
debug!("Failed to resolve direct node multiaddr: {}", e.to_string());
continue;
}
};

let opts = DialOpts::unknown_peer_id()
.address(direct_node_address.clone())
.build();
Expand Down Expand Up @@ -219,11 +235,6 @@ pub async fn connect_to_relay(

// Now that we have a reply from the relay node we can add their peer id to the
// relay address.

// Pop off the "p2p" protocol.
let _ = relay_address.pop();

// Add it back on again with the relay nodes peer id included.
relay_address.push(Protocol::P2p(peer_id));

// Update values on the config.
Expand Down
19 changes: 1 addition & 18 deletions aquadoggo/src/network/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::net::SocketAddr;

use anyhow::Result;
use libp2p::multiaddr::Protocol;
use libp2p::Multiaddr;
use regex::Regex;

Expand All @@ -24,18 +22,3 @@ pub fn to_quic_address(address: &Multiaddr) -> Option<SocketAddr> {
}
}
}

pub fn to_multiaddress(address: &String) -> Result<Multiaddr> {
let socket_address = address
.to_socket_addrs()?
.next()
.unwrap_or_else(|| panic!("Could not resolve socket address for: {}", address));

let mut multiaddr = match socket_address.ip() {
IpAddr::V4(ip) => Multiaddr::from(Protocol::Ip4(ip)),
IpAddr::V6(ip) => Multiaddr::from(Protocol::Ip6(ip)),
};
multiaddr.push(Protocol::Udp(socket_address.port()));
multiaddr.push(Protocol::QuicV1);
Ok(multiaddr)
}

0 comments on commit 2ce83d0

Please sign in to comment.