diff --git a/CHANGELOG.md b/CHANGELOG.md index f5924b0..0b4b92e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This file lists the changes that have occurred since January 2024 in the project * Increase samples used for clock synchronization and idle latency measurement * Clock synchronization now uses the average of the lowest 1/3rd of samples +* Adjust for clock drift in tests +* Fix connecting to servers on non-standard port with peers ## 0.3 - 2024-09-16 diff --git a/README.md b/README.md index 69e74d0..8a7c110 100644 --- a/README.md +++ b/README.md @@ -265,6 +265,3 @@ docker build .. -t crusader -f server-static.Dockerfile * The up and down latency measurement rely on symmetric stable latency measurements to the server. These values may be wrong if those assumption don't hold on test startup. - -* The up and down latency measurement may slowly get out of sync due to -clock drift. Clocks are currently only synchronized on test startup. diff --git a/src/crusader-lib/src/common.rs b/src/crusader-lib/src/common.rs index 386a122..9145d93 100644 --- a/src/crusader-lib/src/common.rs +++ b/src/crusader-lib/src/common.rs @@ -420,6 +420,15 @@ async fn ping_measure_recv( Ok(storage) } +pub struct LatencyResult { + pub latency: Duration, + pub threshold: Duration, + pub server_pong: Duration, + pub server_offset: u64, + pub server_time: u64, + pub control_rx: FramedRead, +} + pub(crate) async fn measure_latency( id: u64, ping_index: &mut u64, @@ -428,15 +437,7 @@ pub(crate) async fn measure_latency( server: SocketAddr, local_udp: SocketAddr, setup_start: Instant, -) -> Result< - ( - Duration, - Duration, - u64, - FramedRead, - ), - anyhow::Error, -> { +) -> Result { send(&mut control_tx, &ClientMessage::GetMeasurements).await?; let latencies = tokio::spawn(async move { @@ -524,17 +525,36 @@ pub(crate) async fn measure_latency( let server_offset = (server_pong.as_micros() as u64).wrapping_sub(server_time); - (server_pong, latency, server_offset) + (server_pong, latency, server_offset, server_time) }) .collect(); + let server_pong = pings + .iter() + .map(|&(server_pong, _, _, _)| server_pong) + .sum::() + / (pings.len() as u32); + let server_offset = pings .iter() - .map(|&(_, _, offset)| offset as u128) + .map(|&(_, _, offset, _)| offset as u128) + .sum::() + / (pings.len() as u128); + + let server_time = pings + .iter() + .map(|&(_, _, _, time)| time as u128) .sum::() / (pings.len() as u128); - Ok((latency, threshold, server_offset as u64, control_rx)) + Ok(LatencyResult { + latency, + threshold, + server_pong, + server_offset: server_offset as u64, + server_time: server_time as u64, + control_rx, + }) } pub(crate) async fn ping_send( @@ -545,7 +565,7 @@ pub(crate) async fn ping_send( socket: Arc, interval: Duration, estimated_duration: Duration, -) -> Result, anyhow::Error> { +) -> Result<(Vec, u64), anyhow::Error> { let mut storage = Vec::with_capacity( ((estimated_duration.as_secs_f64() + 2.0) * (1000.0 / interval.as_millis() as f64) * 1.5) as usize, @@ -579,7 +599,7 @@ pub(crate) async fn ping_send( storage.push(current); } - Ok(storage) + Ok((storage, ping_index)) } pub(crate) async fn ping_recv( diff --git a/src/crusader-lib/src/latency.rs b/src/crusader-lib/src/latency.rs index 1cd054d..cc344d2 100644 --- a/src/crusader-lib/src/latency.rs +++ b/src/crusader-lib/src/latency.rs @@ -21,7 +21,7 @@ use tokio::{ }; use tokio_util::codec::{FramedRead, FramedWrite}; -use crate::common::{connect, hello, measure_latency, udp_handle}; +use crate::common::{connect, hello, measure_latency, udp_handle, LatencyResult}; use crate::discovery; use crate::protocol::{codec, receive, send, ClientMessage, Ping, ServerMessage}; @@ -133,7 +133,12 @@ async fn test_async( let mut ping_index = 0; - let (_, latency, mut server_time_offset, mut control_rx) = measure_latency( + let LatencyResult { + threshold: latency, + server_offset: mut server_time_offset, + mut control_rx, + .. + } = measure_latency( id, &mut ping_index, &mut control_tx, diff --git a/src/crusader-lib/src/peer.rs b/src/crusader-lib/src/peer.rs index 6b02936..c9c9b37 100644 --- a/src/crusader-lib/src/peer.rs +++ b/src/crusader-lib/src/peer.rs @@ -1,4 +1,4 @@ -use crate::common::connect; +use crate::common::{connect, LatencyResult}; #[cfg(feature = "client")] use crate::common::{Config, Msg}; #[cfg(feature = "client")] @@ -110,7 +110,7 @@ pub async fn connect_to_peer( IpAddr::V6(ip) => ip, } .octets(), - port: config.port, + port: server.port(), ping_interval: config.ping_interval.as_millis() as u64, estimated_duration: estimated_duration.as_millis(), }, @@ -174,7 +174,13 @@ pub async fn run_peer( let mut ping_index = 0; - let (latency, _, server_time_offset, mut control_rx) = measure_latency( + let LatencyResult { + latency, + server_pong: pre_server_pong, + server_time: pre_server_time, + mut control_rx, + .. + } = measure_latency( id, &mut ping_index, &mut control_tx, @@ -216,7 +222,7 @@ pub async fn run_peer( }; } - Ok((latencies, overload_)) + Ok((latencies, overload_, control_rx)) }); send( @@ -268,13 +274,47 @@ pub async fn run_peer( state_tx.send((TestState::EndPingRecv, Instant::now())).ok(); - let pings_sent = ping_send.await??; + let (pings_sent, mut ping_index) = ping_send.await??; + let mut pongs = ping_recv.await??; + send(&mut control_tx, &ClientMessage::StopMeasurements).await?; + + let (mut latencies, server_overload, control_rx) = measures.await??; + + let LatencyResult { + server_pong: post_server_pong, + server_time: post_server_time, + .. + } = measure_latency( + id, + &mut ping_index, + &mut control_tx, + control_rx, + server, + local_udp, + setup_start, + ) + .await?; + send(&mut control_tx, &ClientMessage::Done).await?; - let mut pongs = ping_recv.await??; + let server_time = post_server_time.wrapping_sub(pre_server_time); + let peer_time = post_server_pong.saturating_sub(pre_server_pong); + let peer_time_micros = peer_time.as_micros() as f64; + let ratio = peer_time_micros / server_time as f64; + let inv_ratio = server_time as f64 / peer_time_micros; + + let to_peer_time = |server_time: u64| -> u64 { + let time = server_time.wrapping_sub(pre_server_time); + let time = (time as f64 * ratio) as u64; + (pre_server_pong.as_micros() as u64).saturating_add(time) + }; - let (mut latencies, server_overload) = measures.await??; + let to_server_time = |peer_time: Duration| -> u64 { + let time = peer_time.saturating_sub(pre_server_pong).as_micros() as u64; + let time = (time as f64 * inv_ratio) as u64; + pre_server_time.wrapping_add(time) + }; latencies.sort_by_key(|d| d.index); pongs.sort_by_key(|d| d.0.index); @@ -288,10 +328,8 @@ pub async fn run_peer( .ok() .map(|ping| RawLatency { total: None, - up: Duration::from_micros( - latencies[ping].time.wrapping_add(server_time_offset), - ) - .saturating_sub(sent), + up: Duration::from_micros(to_peer_time(latencies[ping].time)) + .saturating_sub(sent), }); latency.as_mut().map(|latency| { @@ -304,7 +342,7 @@ pub async fn run_peer( }); PeerLatency { - sent: (sent.as_micros() as u64).wrapping_sub(server_time_offset), + sent: to_server_time(sent), latency, } }) diff --git a/src/crusader-lib/src/test.rs b/src/crusader-lib/src/test.rs index f4aface..e153205 100644 --- a/src/crusader-lib/src/test.rs +++ b/src/crusader-lib/src/test.rs @@ -1,6 +1,6 @@ use crate::common::{ connect, data, fresh_socket_addr, hello, measure_latency, ping_recv, ping_send, read_data, - wait_for_state, write_data, Config, Msg, TestState, + wait_for_state, write_data, Config, LatencyResult, Msg, TestState, }; use crate::file_format::{ RawConfig, RawHeader, RawPing, RawPoint, RawResult, RawStream, RawStreamGroup, TestData, @@ -153,7 +153,13 @@ pub(crate) async fn test_async( let mut ping_index = 0; - let (latency, _, server_time_offset, mut control_rx) = measure_latency( + let LatencyResult { + latency, + server_pong: pre_server_pong, + server_time: pre_server_time, + mut control_rx, + .. + } = measure_latency( id, &mut ping_index, &mut control_tx, @@ -311,7 +317,7 @@ pub(crate) async fn test_async( }; } - Ok((latencies, throughput, overload_)) + Ok((latencies, throughput, overload_, control_rx)) }); if let Some(peer) = peer.as_mut() { @@ -472,20 +478,51 @@ pub(crate) async fn test_async( state_tx.send((TestState::EndPingRecv, Instant::now()))?; let peer = if let Some(peer) = peer { - Some(peer.complete().await?) + Some( + peer.complete() + .await + .context("Failed to wait for peer completion")?, + ) } else { None }; let duration = start.elapsed(); - let pings_sent = ping_send.await??; + let (pings_sent, mut ping_index) = ping_send.await??; + let mut pongs = ping_recv.await??; + send(&mut control_tx, &ClientMessage::StopMeasurements).await?; + + let (mut latencies, throughput, server_overload, control_rx) = measures.await??; + + let LatencyResult { + server_pong: post_server_pong, + server_time: post_server_time, + .. + } = measure_latency( + id, + &mut ping_index, + &mut control_tx, + control_rx, + server, + local_udp, + setup_start, + ) + .await?; + send(&mut control_tx, &ClientMessage::Done).await?; - let mut pongs = ping_recv.await??; + let server_time = post_server_time.wrapping_sub(pre_server_time); + let client_time = post_server_pong.saturating_sub(pre_server_pong); + let client_time_micros = client_time.as_micros() as f64; + let ratio = client_time_micros / server_time as f64; - let (mut latencies, throughput, server_overload) = measures.await??; + let to_client_time = |server_time: u64| -> u64 { + let time = server_time.wrapping_sub(pre_server_time); + let time = (time as f64 * ratio) as u64; + (pre_server_pong.as_micros() as u64).saturating_add(time) + }; let server_overload = server_overload || peer.as_ref().map(|p| p.0).unwrap_or_default(); @@ -495,7 +532,7 @@ pub(crate) async fn test_async( .enumerate() .map(|(i, p)| RawPing { index: i as u64, - sent: Duration::from_micros(p.sent.wrapping_add(server_time_offset)), + sent: Duration::from_micros(to_client_time(p.sent)), latency: p.latency, }) .collect::>() @@ -516,10 +553,8 @@ pub(crate) async fn test_async( .ok() .map(|ping| RawLatency { total: None, - up: Duration::from_micros( - latencies[ping].time.wrapping_add(server_time_offset), - ) - .saturating_sub(sent), + up: Duration::from_micros(to_client_time(latencies[ping].time)) + .saturating_sub(sent), }); latency.as_mut().map(|latency| { @@ -573,7 +608,7 @@ pub(crate) async fn test_async( throughput .iter() .filter(|e| e.0.group == group && e.0.id == id) - .map(|e| (e.1.wrapping_add(server_time_offset), e.2)) + .map(|e| (to_client_time(e.1), e.2)) .collect() };