Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gracefull shutdown for DNS , Taker, Maker #361

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ libtor = { version = "47.13.0", optional = true, features = ["vendored-openssl"]
mitosis = { version = "0.1.1", optional = true }
log4rs = "1.3.0"
openssl-sys = { version = "0.9.68", optional = true }
ctrlc = { version = "3.4.0", features = ["termination"] }

#Empty default feature set, (helpful to generalise in github actions)
[features]
Expand Down
10 changes: 8 additions & 2 deletions src/bin/directoryd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bitcoind::bitcoincore_rpc::Auth;
use clap::Parser;
use coinswap::{
market::directory::{start_directory_server, DirectoryServer, DirectoryServerError},
market::directory::{
shutdown_server, start_dns_server, DirectoryServer, DirectoryServerError,
},
utill::{parse_proxy_auth, setup_directory_logger, ConnectionType},
wallet::RPCConfig,
};
Expand Down Expand Up @@ -70,7 +72,11 @@ fn main() -> Result<(), DirectoryServerError> {
}
let directory = Arc::new(DirectoryServer::new(args.data_directory, Some(conn_type))?);

start_directory_server(directory, Some(rpc_config))?;
// TODO: Pass this Error via ServerError?
if let Err(e) = start_dns_server(directory.clone(), Some(rpc_config)) {
log::error!("Error: {:?}", e);
shutdown_server(directory)?;
}

Ok(())
}
10 changes: 8 additions & 2 deletions src/bin/makerd.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use bitcoind::bitcoincore_rpc::Auth;
use clap::Parser;
use coinswap::{
maker::{start_maker_server, Maker, MakerBehavior, MakerError},
maker::{shutdown_server, start_maker_server, Maker, MakerBehavior, MakerError},
utill::{parse_proxy_auth, setup_maker_logger, ConnectionType},
wallet::RPCConfig,
};

// use coinswap::maker::
use std::{path::PathBuf, str::FromStr, sync::Arc};

#[cfg(feature = "tor")]
Expand Down Expand Up @@ -87,7 +89,11 @@ fn main() -> Result<(), MakerError> {
MakerBehavior::Normal,
)?);

start_maker_server(maker)?;
// TODO: Pass this Error via ServerError?
if let Err(e) = start_maker_server(maker.clone()) {
log::error!("Error: {:?}", e);
shutdown_server(maker)?;
}

Ok(())
}
78 changes: 14 additions & 64 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
messages::{FidelityProof, ReqContractSigsForSender},
Hash160,
},
utill::{get_maker_dir, redeemscript_to_scriptpubkey, ConnectionType},
utill::{get_maker_dir, redeemscript_to_scriptpubkey, ConnectionType, ThreadPool},
wallet::{RPCConfig, SwapCoin, WalletSwapCoin},
};
use bitcoin::{
Expand Down Expand Up @@ -88,65 +88,6 @@ pub struct ConnectionState {
pub pending_funding_txes: Vec<Transaction>,
}

pub struct ThreadPool {
pub threads: Mutex<Vec<JoinHandle<()>>>,
pub port: u16,
}

impl Drop for ThreadPool {
fn drop(&mut self) {
if let Err(e) = self.join_all_threads() {
log::error!("Error joining threads in via drop: {:?}", e);
}
}
}

impl ThreadPool {
pub fn new(port: u16) -> Self {
Self {
threads: Mutex::new(Vec::new()),
port,
}
}

pub fn add_thread(&self, handle: JoinHandle<()>) {
let mut threads = self.threads.lock().unwrap();
threads.push(handle);
}
#[inline]
fn join_all_threads(&self) -> Result<(), MakerError> {
let mut threads = self
.threads
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;

log::info!("Joining {} threads", threads.len());

let mut joined_count = 0;
while let Some(thread) = threads.pop() {
let thread_name = thread.thread().name().unwrap().to_string();

match thread.join() {
Ok(_) => {
log::info!("[{}] Thread {} joined", self.port, thread_name);
joined_count += 1;
}
Err(e) => {
log::error!(
"[{}] Error {:?} while joining thread {}",
self.port,
e,
thread_name
);
}
}
}

log::info!("Successfully joined {} threads", joined_count,);
Ok(())
}
}

/// Represents the maker in the swap protocol.
pub struct Maker {
/// Defines special maker behavior, only applicable for testing
Expand All @@ -166,7 +107,7 @@ pub struct Maker {
/// Path for the data directory.
pub data_dir: PathBuf,
/// Thread pool for managing all spawned threads
pub thread_pool: Arc<ThreadPool>,
pub thread_pool: Arc<Mutex<ThreadPool>>,
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -254,7 +195,16 @@ impl Maker {
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
data_dir,
thread_pool: Arc::new(ThreadPool::new(port)),
thread_pool: {
#[cfg(feature = "integration-test")]
{
Arc::new(Mutex::new(ThreadPool::new(port)))
}
#[cfg(not(feature = "integration-test"))]
{
Arc::new(Mutex::new(ThreadPool::new()))
}
},
})
}

Expand Down Expand Up @@ -511,7 +461,7 @@ pub fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), MakerErr
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
maker.thread_pool.lock()?.add_thread(handle);
// Clear the state value here
*connection_state = ConnectionState::default();
break;
Expand Down Expand Up @@ -597,7 +547,7 @@ pub fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError> {
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
maker.thread_pool.lock()?.add_thread(handle);
// Clear the state values here
*state = ConnectionState::default();
break;
Expand Down
2 changes: 1 addition & 1 deletion src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
maker.thread_pool.lock()?.add_thread(handle);
}
Ok(())
}
2 changes: 1 addition & 1 deletion src/maker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod server;
pub use api::{Maker, MakerBehavior};
pub use error::MakerError;
pub use rpc::{RpcMsgReq, RpcMsgResp};
pub use server::start_maker_server;
pub use server::{shutdown_server, start_maker_server};
Loading