Skip to content

Commit

Permalink
fix dns
Browse files Browse the repository at this point in the history
  • Loading branch information
hatoo committed Mar 2, 2024
1 parent 0a588dc commit d1ce4b9
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hyper::{
};
use hyper_util::rt::{TokioExecutor, TokioIo};
use rand::prelude::*;
use std::{pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc, time::Instant};
use thiserror::Error;
use tokio::net::TcpStream;
use url::{ParseError, Url};
Expand Down Expand Up @@ -296,54 +296,60 @@ impl Client {
Ok(())
}

async fn client(
async fn client<R: Rng>(
&self,
addr: (std::net::IpAddr, u16),
url: &Url,
) -> Result<Stream, ClientError> {
rng: &mut R,
) -> Result<(Instant, Stream), ClientError> {
// TODO: Allow the connect timeout to be configured
let timeout_duration = tokio::time::Duration::from_secs(5);

if url.scheme() == "https" {
let addr = self.dns.lookup(url, rng).await?;
let dns_lookup = Instant::now();
// If we do not put a timeout here then the connections attempts will
// linger long past the configured timeout
let stream = tokio::time::timeout(timeout_duration, self.tls_client(addr, url)).await;
return match stream {
Ok(Ok(stream)) => Ok(stream),
Ok(Ok(stream)) => Ok((dns_lookup, stream)),
Ok(Err(err)) => Err(err),
Err(_) => Err(ClientError::Timeout),
};
}
#[cfg(unix)]
if let Some(socket_path) = &self.unix_socket {
let dns_lookup = Instant::now();
let stream = tokio::time::timeout(
timeout_duration,
tokio::net::UnixStream::connect(socket_path),
)
.await;
return match stream {
Ok(Ok(stream)) => Ok(Stream::Unix(stream)),
Ok(Ok(stream)) => Ok((dns_lookup, Stream::Unix(stream))),
Ok(Err(err)) => Err(ClientError::IoError(err)),
Err(_) => Err(ClientError::Timeout),
};
}
#[cfg(feature = "vsock")]
if let Some(addr) = self.vsock_addr {
let dns_lookup = Instant::now();
let stream =
tokio::time::timeout(timeout_duration, tokio_vsock::VsockStream::connect(addr))
.await;
return match stream {
Ok(Ok(stream)) => Ok(Stream::Vsock(stream)),
Ok(Ok(stream)) => Ok((dns_lookup, Stream::Vsock(stream))),
Ok(Err(err)) => Err(ClientError::IoError(err)),
Err(_) => Err(ClientError::Timeout),
};
}
let addr = self.dns.lookup(url, rng).await?;
let dns_lookup = Instant::now();
let stream =
tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await;
match stream {
Ok(Ok(stream)) => {
stream.set_nodelay(true)?;
Ok(Stream::Tcp(stream))
Ok((dns_lookup, Stream::Tcp(stream)))
}
Ok(Err(err)) => Err(ClientError::IoError(err)),
Err(_) => Err(ClientError::Timeout),
Expand Down Expand Up @@ -411,13 +417,13 @@ impl Client {
Ok(Stream::Tls(stream))
}

async fn client_http1(
async fn client_http1<R: Rng>(
&self,
addr: (std::net::IpAddr, u16),
url: &Url,
) -> Result<SendRequestHttp1, ClientError> {
let stream = self.client(addr, url).await?;
stream.handshake_http1().await
rng: &mut R,
) -> Result<(Instant, SendRequestHttp1), ClientError> {
let (dns_lookup, stream) = self.client(url, rng).await?;
Ok((dns_lookup, stream.handshake_http1().await?))
}

fn request(&self, url: &Url) -> Result<http::Request<Full<&'static [u8]>>, ClientError> {
Expand Down Expand Up @@ -453,9 +459,8 @@ impl Client {
let mut send_request = if let Some(send_request) = client_state.send_request.take() {
send_request
} else {
let addr = self.dns.lookup(&url, &mut client_state.rng).await?;
let dns_lookup = std::time::Instant::now();
let send_request = self.client_http1(addr, &url).await?;
let (dns_lookup, send_request) =
self.client_http1(&url, &mut client_state.rng).await?;
let dialup = std::time::Instant::now();

connection_time = Some(ConnectionTime { dns_lookup, dialup });
Expand All @@ -468,9 +473,9 @@ impl Client {
// This gets hit when the connection for HTTP/1.1 faults
// This re-connects
start = std::time::Instant::now();
let addr = self.dns.lookup(&url, &mut client_state.rng).await?;
let dns_lookup = std::time::Instant::now();
send_request = self.client_http1(addr, &url).await?;
let (dns_lookup, send_request_) =
self.client_http1(&url, &mut client_state.rng).await?;
send_request = send_request_;
let dialup = std::time::Instant::now();
connection_time = Some(ConnectionTime { dns_lookup, dialup });
}
Expand Down Expand Up @@ -549,9 +554,7 @@ impl Client {
url: &Url,
rng: &mut R,
) -> Result<(ConnectionTime, SendRequestHttp2), ClientError> {
let addr = self.dns.lookup(url, rng).await?;
let dns_lookup = std::time::Instant::now();
let stream = self.client(addr, url).await?;
let (dns_lookup, stream) = self.client(url, rng).await?;
let send_request = stream.handshake_http2().await?;
let dialup = std::time::Instant::now();
Ok((ConnectionTime { dns_lookup, dialup }, send_request))
Expand Down Expand Up @@ -641,16 +644,16 @@ impl Client {
// reuse connection
(send_request, None)
} else {
let addr = self.dns.lookup(&url, rng).await?;
(self.client_http1(addr, &url).await?, Some(send_request))
let (_dns_lookup, stream) = self.client_http1(&url, rng).await?;
(stream, Some(send_request))
};

while futures::future::poll_fn(|ctx| send_request.poll_ready(ctx))
.await
.is_err()
{
let addr = self.dns.lookup(&url, rng).await?;
send_request = self.client_http1(addr, &url).await?;
let (_dns_lookup, stream) = self.client_http1(&url, rng).await?;
send_request = stream;
}

let mut request = self.request(&url)?;
Expand Down

0 comments on commit d1ce4b9

Please sign in to comment.