diff --git a/src/message_bus.rs b/src/message_bus.rs index fb0076c..96b6d25 100644 --- a/src/message_bus.rs +++ b/src/message_bus.rs @@ -16,11 +16,17 @@ pub struct MessageBus { } impl MessageBus { - pub async fn run(&self) { + pub fn new(config: Config) -> Self { + Self { + config: Arc::new(config), + connections: Arc::new(Mutex::new(Vec::new())), + event_emitter: Arc::new(Mutex::new(EventEmitter::new())), + } + } + + pub async fn run(&self) -> Result<(), Box> { let addr = format!("{}:{}", self.config.host, self.config.port); - let listener = TcpListener::bind(&addr) - .await - .expect("Failed to bind to address"); + let listener = TcpListener::bind(&addr).await?; println!( "MessageBus listening on {} (route: {})", addr, self.config.route @@ -31,7 +37,9 @@ impl MessageBus { tokio::spawn(async move { match accept_async(stream).await { Ok(ws_stream) => { - bus_clone.handle_connection(ws_stream).await; + if let Err(e) = bus_clone.handle_connection(ws_stream).await { + eprintln!("Error handling connection: {}", e); + } } Err(e) => { eprintln!("Error during WebSocket handshake: {}", e); @@ -39,29 +47,14 @@ impl MessageBus { } }); } - } - - pub async fn run(&self) { - let addr = format!("{}:{}", self.config.host, self.config.port); - let listener = TcpListener::bind(&addr).await.unwrap(); - println!( - "MessageBus listening on {} (route: {})", - addr, self.config.route - ); - while let Ok((stream, _)) = listener.accept().await { - let ws_stream = accept_async(stream).await.unwrap(); - let bus_clone = self.clone(); - tokio::spawn(async move { - bus_clone.handle_connection(ws_stream).await; - }); - } + Ok(()) } async fn handle_connection( &self, ws_stream: tokio_tungstenite::WebSocketStream, - ) { + ) -> Result<(), Box> { let (tx, mut rx) = mpsc::unbounded_channel(); { let mut connections = self.connections.lock().unwrap(); @@ -102,14 +95,13 @@ impl MessageBus { }); while let Some(message) = rx.recv().await { - if let Err(e) = write.send(message).await { - eprintln!("Failed to send message: {}", e); - break; - } + write.send(message).await?; } // Clean up connections after the write loop ends let mut connections = self.connections.lock().unwrap(); connections.retain(|conn| !conn.is_closed()); + + Ok(()) } }