Skip to content

Commit

Permalink
Simplify the usage of Endpoint
Browse files Browse the repository at this point in the history
- Supports forcibly closing a endpoint
- When closing a connection, all its streams are forcefully closed first
- Once a connection is marked as closed, all its remaining internal
  events will no longer be processed
  • Loading branch information
iyangsj committed Nov 27, 2023
1 parent fd20849 commit 4a69ec0
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 73 deletions.
2 changes: 1 addition & 1 deletion apps/src/bin/tquic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl Worker {
);

// Close endpoint.
self.endpoint.close();
self.endpoint.close(false);

// Close connections.
let mut senders = self.senders.borrow_mut();
Expand Down
8 changes: 5 additions & 3 deletions include/tquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,12 @@ bool quic_endpoint_exist_connection(struct quic_endpoint_t *endpoint,
struct quic_conn_t *quic_endpoint_get_connection(struct quic_endpoint_t *endpoint, uint64_t index);

/**
* Cease creating new connections and wait all active connections to
* close.
* Gracefully or forcibly shutdown the endpoint.
* If `force` is false, cease creating new connections and wait for all
* active connections to close. Otherwise, forcibly close all the active
* connections.
*/
void quic_endpoint_close(struct quic_endpoint_t *endpoint);
void quic_endpoint_close(struct quic_endpoint_t *endpoint, bool force);

/**
* Get index of the connection
Expand Down
5 changes: 5 additions & 0 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3339,6 +3339,11 @@ impl Connection {
self.streams.writable_iter()
}

/// Return an iterator over all the existing streams on the connection.
pub fn stream_iter(&self) -> StreamIter {
self.streams.iter()
}

/// Return true if the stream has enough flow control capacity to send data
/// and application wants to send more data.
pub(crate) fn stream_check_writable(&self, stream_id: u64) -> bool {
Expand Down
7 changes: 7 additions & 0 deletions src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,13 @@ impl StreamMap {
}
}

/// Return an iterator over all the existing streams.
pub fn iter(&self) -> StreamIter {
StreamIter {
streams: self.streams.keys().copied().collect(),
}
}

/// Return an iterator over streams that have outstanding data to be read
/// by the application.
pub fn readable_iter(&self) -> StreamIter {
Expand Down
196 changes: 131 additions & 65 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,91 +463,117 @@ impl Endpoint {

// Process all tickable connections
while let Some(idx) = self.conn_tickable_next() {
if self.process_connection(idx, &mut ready) {
continue;
}

// Try to clean up the closed connection.
let conn = match self.conns.get_mut(idx) {
Some(v) => v,
None => continue,
};

// Try to clean up the closed connection.
if conn.is_closed() {
self.handler.on_conn_closed(conn);
for stream_id in conn.stream_iter() {
self.handler.on_stream_closed(conn, stream_id);
conn.stream_destroy(stream_id);
}

conn.mark_tickable(false);
conn.mark_sendable(false);
self.timers.del(&idx);
self.routes.remove(conn);
self.conns.remove(idx);
continue;
self.handler.on_conn_closed(conn);
conn.mark_tickable(false);
conn.mark_sendable(false);
self.timers.del(&idx);
self.routes.remove(conn);
self.conns.remove(idx);
}

// Process all sendable connections
self.send_packets_out()?;

// Add connection with internal events to the tickable queue again.
for idx in ready {
if let Some(conn) = self.conns.get_mut(idx) {
conn.mark_tickable(true);
}
}

// Try to process endpoint-facing events on the connection.
while let Some(event) = conn.poll() {
match event {
Event::ConnectionEstablished => self.handler.on_conn_established(conn),
Ok(())
}

Event::NewToken(token) => self.handler.on_new_token(conn, token),
/// Process internal events of a tickable connection.
///
/// Return `true` if the connection has been processed successfully.
/// Return `false` if the connection has been closed and need to be cleaned up.
fn process_connection(&mut self, idx: u64, ready: &mut Vec<u64>) -> bool {
let conn = match self.conns.get_mut(idx) {
Some(v) => v,
None => return true,
};
if conn.is_closed() {
return false;
}

Event::ScidToAdvertise(num) => {
let key = &self.config.reset_token_key;
Self::conn_add_scids(conn, num, &mut self.cid_gen, key, &mut self.routes);
}
// Try to process endpoint-facing events on the connection.
while let Some(event) = conn.poll() {
match event {
Event::ConnectionEstablished => self.handler.on_conn_established(conn),

Event::ScidRetired(cid) => self.routes.remove_with_cid(&cid),
Event::NewToken(token) => self.handler.on_new_token(conn, token),

Event::DcidAdvertised(token) => self.routes.insert_with_token(token, idx),
Event::ScidToAdvertise(num) => {
let key = &self.config.reset_token_key;
Self::conn_add_scids(conn, num, &mut self.cid_gen, key, &mut self.routes);
}

Event::DcidRetired(token) => self.routes.remove_with_token(&token),
Event::ScidRetired(cid) => self.routes.remove_with_cid(&cid),

Event::StreamCreated(stream_id) => {
self.handler.on_stream_created(conn, stream_id)
}
Event::DcidAdvertised(token) => self.routes.insert_with_token(token, idx),

Event::StreamClosed(stream_id) => {
self.handler.on_stream_closed(conn, stream_id);
conn.stream_destroy(stream_id);
}
Event::DcidRetired(token) => self.routes.remove_with_token(&token),

Event::StreamCreated(stream_id) => self.handler.on_stream_created(conn, stream_id),

Event::StreamClosed(stream_id) => {
self.handler.on_stream_closed(conn, stream_id);
conn.stream_destroy(stream_id);
}
}
for stream_id in conn.stream_readable_iter() {
if conn.stream_check_readable(stream_id) {
self.handler.on_stream_readable(conn, stream_id);
}
if conn.is_closed() {
return false;
}

for stream_id in conn.stream_writable_iter() {
if conn.stream_check_writable(stream_id) {
self.handler.on_stream_writable(conn, stream_id);
}
for stream_id in conn.stream_readable_iter() {
if conn.stream_check_readable(stream_id) {
self.handler.on_stream_readable(conn, stream_id);
if conn.is_closed() {
return false;
}
}

// Add the connection to the sendable queue
conn.mark_sendable(true);

// Try to update the timer of the connection
if let Some(t) = conn.timeout() {
self.timers.add(idx, t, Instant::now());
} else {
self.timers.del(&idx);
}

if conn.is_ready() {
trace!("conn {} is still ready", conn.trace_id());
ready.push(idx);
}
for stream_id in conn.stream_writable_iter() {
if conn.stream_check_writable(stream_id) {
self.handler.on_stream_writable(conn, stream_id);
if conn.is_closed() {
return false;
}
}
conn.mark_tickable(false);
}

// Process all sendable connections
self.send_packets_out()?;
// Add the connection to the sendable queue
conn.mark_sendable(true);

// Add connection with internal events to the tickable queue again.
for idx in ready {
if let Some(conn) = self.conns.get_mut(idx) {
conn.mark_tickable(true);
}
// Try to update the timer of the connection
if let Some(t) = conn.timeout() {
self.timers.add(idx, t, Instant::now());
} else {
self.timers.del(&idx);
}

Ok(())
if conn.is_ready() {
trace!("conn {} is still ready", conn.trace_id());
ready.push(idx);
}
conn.mark_tickable(false);
true
}

/// Add scids for the given connection
Expand Down Expand Up @@ -700,10 +726,31 @@ impl Endpoint {
Ok(())
}

/// Cease creating new connections and wait all active connections to
/// close.
pub fn close(&mut self) {
/// Gracefully or forcibly shutdown the endpoint.
/// If `force` is false, cease creating new connections and wait for all
/// active connections to close. Otherwise, forcibly close all the active
/// connections.
pub fn close(&mut self, force: bool) {
self.closed = true;
if !force {
return;
}

trace!(
"{} forcibly close {} connections",
&self.trace_id,
self.conns.len()
);
for (_, conn) in self.conns.conns.iter_mut() {
for stream_id in conn.stream_iter() {
self.handler.on_stream_closed(conn, stream_id);
conn.stream_destroy(stream_id);
}
self.handler.on_conn_closed(conn);
}
self.timers.clear();
self.routes.clear();
self.conns.clear();
}

/// Set the unique trace id for the endpoint
Expand Down Expand Up @@ -752,6 +799,11 @@ impl ConnectionTable {
self.conns.remove(&index);
}

/// Clear the connection table
fn clear(&mut self) {
self.conns.clear();
}

/// Return the number of connections
fn len(&self) -> usize {
self.conns.len()
Expand Down Expand Up @@ -874,6 +926,13 @@ impl ConnectionRoutes {
}
}
}

/// Clear all the routes.
fn clear(&mut self) {
self.cid_table.clear();
self.addr_table.clear();
self.token_table.clear();
}
}

const MAX_BUFFER_SIZE: usize = 2048;
Expand Down Expand Up @@ -1546,7 +1605,7 @@ mod tests {

fn on_conn_closed(&mut self, conn: &mut Connection) {
trace!("{} connection closed", conn.trace_id());
assert_eq!(conn.is_established(), true);
assert_eq!(conn.stream_iter().count(), 0);
if self.conf.new_token_expected {
assert!(self.token.is_some());
}
Expand Down Expand Up @@ -1631,6 +1690,7 @@ mod tests {

fn on_conn_closed(&mut self, conn: &mut Connection) {
trace!("{} connection closed", conn.trace_id());
assert_eq!(conn.stream_iter().count(), 0);
}

fn on_stream_created(&mut self, conn: &mut Connection, stream_id: u64) {
Expand Down Expand Up @@ -1981,9 +2041,15 @@ mod tests {
assert!(e.connect(cli_addr, srv_addr, host, None, None).is_ok());
assert_eq!(e.conns.len(), 1);

// connect on closed client endpoint
e.close();
// gracefully close client endpoint
e.close(false);
assert_eq!(e.conns.len(), 1);
assert!(e.connect(cli_addr, srv_addr, host, None, None).is_err());
assert_eq!(e.conns.len(), 1);

// forcibly close client endpoint
e.close(true);
assert_eq!(e.conns.len(), 0);

// connect on server endpoint
let mut e = Endpoint::new(
Expand Down
10 changes: 6 additions & 4 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,13 @@ pub extern "C" fn quic_endpoint_get_connection(
}
}

/// Cease creating new connections and wait all active connections to
/// close.
/// Gracefully or forcibly shutdown the endpoint.
/// If `force` is false, cease creating new connections and wait for all
/// active connections to close. Otherwise, forcibly close all the active
/// connections.
#[no_mangle]
pub extern "C" fn quic_endpoint_close(endpoint: &mut Endpoint) {
endpoint.close()
pub extern "C" fn quic_endpoint_close(endpoint: &mut Endpoint, force: bool) {
endpoint.close(force)
}

/// Get index of the connection
Expand Down
5 changes: 5 additions & 0 deletions src/timer_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl TimerQueue {
}
None
}

/// Clear all the timers
pub fn clear(&mut self) {
self.timers.clear();
}
}

impl Default for TimerQueue {
Expand Down

0 comments on commit 4a69ec0

Please sign in to comment.