From aa450b65725f45c510585168e3c5fdb2a60e6524 Mon Sep 17 00:00:00 2001 From: claddy <0xcladdy@gmail.com> Date: Mon, 16 Dec 2024 23:11:12 +0530 Subject: [PATCH 1/3] Name recovery threads --- src/maker/api.rs | 26 +++++++++++++++++++------- src/maker/handlers.rs | 6 +++--- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/maker/api.rs b/src/maker/api.rs index 523bdf4f..203600e7 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -510,9 +510,15 @@ pub fn check_for_broadcasted_contracts(maker: Arc) -> Result<(), MakerErr "[{}] Spawning recovery thread after seeing contracts in mempool", maker.config.port ); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap(); - }); + let handle = std::thread::Builder::new() + .name("recovery: saw contracts in mempool".to_string()) + .spawn(move || { + if let Err(e) = + recover_from_swap(maker_clone.clone(), outgoings, incomings) + { + log::error!("Recovery thread failed with error: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); // Clear the state value here *connection_state = ConnectionState::default(); @@ -592,9 +598,15 @@ pub fn check_for_idle_states(maker: Arc) -> Result<(), MakerError> { "[{}] Spawning recovery thread after Taker dropped", maker.config.port ); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap() - }); + let handle = std::thread::Builder::new() + .name("recovery: taker dropped".to_string()) + .spawn(move || { + if let Err(e) = + recover_from_swap(maker_clone.clone(), outgoings, incomings) + { + log::error!("Recovery thread failed with error: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); // Clear the state values here *state = ConnectionState::default(); @@ -775,4 +787,4 @@ pub fn recover_from_swap( }; std::thread::sleep(block_lookup_interval); } -} +} \ No newline at end of file diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index 647138d3..2ffa88da 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -678,9 +678,9 @@ fn unexpected_recovery(maker: Arc) -> Result<(), MakerError> { } // Spawn a separate thread to wait for contract maturity and broadcasting timelocked. let maker_clone = maker.clone(); - let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap() - }); + let handle = std::thread::Builder::new() + .name("recovery: wait for contract maturity & broadcasting timelocked".to_string()) + .spawn(move || recover_from_swap(maker_clone, outgoings, incomings).unwrap())?; maker.thread_pool.add_thread(handle); } Ok(()) From e1c052627ec3ffc2d83a5d81f159b21608910ed9 Mon Sep 17 00:00:00 2001 From: claddy <0xcladdy@gmail.com> Date: Mon, 16 Dec 2024 23:21:25 +0530 Subject: [PATCH 2/3] Add logs when 'join_all_threads' is used --- src/maker/api.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/maker/api.rs b/src/maker/api.rs index 203600e7..e3abe637 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -126,9 +126,30 @@ impl ThreadPool { .threads .lock() .map_err(|_| MakerError::General("Failed to lock threads"))?; + + let thread_count = threads.len(); + log::info!("Joining {} threads", thread_count); + + let mut joined_count = 0; while let Some(thread) = threads.pop() { - thread.join().unwrap(); + let thread_name = thread.thread().name().unwrap().to_string(); + + match thread.join() { + Ok(_) => { + log::info!("Thread {} terminated successfully", thread_name); + joined_count += 1; + } + Err(e) => { + log::error!("Thread {} terminated due to error {:?}", thread_name, e); + } + } } + + log::info!( + "Successfully joined {} out of {} threads", + joined_count, + thread_count + ); Ok(()) } } @@ -787,4 +808,4 @@ pub fn recover_from_swap( }; std::thread::sleep(block_lookup_interval); } -} \ No newline at end of file +} From 09718c624363a32941fe4d139e7c6c1b3b541507 Mon Sep 17 00:00:00 2001 From: claddy <0xcladdy@gmail.com> Date: Tue, 17 Dec 2024 22:45:52 +0530 Subject: [PATCH 3/3] Address review; minor nits --- src/maker/api.rs | 47 +++++++++++++++++++------------------------ src/maker/handlers.rs | 8 ++++++-- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/maker/api.rs b/src/maker/api.rs index e3abe637..3c185261 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -93,12 +93,7 @@ pub struct ConnectionState { pub struct ThreadPool { pub threads: Mutex>>, -} - -impl Default for ThreadPool { - fn default() -> Self { - Self::new() - } + pub port: u16, } impl Drop for ThreadPool { @@ -110,9 +105,10 @@ impl Drop for ThreadPool { } impl ThreadPool { - pub fn new() -> Self { + pub fn new(port: u16) -> Self { Self { threads: Mutex::new(Vec::new()), + port, } } @@ -127,8 +123,7 @@ impl ThreadPool { .lock() .map_err(|_| MakerError::General("Failed to lock threads"))?; - let thread_count = threads.len(); - log::info!("Joining {} threads", thread_count); + log::info!("Joining {} threads", threads.len()); let mut joined_count = 0; while let Some(thread) = threads.pop() { @@ -136,20 +131,21 @@ impl ThreadPool { match thread.join() { Ok(_) => { - log::info!("Thread {} terminated successfully", thread_name); + log::info!("[{}] Thread {} joined", self.port, thread_name); joined_count += 1; } Err(e) => { - log::error!("Thread {} terminated due to error {:?}", thread_name, e); + log::error!( + "[{}] Error {:?} while joining thread {}", + self.port, + e, + thread_name + ); } } } - log::info!( - "Successfully joined {} out of {} threads", - joined_count, - thread_count - ); + log::info!("Successfully joined {} threads", joined_count,); Ok(()) } } @@ -272,6 +268,8 @@ impl Maker { config.connection_type = connection_type; } + let thread_pool_port = config.port; + config.write_to_file(&data_dir.join("config.toml"))?; log::info!("Initializing wallet sync"); @@ -286,7 +284,7 @@ impl Maker { connection_state: Mutex::new(HashMap::new()), highest_fidelity_proof: RwLock::new(None), is_setup_complete: AtomicBool::new(false), - thread_pool: Arc::new(ThreadPool::new()), + thread_pool: Arc::new(ThreadPool::new(thread_pool_port)), }) } @@ -532,12 +530,11 @@ pub fn check_for_broadcasted_contracts(maker: Arc) -> Result<(), MakerErr maker.config.port ); let handle = std::thread::Builder::new() - .name("recovery: saw contracts in mempool".to_string()) + .name("Swap recovery thread".to_string()) .spawn(move || { - if let Err(e) = - recover_from_swap(maker_clone.clone(), outgoings, incomings) + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) { - log::error!("Recovery thread failed with error: {:?}", e); + log::error!("Failed to recover from swap due to: {:?}", e); } })?; maker.thread_pool.add_thread(handle); @@ -620,12 +617,10 @@ pub fn check_for_idle_states(maker: Arc) -> Result<(), MakerError> { maker.config.port ); let handle = std::thread::Builder::new() - .name("recovery: taker dropped".to_string()) + .name("Swap Recovery Thread".to_string()) .spawn(move || { - if let Err(e) = - recover_from_swap(maker_clone.clone(), outgoings, incomings) - { - log::error!("Recovery thread failed with error: {:?}", e); + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) { + log::error!("Failed to recover from swap due to: {:?}", e); } })?; maker.thread_pool.add_thread(handle); diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index 2ffa88da..dc6b32ff 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -679,8 +679,12 @@ fn unexpected_recovery(maker: Arc) -> Result<(), MakerError> { // Spawn a separate thread to wait for contract maturity and broadcasting timelocked. let maker_clone = maker.clone(); let handle = std::thread::Builder::new() - .name("recovery: wait for contract maturity & broadcasting timelocked".to_string()) - .spawn(move || recover_from_swap(maker_clone, outgoings, incomings).unwrap())?; + .name("Swap Recovery Thread".to_string()) + .spawn(move || { + if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) { + log::error!("Failed to recover from swap due to: {:?}", e); + } + })?; maker.thread_pool.add_thread(handle); } Ok(())