diff --git a/src/maker/api.rs b/src/maker/api.rs index 523bdf4f..3c185261 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -93,12 +93,7 @@ pub struct ConnectionState { pub struct ThreadPool { pub threads: Mutex>>, -} - -impl Default for ThreadPool { - fn default() -> Self { - Self::new() - } + pub port: u16, } impl Drop for ThreadPool { @@ -110,9 +105,10 @@ impl Drop for ThreadPool { } impl ThreadPool { - pub fn new() -> Self { + pub fn new(port: u16) -> Self { Self { threads: Mutex::new(Vec::new()), + port, } } @@ -126,9 +122,30 @@ impl ThreadPool { .threads .lock() .map_err(|_| MakerError::General("Failed to lock threads"))?; + + log::info!("Joining {} threads", threads.len()); + + let mut joined_count = 0; while let Some(thread) = threads.pop() { - thread.join().unwrap(); + let thread_name = thread.thread().name().unwrap().to_string(); + + match thread.join() { + Ok(_) => { + log::info!("[{}] Thread {} joined", self.port, thread_name); + joined_count += 1; + } + Err(e) => { + log::error!( + "[{}] Error {:?} while joining thread {}", + self.port, + e, + thread_name + ); + } + } } + + log::info!("Successfully joined {} threads", joined_count,); Ok(()) } } @@ -251,6 +268,8 @@ impl Maker { config.connection_type = connection_type; } + let thread_pool_port = config.port; + config.write_to_file(&data_dir.join("config.toml"))?; log::info!("Initializing wallet sync"); @@ -265,7 +284,7 @@ impl Maker { connection_state: Mutex::new(HashMap::new()), highest_fidelity_proof: RwLock::new(None), is_setup_complete: AtomicBool::new(false), - thread_pool: Arc::new(ThreadPool::new()), + thread_pool: Arc::new(ThreadPool::new(thread_pool_port)), }) } @@ -510,9 +529,14 @@ pub fn check_for_broadcasted_contracts(maker: Arc) -> Result<(), MakerErr "[{}] Spawning recovery thread after seeing contracts in mempool", maker.config.port ); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap(); - }); + let handle = std::thread::Builder::new() + .name("Swap recovery thread".to_string()) + .spawn(move || { + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) + { + log::error!("Failed to recover from swap due to: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); // Clear the state value here *connection_state = ConnectionState::default(); @@ -592,9 +616,13 @@ pub fn check_for_idle_states(maker: Arc) -> Result<(), MakerError> { "[{}] Spawning recovery thread after Taker dropped", maker.config.port ); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap() - }); + let handle = std::thread::Builder::new() + .name("Swap Recovery Thread".to_string()) + .spawn(move || { + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) { + log::error!("Failed to recover from swap due to: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); // Clear the state values here *state = ConnectionState::default(); diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index 647138d3..dc6b32ff 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -678,9 +678,13 @@ fn unexpected_recovery(maker: Arc) -> Result<(), MakerError> { } // Spawn a separate thread to wait for contract maturity and broadcasting timelocked. let maker_clone = maker.clone(); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap() - }); + let handle = std::thread::Builder::new() + .name("Swap Recovery Thread".to_string()) + .spawn(move || { + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) { + log::error!("Failed to recover from swap due to: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); } Ok(())