Skip to content

Commit

Permalink
Update tquic_client (Tencent#163)
Browse files Browse the repository at this point in the history
- update `local_addresses` option to allow the os to choose available ports
- use `local_addresses` option to specify the addresses to bind in both singlepath and multipath mode.
- add active-cid-limit option to allow more paths
- allow all workers, instead of just the first worker, to work in multipath mode.
- add README.md for tquic_tools crate
  • Loading branch information
iyangsj authored Jan 30, 2024
1 parent aed5d6a commit 501fad3
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 42 deletions.
84 changes: 55 additions & 29 deletions tools/src/bin/tquic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs::create_dir_all;
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
Expand Down Expand Up @@ -150,7 +151,6 @@ pub struct ClientOpt {

/// Enable early data.
#[clap(short, long)]
// TODO: support early data.
pub enable_early_data: bool,

/// Disable stateless reset.
Expand All @@ -177,9 +177,13 @@ pub struct ClientOpt {
#[clap(long, default_value = "MINRTT")]
pub multipath_algor: MultipathAlgorithm,

/// Extra local addresses for client.
#[clap(long, value_delimiter = ' ', value_name = "ADDR")]
pub local_addresses: Vec<SocketAddr>,
/// Optional local IP addresses for client. e.g 192.168.1.10,192.168.2.20
#[clap(long, value_delimiter = ',', value_name = "ADDR")]
pub local_addresses: Vec<IpAddr>,

/// Set active_connection_id_limit transport parameter. Values lower than 2 will be ignored.
#[clap(long, default_value = "2", value_name = "NUM")]
pub active_cid_limit: u64,

/// Set max_udp_payload_size transport parameter.
#[clap(long, default_value = "65527", value_name = "NUM")]
Expand Down Expand Up @@ -262,12 +266,12 @@ impl Client {
pub fn start(&mut self) {
self.start_time = Instant::now();
let mut threads = vec![];
for i in 0..self.option.threads {
for _ in 0..self.option.threads {
let client_opt = self.option.clone();
let client_ctx = self.context.clone();
let terminated = self.terminated.clone();
let thread = thread::spawn(move || {
let mut worker = Worker::new(i, client_opt, client_ctx, terminated).unwrap();
let mut worker = Worker::new(client_opt, client_ctx, terminated).unwrap();
worker.start().unwrap();
});
threads.push(thread);
Expand Down Expand Up @@ -417,7 +421,6 @@ struct Worker {
impl Worker {
/// Create a new single thread client.
pub fn new(
index: u32,
option: ClientOpt,
client_ctx: Arc<Mutex<ClientContext>>,
terminated: Arc<AtomicBool>,
Expand All @@ -440,6 +443,7 @@ impl Worker {
config.set_min_congestion_window(option.min_congestion_window);
config.enable_multipath(option.enable_multipath);
config.set_multipath_algorithm(option.multipath_algor);
config.set_active_connection_id_limit(option.active_cid_limit);
let tls_config = TlsConfig::new_client_config(
ApplicationProto::convert_to_vec(&option.alpn),
option.enable_early_data,
Expand All @@ -450,17 +454,36 @@ impl Worker {
let registry = poll.registry();
let worker_ctx = Rc::new(RefCell::new(WorkerContext::with_option(&option)));
let senders = Rc::new(RefCell::new(FxHashMap::default()));
let handlers = WorkerHandler::new(&option, worker_ctx.clone(), senders.clone());

// Use unspecified local addr or the given local addr
let remote = option.connect_to.unwrap();
let mut sock = QuicSocket::new_client_socket(remote.is_ipv4(), registry)?;
if index == 0 && !option.local_addresses.is_empty() {
for local in &option.local_addresses {
let _ = sock.add(local, registry);
let local = if !option.local_addresses.is_empty() {
SocketAddr::new(option.local_addresses[0], 0)
} else {
match remote.is_ipv4() {
true => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
false => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}
};
let mut sock = QuicSocket::new(&local, registry)?;

let mut assigned_addrs = Vec::new();
assigned_addrs.push(sock.local_addr());
if let Some(addrs) = option.local_addresses.get(1..) {
for local in addrs {
let addr = sock.add(&SocketAddr::new(*local, 0), registry)?;
assigned_addrs.push(addr);
}
}
let sock = Rc::new(sock);

let handlers = WorkerHandler::new(
&option,
&assigned_addrs,
worker_ctx.clone(),
senders.clone(),
);

Ok(Worker {
option,
endpoint: Endpoint::new(Box::new(config), false, Box::new(handlers), sock.clone()),
Expand Down Expand Up @@ -1161,13 +1184,14 @@ struct WorkerHandler {
/// Remote server.
remote: SocketAddr,

/// Extra local addresses.
/// Local address list
local_addresses: Vec<SocketAddr>,
}

impl WorkerHandler {
fn new(
option: &ClientOpt,
local_addresses: &[SocketAddr],
worker_ctx: Rc<RefCell<WorkerContext>>,
senders: Rc<RefCell<FxHashMap<u64, RequestSender>>>,
) -> Self {
Expand All @@ -1176,7 +1200,7 @@ impl WorkerHandler {
worker_ctx,
senders,
remote: option.connect_to.unwrap(),
local_addresses: option.local_addresses.clone(),
local_addresses: local_addresses.to_owned(),
}
}

Expand Down Expand Up @@ -1276,21 +1300,23 @@ impl TransportHandler for WorkerHandler {
}

// Try to add additional paths
for local in &self.local_addresses {
match conn.add_path(*local, self.remote) {
Ok(_) => debug!(
"{} add new path {}-{}",
conn.trace_id(),
*local,
self.remote
),
Err(e) => debug!(
"{} fail to add path {}-{}: {}",
conn.trace_id(),
*local,
self.remote,
e
),
if let Some(addrs) = self.local_addresses.get(1..) {
for local in addrs {
match conn.add_path(*local, self.remote) {
Ok(_) => debug!(
"{} add new path {}-{}",
conn.trace_id(),
*local,
self.remote
),
Err(e) => debug!(
"{} fail to add path {}-{}: {}",
conn.trace_id(),
*local,
self.remote,
e
),
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions tools/src/bin/tquic_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ pub struct ServerOpt {
#[clap(long, default_value = "MINRTT")]
pub multipath_algor: MultipathAlgorithm,

/// Set active_connection_id_limit transport parameter. Values lower than 2 will be ignored.
#[clap(long, default_value = "2", value_name = "NUM")]
pub active_cid_limit: u64,

/// Set max_udp_payload_size transport parameter.
#[clap(long, default_value = "65527", value_name = "NUM")]
pub recv_udp_payload_size: u16,
Expand Down Expand Up @@ -201,6 +205,7 @@ impl Server {
config.set_min_congestion_window(option.min_congestion_window);
config.enable_multipath(option.enable_multipath);
config.set_multipath_algorithm(option.multipath_algor);
config.set_active_connection_id_limit(option.active_cid_limit);

if let Some(address_token_key) = &option.address_token_key {
let address_token_key = convert_address_token_key(address_token_key);
Expand Down
15 changes: 2 additions & 13 deletions tools/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
// limitations under the License.

use std::io::ErrorKind;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;

use clap::builder::PossibleValue;
Expand Down Expand Up @@ -123,29 +120,21 @@ impl QuicSocket {
})
}

pub fn new_client_socket(is_ipv4: bool, registry: &Registry) -> Result<Self> {
let local = match is_ipv4 {
true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
};
QuicSocket::new(&SocketAddr::new(local, 0), registry)
}

/// Return the local address of the initial socket.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

/// Add additional socket binding with given local address.
pub fn add(&mut self, local: &SocketAddr, registry: &Registry) -> Result<()> {
pub fn add(&mut self, local: &SocketAddr, registry: &Registry) -> Result<SocketAddr> {
let socket = UdpSocket::bind(*local)?;
let local_addr = socket.local_addr()?;
let sid = self.socks.insert(socket);
self.addrs.insert(local_addr, sid);

let socket = self.socks.get_mut(sid).unwrap();
registry.register(socket, Token(sid), Interest::READABLE)?;
Ok(())
Ok(local_addr)
}

/// Delete socket binding with given local address.
Expand Down

0 comments on commit 501fad3

Please sign in to comment.