Skip to content

Commit

Permalink
Refactor the code structure and re-implement uring management
Browse files Browse the repository at this point in the history
  • Loading branch information
ClawSeven committed Dec 13, 2023
1 parent 5025bee commit f466504
Show file tree
Hide file tree
Showing 61 changed files with 175 additions and 192 deletions.
50 changes: 48 additions & 2 deletions src/libos/crates/io-uring-callback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ extern crate sgx_libc as libc;
#[cfg(feature = "sgx")]
extern crate sgx_tstd as std;

use std::io;
use core::sync::atomic::AtomicUsize;
use std::{io, collections::HashMap};
use std::sync::Arc;
cfg_if::cfg_if! {
if #[cfg(feature = "sgx")] {
Expand All @@ -121,10 +122,12 @@ cfg_if::cfg_if! {
}
}

use atomic::Ordering;
use io_uring::opcode;
use io_uring::squeue::Entry as SqEntry;
use io_uring::types;
use slab::Slab;
use spin::RwLock;
use std::os::unix::prelude::RawFd;

use crate::io_handle::IoToken;
Expand All @@ -146,6 +149,7 @@ pub struct IoUring {
ring: io_uring::IoUring,
token_table: Mutex<Slab<Arc<IoToken>>>,
sq_lock: Mutex<()>, // For submission queue synchronization
fd_map: RwLock<HashMap<usize, AtomicUsize>>, // (key: fd, value: op num)
}

