Skip to content

Commit

Permalink
Merge pull request #316 from claddyy/rec_threads
Browse files Browse the repository at this point in the history
Add threadpool to join the recovery threads
  • Loading branch information
mojoX911 authored Dec 16, 2024
2 parents 6e5ce00 + 014af60 commit b80244a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 58 deletions.
90 changes: 68 additions & 22 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@
//! contract broadcasts and handle idle Taker connections. Additionally, it handles recovery by broadcasting
//! contract transactions and claiming funds after an unsuccessful swap event.
use std::{
collections::HashMap,
net::IpAddr,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
};

use bip39::Mnemonic;
use bitcoin::{
ecdsa::Signature,
secp256k1::{self, Secp256k1},
OutPoint, PublicKey, ScriptBuf, Transaction,
};
use bitcoind::bitcoincore_rpc::RpcApi;

use crate::{
protocol::{
contract::check_hashvalues_are_equal,
Expand All @@ -36,6 +17,24 @@ use crate::{
},
wallet::{RPCConfig, SwapCoin, WalletSwapCoin},
};
use bip39::Mnemonic;
use bitcoin::{
ecdsa::Signature,
secp256k1::{self, Secp256k1},
OutPoint, PublicKey, ScriptBuf, Transaction,
};
use bitcoind::bitcoincore_rpc::RpcApi;
use std::{
collections::HashMap,
net::IpAddr,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex, RwLock,
},
thread::JoinHandle,
time::{Duration, Instant},
};

use crate::{
protocol::{
Expand Down Expand Up @@ -92,6 +91,48 @@ pub struct ConnectionState {
pub pending_funding_txes: Vec<Transaction>,
}

pub struct ThreadPool {
pub threads: Mutex<Vec<JoinHandle<()>>>,
}

impl Default for ThreadPool {
fn default() -> Self {
Self::new()
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
if let Err(e) = self.join_all_threads() {
log::error!("Error joining threads in via drop: {:?}", e);
}
}
}

impl ThreadPool {
pub fn new() -> Self {
Self {
threads: Mutex::new(Vec::new()),
}
}

pub fn add_thread(&self, handle: JoinHandle<()>) {
let mut threads = self.threads.lock().unwrap();
threads.push(handle);
}
#[inline]
fn join_all_threads(&self) -> Result<(), MakerError> {
let mut threads = self
.threads
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;
while let Some(thread) = threads.pop() {
thread.join().unwrap();
}
Ok(())
}
}

/// Represents the maker in the swap protocol.
pub struct Maker {
/// Defines special maker behavior, only applicable for testing
Expand All @@ -108,6 +149,8 @@ pub struct Maker {
pub highest_fidelity_proof: RwLock<Option<FidelityProof>>,
/// Is setup complete
pub is_setup_complete: AtomicBool,
/// Thread pool for managing all spawned threads
pub thread_pool: Arc<ThreadPool>,
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -222,6 +265,7 @@ impl Maker {
connection_state: Mutex::new(HashMap::new()),
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
thread_pool: Arc::new(ThreadPool::new()),
})
}

Expand Down Expand Up @@ -466,9 +510,10 @@ pub fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), MakerErr
"[{}] Spawning recovery thread after seeing contracts in mempool",
maker.config.port
);
std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
});
maker.thread_pool.add_thread(handle);
// Clear the state value here
*connection_state = ConnectionState::default();
break;
Expand Down Expand Up @@ -547,9 +592,10 @@ pub fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError> {
"[{}] Spawning recovery thread after Taker dropped",
maker.config.port
);
std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
// Clear the state values here
*state = ConnectionState::default();
break;
Expand Down
5 changes: 3 additions & 2 deletions src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,10 @@ fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
}
// Spawn a separate thread to wait for contract maturity and broadcasting timelocked.
let maker_clone = maker.clone();
std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
}
Ok(())
}
61 changes: 27 additions & 34 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,22 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
// Global server Mutex, to switch on/off p2p network.
let accepting_clients = Arc::new(AtomicBool::new(false));

// Spawn Server threads.
// All thread handles are stored in the thread_pool, which are all joined at server shutdown.
let mut thread_pool = Vec::new();

