From 014af60aa1e70ac030cce61ddc5f012d504d8233 Mon Sep 17 00:00:00 2001 From: claddy <0xcladdy@gmail.com> Date: Wed, 11 Dec 2024 11:14:21 +0530 Subject: [PATCH] Add threadpool to join recovery threads - implement drop trait - replace local threadpools from maker server - restructure error handling in server --- src/maker/api.rs | 90 ++++++++++++++++++++++++++++++++----------- src/maker/handlers.rs | 5 ++- src/maker/server.rs | 61 +++++++++++++---------------- 3 files changed, 98 insertions(+), 58 deletions(-) diff --git a/src/maker/api.rs b/src/maker/api.rs index d634f550..523bdf4f 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -6,25 +6,6 @@ //! contract broadcasts and handle idle Taker connections. Additionally, it handles recovery by broadcasting //! contract transactions and claiming funds after an unsuccessful swap event. -use std::{ - collections::HashMap, - net::IpAddr, - path::PathBuf, - sync::{ - atomic::{AtomicBool, Ordering::Relaxed}, - Arc, Mutex, RwLock, - }, - time::{Duration, Instant}, -}; - -use bip39::Mnemonic; -use bitcoin::{ - ecdsa::Signature, - secp256k1::{self, Secp256k1}, - OutPoint, PublicKey, ScriptBuf, Transaction, -}; -use bitcoind::bitcoincore_rpc::RpcApi; - use crate::{ protocol::{ contract::check_hashvalues_are_equal, @@ -36,6 +17,24 @@ use crate::{ }, wallet::{RPCConfig, SwapCoin, WalletSwapCoin}, }; +use bip39::Mnemonic; +use bitcoin::{ + ecdsa::Signature, + secp256k1::{self, Secp256k1}, + OutPoint, PublicKey, ScriptBuf, Transaction, +}; +use bitcoind::bitcoincore_rpc::RpcApi; +use std::{ + collections::HashMap, + net::IpAddr, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, Mutex, RwLock, + }, + thread::JoinHandle, + time::{Duration, Instant}, +}; use crate::{ protocol::{ @@ -92,6 +91,48 @@ pub struct ConnectionState { pub pending_funding_txes: Vec, } +pub struct ThreadPool { + pub threads: Mutex>>, +} + +impl Default for ThreadPool { + fn default() -> Self { + Self::new() + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + if let Err(e) = self.join_all_threads() { + log::error!("Error joining threads in via drop: {:?}", e); + } + } +} + +impl ThreadPool { + pub fn new() -> Self { + Self { + threads: Mutex::new(Vec::new()), + } + } + + pub fn add_thread(&self, handle: JoinHandle<()>) { + let mut threads = self.threads.lock().unwrap(); + threads.push(handle); + } + #[inline] + fn join_all_threads(&self) -> Result<(), MakerError> { + let mut threads = self + .threads + .lock() + .map_err(|_| MakerError::General("Failed to lock threads"))?; + while let Some(thread) = threads.pop() { + thread.join().unwrap(); + } + Ok(()) + } +} + /// Represents the maker in the swap protocol. pub struct Maker { /// Defines special maker behavior, only applicable for testing @@ -108,6 +149,8 @@ pub struct Maker { pub highest_fidelity_proof: RwLock>, /// Is setup complete pub is_setup_complete: AtomicBool, + /// Thread pool for managing all spawned threads + pub thread_pool: Arc, } #[allow(clippy::too_many_arguments)] @@ -222,6 +265,7 @@ impl Maker { connection_state: Mutex::new(HashMap::new()), highest_fidelity_proof: RwLock::new(None), is_setup_complete: AtomicBool::new(false), + thread_pool: Arc::new(ThreadPool::new()), }) } @@ -466,9 +510,10 @@ pub fn check_for_broadcasted_contracts(maker: Arc) -> Result<(), MakerErr "[{}] Spawning recovery thread after seeing contracts in mempool", maker.config.port ); - std::thread::spawn(move || { + let handle = std::thread::spawn(move || { recover_from_swap(maker_clone, outgoings, incomings).unwrap(); }); + maker.thread_pool.add_thread(handle); // Clear the state value here *connection_state = ConnectionState::default(); break; @@ -547,9 +592,10 @@ pub fn check_for_idle_states(maker: Arc) -> Result<(), MakerError> { "[{}] Spawning recovery thread after Taker dropped", maker.config.port ); - std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap(); + let handle = std::thread::spawn(move || { + recover_from_swap(maker_clone, outgoings, incomings).unwrap() }); + maker.thread_pool.add_thread(handle); // Clear the state values here *state = ConnectionState::default(); break; diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index e5c5e061..647138d3 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -678,9 +678,10 @@ fn unexpected_recovery(maker: Arc) -> Result<(), MakerError> { } // Spawn a separate thread to wait for contract maturity and broadcasting timelocked. let maker_clone = maker.clone(); - std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings).unwrap(); + let handle = std::thread::spawn(move || { + recover_from_swap(maker_clone, outgoings, incomings).unwrap() }); + maker.thread_pool.add_thread(handle); } Ok(()) } diff --git a/src/maker/server.rs b/src/maker/server.rs index e7d1f1f0..b247b18e 100644 --- a/src/maker/server.rs +++ b/src/maker/server.rs @@ -433,23 +433,22 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { // Global server Mutex, to switch on/off p2p network. let accepting_clients = Arc::new(AtomicBool::new(false)); - // Spawn Server threads. - // All thread handles are stored in the thread_pool, which are all joined at server shutdown. - let mut thread_pool = Vec::new(); - if !maker.shutdown.load(Relaxed) { // 1. Bitcoin Core Connection checker thread. // Ensures that Bitcoin Core connection is live. // If not, it will block p2p connections until Core works again. let maker_clone = maker.clone(); let acc_client_clone = accepting_clients.clone(); - let conn_check_thread: thread::JoinHandle> = thread::Builder::new() + let conn_check_thread = thread::Builder::new() .name("Bitcoin Core Connection Checker Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning Bitcoin Core connection checker thread", port); - check_connection_with_core(maker_clone, acc_client_clone) + if let Err(e) = check_connection_with_core(maker_clone.clone(), acc_client_clone) { + log::error!("[{}] Bitcoin Core connection check failed: {:?}", port, e); + maker_clone.shutdown.store(true, Relaxed); + } })?; - thread_pool.push(conn_check_thread); + maker.thread_pool.add_thread(conn_check_thread); // 2. Idle Client connection checker thread. // This threads check idelness of peer in live swaps. @@ -462,9 +461,12 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { "[{}] Spawning Client connection status checker thread", port ); - check_for_idle_states(maker_clone.clone()) + if let Err(e) = check_for_idle_states(maker_clone.clone()) { + log::error!("Failed checking client's idle state {:?}", e); + maker_clone.shutdown.store(true, Relaxed); + } })?; - thread_pool.push(idle_conn_check_thread); + maker.thread_pool.add_thread(idle_conn_check_thread); // 3. Watchtower thread. // This thread checks for broadcasted contract transactions, which usually means violation of the protocol. @@ -475,9 +477,12 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .name("Contract Watcher Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning contract-watcher thread", port); - check_for_broadcasted_contracts(maker_clone.clone()) + if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) { + maker_clone.shutdown.store(true, Relaxed); + log::error!("Failed checking broadcasted contracts {:?}", e); + } })?; - thread_pool.push(contract_watcher_thread); + maker.thread_pool.add_thread(contract_watcher_thread); // 4: The RPC server thread. // User for responding back to `maker-cli` apps. @@ -486,10 +491,16 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .name("RPC Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning RPC server thread", port); - start_rpc_server(maker_clone) + match start_rpc_server(maker_clone.clone()) { + Ok(_) => (), + Err(e) => { + log::error!("Failed starting rpc server {:?}", e); + maker_clone.shutdown.store(true, Relaxed); + } + } })?; - thread_pool.push(rpc_thread); + maker.thread_pool.add_thread(rpc_thread); sleep(Duration::from_secs(heart_beat_interval)); // wait for 1 beat, to complete spawns of all the threads. maker.is_setup_complete.store(true, Relaxed); @@ -516,18 +527,15 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { match listener.accept() { Ok((mut stream, client_addr)) => { log::info!("[{}] Spawning Client Handler thread", maker.config.port); - + let maker_for_handler = maker.clone(); let client_handler_thread = thread::Builder::new() .name("Client Handler Thread".to_string()) .spawn(move || { - if let Err(e) = handle_client(maker, &mut stream, client_addr) { + if let Err(e) = handle_client(maker_for_handler, &mut stream, client_addr) { log::error!("[{}] Error Handling client request {:?}", port, e); - Err(e) - } else { - Ok(()) } })?; - thread_pool.push(client_handler_thread); + maker.thread_pool.add_thread(client_handler_thread); } Err(e) => { @@ -549,21 +557,6 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] Maker is shutting down.", port); - // Shuting down. Join all the threads. - for thread in thread_pool { - log::info!( - "[{}] Closing Thread: {}", - port, - thread.thread().name().expect("Thread name expected") - ); - let join_result = thread.join(); - if let Ok(r) = join_result { - log::info!("[{}] Thread closing result: {:?}", port, r) - } else if let Err(e) = join_result { - log::info!("[{}] error in internal thread: {:?}", port, e); - } - } - if maker.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") { crate::tor::kill_tor_handles(tor_thread.expect("Tor thread expected")); }