From 4a69ec0d35d700f7268822b7af0b3e8e883601e9 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Mon, 27 Nov 2023 16:03:44 +0800 Subject: [PATCH] Simplify the usage of Endpoint - Supports forcibly closing a endpoint - When closing a connection, all its streams are forcefully closed first - Once a connection is marked as closed, all its remaining internal events will no longer be processed --- apps/src/bin/tquic_client.rs | 2 +- include/tquic.h | 8 +- src/connection/connection.rs | 5 + src/connection/stream.rs | 7 ++ src/endpoint.rs | 196 +++++++++++++++++++++++------------ src/ffi.rs | 10 +- src/timer_queue.rs | 5 + 7 files changed, 160 insertions(+), 73 deletions(-) diff --git a/apps/src/bin/tquic_client.rs b/apps/src/bin/tquic_client.rs index 11817f2e8..4d0478455 100644 --- a/apps/src/bin/tquic_client.rs +++ b/apps/src/bin/tquic_client.rs @@ -474,7 +474,7 @@ impl Worker { ); // Close endpoint. - self.endpoint.close(); + self.endpoint.close(false); // Close connections. let mut senders = self.senders.borrow_mut(); diff --git a/include/tquic.h b/include/tquic.h index db41649b0..8e87f171a 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -486,10 +486,12 @@ bool quic_endpoint_exist_connection(struct quic_endpoint_t *endpoint, struct quic_conn_t *quic_endpoint_get_connection(struct quic_endpoint_t *endpoint, uint64_t index); /** - * Cease creating new connections and wait all active connections to - * close. + * Gracefully or forcibly shutdown the endpoint. + * If `force` is false, cease creating new connections and wait for all + * active connections to close. Otherwise, forcibly close all the active + * connections. */ -void quic_endpoint_close(struct quic_endpoint_t *endpoint); +void quic_endpoint_close(struct quic_endpoint_t *endpoint, bool force); /** * Get index of the connection diff --git a/src/connection/connection.rs b/src/connection/connection.rs index d1cdd82b6..1397c9211 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -3339,6 +3339,11 @@ impl Connection { self.streams.writable_iter() } + /// Return an iterator over all the existing streams on the connection. + pub fn stream_iter(&self) -> StreamIter { + self.streams.iter() + } + /// Return true if the stream has enough flow control capacity to send data /// and application wants to send more data. pub(crate) fn stream_check_writable(&self, stream_id: u64) -> bool { diff --git a/src/connection/stream.rs b/src/connection/stream.rs index a78d85a79..1f0b50035 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -919,6 +919,13 @@ impl StreamMap { } } + /// Return an iterator over all the existing streams. + pub fn iter(&self) -> StreamIter { + StreamIter { + streams: self.streams.keys().copied().collect(), + } + } + /// Return an iterator over streams that have outstanding data to be read /// by the application. pub fn readable_iter(&self) -> StreamIter { diff --git a/src/endpoint.rs b/src/endpoint.rs index 50aeaa169..a98632270 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -463,91 +463,117 @@ impl Endpoint { // Process all tickable connections while let Some(idx) = self.conn_tickable_next() { + if self.process_connection(idx, &mut ready) { + continue; + } + + // Try to clean up the closed connection. let conn = match self.conns.get_mut(idx) { Some(v) => v, None => continue, }; - // Try to clean up the closed connection. - if conn.is_closed() { - self.handler.on_conn_closed(conn); + for stream_id in conn.stream_iter() { + self.handler.on_stream_closed(conn, stream_id); + conn.stream_destroy(stream_id); + } - conn.mark_tickable(false); - conn.mark_sendable(false); - self.timers.del(&idx); - self.routes.remove(conn); - self.conns.remove(idx); - continue; + self.handler.on_conn_closed(conn); + conn.mark_tickable(false); + conn.mark_sendable(false); + self.timers.del(&idx); + self.routes.remove(conn); + self.conns.remove(idx); + } + + // Process all sendable connections + self.send_packets_out()?; + + // Add connection with internal events to the tickable queue again. + for idx in ready { + if let Some(conn) = self.conns.get_mut(idx) { + conn.mark_tickable(true); } + } - // Try to process endpoint-facing events on the connection. - while let Some(event) = conn.poll() { - match event { - Event::ConnectionEstablished => self.handler.on_conn_established(conn), + Ok(()) + } - Event::NewToken(token) => self.handler.on_new_token(conn, token), + /// Process internal events of a tickable connection. + /// + /// Return `true` if the connection has been processed successfully. + /// Return `false` if the connection has been closed and need to be cleaned up. + fn process_connection(&mut self, idx: u64, ready: &mut Vec) -> bool { + let conn = match self.conns.get_mut(idx) { + Some(v) => v, + None => return true, + }; + if conn.is_closed() { + return false; + } - Event::ScidToAdvertise(num) => { - let key = &self.config.reset_token_key; - Self::conn_add_scids(conn, num, &mut self.cid_gen, key, &mut self.routes); - } + // Try to process endpoint-facing events on the connection. + while let Some(event) = conn.poll() { + match event { + Event::ConnectionEstablished => self.handler.on_conn_established(conn), - Event::ScidRetired(cid) => self.routes.remove_with_cid(&cid), + Event::NewToken(token) => self.handler.on_new_token(conn, token), - Event::DcidAdvertised(token) => self.routes.insert_with_token(token, idx), + Event::ScidToAdvertise(num) => { + let key = &self.config.reset_token_key; + Self::conn_add_scids(conn, num, &mut self.cid_gen, key, &mut self.routes); + } - Event::DcidRetired(token) => self.routes.remove_with_token(&token), + Event::ScidRetired(cid) => self.routes.remove_with_cid(&cid), - Event::StreamCreated(stream_id) => { - self.handler.on_stream_created(conn, stream_id) - } + Event::DcidAdvertised(token) => self.routes.insert_with_token(token, idx), - Event::StreamClosed(stream_id) => { - self.handler.on_stream_closed(conn, stream_id); - conn.stream_destroy(stream_id); - } + Event::DcidRetired(token) => self.routes.remove_with_token(&token), + + Event::StreamCreated(stream_id) => self.handler.on_stream_created(conn, stream_id), + + Event::StreamClosed(stream_id) => { + self.handler.on_stream_closed(conn, stream_id); + conn.stream_destroy(stream_id); } } - for stream_id in conn.stream_readable_iter() { - if conn.stream_check_readable(stream_id) { - self.handler.on_stream_readable(conn, stream_id); - } + if conn.is_closed() { + return false; } - - for stream_id in conn.stream_writable_iter() { - if conn.stream_check_writable(stream_id) { - self.handler.on_stream_writable(conn, stream_id); + } + for stream_id in conn.stream_readable_iter() { + if conn.stream_check_readable(stream_id) { + self.handler.on_stream_readable(conn, stream_id); + if conn.is_closed() { + return false; } } - - // Add the connection to the sendable queue - conn.mark_sendable(true); - - // Try to update the timer of the connection - if let Some(t) = conn.timeout() { - self.timers.add(idx, t, Instant::now()); - } else { - self.timers.del(&idx); - } - - if conn.is_ready() { - trace!("conn {} is still ready", conn.trace_id()); - ready.push(idx); + } + for stream_id in conn.stream_writable_iter() { + if conn.stream_check_writable(stream_id) { + self.handler.on_stream_writable(conn, stream_id); + if conn.is_closed() { + return false; + } } - conn.mark_tickable(false); } - // Process all sendable connections - self.send_packets_out()?; + // Add the connection to the sendable queue + conn.mark_sendable(true); - // Add connection with internal events to the tickable queue again. - for idx in ready { - if let Some(conn) = self.conns.get_mut(idx) { - conn.mark_tickable(true); - } + // Try to update the timer of the connection + if let Some(t) = conn.timeout() { + self.timers.add(idx, t, Instant::now()); + } else { + self.timers.del(&idx); } - Ok(()) + if conn.is_ready() { + trace!("conn {} is still ready", conn.trace_id()); + ready.push(idx); + } + conn.mark_tickable(false); + true } /// Add scids for the given connection @@ -700,10 +726,31 @@ impl Endpoint { Ok(()) } - /// Cease creating new connections and wait all active connections to - /// close. - pub fn close(&mut self) { + /// Gracefully or forcibly shutdown the endpoint. + /// If `force` is false, cease creating new connections and wait for all + /// active connections to close. Otherwise, forcibly close all the active + /// connections. + pub fn close(&mut self, force: bool) { self.closed = true; + if !force { + return; + } + + trace!( + "{} forcibly close {} connections", + &self.trace_id, + self.conns.len() + ); + for (_, conn) in self.conns.conns.iter_mut() { + for stream_id in conn.stream_iter() { + self.handler.on_stream_closed(conn, stream_id); + conn.stream_destroy(stream_id); + } + self.handler.on_conn_closed(conn); + } + self.timers.clear(); + self.routes.clear(); + self.conns.clear(); } /// Set the unique trace id for the endpoint @@ -752,6 +799,11 @@ impl ConnectionTable { self.conns.remove(&index); } + /// Clear the connection table + fn clear(&mut self) { + self.conns.clear(); + } + /// Return the number of connections fn len(&self) -> usize { self.conns.len() @@ -874,6 +926,13 @@ impl ConnectionRoutes { } } } + + /// Clear all the routes. + fn clear(&mut self) { + self.cid_table.clear(); + self.addr_table.clear(); + self.token_table.clear(); + } } const MAX_BUFFER_SIZE: usize = 2048; @@ -1546,7 +1605,7 @@ mod tests { fn on_conn_closed(&mut self, conn: &mut Connection) { trace!("{} connection closed", conn.trace_id()); - assert_eq!(conn.is_established(), true); + assert_eq!(conn.stream_iter().count(), 0); if self.conf.new_token_expected { assert!(self.token.is_some()); } @@ -1631,6 +1690,7 @@ mod tests { fn on_conn_closed(&mut self, conn: &mut Connection) { trace!("{} connection closed", conn.trace_id()); + assert_eq!(conn.stream_iter().count(), 0); } fn on_stream_created(&mut self, conn: &mut Connection, stream_id: u64) { @@ -1981,9 +2041,15 @@ mod tests { assert!(e.connect(cli_addr, srv_addr, host, None, None).is_ok()); assert_eq!(e.conns.len(), 1); - // connect on closed client endpoint - e.close(); + // gracefully close client endpoint + e.close(false); + assert_eq!(e.conns.len(), 1); assert!(e.connect(cli_addr, srv_addr, host, None, None).is_err()); + assert_eq!(e.conns.len(), 1); + + // forcibly close client endpoint + e.close(true); + assert_eq!(e.conns.len(), 0); // connect on server endpoint let mut e = Endpoint::new( diff --git a/src/ffi.rs b/src/ffi.rs index e66a168e5..3806ae1f5 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -451,11 +451,13 @@ pub extern "C" fn quic_endpoint_get_connection( } } -/// Cease creating new connections and wait all active connections to -/// close. +/// Gracefully or forcibly shutdown the endpoint. +/// If `force` is false, cease creating new connections and wait for all +/// active connections to close. Otherwise, forcibly close all the active +/// connections. #[no_mangle] -pub extern "C" fn quic_endpoint_close(endpoint: &mut Endpoint) { - endpoint.close() +pub extern "C" fn quic_endpoint_close(endpoint: &mut Endpoint, force: bool) { + endpoint.close(force) } /// Get index of the connection diff --git a/src/timer_queue.rs b/src/timer_queue.rs index fc5cddf95..a423c4f09 100644 --- a/src/timer_queue.rs +++ b/src/timer_queue.rs @@ -79,6 +79,11 @@ impl TimerQueue { } None } + + /// Clear all the timers + pub fn clear(&mut self) { + self.timers.clear(); + } } impl Default for TimerQueue {