Skip to content

Commit

Permalink
Refactor communication
Browse files Browse the repository at this point in the history
  • Loading branch information
Kogepan229 committed Apr 30, 2024
1 parent bf91f36 commit cd7ef71
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Cpu {
}

if one_sec_count >= CPU_CLOCK {
socket::send_one_sec_message();
socket::send_one_sec_message().await;
one_sec_count -= CPU_CLOCK;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/cpu/addressing_mode/abs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ use anyhow::{Context as _, Result};
impl Cpu {
async fn send_port_value8(&mut self, addr: u32, value: u8) {
if addr >= 0xffffd0 && addr <= 0xffffda {
send_addr_value_u8(addr, value);
send_addr_value_u8(addr, value).await;
}
}

async fn send_port_value16(&mut self, addr: u32, value: u16) {
if addr >= 0xffffd0 && addr <= 0xffffda {
send_addr_value_u16(addr, value);
send_addr_value_u16(addr, value).await;
}
}

async fn send_port_value32(&mut self, addr: u32, value: u32) {
if addr >= 0xffffd0 && addr <= 0xffffda {
send_addr_value_u32(addr, value);
send_addr_value_u32(addr, value).await;
}
}

Expand Down
130 changes: 65 additions & 65 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,73 @@ use tokio::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener,
},
sync::mpsc::{self, Receiver, Sender},
};

static READER: OnceLock<OwnedReadHalf> = OnceLock::new();
static WRITER: OnceLock<OwnedWriteHalf> = OnceLock::new();
static READ_BUF: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
static MESSAGE_SENDER: OnceLock<Sender<String>> = OnceLock::new();

pub async fn listen(addr: String) -> Result<()> {
let listener = TcpListener::bind(addr).await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let (reader, writer) = stream.into_split();
READER.set(reader).unwrap();
WRITER.set(writer).unwrap();
let (socket_reader, socket_writer) = stream.into_split();

READ_BUF.set(Mutex::new(Vec::<String>::new())).unwrap();
receive();
start_receive_worker(socket_reader);

let (tx, rx) = mpsc::channel(32);
MESSAGE_SENDER.set(tx).unwrap();
start_send_workder(rx, socket_writer);
Ok(())
}

fn receive() {
match READER.get() {
Some(s) => {
tokio::spawn(async move {
loop {
let mut msg = vec![0; 1024];
s.readable().await.unwrap();
match s.try_read(&mut msg) {
Ok(n) => {
msg.truncate(n);
match READ_BUF.get() {
Some(b) => {
b.lock().unwrap().push(String::from_utf8(msg).unwrap());
}
None => return,
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
println!("{}", e.to_string());
return;
fn start_receive_worker(socket_reader: OwnedReadHalf) {
tokio::spawn(async move {
loop {
let mut msg = vec![0; 1024];
socket_reader.readable().await.unwrap();
match socket_reader.try_read(&mut msg) {
Ok(n) => {
msg.truncate(n);
match READ_BUF.get() {
Some(b) => {
b.lock().unwrap().push(String::from_utf8(msg).unwrap());
}
None => return,
}
}
});
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
println!("{}", e.to_string());
return;
}
}
}
None => return,
}
});
}

fn start_send_workder(mut rx: Receiver<String>, socket_writer: OwnedWriteHalf) {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let _msg = message + "\n";
let str_bytes = _msg.as_bytes();
let mut written_bytes = 0;
loop {
socket_writer.writable().await.unwrap();
match socket_writer.try_write(str_bytes) {
Ok(n) => {
written_bytes += n;
}
Err(_) => {}
}
if written_bytes == str_bytes.len() {
break;
}
}
}
});
}

pub fn get_received_msgs() -> Option<Vec<String>> {
Expand All @@ -67,46 +87,26 @@ pub fn get_received_msgs() -> Option<Vec<String>> {
}
}

pub fn send_message(msg: String) {
match WRITER.get() {
Some(v) => {
tokio::spawn(async move {
let _msg = msg + "\n";
let str_bytes = _msg.as_bytes();
let mut written_bytes = 0;
loop {
v.writable().await.unwrap();
match v.try_write(str_bytes) {
Ok(n) => {
written_bytes += n;
}
Err(_) => {}
}
if written_bytes == str_bytes.len() {
break;
}
}
});
}
None => return,
}
}

pub fn send_one_sec_message() {
send_message("1sec".to_owned());
pub async fn send_one_sec_message() {
MESSAGE_SENDER
.get()
.unwrap()
.send("1sec".to_owned())
.await
.unwrap();
}

pub fn send_addr_value_u8(addr: u32, value: u8) {
pub async fn send_addr_value_u8(addr: u32, value: u8) {
let str = format!("u8:{}:{}", addr, value);
send_message(str);
MESSAGE_SENDER.get().unwrap().send(str).await.unwrap();
}

pub fn send_addr_value_u16(addr: u32, value: u16) {
pub async fn send_addr_value_u16(addr: u32, value: u16) {
let str = format!("u16:{}:{}", addr, value);
send_message(str);
MESSAGE_SENDER.get().unwrap().send(str).await.unwrap();
}

pub fn send_addr_value_u32(addr: u32, value: u32) {
pub async fn send_addr_value_u32(addr: u32, value: u32) {
let str = format!("u32:{}:{}", addr, value);
send_message(str);
MESSAGE_SENDER.get().unwrap().send(str).await.unwrap();
}

0 comments on commit cd7ef71

Please sign in to comment.