From 3dc66b368ed8465729406d9efbbcae929757a9ef Mon Sep 17 00:00:00 2001 From: David Wong Date: Sat, 20 Jan 2024 21:31:32 -0800 Subject: [PATCH] [WIP] implement concurrent orchestrator --- src/committee/orchestrator.rs | 78 ++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/src/committee/orchestrator.rs b/src/committee/orchestrator.rs index 9ee65a7..4884309 100644 --- a/src/committee/orchestrator.rs +++ b/src/committee/orchestrator.rs @@ -20,6 +20,8 @@ use jsonrpsee_types::{ErrorObjectOwned, Params}; use log::{debug, error, info}; use secp256k1::XOnlyPublicKey; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tokio_stream::StreamExt as _; use crate::{ bob_request::{BobRequest, BobResponse}, @@ -75,44 +77,72 @@ impl Orchestrator { // Round 1 // - let mut commitments_map = BTreeMap::new(); - - // pick a threshold of members at random - // TODO: AT RANDOM! - let threshold_of_members = self - .committee_cfg - .members - .iter() - .take(self.committee_cfg.threshold) - .collect_vec(); + // query all members until a threshold of them responds + let results = Arc::new(Mutex::new(vec![])); + let mut all_members = self.committee_cfg.members.iter().collect_vec(); + use rand::seq::SliceRandom; + all_members.shuffle(&mut rand::thread_rng()); + let mut stream = tokio_stream::iter(&all_members); + while let Some((member_id, member)) = stream.next().await { + // stop now if we already reached enough responses + { + if results.lock().await.len() >= self.committee_cfg.threshold {} + } - // TODO: do this concurrently with async - // TODO: take a random sample instead of the first `threshold` members - // TODO: what if we get a timeout or can't meet that threshold? loop? send to more members? - for (member_id, member) in &threshold_of_members { // send json RPC request let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); - let resp = json_rpc_request( + let resp = match json_rpc_request( &rpc_ctx, "round_1_signing", &[serde_json::value::to_raw_value(&bob_request).unwrap()], ) .await - .context("rpc request to committee didn't work"); - debug!("{:?}", resp); - let resp = resp?; - - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let resp: Round1Response = response.result()?; + { + Ok(x) => x, + Err(err) => { + info!("member {member_id:?} errored in round 1: {err}"); + continue; + } + }; + + // parse the response json rpc type + let response: bitcoincore_rpc::jsonrpc::Response = match serde_json::from_str(&resp) { + Ok(x) => x, + Err(err) => { + info!("member {member_id:?} errored in round 1 parsing response: {err}"); + continue; + } + }; + + // parse the actual response + let round1_response: Round1Response = match response.result() { + Ok(x) => x, + Err(err) => { + info!( + "member {member_id:?} errored in round 1 parsing round 1 response: {err}" + ); + continue; + } + }; // store the commitment - commitments_map.insert(**member_id, resp.commitments); + results + .lock() + .await + .push((member_id, member, round1_response.commitments)); } // // Produce transaction and digest // + let message = get_digest_to_hash(&bob_request.prev_outs, &bob_request.tx, &smart_contract)?; + let commitments_map: BTreeMap<_, _> = results + .lock() + .await + .iter() + .map(|(member_id, _, commitments)| (***member_id, commitments.clone())) + .collect(); // // Round 2 @@ -130,7 +160,7 @@ impl Orchestrator { // TODO: do this concurrently with async // TODO: take a random sample instead of the first `threshold` members // TODO: what if we get a timeout or can't meet that threshold? loop? send to more members? - for (member_id, member) in &threshold_of_members { + for (member_id, member, _) in results.lock().await.iter() { // send json RPC request let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); let resp = json_rpc_request( @@ -147,7 +177,7 @@ impl Orchestrator { let round2_response: Round2Response = response.result()?; // store the commitment - signature_shares.insert(**member_id, round2_response.signature_share); + signature_shares.insert(***member_id, round2_response.signature_share); } //