impl Drop for IoUring {
Expand All @@ -171,10 +175,12 @@ impl IoUring {
pub(crate) fn new(ring: io_uring::IoUring) -> Self {
let token_table = Mutex::new(Slab::new());
let sq_lock = Mutex::new(());
let fd_map = RwLock::new(HashMap::new());
Self {
ring,
token_table,
sq_lock,
fd_map,
}
}

Expand All @@ -199,6 +205,7 @@ impl IoUring {
let entry = opcode::Accept::new(fd, addr, addrlen)
.flags(flags as i32)
.build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -215,6 +222,7 @@ impl IoUring {
callback: impl FnOnce(i32) + Send + 'static,
) -> IoHandle {
let entry = opcode::Connect::new(fd, addr, addrlen).build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -230,6 +238,7 @@ impl IoUring {
callback: impl FnOnce(i32) + Send + 'static,
) -> IoHandle {
let entry = opcode::PollAdd::new(fd, flags).build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -251,6 +260,7 @@ impl IoUring {
.offset(offset)
.rw_flags(flags)
.build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -272,6 +282,7 @@ impl IoUring {
.offset(offset)
.rw_flags(flags)
.build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -293,6 +304,7 @@ impl IoUring {
.offset(offset)
.rw_flags(flags)
.build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -314,6 +326,7 @@ impl IoUring {
.offset(offset)
.rw_flags(flags)
.build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -330,7 +343,7 @@ impl IoUring {
callback: impl FnOnce(i32) + Send + 'static,
) -> IoHandle {
let entry = opcode::RecvMsg::new(fd, msg).flags(flags).build();
// entry = entry.set_ioprio_poll_first();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -347,6 +360,7 @@ impl IoUring {
callback: impl FnOnce(i32) + Send + 'static,
) -> IoHandle {
let entry = opcode::SendMsg::new(fd, msg).flags(flags).build();
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand All @@ -368,6 +382,7 @@ impl IoUring {
} else {
opcode::Fsync::new(fd).build()
};
self.op_fetch_add(fd.0 as usize, 1);
self.push_entry(entry, callback)
}

Expand Down Expand Up @@ -505,6 +520,37 @@ impl IoUring {
io_handle
}

fn op_fetch_add(&self, fd: usize, val: usize) -> usize {
let fd_map = self.fd_map.upgradeable_read();
match fd_map.get(&fd) {
Some(ops_num) => {
ops_num.fetch_add(val, Ordering::Relaxed)
},
None => {
let mut fd_map = fd_map.upgrade();
fd_map.insert(fd, AtomicUsize::new(val));
0
},
}
}

pub fn dissattach_fd(&self, fd: usize) -> Option<AtomicUsize> {
let mut fd_map = self.fd_map.write();
fd_map.remove(&fd)
}

// Using the sum of the number of attached file descriptors (raw fd) as a measure of task load.
pub fn task_load(&self) -> usize {
let fd_map = self.fd_map.read();
fd_map.values().fold(0, |acc, val| acc + val.load(Ordering::Relaxed))
}

// The number of registered fd in this io_uring instance
pub fn registered_fds(&self) -> usize {
let fd_map = self.fd_map.read();
fd_map.len()
}

/// Cancel an ongoing I/O request.
///
/// # safety
Expand Down
8 changes: 4 additions & 4 deletions src/libos/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ pub extern "C" fn occlum_ecall_init(

HAS_INIT.store(true, Ordering::Release);

// Create four IO-Uring instances and spawn polling threads for each of them.
// Assign IO-Uring references during socket initialization.
let multiton = &crate::io_uring::MULTITON;
multiton.poll_completions();
// // Create four IO-Uring instances and spawn polling threads for each of them.
// // Assign IO-Uring references during socket initialization.
// let multiton = &crate::io_uring::MULTITON;
// multiton.poll_completions();

// Init boot up time stamp here.
time::up_time::init();
Expand Down
46 changes: 18 additions & 28 deletions src/libos/src/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,42 +45,29 @@ impl UringState {
}

pub struct UringSet {
// urings: Mutex<HashMap<KeyableArc<IoUring>, UringState>>,
urings: Mutex<Vec<(Arc<IoUring>, UringState)>>,
count: AtomicUsize,
urings: Mutex<HashMap<KeyableArc<IoUring>, UringState>>,
}

impl UringSet {
pub fn new() -> Self {
// let urings = Mutex::new(HashMap::new());
let urings = Mutex::new(Vec::with_capacity(URING_NUM_LIMIT));
let urings = Mutex::new(HashMap::new());

let mut guard = urings.lock();
for _ in 0..URING_NUM_LIMIT {
let uring: Arc<IoUring> = Arc::new(
let uring: KeyableArc<IoUring> = Arc::new(
Builder::new()
.setup_sqpoll(500 /* ms */)
.build(256)
.unwrap(),
)
.into();
guard.push((uring, UringState::default()));

// let uring: KeyableArc<IoUring> = Arc::new(
// Builder::new()
// .setup_sqpoll(500 /* ms */)
// .build(256)
// .unwrap(),
// )
// .into();
// guard.insert(uring, UringState::default());
guard.insert(uring, UringState::default());
}

drop(guard);

Self {
urings,
count: AtomicUsize::new(0),
}
}

Expand All @@ -94,15 +81,17 @@ impl UringSet {

pub fn get_uring(&self) -> Arc<IoUring> {
let mut map = self.urings.lock();
let (uring, state) = map
.iter_mut()
.min_by_key(|(_, &mut state)| state.registered_num)
.unwrap();

let idx = self.count.fetch_add(1, Ordering::Relaxed) % URING_NUM_LIMIT;
let min_registered_num = state.registered_num;

let (uring, state) = map.get_mut(idx).unwrap();
let (uring, state) = map.iter_mut().filter(|(_, state)| {
state.registered_num == min_registered_num
}).min_by_key(|(uring, _)| uring.task_load()).unwrap();

// let (uring, state) = map
// .iter_mut()
// .min_by_key(|(_, &mut state)| state.registered_num)
// .unwrap();
// Update states
state.register_one_socket();
if !state.is_enable_poll {
Expand All @@ -112,11 +101,12 @@ impl UringSet {
uring.clone().into()
}

pub fn free_uring(&self, uring: Arc<IoUring>) {
// let uring: KeyableArc<IoUring> = uring.into();
// let mut map = self.urings.lock();
// let mut state = map.get_mut(&uring).unwrap();
pub fn disattach_uring(&self, fd: usize, uring: Arc<IoUring>) {
let uring: KeyableArc<IoUring> = uring.into();
let mut map = self.urings.lock();
let mut state = map.get_mut(&uring).unwrap();

// state.unregister_one_socket();
uring.dissattach_fd(fd);
state.unregister_one_socket();
}
}
1 change: 0 additions & 1 deletion src/libos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ mod signal;
mod syscall;
mod time;
mod untrusted;
mod uring_socket;
mod util;
mod vm;

Expand Down
4 changes: 2 additions & 2 deletions src/libos/src/net/io_multiplexing/poll_new/event_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::time::Duration;

use crate::events::{Observer, Waiter, WaiterQueueObserver};
use crate::fs::{AtomicIoEvents, IoEvents};
use crate::net::socket_file::UringSocketType;
use crate::net::uring_socket::util::poller::Poller;
use crate::net::uring_socket::UringSocketType;
use crate::prelude::*;
use crate::time::{timespec_t, TIMERSLACK};
use crate::uring_socket::util::poller::Poller;

/// Monitor events that happen on a set of interesting files.
///
Expand Down
4 changes: 2 additions & 2 deletions src/libos/src/net/io_multiplexing/poll_new/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::cell::Cell;
use std::sync::Weak;
use std::time::Duration;

use crate::net::uring_socket::util::poller::Poller;
use crate::prelude::*;
use crate::uring_socket::util::poller::Poller;
use crate::{fs::IoEvents, net::socket_file::UringSocketType};
use crate::{fs::IoEvents, net::uring_socket::UringSocketType};

use self::event_monitor::{EventMonitor, EventMonitorBuilder};

Expand Down
7 changes: 1 addition & 6 deletions src/libos/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,9 @@ pub use self::ocall_socket::{
};
pub use self::syscalls::*;

pub use self::addr::*;

mod addr;
mod io_multiplexing;
mod ocall_socket;
mod socket_file;
mod socket_file_impl;
mod sockopt;
mod syscalls;
pub mod uring_socket;

pub use self::syscalls::*;
Loading

0 comments on commit f466504

Please sign in to comment.