Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] implement concurrent orchestrator #9

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions src/committee/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use jsonrpsee_types::{ErrorObjectOwned, Params};
use log::{debug, error, info};
use secp256k1::XOnlyPublicKey;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio_stream::StreamExt as _;

use crate::{
bob_request::{BobRequest, BobResponse},
Expand Down Expand Up @@ -75,44 +77,72 @@ impl Orchestrator {
// Round 1
//

let mut commitments_map = BTreeMap::new();

// pick a threshold of members at random
// TODO: AT RANDOM!
let threshold_of_members = self
.committee_cfg
.members
.iter()
.take(self.committee_cfg.threshold)
.collect_vec();
// query all members until a threshold of them responds
let results = Arc::new(Mutex::new(vec![]));
let mut all_members = self.committee_cfg.members.iter().collect_vec();
use rand::seq::SliceRandom;
all_members.shuffle(&mut rand::thread_rng());
let mut stream = tokio_stream::iter(&all_members);
while let Some((member_id, member)) = stream.next().await {
// stop now if we already reached enough responses
{
if results.lock().await.len() >= self.committee_cfg.threshold {}
}

// TODO: do this concurrently with async
// TODO: take a random sample instead of the first `threshold` members
// TODO: what if we get a timeout or can't meet that threshold? loop? send to more members?
for (member_id, member) in &threshold_of_members {
// send json RPC request
let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None);
let resp = json_rpc_request(
let resp = match json_rpc_request(
&rpc_ctx,
"round_1_signing",
&[serde_json::value::to_raw_value(&bob_request).unwrap()],
)
.await
.context("rpc request to committee didn't work");
debug!("{:?}", resp);
let resp = resp?;

let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?;
let resp: Round1Response = response.result()?;
{
Ok(x) => x,
Err(err) => {
info!("member {member_id:?} errored in round 1: {err}");
continue;
}
};

// parse the response json rpc type
let response: bitcoincore_rpc::jsonrpc::Response = match serde_json::from_str(&resp) {
Ok(x) => x,
Err(err) => {
info!("member {member_id:?} errored in round 1 parsing response: {err}");
continue;
}
};

// parse the actual response
let round1_response: Round1Response = match response.result() {
Ok(x) => x,
Err(err) => {
info!(
"member {member_id:?} errored in round 1 parsing round 1 response: {err}"
);
continue;
}
};

// store the commitment
commitments_map.insert(**member_id, resp.commitments);
results
.lock()
.await
.push((member_id, member, round1_response.commitments));
}

//
// Produce transaction and digest
//

let message = get_digest_to_hash(&bob_request.prev_outs, &bob_request.tx, &smart_contract)?;
let commitments_map: BTreeMap<_, _> = results
.lock()
.await
.iter()
.map(|(member_id, _, commitments)| (***member_id, commitments.clone()))
.collect();

//
// Round 2
Expand All @@ -130,7 +160,7 @@ impl Orchestrator {
// TODO: do this concurrently with async
// TODO: take a random sample instead of the first `threshold` members
// TODO: what if we get a timeout or can't meet that threshold? loop? send to more members?
for (member_id, member) in &threshold_of_members {
for (member_id, member, _) in results.lock().await.iter() {
// send json RPC request
let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None);
let resp = json_rpc_request(
Expand All @@ -147,7 +177,7 @@ impl Orchestrator {
let round2_response: Round2Response = response.result()?;

// store the commitment
signature_shares.insert(**member_id, round2_response.signature_share);
signature_shares.insert(***member_id, round2_response.signature_share);
}

//
Expand Down
Loading