Skip to content

Commit

Permalink
manage websocket connection dns and dial ourselves
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwample committed Jan 23, 2025
1 parent 7e8dc07 commit 30cb1f9
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 74 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions common/client-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub enum ClientCoreError {
#[error("no gateway with id: {0}")]
NoGatewayWithId(String),

#[error("Invalid URL: {0}")]
InvalidURL(String),

#[error("no gateways on network")]
NoGatewaysOnNetwork,

Expand Down
2 changes: 1 addition & 1 deletion common/client-core/src/init/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub async fn gateways_for_init<R: Rng>(
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure.into()),
Ok(Err(conn_failure)) => Err(conn_failure),
Ok(Ok((stream, _))) => Ok(stream),
}
}
Expand Down
52 changes: 38 additions & 14 deletions common/client-core/src/init/websockets.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,50 @@
use nym_http_api_client::dns::HickoryDnsResolver;
use crate::error::ClientCoreError;

use nym_http_api_client::HickoryDnsResolver;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::{
error::{Error, UrlError},
handshake::client::Response,
};
use tungstenite::{error::UrlError, handshake::client::Response};
use url::{Host, Url};

use std::net::SocketAddr;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error> {
use std::net::SocketAddr;

) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
let resolver = HickoryDnsResolver::default();
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidURL(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);

let sock_addrs: Vec<SocketAddr> = resolver
.resolve_str(endpoint)
.await
.map_err(|_| UrlError::NoPathOrQuery)? // failed to resolve
.collect();
let host = uri
.host()
.ok_or(ClientCoreError::InvalidURL(endpoint.to_owned()))?;

// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await
.map_err(|_| {
// failed to resolve
ClientCoreError::GatewayConnectionFailure {
source: UrlError::NoPathOrQuery.into(),
}
})?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};

let stream = TcpStream::connect(&sock_addrs[..]).await?;

tokio_tungstenite::client_async_tls(endpoint, stream).await
tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(Into::into)
}
1 change: 1 addition & 0 deletions common/client-libs/gateway-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ nym-credential-storage = { path = "../../credential-storage" }
nym-credentials-interface = { path = "../../credentials-interface" }
nym-crypto = { path = "../../crypto" }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-http-api-client = { path = "../../http-api-client" }
nym-network-defaults = { path = "../../network-defaults" }
nym-sphinx = { path = "../../nymsphinx" }
nym-statistics-common = { path = "../../statistics" }
Expand Down
17 changes: 6 additions & 11 deletions common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ use url::Url;
use std::os::fd::RawFd;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;

#[cfg(not(unix))]
use std::os::raw::c_int as RawFd;
Expand All @@ -53,6 +51,11 @@ use zeroize::Zeroizing;

pub mod config;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
#[cfg(not(target_arch = "wasm32"))]
use websockets::connect_async;

pub struct GatewayConfig {
pub gateway_identity: identity::PublicKey,

Expand Down Expand Up @@ -201,15 +204,7 @@ impl<C, St> GatewayClient<C, St> {
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let ws_stream = match connect_async(&self.gateway_address).await {
Ok((ws_stream, _)) => ws_stream,
Err(error) => {
return Err(GatewayClientError::NetworkConnectionFailed {
address: self.gateway_address.clone(),
source: error,
})
}
};
let (ws_stream, _) = connect_async(&self.gateway_address).await?;

self.connection = SocketState::Available(Box::new(ws_stream));

Expand Down
60 changes: 60 additions & 0 deletions common/client-libs/gateway-client/src/client/websockets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::error::GatewayClientError;

use nym_http_api_client::HickoryDnsResolver;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::{error::UrlError, handshake::client::Response};
use url::{Host, Url};

use std::net::SocketAddr;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidURL(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);

let host = uri
.host()
.ok_or(GatewayClientError::InvalidURL(endpoint.to_owned()))?;

// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await
.map_err(|_| {
// failed to resolve
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: UrlError::NoPathOrQuery.into(),
}
})?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};

let stream = TcpStream::connect(&sock_addrs[..]).await.map_err(|error| {
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error.into(),
}
})?;

tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error,
})
}
6 changes: 4 additions & 2 deletions common/http-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ license.workspace = true

[dependencies]
async-trait = { workspace = true }
once_cell = { workspace = true }
hickory-resolver = { workspace = true, features = ["dns-over-https-rustls", "webpki-roots"] }
reqwest = { workspace = true, features = ["json"] }
http.workspace = true
url = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

nym-bin-common = { path = "../bin-common" }

