Skip to content

Commit

Permalink
Fix the suboptimal performance in multipath transmission caused by pa…
Browse files Browse the repository at this point in the history
…cing (#359)
  • Loading branch information
Thuwzq authored Aug 13, 2024
1 parent 04e656c commit 6b9a5b8
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/congestion_control/pacing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Pacer {
/// Build a pacer controller.
pub fn build_pacer_controller(conf: &RecoveryConfig) -> Self {
Pacer::new(
true,
conf.enable_pacing,
conf.initial_rtt,
conf.initial_congestion_window
.saturating_mul(conf.max_datagram_size as u64),
Expand Down
11 changes: 2 additions & 9 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1824,15 +1824,8 @@ impl Connection {
// count toward congestion control limits. (RFC 9002 Section 3)
// - Probe packets are allowed to temporarily exceed the congestion
// window. (RFC 9002 Section 4.7)
if !st.is_probe {
if !r.can_send() {
return Err(Error::Done);
}

// Check the pacer
if self.recovery_conf.enable_pacing && !r.can_pacing() {
return Err(Error::Done);
}
if !st.is_probe && !r.can_send() {
return Err(Error::Done);
}

// Write PMTU probe frames
Expand Down
28 changes: 12 additions & 16 deletions src/connection/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ impl Recovery {

self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
}

// Update pacing tokens number.
self.pacer.on_sent(self.max_datagram_size as u64);
}

/// Handle packet acknowledgment event.
Expand Down Expand Up @@ -825,36 +828,29 @@ impl Recovery {
self.max_datagram_size = max_datagram_size;
}

/// Check whether the congestion window is still sufficient for sending packets.
pub(crate) fn can_send(&self) -> bool {
/// Check whether this path can still send packets.
pub(crate) fn can_send(&mut self) -> bool {
self.bytes_in_flight < self.congestion.congestion_window() as usize
&& (!self.pacer.enabled() || self.can_pacing())
}

pub fn can_pacing(&mut self) -> bool {
fn can_pacing(&mut self) -> bool {
let now = time::Instant::now();
let cwnd = self.congestion.congestion_window();
let srtt = self.rtt.smoothed_rtt() as Duration;

if let Some(pr) = self.congestion.pacing_rate() {
if let Some(pacer_timer) = self.pacer.schedule(
self.pacer_timer = self.pacer.schedule(
self.cache_pkt_size as u64,
pr,
srtt,
cwnd,
self.max_datagram_size as u64,
now,
) {
trace!("{} pacer will be ready at {:?}", self.trace_id, pacer_timer);
self.pacer_timer = Some(pacer_timer);
return false;
} else {
self.pacer.on_sent(self.max_datagram_size as u64);
self.pacer_timer = None;
return true;
}
);
}
trace!("{} pacing is disabled", self.trace_id);
true
trace!("{} pacing timer is {:?}", self.trace_id, self.pacer_timer);
self.pacer_timer.is_none()
}

/// Update statistics for the packet sent event
Expand Down Expand Up @@ -905,7 +901,7 @@ impl Recovery {

/// Update statistics for the congestion window limited event
pub(crate) fn stat_cwnd_limited(&mut self) {
let is_cwnd_limited = !self.can_send();
let is_cwnd_limited = self.bytes_in_flight >= self.congestion.congestion_window() as usize;
let now = Instant::now();
if let Some(last_cwnd_limited_time) = self.last_cwnd_limited_time {
// Update duration timely, in case it stays in cwnd limited all the time.
Expand Down
2 changes: 1 addition & 1 deletion src/multipath_scheduler/scheduler_minrtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl MultipathScheduler for MinRttScheduler {
) -> Result<usize> {
let mut best = None;

for (pid, path) in paths.iter() {
for (pid, path) in paths.iter_mut() {
// Skip the path that is not ready for sending non-probing packets.
if !path.active() || !path.recovery.can_send() {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/multipath_scheduler/scheduler_redundant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl MultipathScheduler for RedundantScheduler {
spaces: &mut PacketNumSpaceMap,
streams: &mut StreamMap,
) -> Result<usize> {
for (pid, path) in paths.iter() {
for (pid, path) in paths.iter_mut() {
// Skip the path that is not ready for sending non-probing packets.
if !path.active() || !path.recovery.can_send() {
continue;
Expand Down
10 changes: 5 additions & 5 deletions src/multipath_scheduler/scheduler_rr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl RoundRobinScheduler {

impl RoundRobinScheduler {
/// Iterate and find the last used path
fn find_last(&self, iter: &mut slab::Iter<Path>, last: usize) -> bool {
fn find_last(&self, iter: &mut slab::IterMut<Path>, last: usize) -> bool {
for (pid, _) in iter.by_ref() {
if pid != last {
continue;
Expand All @@ -50,7 +50,7 @@ impl RoundRobinScheduler {
}

/// Try to select an available path
fn select(&mut self, iter: &mut slab::Iter<Path>) -> Option<usize> {
fn select(&mut self, iter: &mut slab::IterMut<Path>) -> Option<usize> {
for (pid, path) in iter.by_ref() {
// Skip the path that is not ready for sending non-probing packets.
if !path.active() || !path.recovery.can_send() {
Expand All @@ -72,7 +72,7 @@ impl MultipathScheduler for RoundRobinScheduler {
spaces: &mut PacketNumSpaceMap,
streams: &mut StreamMap,
) -> Result<usize> {
let mut iter = paths.iter();
let mut iter = paths.iter_mut();
let mut exist_last = false;

// Iterate and find the last used path
Expand All @@ -81,7 +81,7 @@ impl MultipathScheduler for RoundRobinScheduler {
exist_last = true;
} else {
// The last path has been abandoned
iter = paths.iter();
iter = paths.iter_mut();
}
}

Expand All @@ -93,7 +93,7 @@ impl MultipathScheduler for RoundRobinScheduler {
return Err(Error::Done);
}

let mut iter = paths.iter();
let mut iter = paths.iter_mut();
if let Some(pid) = self.select(&mut iter) {
return Ok(pid);
}
Expand Down

0 comments on commit 6b9a5b8

Please sign in to comment.