diff --git a/src/congestion_control/pacing.rs b/src/congestion_control/pacing.rs index e1344fc9..2b3d29a6 100644 --- a/src/congestion_control/pacing.rs +++ b/src/congestion_control/pacing.rs @@ -82,7 +82,7 @@ impl Pacer { /// Build a pacer controller. pub fn build_pacer_controller(conf: &RecoveryConfig) -> Self { Pacer::new( - true, + conf.enable_pacing, conf.initial_rtt, conf.initial_congestion_window .saturating_mul(conf.max_datagram_size as u64), diff --git a/src/connection/connection.rs b/src/connection/connection.rs index 44df7879..1b86614a 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -1824,15 +1824,8 @@ impl Connection { // count toward congestion control limits. (RFC 9002 Section 3) // - Probe packets are allowed to temporarily exceed the congestion // window. (RFC 9002 Section 4.7) - if !st.is_probe { - if !r.can_send() { - return Err(Error::Done); - } - - // Check the pacer - if self.recovery_conf.enable_pacing && !r.can_pacing() { - return Err(Error::Done); - } + if !st.is_probe && !r.can_send() { + return Err(Error::Done); } // Write PMTU probe frames diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index 8ad0fc4d..ea4f364b 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -212,6 +212,9 @@ impl Recovery { self.set_loss_detection_timer(space_id, spaces, handshake_status, now); } + + // Update pacing tokens number. + self.pacer.on_sent(self.max_datagram_size as u64); } /// Handle packet acknowledgment event. @@ -825,36 +828,29 @@ impl Recovery { self.max_datagram_size = max_datagram_size; } - /// Check whether the congestion window is still sufficient for sending packets. - pub(crate) fn can_send(&self) -> bool { + /// Check whether this path can still send packets. + pub(crate) fn can_send(&mut self) -> bool { self.bytes_in_flight < self.congestion.congestion_window() as usize + && (!self.pacer.enabled() || self.can_pacing()) } - pub fn can_pacing(&mut self) -> bool { + fn can_pacing(&mut self) -> bool { let now = time::Instant::now(); let cwnd = self.congestion.congestion_window(); let srtt = self.rtt.smoothed_rtt() as Duration; if let Some(pr) = self.congestion.pacing_rate() { - if let Some(pacer_timer) = self.pacer.schedule( + self.pacer_timer = self.pacer.schedule( self.cache_pkt_size as u64, pr, srtt, cwnd, self.max_datagram_size as u64, now, - ) { - trace!("{} pacer will be ready at {:?}", self.trace_id, pacer_timer); - self.pacer_timer = Some(pacer_timer); - return false; - } else { - self.pacer.on_sent(self.max_datagram_size as u64); - self.pacer_timer = None; - return true; - } + ); } - trace!("{} pacing is disabled", self.trace_id); - true + trace!("{} pacing timer is {:?}", self.trace_id, self.pacer_timer); + self.pacer_timer.is_none() } /// Update statistics for the packet sent event @@ -905,7 +901,7 @@ impl Recovery { /// Update statistics for the congestion window limited event pub(crate) fn stat_cwnd_limited(&mut self) { - let is_cwnd_limited = !self.can_send(); + let is_cwnd_limited = self.bytes_in_flight >= self.congestion.congestion_window() as usize; let now = Instant::now(); if let Some(last_cwnd_limited_time) = self.last_cwnd_limited_time { // Update duration timely, in case it stays in cwnd limited all the time. diff --git a/src/multipath_scheduler/scheduler_minrtt.rs b/src/multipath_scheduler/scheduler_minrtt.rs index 8e297c76..31fea2cf 100644 --- a/src/multipath_scheduler/scheduler_minrtt.rs +++ b/src/multipath_scheduler/scheduler_minrtt.rs @@ -45,7 +45,7 @@ impl MultipathScheduler for MinRttScheduler { ) -> Result { let mut best = None; - for (pid, path) in paths.iter() { + for (pid, path) in paths.iter_mut() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { continue; diff --git a/src/multipath_scheduler/scheduler_redundant.rs b/src/multipath_scheduler/scheduler_redundant.rs index d089cc47..91641f78 100644 --- a/src/multipath_scheduler/scheduler_redundant.rs +++ b/src/multipath_scheduler/scheduler_redundant.rs @@ -49,7 +49,7 @@ impl MultipathScheduler for RedundantScheduler { spaces: &mut PacketNumSpaceMap, streams: &mut StreamMap, ) -> Result { - for (pid, path) in paths.iter() { + for (pid, path) in paths.iter_mut() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { continue; diff --git a/src/multipath_scheduler/scheduler_rr.rs b/src/multipath_scheduler/scheduler_rr.rs index 210719d0..92c0a883 100644 --- a/src/multipath_scheduler/scheduler_rr.rs +++ b/src/multipath_scheduler/scheduler_rr.rs @@ -39,7 +39,7 @@ impl RoundRobinScheduler { impl RoundRobinScheduler { /// Iterate and find the last used path - fn find_last(&self, iter: &mut slab::Iter, last: usize) -> bool { + fn find_last(&self, iter: &mut slab::IterMut, last: usize) -> bool { for (pid, _) in iter.by_ref() { if pid != last { continue; @@ -50,7 +50,7 @@ impl RoundRobinScheduler { } /// Try to select an available path - fn select(&mut self, iter: &mut slab::Iter) -> Option { + fn select(&mut self, iter: &mut slab::IterMut) -> Option { for (pid, path) in iter.by_ref() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { @@ -72,7 +72,7 @@ impl MultipathScheduler for RoundRobinScheduler { spaces: &mut PacketNumSpaceMap, streams: &mut StreamMap, ) -> Result { - let mut iter = paths.iter(); + let mut iter = paths.iter_mut(); let mut exist_last = false; // Iterate and find the last used path @@ -81,7 +81,7 @@ impl MultipathScheduler for RoundRobinScheduler { exist_last = true; } else { // The last path has been abandoned - iter = paths.iter(); + iter = paths.iter_mut(); } } @@ -93,7 +93,7 @@ impl MultipathScheduler for RoundRobinScheduler { return Err(Error::Done); } - let mut iter = paths.iter(); + let mut iter = paths.iter_mut(); if let Some(pid) = self.select(&mut iter) { return Ok(pid); }