diff --git a/src/committee/orchestrator.rs b/src/committee/orchestrator.rs index c6dca68..ed012ac 100644 --- a/src/committee/orchestrator.rs +++ b/src/committee/orchestrator.rs @@ -12,8 +12,7 @@ use bitcoin::{ key::{TapTweak, UntweakedPublicKey}, secp256k1, taproot, TapSighashType, Witness, }; -use frost_secp256k1_tr::Ciphersuite; -use frost_secp256k1_tr::Group; +use frost_secp256k1_tr::{Ciphersuite, Group, Identifier}; use futures::future::join_all; use itertools::Itertools; use jsonrpsee::{server::Server, RpcModule}; @@ -40,14 +39,12 @@ use super::node::{Round2Request, Round2Response}; // Orchestration logic // -type RpcOrchestratorContext = Arc<(Orchestrator, Arc>)>; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitteeConfig { pub threshold: usize, // TODO: We could use a Vec instead of a HashMap for the members, since it would be more efficient. // We do not currently need hashmap functionality, but we might later, so left unchanged. - pub members: HashMap, + pub members: HashMap, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] @@ -61,8 +58,8 @@ pub enum MemberStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatusResponse { - pub online_members: Vec, - pub offline_members: Vec, + pub online_members: Vec, + pub offline_members: Vec, } // This will be the second part in the RpcModule context wrapped in a RwLock. @@ -70,8 +67,8 @@ pub struct StatusResponse { // to wait for a read lock every time an rpc handler needs to access the config. // This way, handlers will only wait for read locks when they need the status of the members. pub struct MemberStatusState { - pub key_to_addr: HashMap, - pub status: HashMap, + pub key_to_addr: HashMap, + pub status: HashMap, } impl MemberStatusState { @@ -110,11 +107,11 @@ impl MemberStatusState { } } - pub fn get_member_status(&self, key: &frost_secp256k1_tr::Identifier) -> MemberStatus { + pub fn get_member_status(&self, key: &Identifier) -> MemberStatus { *self.status.get(key).unwrap() } - pub fn mark_as_disconnected(&mut self, key: &frost_secp256k1_tr::Identifier) { + pub fn mark_as_disconnected(&mut self, key: &Identifier) { let m_status = self.status.get_mut(key).unwrap(); *m_status = MemberStatus::Disconnected(( Self::get_current_time_secs() + Self::get_next_fibonacci_backoff_delay(0), @@ -122,17 +119,12 @@ impl MemberStatusState { )) } - pub fn mark_as_offline(&mut self, key: &frost_secp256k1_tr::Identifier) { + pub fn mark_as_offline(&mut self, key: &Identifier) { let m_status = self.status.get_mut(key).unwrap(); *m_status = MemberStatus::Offline; } - pub fn get_status( - &self, - ) -> ( - Vec, - Vec, - ) { + pub fn get_status(&self) -> (Vec, Vec) { let mut online = Vec::new(); let mut offline = Vec::new(); @@ -204,22 +196,20 @@ impl MemberStatusState { // Get array of members which are not *permanently* offline let members_to_check = { + let current_time = Self::get_current_time_secs(); let r_lock = state.read().unwrap(); r_lock - .key_to_addr + .status .iter() - .map(|(key, addr)| { - let status = *r_lock.status.get(key).unwrap(); - (*key, addr.clone(), status) + .filter(|(_, s)| { + matches!(s, MemberStatus::Online) + | matches!(s, MemberStatus::Disconnected((r, _)) if *r <= current_time) }) - .filter(|(_, _, status)| match *status { - MemberStatus::Online => true, - MemberStatus::Disconnected((retry_time, _)) => { - retry_time <= Self::get_current_time_secs() - } - _ => false, + .map(|(key, status)| { + let addr = r_lock.key_to_addr.get(key).unwrap(); + (*key, addr.clone(), *status) }) - .collect::>() + .collect_vec() }; // check alive for each one of them @@ -246,7 +236,8 @@ impl MemberStatusState { let member_status = state_w.status.get_mut(key).unwrap(); let last_retries = match old_status { MemberStatus::Disconnected((_, last_retry_number)) => *last_retry_number, - _ => 0, + MemberStatus::Online => 0, + MemberStatus::Offline => continue, }; if last_retries > KEEPALIVE_MAX_RETRIES { @@ -276,25 +267,24 @@ pub struct Member { pub struct Orchestrator { pub pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage, pub committee_cfg: CommitteeConfig, + pub member_status: Arc>, } impl Orchestrator { pub fn new( pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage, committee_cfg: CommitteeConfig, + member_status: Arc>, ) -> Self { Self { pubkey_package, committee_cfg, + member_status, } } /// Handles bob request from A to Z. - pub async fn handle_request( - &self, - bob_request: &BobRequest, - member_status: Arc>, - ) -> Result { + pub async fn handle_request(&self, bob_request: &BobRequest) -> Result { // Validate transaction before forwarding it, and get smart contract let smart_contract = bob_request.validate_request().await?; @@ -308,7 +298,7 @@ impl Orchestrator { let mut commitments_map = BTreeMap::new(); let mut available_members = { - let ms_r = member_status.read().unwrap(); + let ms_r = self.member_status.read().unwrap(); self.committee_cfg .members .iter() @@ -345,17 +335,25 @@ impl Orchestrator { Ok(x) => x, Err(rpc_error) => { warn!("Round 1 error with {}, marking as disconnected and retrying round 1: {rpc_error}", member.address); - let mut ms_w = member_status.write().unwrap(); + let mut ms_w = self.member_status.write().unwrap(); ms_w.mark_as_disconnected(member_id); continue 'retry; } }; - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let resp: Round1Response = response.result()?; + if let Ok(response) = + serde_json::from_str::(&resp) + { + let resp: Round1Response = response.result()?; - // store the commitment - commitments_map.insert(*member_id, resp.commitments); + // store the commitment + commitments_map.insert(*member_id, resp.commitments); + } else { + warn!("Round 1 error with {}, marking as offline and retrying from round 1: deserialize error", member.address); + let mut ms_w = self.member_status.write().unwrap(); + ms_w.mark_as_disconnected(member_id); + continue 'retry; + } } // @@ -400,17 +398,25 @@ impl Orchestrator { Ok(x) => x, Err(rpc_error) => { warn!("Round 2 error with {}, marking as offline and retrying from round 1: {rpc_error}", member.address); - let mut ms_w = member_status.write().unwrap(); + let mut ms_w = self.member_status.write().unwrap(); ms_w.mark_as_offline(member_id); continue 'retry; } }; - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let round2_response: Round2Response = response.result()?; + if let Ok(response) = + serde_json::from_str::(&resp) + { + let round2_response: Round2Response = response.result()?; - // store the commitment - signature_shares.insert(*member_id, round2_response.signature_share); + // store the commitment + signature_shares.insert(*member_id, round2_response.signature_share); + } else { + warn!("Round 2 error with {}, marking as offline and retrying from round 1: deserialize error", member.address); + let mut ms_w = self.member_status.write().unwrap(); + ms_w.mark_as_offline(member_id); + continue 'retry; + } } // @@ -527,34 +533,30 @@ impl Orchestrator { /// Bob's request to unlock funds from a smart contract. async fn unlock_funds( params: Params<'static>, - context: RpcOrchestratorContext, + context: Arc, ) -> RpcResult { // get bob request let bob_request: [BobRequest; 1] = params.parse()?; let bob_request = &bob_request[0]; info!("received request: {:?}", bob_request); - let bob_response = context - .0 - .handle_request(bob_request, context.1.clone()) - .await - .map_err(|e| { - ErrorObjectOwned::owned( - jsonrpsee_types::error::UNKNOWN_ERROR_CODE, - "error while unlocking funds", - Some(format!("the request didn't validate: {e}")), - ) - })?; + let bob_response = context.handle_request(bob_request).await.map_err(|e| { + ErrorObjectOwned::owned( + jsonrpsee_types::error::UNKNOWN_ERROR_CODE, + "error while unlocking funds", + Some(format!("the request didn't validate: {e}")), + ) + })?; RpcResult::Ok(bob_response) } async fn get_nodes_status( _params: Params<'static>, - context: RpcOrchestratorContext, + context: Arc, ) -> RpcResult { let (online, offline) = { - let mss_r = context.1.read().unwrap(); + let mss_r = context.member_status.read().unwrap(); mss_r.get_status() }; @@ -576,13 +578,11 @@ pub async fn run_server( let mss_thread_copy = member_status_state.clone(); tokio::spawn(async move { MemberStatusState::keepalive_thread(mss_thread_copy).await }); - let ctx = ( - Orchestrator { - pubkey_package, - committee_cfg, - }, - member_status_state.clone(), - ); + let ctx = Orchestrator { + pubkey_package, + committee_cfg, + member_status: member_status_state, + }; let server = Server::builder() .build(address.parse::()?)