[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hickory-resolver = { workspace = true, features = ["dns-over-https-rustls", "webpki-roots"] }

# for request timeout until https://github.com/seanmonstar/reqwest/issues/1135 is fixed
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
workspace = true
Expand Down
94 changes: 55 additions & 39 deletions common/http-api-client/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@
//!
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the
//! `hickory-resolver` crate
#![deny(missing_docs)]

use crate::ClientBuilder;

use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

use hickory_resolver::lookup_ip::LookupIp;
use hickory_resolver::{
config::{LookupIpStrategy, NameServerConfigGroup, ResolverConfig, ResolverOpts},
error::ResolveError,
lookup_ip::LookupIpIntoIter,
TokioAsyncResolver,
};
use once_cell::sync::OnceCell;
pub use reqwest::dns::Resolve;
use reqwest::dns::{Addrs, Name, Resolving};
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tracing::warn;

impl ClientBuilder {
/// Override the DNS resolver implementation used by the underlying http client.
pub fn dns_resolver<R: Resolve + 'static>(mut self, resolver: Arc<R>) -> Self {
self.reqwest_client_builder = self.reqwest_client_builder.dns_resolver(resolver);
self
Expand All @@ -38,8 +38,12 @@ struct SocketAddrs {
iter: LookupIpIntoIter,
}

#[derive(Debug)]
struct HickoryDnsSystemConfError(ResolveError);
#[derive(Debug, thiserror::Error)]
#[error("hickory-dns resolver error: {hickory_error}")]
pub struct HickoryDnsError {
#[from]
hickory_error: ResolveError,
}

/// Wrapper around an `AsyncResolver`, which implements the `Resolve` trait.
#[derive(Debug, Default, Clone)]
Expand All @@ -53,26 +57,19 @@ pub struct HickoryDnsResolver {

impl Resolve for HickoryDnsResolver {
fn resolve(&self, name: Name) -> Resolving {
self.resolve_str(name.as_str())
}
}

impl HickoryDnsResolver {
pub fn resolve_str(&self, name: &str) -> Resolving {
let resolver = self.state.clone();
let fallback = self.fallback.clone();
let domain = name.to_owned();
Box::pin(async move {
let resolver = resolver.get_or_try_init(new_resolver)?;

// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(&domain).await {
let lookup = match resolver.lookup_ip(name.as_str()).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(&domain).await?
resolver.lookup_ip(name.as_str()).await?
}
};

Expand All @@ -92,9 +89,29 @@ impl Iterator for SocketAddrs {
}
}

impl HickoryDnsResolver {
/// Attempt to resolve a domain name to a set of ['IpAddr']s
pub async fn resolve_str(&self, name: &str) -> Result<LookupIp, HickoryDnsError> {
let resolver = self.state.get_or_try_init(new_resolver)?;

// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(name).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = self.fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(name).await?
}
};

Ok(lookup)
}
}

/// Create a new resolver with a custom DoT based configuration. The options are overridden to look
/// up for both IPv4 and IPv6 addresses to work with "happy eyeballs" algorithm.
fn new_resolver() -> Result<TokioAsyncResolver, HickoryDnsSystemConfError> {
fn new_resolver() -> Result<TokioAsyncResolver, HickoryDnsError> {
let mut name_servers = NameServerConfigGroup::google_tls();
name_servers.merge(NameServerConfigGroup::google_https());
// name_servers.merge(NameServerConfigGroup::google_h3());
Expand All @@ -106,35 +123,22 @@ fn new_resolver() -> Result<TokioAsyncResolver, HickoryDnsSystemConfError> {
let config = ResolverConfig::from_parts(None, Vec::new(), name_servers);

let mut opts = ResolverOpts::default();
opts.ip_strategy = LookupIpStrategy::Ipv4thenIpv6;
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
// Would like to enable this when 0.25 stabilizes
// opts.server_ordering_strategy = ServerOrderingStrategy::RoundRobin;

Ok(TokioAsyncResolver::tokio(config, opts))
}

/// Create a new resolver with the default configuration, which reads from `/etc/resolve.conf`. The
/// options are overridden to look up for both IPv4 and IPv6 addresses to work with "happy eyeballs"
/// algorithm.
fn new_resolver_system() -> Result<TokioAsyncResolver, HickoryDnsSystemConfError> {
let (config, mut opts) =
hickory_resolver::system_conf::read_system_conf().map_err(HickoryDnsSystemConfError)?;
opts.ip_strategy = LookupIpStrategy::Ipv4thenIpv6;
/// Create a new resolver with the default configuration, which reads from the system DNS config
/// (i.e. `/etc/resolve.conf` in unix). The options are overridden to look up for both IPv4 and IPv6
/// addresses to work with "happy eyeballs" algorithm.
fn new_resolver_system() -> Result<TokioAsyncResolver, HickoryDnsError> {
let (config, mut opts) = hickory_resolver::system_conf::read_system_conf()?;
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
Ok(TokioAsyncResolver::tokio(config, opts))
}

impl fmt::Display for HickoryDnsSystemConfError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("error reading DNS system conf for hickory-dns")
}
}

impl std::error::Error for HickoryDnsSystemConfError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -148,7 +152,7 @@ mod test {
.unwrap();

let resp = client
.get("http://ifconfig.me")
.get("http://ifconfig.me:80")
.send()
.await
.unwrap()
Expand All @@ -158,4 +162,16 @@ mod test {

assert!(!resp.is_empty());
}

#[tokio::test]
async fn dns_lookup() -> Result<(), HickoryDnsError> {
let resolver = HickoryDnsResolver::default();

let domain = "ifconfig.me";
let addrs = resolver.resolve_str(domain).await?;

assert!(addrs.into_iter().next().is_some());

Ok(())
}
}
Loading

0 comments on commit 30cb1f9

Please sign in to comment.