if !maker.shutdown.load(Relaxed) {
// 1. Bitcoin Core Connection checker thread.
// Ensures that Bitcoin Core connection is live.
// If not, it will block p2p connections until Core works again.
let maker_clone = maker.clone();
let acc_client_clone = accepting_clients.clone();
let conn_check_thread: thread::JoinHandle<Result<(), MakerError>> = thread::Builder::new()
let conn_check_thread = thread::Builder::new()
.name("Bitcoin Core Connection Checker Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning Bitcoin Core connection checker thread", port);
check_connection_with_core(maker_clone, acc_client_clone)
if let Err(e) = check_connection_with_core(maker_clone.clone(), acc_client_clone) {
log::error!("[{}] Bitcoin Core connection check failed: {:?}", port, e);
maker_clone.shutdown.store(true, Relaxed);
}
})?;
thread_pool.push(conn_check_thread);
maker.thread_pool.add_thread(conn_check_thread);

// 2. Idle Client connection checker thread.
// This threads check idelness of peer in live swaps.
Expand All @@ -462,9 +461,12 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
"[{}] Spawning Client connection status checker thread",
port
);
check_for_idle_states(maker_clone.clone())
if let Err(e) = check_for_idle_states(maker_clone.clone()) {
log::error!("Failed checking client's idle state {:?}", e);
maker_clone.shutdown.store(true, Relaxed);
}
})?;
thread_pool.push(idle_conn_check_thread);
maker.thread_pool.add_thread(idle_conn_check_thread);

// 3. Watchtower thread.
// This thread checks for broadcasted contract transactions, which usually means violation of the protocol.
Expand All @@ -475,9 +477,12 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.name("Contract Watcher Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning contract-watcher thread", port);
check_for_broadcasted_contracts(maker_clone.clone())
if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) {
maker_clone.shutdown.store(true, Relaxed);
log::error!("Failed checking broadcasted contracts {:?}", e);
}
})?;
thread_pool.push(contract_watcher_thread);
maker.thread_pool.add_thread(contract_watcher_thread);

// 4: The RPC server thread.
// User for responding back to `maker-cli` apps.
Expand All @@ -486,10 +491,16 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.name("RPC Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning RPC server thread", port);
start_rpc_server(maker_clone)
match start_rpc_server(maker_clone.clone()) {
Ok(_) => (),
Err(e) => {
log::error!("Failed starting rpc server {:?}", e);
maker_clone.shutdown.store(true, Relaxed);
}
}
})?;

thread_pool.push(rpc_thread);
maker.thread_pool.add_thread(rpc_thread);

sleep(Duration::from_secs(heart_beat_interval)); // wait for 1 beat, to complete spawns of all the threads.
maker.is_setup_complete.store(true, Relaxed);
Expand All @@ -516,18 +527,15 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
match listener.accept() {
Ok((mut stream, client_addr)) => {
log::info!("[{}] Spawning Client Handler thread", maker.config.port);

let maker_for_handler = maker.clone();
let client_handler_thread = thread::Builder::new()
.name("Client Handler Thread".to_string())
.spawn(move || {
if let Err(e) = handle_client(maker, &mut stream, client_addr) {
if let Err(e) = handle_client(maker_for_handler, &mut stream, client_addr) {
log::error!("[{}] Error Handling client request {:?}", port, e);
Err(e)
} else {
Ok(())
}
})?;
thread_pool.push(client_handler_thread);
maker.thread_pool.add_thread(client_handler_thread);
}

Err(e) => {
Expand All @@ -549,21 +557,6 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {

log::info!("[{}] Maker is shutting down.", port);

// Shuting down. Join all the threads.
for thread in thread_pool {
log::info!(
"[{}] Closing Thread: {}",
port,
thread.thread().name().expect("Thread name expected")
);
let join_result = thread.join();
if let Ok(r) = join_result {
log::info!("[{}] Thread closing result: {:?}", port, r)
} else if let Err(e) = join_result {
log::info!("[{}] error in internal thread: {:?}", port, e);
}
}

if maker.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") {
crate::tor::kill_tor_handles(tor_thread.expect("Tor thread expected"));
}
Expand Down

0 comments on commit b80244a

Please sign in to comment.