Skip to content

Commit

Permalink
Fix the closure of the stream that was reset by the peer (Tencent#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
iyangsj authored Aug 15, 2024
1 parent ab7ba28 commit 14de90e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
56 changes: 54 additions & 2 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4364,6 +4364,7 @@ pub(crate) mod tests {
use super::*;
use crate::multipath_scheduler::MultipathAlgorithm;
use crate::packet;
use crate::ranges::RangeSet;
use crate::tls::tests::ServerConfigSelector;
use crate::tls::TlsConfig;
use crate::tls::TlsConfigSelector;
Expand Down Expand Up @@ -7157,11 +7158,62 @@ pub(crate) mod tests {
Err(Error::StreamReset(2))
);

// Client send RESET_STREAM
// Client send ACK/RESET_STREAM
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv ACK/RESET_STREAM
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
assert_eq!(test_pair.server.streams.is_closed(sid), true);
assert_eq!(test_pair.server.stream_readable(sid), false);
assert_eq!(
test_pair.server.stream_read(sid, &mut buf),
Err(Error::StreamStateError)
);

Ok(())
}

#[test]
fn stream_shutdown_abnormal() -> Result<()> {
let mut test_pair = TestPair::new_with_test_config()?;
assert_eq!(test_pair.handshake(), Ok(()));
let mut buf = vec![0; 16];

// Client send data on a stream
let (sid, data) = (0, TestPair::new_test_data(10));
test_pair.client.stream_write(sid, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv RESET_STREAM
// Server shutdown the stream (Read/Write)
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
test_pair.server.stream_shutdown(sid, Shutdown::Read, 1)?;
test_pair.server.stream_shutdown(sid, Shutdown::Write, 2)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;

// Client recv STOP_SENDING/RESET_STREAM
TestPair::conn_packets_in(&mut test_pair.client, packets)?;

// Client send ACK
let mut ack_ranges = RangeSet::new(1);
ack_ranges.insert(0..2);
let frame = frame::Frame::Ack {
ack_delay: 0,
ack_ranges,
ecn_counts: None,
};
test_pair.build_packet_and_send(PacketType::OneRTT, &[frame], false)?;
assert_eq!(test_pair.server.streams.is_closed(sid), false);

// Client send RESET_STREAM
let frame = frame::Frame::ResetStream {
stream_id: 0,
error_code: 1,
final_size: 10,
};
test_pair.build_packet_and_send(PacketType::OneRTT, &[frame], false)?;

// Server stream 0 should be closed now
assert_eq!(test_pair.server.streams.is_closed(sid), true);
assert_eq!(test_pair.server.stream_readable(sid), false);
assert_eq!(
test_pair.server.stream_read(sid, &mut buf),
Expand Down
11 changes: 10 additions & 1 deletion src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,10 +1343,19 @@ impl StreamMap {
return Err(Error::FlowControlError);
}

if !was_readable && stream.is_readable() {
let is_readable = stream.is_readable();
let is_complete = stream.is_complete();
let local = stream.local;

if !was_readable && is_readable {
self.mark_readable(stream_id, true);
}

// Mark closed if the stream is complete and not readable.
if is_complete && !is_readable {
self.mark_closed(stream_id, local);
}

self.flow_control.increase_recv_off(max_rx_off_delta);
self.flow_control.increase_read_off(max_fc_off_delta);
if self.flow_control.should_send_max_data() {
Expand Down

0 comments on commit 14de90e

Please sign in to comment.