diff --git a/CHANGELOG.md b/CHANGELOG.md index e9bdb78e..01df046c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v1.1.0] - 2024-08-20 + +### Added +- Buffer undecryptable Handshake and OneRtt packets during the handshake phase +- Update some comments about stream + +### Fixed +- Fix the closure of the stream that was reset by the peer +- Fix the suboptimal performance in multipath transmission caused by pacing + + ## [v1.0.0] - 2024-08-01 ### Added diff --git a/Cargo.toml b/Cargo.toml index 1cae16c9..701c1f2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "1.0.0" +version = "1.1.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" diff --git a/include/tquic.h b/include/tquic.h index d7f322de..d77392b9 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -724,6 +724,12 @@ void quic_config_set_send_batch_size(struct quic_config_t *config, uint16_t v); */ void quic_config_set_zerortt_buffer_size(struct quic_config_t *config, uint16_t v); +/** + * Set the maximum number of undecryptable packets that can be stored by one connection. + * The default value is `10`. A value of 0 will be treated as default value. + */ +void quic_config_set_max_undecryptable_packets(struct quic_config_t *config, uint16_t v); + /** * Create a new TlsConfig. * The caller is responsible for the memory of the TlsConfig and should properly diff --git a/src/congestion_control/pacing.rs b/src/congestion_control/pacing.rs index e1344fc9..2b3d29a6 100644 --- a/src/congestion_control/pacing.rs +++ b/src/congestion_control/pacing.rs @@ -82,7 +82,7 @@ impl Pacer { /// Build a pacer controller. pub fn build_pacer_controller(conf: &RecoveryConfig) -> Self { Pacer::new( - true, + conf.enable_pacing, conf.initial_rtt, conf.initial_congestion_window .saturating_mul(conf.max_datagram_size as u64), diff --git a/src/connection/connection.rs b/src/connection/connection.rs index d463d936..3afc04c9 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -111,7 +111,7 @@ pub struct Connection { crypto_streams: Rc>, /// Raw packets that were received before decryption keys are available. - undecryptable_pkts: VecDeque<(Vec, PacketInfo)>, + undecryptable_packets: UndecryptablePackets, /// Peer transport parameters. peer_transport_params: TransportParams, @@ -256,7 +256,7 @@ impl Connection { streams, tls_session, crypto_streams: Rc::new(RefCell::new(CryptoStreams::new())), - undecryptable_pkts: VecDeque::new(), + undecryptable_packets: UndecryptablePackets::new(conf.max_undecryptable_packets), peer_transport_params: TransportParams::default(), local_transport_params: conf.local_transport_params.clone(), recovery_conf: conf.recovery.clone(), @@ -434,8 +434,10 @@ impl Connection { left -= read; } - // Try to process undecryptable 0-RTT packets - self.try_process_undecryptable_packets()?; + // Try to process undecryptable packets + if !self.is_established() { + self.try_process_undecryptable_packets(); + } Ok(len - left) } @@ -520,21 +522,9 @@ impl Connection { let key = match &key.open { Some(open) => open, None => { - // Buffer undecryptable packets when the key is not yet available. - if hdr.pkt_type == PacketType::ZeroRTT - && !self.is_established() - && self.undecryptable_pkts.len() < crate::MAX_UNDECRYPTABLE_PACKETS - { - let pkt = buf[..read + length].to_vec(); - self.undecryptable_pkts.push_back((pkt, *info)); - } else { - trace!( - "{} key not yet available, drop packet {:?}", - self.trace_id, - hdr - ); - } - return Err(Error::Done); + let pkt = buf[..read + length].to_vec(); + self.try_buffer_undecryptable_packets(&hdr, pkt, info); + return Ok(read + length); } }; packet::decrypt_header(buf, pkt_num_offset, &mut hdr, key).map_err(|_| Error::Done)?; @@ -1131,8 +1121,8 @@ impl Connection { if self.tls_session.is_completed() { self.flags.insert(HandshakeCompleted); self.events.add(Event::ConnectionEstablished); - self.undecryptable_pkts.clear(); self.timers.stop(Timer::Handshake); + self.try_process_undecryptable_packets(); if self.is_server { // The TLS handshake is considered confirmed at the server when @@ -1253,20 +1243,68 @@ impl Connection { } } - /// A server could receive packets protected with 0-RTT keys prior to - /// receiving a TLS ClientHello. The server MAY retain these packets for - /// later decryption in anticipation of receiving a ClientHello. - /// See RFC 9001 Section 5.7 - fn try_process_undecryptable_packets(&mut self) -> Result<()> { - if self.tls_session.get_keys(Level::ZeroRTT).open.is_some() { - while let Some((mut pkt, info)) = self.undecryptable_pkts.pop_front() { + /// Try to buffer undecryptable packets when the keys are not yet available. + fn try_buffer_undecryptable_packets( + &mut self, + hdr: &PacketHeader, + pkt: Vec, + info: &PacketInfo, + ) { + if self.is_established() + || (self.is_server + && hdr.pkt_type != PacketType::ZeroRTT + && hdr.pkt_type != PacketType::OneRTT) + || (!self.is_server + && hdr.pkt_type != PacketType::Handshake + && hdr.pkt_type != PacketType::OneRTT) + { + trace!("{} drop packet {:?}", self.trace_id, hdr); + return; + } + + if self.undecryptable_packets.push(&hdr.pkt_type, pkt, info) { + trace!("{} buffer undecryptable packets: {:?}", self.trace_id, hdr); + } else { + trace!( + "{} key not yet available, drop packet {:?}", + self.trace_id, + hdr + ); + } + } + + /// Try to process undecryptable packets. + fn try_process_undecryptable_packets(&mut self) { + if self.undecryptable_packets.all_empty() { + return; + } + + let pkt_types = if self.is_server { + vec![PacketType::ZeroRTT, PacketType::OneRTT] + } else { + vec![PacketType::Handshake, PacketType::OneRTT] + }; + + for pkt_type in pkt_types { + if self.undecryptable_packets.is_empty(&pkt_type) { + continue; + } + + let level = pkt_type.to_level().unwrap(); + let key = self.tls_session.get_keys(level); + if key.open.is_none() { + continue; + } + + while let Some((mut pkt, info)) = self.undecryptable_packets.pop(&pkt_type) { if let Err(e) = self.recv(&mut pkt, &info) { - self.undecryptable_pkts.clear(); - return Err(e); + error!( + "{} try process undecryptable packet error {:?} type {:?}", + self.trace_id, e, pkt_type + ); } } } - Ok(()) } /// Process acknowledged frames in each packet number space @@ -1786,15 +1824,8 @@ impl Connection { // count toward congestion control limits. (RFC 9002 Section 3) // - Probe packets are allowed to temporarily exceed the congestion // window. (RFC 9002 Section 4.7) - if !st.is_probe { - if !r.can_send() { - return Err(Error::Done); - } - - // Check the pacer - if self.recovery_conf.enable_pacing && !r.can_pacing() { - return Err(Error::Done); - } + if !st.is_probe && !r.can_send() { + return Err(Error::Done); } // Write PMTU probe frames @@ -3635,7 +3666,7 @@ impl Connection { } } - /// Return an iterator over streams that have data to read. + /// Return an iterator over streams that have data to read or an error to collect. pub fn stream_readable_iter(&self) -> StreamIter { self.streams.readable_iter() } @@ -3763,7 +3794,7 @@ impl Connection { self.streams.stream_writable(stream_id, len) } - /// Return true if the stream has data that can be read. + /// Return true if the stream has data to be read or an error to be collected. pub fn stream_readable(&self, stream_id: u64) -> bool { self.streams.stream_readable(stream_id) } @@ -4109,6 +4140,79 @@ impl CryptoStreams { } } +/// Collection of packets which were received before decryption keys are available. +struct UndecryptablePackets { + zerortt_pkts: VecDeque<(Vec, PacketInfo)>, + handshake_pkts: VecDeque<(Vec, PacketInfo)>, + onertt_pkts: VecDeque<(Vec, PacketInfo)>, + capacity: usize, +} + +impl UndecryptablePackets { + fn new(capacity: usize) -> Self { + Self { + zerortt_pkts: VecDeque::with_capacity(capacity), + handshake_pkts: VecDeque::with_capacity(capacity), + onertt_pkts: VecDeque::with_capacity(capacity), + capacity, + } + } + + fn push(&mut self, pkt_type: &PacketType, pkt: Vec, info: &PacketInfo) -> bool { + match pkt_type { + PacketType::ZeroRTT => { + if self.zerortt_pkts.len() > self.capacity { + false + } else { + self.zerortt_pkts.push_back((pkt, *info)); + true + } + } + PacketType::Handshake => { + if self.handshake_pkts.len() > self.capacity { + false + } else { + self.handshake_pkts.push_back((pkt, *info)); + true + } + } + PacketType::OneRTT => { + if self.onertt_pkts.len() > self.capacity { + false + } else { + self.onertt_pkts.push_back((pkt, *info)); + true + } + } + _ => false, + } + } + + fn pop(&mut self, pkt_type: &PacketType) -> Option<(Vec, PacketInfo)> { + match pkt_type { + PacketType::ZeroRTT => self.zerortt_pkts.pop_front(), + PacketType::Handshake => self.handshake_pkts.pop_front(), + PacketType::OneRTT => self.onertt_pkts.pop_front(), + _ => None, + } + } + + fn is_empty(&self, pkt_type: &PacketType) -> bool { + match pkt_type { + PacketType::ZeroRTT => self.zerortt_pkts.is_empty(), + PacketType::Handshake => self.handshake_pkts.is_empty(), + PacketType::OneRTT => self.onertt_pkts.is_empty(), + _ => true, + } + } + + fn all_empty(&self) -> bool { + self.zerortt_pkts.is_empty() + && self.handshake_pkts.is_empty() + && self.onertt_pkts.is_empty() + } +} + /// Various flags of QUIC connection #[bitflags] #[repr(u32)] @@ -4260,6 +4364,7 @@ pub(crate) mod tests { use super::*; use crate::multipath_scheduler::MultipathAlgorithm; use crate::packet; + use crate::ranges::RangeSet; use crate::tls::tests::ServerConfigSelector; use crate::tls::TlsConfig; use crate::tls::TlsConfigSelector; @@ -5027,7 +5132,7 @@ pub(crate) mod tests { } #[test] - fn handshake_with_0rtt_reordered() -> Result<()> { + fn handshake_with_0rtt_reordered_server_side() -> Result<()> { let mut client_config = TestPair::new_test_config(false)?; let mut server_config = TestPair::new_test_config(true)?; @@ -5044,26 +5149,88 @@ pub(crate) mod tests { assert!(test_pair.client.is_in_early_data()); assert!(!packets.is_empty()); - // Client send ZeorRTT packet + // Client send ZeroRTT packet let content = "client zero rtt data before initial"; + let mut frames = vec![]; let frame = TestPair::new_test_stream_frame(content.as_bytes()); + frames.push(frame); let packet = - TestPair::conn_build_packet(&mut test_pair.client, PacketType::ZeroRTT, &[frame])?; + TestPair::conn_build_packet(&mut test_pair.client, PacketType::ZeroRTT, &frames)?; let info = packets.first().unwrap().1; // Server recv ZeroRTT packet before Initial packet TestPair::conn_packets_in(&mut test_pair.server, vec![(packet, info)])?; - assert_eq!(test_pair.client.is_in_early_data(), true); - assert_eq!(test_pair.server.streams.has_readable_streams(), false); - assert_eq!(test_pair.server.undecryptable_pkts.is_empty(), false); + assert!(test_pair.client.is_in_early_data()); + assert!(!test_pair.server.streams.has_readable_streams()); + assert!(!test_pair + .server + .undecryptable_packets + .zerortt_pkts + .is_empty()); // Server recv the reordered Initial packet TestPair::conn_packets_in(&mut test_pair.server, packets)?; assert_eq!(test_pair.client.is_in_early_data(), true); - assert_eq!(test_pair.server.streams.has_readable_streams(), true); - assert_eq!(test_pair.server.undecryptable_pkts.is_empty(), true); + assert!(test_pair + .server + .undecryptable_packets + .zerortt_pkts + .is_empty()); + assert!(test_pair.server.streams.has_readable_streams()); + let stream = test_pair.server.streams.get_mut(0).unwrap(); + let mut buf = vec![0; 128]; + assert_eq!(stream.recv.read(&mut buf)?, (content.len(), false)); + assert_eq!(content.as_bytes(), &buf[..content.len()]); + + Ok(()) + } + + #[test] + fn handshake_with_1rtt_reordered_server_side() -> Result<()> { + let mut test_pair = TestPair::new_with_test_config()?; + + // Client send and server recv Initial. + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + + // Server send and client recv Initial and Handshake. + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + assert!(test_pair.client.is_established()); + // Client send OneRTT packet. + let content = "client one rtt data before handshake"; + let mut frames = vec![]; + let frame = TestPair::new_test_stream_frame(content.as_bytes()); + frames.push(frame); + let packet = + TestPair::conn_build_packet(&mut test_pair.client, PacketType::OneRTT, &frames)?; + + // Client send Handshake packets. + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + let info = packets.first().unwrap().1; + + // Server recv OneRTT packet before Handshake packets. + TestPair::conn_packets_in(&mut test_pair.server, vec![(packet, info)])?; + assert!(!test_pair.server.is_confirmed()); + assert!(!test_pair + .server + .undecryptable_packets + .onertt_pkts + .is_empty()); + + // Server recv the reordered Handshake packets. + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + assert!(test_pair.server.is_confirmed()); + assert!(test_pair + .server + .tls_session + .get_keys(Level::OneRTT) + .open + .is_some()); + assert!(test_pair.server.streams.has_readable_streams()); let stream = test_pair.server.streams.get_mut(0).unwrap(); + assert!(stream.is_readable()); let mut buf = vec![0; 128]; assert_eq!(stream.recv.read(&mut buf)?, (content.len(), false)); assert_eq!(content.as_bytes(), &buf[..content.len()]); @@ -5072,7 +5239,7 @@ pub(crate) mod tests { } #[test] - fn handshake_with_initial_reordered() -> Result<()> { + fn handshake_with_handshake_reordered_client_side() -> Result<()> { let mut test_pair = TestPair::new_with_test_config()?; // Client send Initial @@ -5080,28 +5247,28 @@ pub(crate) mod tests { TestPair::conn_packets_in(&mut test_pair.server, packets)?; // Server send Initial and Handshake - let mut packets = TestPair::conn_packets_out(&mut test_pair.server)?; + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; assert_eq!(test_pair.client.is_established(), false); assert_eq!(test_pair.client.flags.contains(HandshakeConfirmed), false); assert_eq!(test_pair.server.is_established(), false); assert_eq!(test_pair.server.flags.contains(HandshakeConfirmed), false); - // Client recv disordered Initial/Handshake packets. The Handshake - // packets arriving before the Initial packet are dropped by the client. - packets.reverse(); - TestPair::conn_packets_in(&mut test_pair.client, packets)?; + // Client recv Handshake before Initial. + TestPair::conn_packets_in(&mut test_pair.client, vec![packets[1].clone()])?; assert_eq!(test_pair.client.is_established(), false); + let undecryptable_handshake_packets = + &test_pair.client.undecryptable_packets.handshake_pkts; + assert_eq!(undecryptable_handshake_packets.is_empty(), false); + TestPair::conn_packets_in(&mut test_pair.client, vec![packets[0].clone()])?; + assert_eq!(test_pair.client.is_established(), true); + let undecryptable_handshake_packets = + &test_pair.client.undecryptable_packets.handshake_pkts; + assert_eq!(undecryptable_handshake_packets.is_empty(), true); // Client send Initial/Handshake(ack) let packets = TestPair::conn_packets_out(&mut test_pair.client)?; TestPair::conn_packets_in(&mut test_pair.server, packets)?; - // Fake timing out server's timer - let timeout = test_pair.server.timeout(); - assert!(timeout.is_some()); - let now = time::Instant::now() + timeout.unwrap(); - test_pair.server.on_timeout(now); - // Continue handshake test_pair.handshake()?; assert_eq!(test_pair.client.is_established(), true); @@ -6991,11 +7158,62 @@ pub(crate) mod tests { Err(Error::StreamReset(2)) ); - // Client send RESET_STREAM + // Client send ACK/RESET_STREAM let packets = TestPair::conn_packets_out(&mut test_pair.client)?; - // Server recv RESET_STREAM + // Server recv ACK/RESET_STREAM TestPair::conn_packets_in(&mut test_pair.server, packets)?; + assert_eq!(test_pair.server.streams.is_closed(sid), true); + assert_eq!(test_pair.server.stream_readable(sid), false); + assert_eq!( + test_pair.server.stream_read(sid, &mut buf), + Err(Error::StreamStateError) + ); + + Ok(()) + } + + #[test] + fn stream_shutdown_abnormal() -> Result<()> { + let mut test_pair = TestPair::new_with_test_config()?; + assert_eq!(test_pair.handshake(), Ok(())); + let mut buf = vec![0; 16]; + + // Client send data on a stream + let (sid, data) = (0, TestPair::new_test_data(10)); + test_pair.client.stream_write(sid, data.clone(), false)?; + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + + // Server shutdown the stream (Read/Write) + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + test_pair.server.stream_shutdown(sid, Shutdown::Read, 1)?; + test_pair.server.stream_shutdown(sid, Shutdown::Write, 2)?; + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + + // Client recv STOP_SENDING/RESET_STREAM + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + + // Client send ACK + let mut ack_ranges = RangeSet::new(1); + ack_ranges.insert(0..2); + let frame = frame::Frame::Ack { + ack_delay: 0, + ack_ranges, + ecn_counts: None, + }; + test_pair.build_packet_and_send(PacketType::OneRTT, &[frame], false)?; + assert_eq!(test_pair.server.streams.is_closed(sid), false); + + // Client send RESET_STREAM + let frame = frame::Frame::ResetStream { + stream_id: 0, + error_code: 1, + final_size: 10, + }; + test_pair.build_packet_and_send(PacketType::OneRTT, &[frame], false)?; + + // Server stream 0 should be closed now + assert_eq!(test_pair.server.streams.is_closed(sid), true); assert_eq!(test_pair.server.stream_readable(sid), false); assert_eq!( test_pair.server.stream_read(sid, &mut buf), diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index 8ad0fc4d..b1e68a0f 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -212,6 +212,9 @@ impl Recovery { self.set_loss_detection_timer(space_id, spaces, handshake_status, now); } + + // Update pacing tokens number. + self.pacer.on_sent(self.max_datagram_size as u64); } /// Handle packet acknowledgment event. @@ -825,36 +828,34 @@ impl Recovery { self.max_datagram_size = max_datagram_size; } - /// Check whether the congestion window is still sufficient for sending packets. - pub(crate) fn can_send(&self) -> bool { + /// Check whether this path can still send packets. + pub(crate) fn can_send(&mut self) -> bool { self.bytes_in_flight < self.congestion.congestion_window() as usize + && (!self.pacer.enabled() || self.can_pacing()) } - pub fn can_pacing(&mut self) -> bool { + fn can_pacing(&mut self) -> bool { let now = time::Instant::now(); let cwnd = self.congestion.congestion_window(); let srtt = self.rtt.smoothed_rtt() as Duration; if let Some(pr) = self.congestion.pacing_rate() { - if let Some(pacer_timer) = self.pacer.schedule( + self.pacer_timer = self.pacer.schedule( self.cache_pkt_size as u64, pr, srtt, cwnd, self.max_datagram_size as u64, now, - ) { - trace!("{} pacer will be ready at {:?}", self.trace_id, pacer_timer); - self.pacer_timer = Some(pacer_timer); - return false; - } else { - self.pacer.on_sent(self.max_datagram_size as u64); - self.pacer_timer = None; - return true; - } + ); + } + + if self.pacer_timer.is_none() { + true + } else { + trace!("{} pacing timer is {:?}", self.trace_id, self.pacer_timer); + false } - trace!("{} pacing is disabled", self.trace_id); - true } /// Update statistics for the packet sent event @@ -905,7 +906,7 @@ impl Recovery { /// Update statistics for the congestion window limited event pub(crate) fn stat_cwnd_limited(&mut self) { - let is_cwnd_limited = !self.can_send(); + let is_cwnd_limited = self.bytes_in_flight >= self.congestion.congestion_window() as usize; let now = Instant::now(); if let Some(last_cwnd_limited_time) = self.last_cwnd_limited_time { // Update duration timely, in case it stays in cwnd limited all the time. diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 1957f8bb..e267bf7f 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -235,9 +235,7 @@ impl StreamMap { /// Read contiguous data from the stream's receive buffer into the given buffer. /// /// Return the number of bytes read and the `fin` flag if read successfully. - /// /// Return `StreamStateError` if the stream closed or never opened. - /// /// Return `Done` if the stream is not readable. pub fn stream_read(&mut self, stream_id: u64, out: &mut [u8]) -> Result<(usize, bool)> { // Local initiated unidirectional streams are send-only, so we can't read from them. @@ -1343,10 +1341,19 @@ impl StreamMap { return Err(Error::FlowControlError); } - if !was_readable && stream.is_readable() { + let is_readable = stream.is_readable(); + let is_complete = stream.is_complete(); + let local = stream.local; + + if !was_readable && is_readable { self.mark_readable(stream_id, true); } + // Mark closed if the stream is complete and not readable. + if is_complete && !is_readable { + self.mark_closed(stream_id, local); + } + self.flow_control.increase_recv_off(max_rx_off_delta); self.flow_control.increase_read_off(max_fc_off_delta); if self.flow_control.should_send_max_data() { @@ -1847,7 +1854,7 @@ impl Stream { self.recv.trace_id = trace_id.to_string(); } - /// Return true if the stream has data to be read. + /// Return true if the stream has data to be read or an error to be collected. pub fn is_readable(&self) -> bool { self.recv.ready() } @@ -2212,7 +2219,8 @@ impl RecvBuf { Ok((len, self.is_fin())) } - /// Return true if the stream has buffered data waiting to be read by application. + /// Return true if the stream has buffered data to be read or an error to + /// be collected. fn ready(&self) -> bool { match self.data.first_key_value() { Some((_, buf)) => buf.off() == self.read_off, @@ -2221,6 +2229,10 @@ impl RecvBuf { } /// Receive RESET_STREAM frame from peer, reset the stream at the given offset. + /// + /// If the recv side is not shutdown by the application, an empty buffer with + /// FIN will be written to the recv buffer to notify the application that it + /// has been reset by its peer. pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result { // Once a final size for a stream is known, it cannot change. If a RESET_STREAM // frame is received indicating a change in the final size for the stream, @@ -2262,8 +2274,9 @@ impl RecvBuf { Ok(max_rx_off_delta as usize) } - /// Shutdown the stream's receive-side, all subsequent data received on the stream - /// will be discarded. + /// Shutdown the stream's receive-side. + /// + /// After this operation, any subsequent data received on the stream will be discarded. fn shutdown(&mut self) -> Result { if self.shutdown { return Err(Error::Done); diff --git a/src/ffi.rs b/src/ffi.rs index c30efb32..718d4769 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -519,6 +519,13 @@ pub extern "C" fn quic_config_set_zerortt_buffer_size(config: &mut Config, v: u1 config.set_zerortt_buffer_size(v as usize); } +/// Set the maximum number of undecryptable packets that can be stored by one connection. +/// The default value is `10`. A value of 0 will be treated as default value. +#[no_mangle] +pub extern "C" fn quic_config_set_max_undecryptable_packets(config: &mut Config, v: u16) { + config.set_max_undecryptable_packets(v as usize); +} + /// Create a new TlsConfig. /// The caller is responsible for the memory of the TlsConfig and should properly /// destroy it by calling `quic_tls_config_free`. diff --git a/src/lib.rs b/src/lib.rs index 9efa9a86..e56ec988 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,9 +117,6 @@ const MAX_ACK_RANGES: usize = 68; /// Default outgoing udp datagram payloads size. const DEFAULT_SEND_UDP_PAYLOAD_SIZE: usize = 1200; -/// The maximum number of undecryptable packets that can be buffered. -const MAX_UNDECRYPTABLE_PACKETS: usize = 10; - /// An endpoint MUST limit the amount of data it sends to the unvalidated /// address to three times the amount of data received from that address. const ANTI_AMPLIFICATION_FACTOR: usize = 3; @@ -346,6 +343,9 @@ pub struct Config { /// Buffer size for early incoming zero rtt packets, in packets. zerortt_buffer_size: usize, + /// The maximum number of undecryptable packets that can be stored by one connection, in packets. + max_undecryptable_packets: usize, + /// Configurations about loss recovery, congestion control, and pmtu discovery. recovery: RecoveryConfig, @@ -399,6 +399,7 @@ impl Config { anti_amplification_factor: ANTI_AMPLIFICATION_FACTOR, send_batch_size: 64, zerortt_buffer_size: 1000, + max_undecryptable_packets: 10, recovery: RecoveryConfig::default(), multipath: MultipathConfig::default(), tls_config_selector: None, @@ -702,6 +703,16 @@ impl Config { } } + /// Set the maximum number of undecryptable packets that can be stored by one connection. + /// The default value is `10`. A value of 0 will be treated as default value. + pub fn set_max_undecryptable_packets(&mut self, v: usize) { + if v > 0 { + self.max_undecryptable_packets = v; + } else { + self.max_undecryptable_packets = 10; + } + } + /// Set TLS config. pub fn set_tls_config(&mut self, tls_config: tls::TlsConfig) { self.set_tls_config_selector(Arc::new(tls::DefaultTlsConfigSelector { diff --git a/src/multipath_scheduler/scheduler_minrtt.rs b/src/multipath_scheduler/scheduler_minrtt.rs index 8e297c76..31fea2cf 100644 --- a/src/multipath_scheduler/scheduler_minrtt.rs +++ b/src/multipath_scheduler/scheduler_minrtt.rs @@ -45,7 +45,7 @@ impl MultipathScheduler for MinRttScheduler { ) -> Result { let mut best = None; - for (pid, path) in paths.iter() { + for (pid, path) in paths.iter_mut() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { continue; diff --git a/src/multipath_scheduler/scheduler_redundant.rs b/src/multipath_scheduler/scheduler_redundant.rs index d089cc47..91641f78 100644 --- a/src/multipath_scheduler/scheduler_redundant.rs +++ b/src/multipath_scheduler/scheduler_redundant.rs @@ -49,7 +49,7 @@ impl MultipathScheduler for RedundantScheduler { spaces: &mut PacketNumSpaceMap, streams: &mut StreamMap, ) -> Result { - for (pid, path) in paths.iter() { + for (pid, path) in paths.iter_mut() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { continue; diff --git a/src/multipath_scheduler/scheduler_rr.rs b/src/multipath_scheduler/scheduler_rr.rs index 210719d0..92c0a883 100644 --- a/src/multipath_scheduler/scheduler_rr.rs +++ b/src/multipath_scheduler/scheduler_rr.rs @@ -39,7 +39,7 @@ impl RoundRobinScheduler { impl RoundRobinScheduler { /// Iterate and find the last used path - fn find_last(&self, iter: &mut slab::Iter, last: usize) -> bool { + fn find_last(&self, iter: &mut slab::IterMut, last: usize) -> bool { for (pid, _) in iter.by_ref() { if pid != last { continue; @@ -50,7 +50,7 @@ impl RoundRobinScheduler { } /// Try to select an available path - fn select(&mut self, iter: &mut slab::Iter) -> Option { + fn select(&mut self, iter: &mut slab::IterMut) -> Option { for (pid, path) in iter.by_ref() { // Skip the path that is not ready for sending non-probing packets. if !path.active() || !path.recovery.can_send() { @@ -72,7 +72,7 @@ impl MultipathScheduler for RoundRobinScheduler { spaces: &mut PacketNumSpaceMap, streams: &mut StreamMap, ) -> Result { - let mut iter = paths.iter(); + let mut iter = paths.iter_mut(); let mut exist_last = false; // Iterate and find the last used path @@ -81,7 +81,7 @@ impl MultipathScheduler for RoundRobinScheduler { exist_last = true; } else { // The last path has been abandoned - iter = paths.iter(); + iter = paths.iter_mut(); } } @@ -93,7 +93,7 @@ impl MultipathScheduler for RoundRobinScheduler { return Err(Error::Done); } - let mut iter = paths.iter(); + let mut iter = paths.iter_mut(); if let Some(pid) = self.select(&mut iter) { return Ok(pid); } diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 2b5202d6..8fc9703a 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "1.0.0" +version = "1.1.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -22,7 +22,7 @@ slab = "0.4" rand = "0.8.5" statrs = "0.16" signal-hook = "0.3.17" -tquic = { path = "..", version = "1.0.0"} +tquic = { path = "..", version = "1.1.0"} [target."cfg(unix)".dependencies] jemallocator = { version = "0.5", package = "tikv-jemallocator" }