Skip to content

Commit

Permalink
[libos] Remove uring udp socket redundant msg when returning error
Browse files Browse the repository at this point in the history
  • Loading branch information
ClawSeven committed Oct 30, 2024
1 parent 0886339 commit 55d65f3
Showing 1 changed file with 33 additions and 35 deletions.
68 changes: 33 additions & 35 deletions src/libos/src/net/socket/uring/datagram/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ impl<A: Addr, R: Runtime> Sender<A, R> {
let mut msg = DataMsg::new(buf_len);
let total_copied = msg.copy_buf(bufs)?;
msg.copy_control(control)?;

let msghdr_ptr = new_send_req(&mut msg, addr);
msg.build(addr);

if !inner.msg_queue.push_msg(msg) {
// Msg queue can not push this msg, mark the socket as non-writable
Expand All @@ -179,12 +178,12 @@ impl<A: Addr, R: Runtime> Sender<A, R> {

// Since the send buffer is not empty, try to flush the buffer
if inner.io_handle.is_none() {
self.do_send(&mut inner, msghdr_ptr);
self.do_send(&mut inner);
}
Ok(total_copied)
}

fn do_send(self: &Arc<Self>, inner: &mut MutexGuard<Inner>, msghdr_ptr: *const libc::msghdr) {
fn do_send(self: &Arc<Self>, inner: &mut MutexGuard<Inner>) {
debug_assert!(!inner.msg_queue.is_empty());
debug_assert!(inner.io_handle.is_none());
let sender = self.clone();
Expand All @@ -195,6 +194,7 @@ impl<A: Addr, R: Runtime> Sender<A, R> {

// Release the handle to the async recv
inner.io_handle.take();
inner.msg_queue.pop_msg();

if retval < 0 {
// TODO: add PRI event if set SO_SELECT_ERR_QUEUE
Expand All @@ -207,12 +207,9 @@ impl<A: Addr, R: Runtime> Sender<A, R> {
}

// Need to handle normal case
inner.msg_queue.pop_msg();
sender.common.pollee().add_events(Events::OUT);
if !inner.msg_queue.is_empty() {
let msghdr_ptr = inner.msg_queue.first_msg_ptr();
debug_assert!(msghdr_ptr.is_some());
sender.do_send(&mut inner, msghdr_ptr.unwrap());
sender.do_send(&mut inner);
} else if inner.is_shutdown == ShutdownStatus::PreShutdown {
// The buffer is empty and the write side is shutdown by the user.
// We can safely shutdown host file here.
Expand All @@ -224,38 +221,12 @@ impl<A: Addr, R: Runtime> Sender<A, R> {
// Generate the async recv request
let io_uring = self.common.io_uring();
let host_fd = Fd(self.common.host_fd() as _);
let msghdr_ptr = inner.msg_queue.first_msg_ptr().unwrap();
let handle = unsafe { io_uring.sendmsg(host_fd, msghdr_ptr, 0, complete_fn) };
inner.io_handle.replace(handle);
}
}

fn new_send_req<A: Addr>(dmsg: &mut DataMsg, addr: &A) -> *const libc::msghdr {
let iovec = libc::iovec {
iov_base: dmsg.send_buf.as_ptr() as _,
iov_len: dmsg.send_buf.len(),
};

let (control, controllen) = match &dmsg.control {
Some(control) => (control.as_mut_ptr() as *mut c_void, control.len()),
None => (ptr::null_mut(), 0),
};

dmsg.req.iovec = iovec;

dmsg.req.msg.msg_iov = &raw mut dmsg.req.iovec as _;
dmsg.req.msg.msg_iovlen = 1;

let (c_addr_storage, c_addr_len) = addr.to_c_storage();

dmsg.req.addr = c_addr_storage;
dmsg.req.msg.msg_name = &raw mut dmsg.req.addr as _;
dmsg.req.msg.msg_namelen = c_addr_len as _;
dmsg.req.msg.msg_control = control;
dmsg.req.msg.msg_controllen = controllen;

&mut dmsg.req.msg
}

pub struct Inner {
io_handle: Option<IoHandle>,
error: Option<Errno>,
Expand Down Expand Up @@ -396,6 +367,33 @@ impl DataMsg {
fn len(&self) -> usize {
self.send_buf.len()
}

fn build<A: Addr>(&mut self, addr: &A) -> *const libc::msghdr {
let iovec = libc::iovec {
iov_base: self.send_buf.as_ptr() as _,
iov_len: self.send_buf.len(),
};

let (control, controllen) = match &self.control {
Some(control) => (control.as_mut_ptr() as *mut c_void, control.len()),
None => (ptr::null_mut(), 0),
};

self.req.iovec = iovec;

self.req.msg.msg_iov = &raw mut self.req.iovec as _;
self.req.msg.msg_iovlen = 1;

let (c_addr_storage, c_addr_len) = addr.to_c_storage();

self.req.addr = c_addr_storage;
self.req.msg.msg_name = &raw mut self.req.addr as _;
self.req.msg.msg_namelen = c_addr_len as _;
self.req.msg.msg_control = control;
self.req.msg.msg_controllen = controllen;

&mut self.req.msg
}
}

#[derive(Debug, PartialEq)]
Expand Down

0 comments on commit 55d65f3

Please sign in to comment.