Skip to content

Commit

Permalink
compilable code
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejgray committed Aug 10, 2024
1 parent 4424d10 commit 92f5d84
Showing 1 changed file with 18 additions and 26 deletions.
44 changes: 18 additions & 26 deletions src/message_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ pub struct MessageBus {
}

impl MessageBus {
pub async fn run(&self) {
pub fn new(config: Config) -> Self {
Self {
config: Arc::new(config),
connections: Arc::new(Mutex::new(Vec::new())),
event_emitter: Arc::new(Mutex::new(EventEmitter::new())),
}
}

pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("{}:{}", self.config.host, self.config.port);
let listener = TcpListener::bind(&addr)
.await
.expect("Failed to bind to address");
let listener = TcpListener::bind(&addr).await?;
println!(
"MessageBus listening on {} (route: {})",
addr, self.config.route
Expand All @@ -31,37 +37,24 @@ impl MessageBus {
tokio::spawn(async move {
match accept_async(stream).await {
Ok(ws_stream) => {
bus_clone.handle_connection(ws_stream).await;
if let Err(e) = bus_clone.handle_connection(ws_stream).await {
eprintln!("Error handling connection: {}", e);
}
}
Err(e) => {
eprintln!("Error during WebSocket handshake: {}", e);
}
}
});
}
}

pub async fn run(&self) {
let addr = format!("{}:{}", self.config.host, self.config.port);
let listener = TcpListener::bind(&addr).await.unwrap();
println!(
"MessageBus listening on {} (route: {})",
addr, self.config.route
);

while let Ok((stream, _)) = listener.accept().await {
let ws_stream = accept_async(stream).await.unwrap();
let bus_clone = self.clone();
tokio::spawn(async move {
bus_clone.handle_connection(ws_stream).await;
});
}
Ok(())
}

async fn handle_connection(
&self,
ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) {
) -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::unbounded_channel();
{
let mut connections = self.connections.lock().unwrap();
Expand Down Expand Up @@ -102,14 +95,13 @@ impl MessageBus {
});

while let Some(message) = rx.recv().await {
if let Err(e) = write.send(message).await {
eprintln!("Failed to send message: {}", e);
break;
}
write.send(message).await?;
}

// Clean up connections after the write loop ends
let mut connections = self.connections.lock().unwrap();
connections.retain(|conn| !conn.is_closed());

Ok(())
}
}

0 comments on commit 92f5d84

Please sign in to comment.