From 9bd271771ed54e78868ce55919d2d5bf4103e5a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Mon, 23 Sep 2024 16:42:22 +0200 Subject: [PATCH] Make scheduling more robust against clock drift --- src/crusader-lib/src/test.rs | 42 +++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/crusader-lib/src/test.rs b/src/crusader-lib/src/test.rs index 5ffa98b..f4aface 100644 --- a/src/crusader-lib/src/test.rs +++ b/src/crusader-lib/src/test.rs @@ -43,9 +43,7 @@ use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec}; const MEASURE_DELAY: Duration = Duration::from_millis(50); #[derive(Debug)] -struct ScheduledLoads { - time: Instant, -} +struct ScheduledLoads; struct State { downloads: Mutex>>, @@ -306,13 +304,8 @@ pub(crate) async fn test_async( .send(()) .map_err(|_| anyhow!("Failed to notify downloader"))?; } - ServerMessage::ScheduledLoads { groups: _, time } => { - let time = Duration::from_micros(time.wrapping_add(server_time_offset)); - scheduled_load_tx - .send(ScheduledLoads { - time: setup_start + time, - }) - .await? + ServerMessage::ScheduledLoads { groups: _, time: _ } => { + scheduled_load_tx.send(ScheduledLoads).await? } _ => bail!("Unexpected message {:?}", reply), }; @@ -351,7 +344,16 @@ pub(crate) async fn test_async( state_tx.send((TestState::Grace1, start))?; time::sleep(grace).await; - let load_delay = (Duration::from_millis(50) + latency).as_micros() as u64; + let load_delay_pure = Duration::from_millis(50); + let load_delay = (load_delay_pure + latency / 2).as_micros() as u64; + + let start_time = || -> Result { + Instant::now() + .checked_add(load_delay_pure) + .ok_or(anyhow!("Time overflow"))? + .checked_sub(latency / 2) + .ok_or(anyhow!("Time overflow")) + }; let mut test_data = Vec::new(); @@ -364,12 +366,12 @@ pub(crate) async fn test_async( }, ) .await?; - let load = scheduled_load_rx + scheduled_load_rx .recv() .await .ok_or(anyhow!("Failed to receive"))?; - let start = load.time; - state_tx.send((TestState::LoadFromServer, load.time))?; + let start = start_time()?; + state_tx.send((TestState::LoadFromServer, start))?; msg(&format!("Testing download...")); let _ = semaphore.acquire_many(loading_streams).await?; let end = Instant::now(); @@ -391,12 +393,12 @@ pub(crate) async fn test_async( }, ) .await?; - let load = scheduled_load_rx + scheduled_load_rx .recv() .await .ok_or(anyhow!("Failed to receive"))?; - let start = load.time; - state_tx.send((TestState::LoadFromClient, load.time))?; + let start = start_time()?; + state_tx.send((TestState::LoadFromClient, start))?; msg(&format!("Testing upload...")); for _ in 0..config.streams { @@ -429,12 +431,12 @@ pub(crate) async fn test_async( }, ) .await?; - let load = scheduled_load_rx + scheduled_load_rx .recv() .await .ok_or(anyhow!("Failed to receive"))?; - let start = load.time; - state_tx.send((TestState::LoadFromBoth, load.time))?; + let start = start_time()?; + state_tx.send((TestState::LoadFromBoth, start))?; msg(&format!("Testing both download and upload...")); for _ in 0..config.streams {