diff --git a/Cargo.toml b/Cargo.toml index b5a661b..092ab44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ bitcoincore-rpc = "0.18" clap = { version = "4.4.10", features = ["derive", "env"] } env_logger = "0.10.1" frost-secp256k1-tr = { git = "https://github.com/mimoo/frost", branch = "mimoo/fix5" } +futures = "0.3.30" hex = "0.4.3" home = "0.5.9" itertools = "0.12.0" diff --git a/README.md b/README.md index d5ee0e1..26b357e 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,15 @@ Jump straight to [usage](#usage) if you want to see some examples, but make sure ### Circom/snarkjs -We build on top of the well-known [circom](https://github.com/iden3/circom)/[snarkjs](https://github.com/iden3/snarkjs) stack. +We build on top of the well-known [circom](https://github.com/iden3/circom)/[snarkjs](https://github.com/iden3/snarkjs) stack. + +To install `circom`, please follow [their guide](https://docs.circom.io/getting-started/installation/). + +To install `snarkjs`, just run: + +``` +npm install -g snarkjs@latest +``` ### Bitcoin wallet @@ -44,7 +52,7 @@ There are two types of zkapps: [stateless](#stateless-zkapps) and [stateful](#st ### Stateless zkapps -A stateless zkapp is single-use, and the bitcoin it locks can be redeemed by anyone who can provide a proof of correct execution. An example of a stateless zkapp is in [`examples/circuit/stateless.circom`](examples/circuit/stateless.circom) (which releases funds to anyone who can find the preimage of a hash function). +A stateless zkapp is single-use, and the bitcoin it locks can be redeemed by anyone who can provide a proof of correct execution. An example of a stateless zkapp is in [`examples/circuit/stateless.circom`](examples/circuit/stateless.circom) (which releases funds to anyone who can find the preimage of a hash function). A stateless zkapp must always contains one public input that authenticates the transaction that spends it: ```circom @@ -66,7 +74,7 @@ $ zkbtc deploy-zkapp --circom-circuit-path examples/circuit/stateless.circom --s This will lock 1,000 satoshis in the zkapp and return the transaction ID of the transaction that deployed the zkapp. A stateless zkapp can be referenced by that transaction ID. -Bob can then unlock the funds from the stateless zkapp with the following command: +Bob can then unlock the funds from the stateless zkapp with the following command: ```shell $ zkbtc use-zkapp --txid "e793bdd8dfdd9912d971790a5f385ad3f1215dce97e25dbefe5449faba632836" --circom-circuit-path examples/circuit/stateless.circom --proof-inputs '{"preimage":["1"]}' --recipient-address "tb1q6nkpv2j9lxrm6h3w4skrny3thswgdcca8cx9k6" @@ -94,7 +102,7 @@ component main{public [prev_state, truncated_txid, amount_out, amount_in]} = Mai You can deploy a stateful zkapp with the following command: ```shell -$ zkbtc deploy-zkapp --circom-circuit-path examples/circuit/stateful.circom --initial-state "1" --satoshi-amount 1000 +$ zkbtc deploy-zkapp --circom-circuit-path examples/circuit/stateful.circom --initial-state "1" --satoshi-amount 1000 ``` You can use a stateful zkapps with the following command: @@ -105,8 +113,8 @@ $ zkbtc use-zkapp --circom-circuit-path examples/circuit/stateful.circom --proof specifying the following inputs: -* `amount_out`: amount being withdrawn -* `amount_in`: amount being deposited +- `amount_out`: amount being withdrawn +- `amount_in`: amount being deposited Other inputs will be automatically filled in (for example, it will use the zkapp's state as `prev_state` input). diff --git a/src/bin/zkbtc.rs b/src/bin/zkbtc.rs index edd050f..a675207 100644 --- a/src/bin/zkbtc.rs +++ b/src/bin/zkbtc.rs @@ -404,6 +404,8 @@ async fn main() -> Result<()> { let filename = format!("key-{id}.json"); let path = output_dir.join(filename); + std::fs::create_dir_all(path.clone().parent().unwrap()) + .expect("Couldn't create directory"); let file = std::fs::File::create(&path) .expect("couldn't create file given output dir"); serde_json::to_writer_pretty(file, key_package).unwrap(); diff --git a/src/committee/node.rs b/src/committee/node.rs index be12c7e..3360815 100644 --- a/src/committee/node.rs +++ b/src/committee/node.rs @@ -211,8 +211,8 @@ async fn round_2_signing( RpcResult::Ok(round2_response) } -async fn is_alive(_params: Params<'static>, _context: Arc) -> String { - "hello".to_string() +async fn is_alive(params: Params<'static>, _context: Arc) -> RpcResult { + Ok(params.parse::<[u64; 1]>()?[0].clone()) } // @@ -243,7 +243,7 @@ pub async fn run_server( module.register_async_method("round_1_signing", round_1_signing)?; module.register_async_method("round_2_signing", round_2_signing)?; - module.register_async_method("is_alive", is_alive)?; + module.register_async_method("ping", is_alive)?; let addr = server.local_addr()?; let handle = server.start(module); diff --git a/src/committee/orchestrator.rs b/src/committee/orchestrator.rs index 9ee65a7..41181ec 100644 --- a/src/committee/orchestrator.rs +++ b/src/committee/orchestrator.rs @@ -2,7 +2,8 @@ use std::{ collections::{BTreeMap, HashMap}, net::SocketAddr, str::FromStr, - sync::Arc, + sync::{Arc, RwLock}, + time::{Duration, SystemTime}, }; use anyhow::{Context, Result}; @@ -13,18 +14,21 @@ use bitcoin::{ }; use frost_secp256k1_tr::Ciphersuite; use frost_secp256k1_tr::Group; +use futures::future::join_all; use itertools::Itertools; use jsonrpsee::{server::Server, RpcModule}; use jsonrpsee_core::RpcResult; use jsonrpsee_types::{ErrorObjectOwned, Params}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; +use rand::seq::SliceRandom; use secp256k1::XOnlyPublicKey; use serde::{Deserialize, Serialize}; +use tokio::time::sleep; use crate::{ bob_request::{BobRequest, BobResponse}, committee::node::Round1Response, - constants::ZKBITCOIN_PUBKEY, + constants::{KEEPALIVE_MAX_RETRIES, KEEPALIVE_WAIT_SECONDS, ZKBITCOIN_PUBKEY}, frost, json_rpc_stuff::{json_rpc_request, RpcCtx}, mpc_sign_tx::get_digest_to_hash, @@ -36,12 +40,235 @@ use super::node::{Round2Request, Round2Response}; // Orchestration logic // +type RpcOrchestratorContext = Arc<(Orchestrator, Arc>)>; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitteeConfig { pub threshold: usize, + // TODO: We could use a Vec instead of a HashMap for the members, since it would be more efficient. + // We do not currently need hashmap functionality, but we might later, so left unchanged. pub members: HashMap, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +pub enum MemberStatus { + Online, + /// Disconnected members contain a tuple with the next connect retry time and the last retry number + Disconnected((u64, u8)), + /// Will no longer retry + Offline, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StatusResponse { + pub online_members: Vec, + pub offline_members: Vec, +} + +// This will be the second part in the RpcModule context wrapped in a RwLock. +// I think this is better than including it in the actual CommitteeConfig since handlers will need +// to wait for a read lock every time an rpc handler needs to access the config. +// This way, handlers will only wait for read locks when they need the status of the members. +pub struct MemberStatusState { + pub key_to_addr: HashMap, + pub status: HashMap, +} + +impl MemberStatusState { + pub async fn new(config: &CommitteeConfig) -> Self { + let mut key_to_addr = HashMap::new(); + let mut status = HashMap::new(); + let mut futures = Vec::with_capacity(config.members.len()); + + for (key, member) in config.members.iter() { + let _ = key_to_addr.insert(key.clone(), member.address.clone()); + futures.push(( + key.clone(), + Self::check_alive(member.address.clone()), + member.address.clone(), + )); + } + + for (key, future, address) in futures.into_iter() { + let new_status = if future.await == true { + info!("{address} is online"); + MemberStatus::Online + } else { + let delay = Self::get_next_fibonacci_backoff_delay(0); + warn!( + "{address} is offline. Re-trying in {delay} seconds... (1/{KEEPALIVE_MAX_RETRIES})" + ); + MemberStatus::Disconnected((Self::get_current_time_secs() + delay, 1)) + }; + + let _ = status.insert(key, new_status); + } + + Self { + key_to_addr, + status, + } + } + + pub fn get_member_status(&self, key: &frost_secp256k1_tr::Identifier) -> MemberStatus { + *self.status.get(key).unwrap() + } + + pub fn mark_as_disconnected(&mut self, key: &frost_secp256k1_tr::Identifier) { + let m_status = self.status.get_mut(key).unwrap(); + *m_status = MemberStatus::Disconnected(( + Self::get_current_time_secs() + Self::get_next_fibonacci_backoff_delay(0), + 1, + )) + } + + pub fn mark_as_offline(&mut self, key: &frost_secp256k1_tr::Identifier) { + let m_status = self.status.get_mut(key).unwrap(); + *m_status = MemberStatus::Offline; + } + + pub fn get_status( + &self, + ) -> ( + Vec, + Vec, + ) { + let mut online = Vec::new(); + let mut offline = Vec::new(); + + for (member, status) in self.status.iter() { + match *status { + MemberStatus::Online => online.push(member.clone()), + MemberStatus::Offline | MemberStatus::Disconnected(_) => { + offline.push(member.clone()) + } + }; + } + + (online, offline) + } + + fn get_current_time_secs() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + } + + fn fib(n: u64) -> u64 { + if n <= 0 { + return 0; + } else if n == 1 { + return 1; + } else { + return Self::fib(n - 1) + Self::fib(n - 2); + } + } + + fn get_next_fibonacci_backoff_delay(retry: u8) -> u64 { + // Offset fib sequence by 5 to get better backoff times + Self::fib(retry as u64 + 5) + } + + async fn check_alive(address: String) -> bool { + let data = Self::get_current_time_secs(); + let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(address.clone()), None, None); + match json_rpc_request( + &rpc_ctx, + "ping", + &[serde_json::value::to_raw_value(&data).unwrap()], + ) + .await + { + Err(_) => false, + Ok(resp_str) => { + // Sanity check + if let Ok(resp) = + serde_json::from_str::(&resp_str) + { + if let Some(resp_data) = resp.result { + resp_data.get().parse::().unwrap_or_default() == data + } else { + false + } + } else { + false + } + } + } + } + + pub async fn keepalive_thread(state: Arc>) { + debug!("Keepalive thread started"); + loop { + // Sleep + sleep(Duration::from_secs(KEEPALIVE_WAIT_SECONDS)).await; + // Get array of members which are not *permanently* offline + + let members_to_check = { + let r_lock = state.read().unwrap(); + r_lock + .key_to_addr + .iter() + .map(|(key, addr)| { + let status = r_lock.status.get(key).unwrap().clone(); + (key.clone(), addr.clone(), status) + }) + .filter(|(_, _, status)| match *status { + MemberStatus::Online => true, + MemberStatus::Disconnected((retry_time, _)) => { + retry_time <= Self::get_current_time_secs() + } + _ => false, + }) + .collect::>() + }; + + // check alive for each one of them + let mut futures = Vec::with_capacity(members_to_check.len()); + for member_data in members_to_check.iter() { + futures.push(Self::check_alive(member_data.1.clone())); + } + + // resolve futures and update state + let keepalive_resp = join_all(futures).await; + for (idx, resp) in keepalive_resp.iter().enumerate() { + let (key, address, old_status) = &members_to_check[idx]; + if *resp { + if *old_status != MemberStatus::Online { + let mut state_w = state.write().unwrap(); + let member_status = state_w.status.get_mut(key).unwrap(); + *member_status = MemberStatus::Online; + info!("{address} is back online"); + } else { + debug!("{address} is online"); + } + } else { + let mut state_w = state.write().unwrap(); + let member_status = state_w.status.get_mut(key).unwrap(); + let last_retries = match old_status { + MemberStatus::Disconnected((_, last_retry_number)) => *last_retry_number, + _ => 0, + }; + + if last_retries > KEEPALIVE_MAX_RETRIES { + error!("{address} is offline. Will not retry connection"); + *member_status = MemberStatus::Offline; + } else { + let new_retries = last_retries + 1; + let delay = Self::get_next_fibonacci_backoff_delay(new_retries); + warn!("{address} is offline. Re-trying in {delay} seconds... ({new_retries}/{KEEPALIVE_MAX_RETRIES})"); + *member_status = MemberStatus::Disconnected(( + Self::get_current_time_secs() + delay, + new_retries, + )); + } + } + } + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Member { /// e.g. "127.0.0.1:8887" @@ -65,191 +292,233 @@ impl Orchestrator { } /// Handles bob request from A to Z. - pub async fn handle_request(&self, bob_request: &BobRequest) -> Result { + pub async fn handle_request( + &self, + bob_request: &BobRequest, + member_status: Arc>, + ) -> Result { // Validate transaction before forwarding it, and get smart contract let smart_contract = bob_request.validate_request().await?; // TODO: we might want to check that the zkapp/UTXO is unspent here, but this requires us to have access to a bitcoin node, so for now we don't do it :o) - // - // Round 1 - // - - let mut commitments_map = BTreeMap::new(); - - // pick a threshold of members at random - // TODO: AT RANDOM! - let threshold_of_members = self - .committee_cfg - .members - .iter() - .take(self.committee_cfg.threshold) - .collect_vec(); - - // TODO: do this concurrently with async - // TODO: take a random sample instead of the first `threshold` members - // TODO: what if we get a timeout or can't meet that threshold? loop? send to more members? - for (member_id, member) in &threshold_of_members { - // send json RPC request - let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); - let resp = json_rpc_request( - &rpc_ctx, - "round_1_signing", - &[serde_json::value::to_raw_value(&bob_request).unwrap()], - ) - .await - .context("rpc request to committee didn't work"); - debug!("{:?}", resp); - let resp = resp?; - - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let resp: Round1Response = response.result()?; - - // store the commitment - commitments_map.insert(**member_id, resp.commitments); - } - - // - // Produce transaction and digest - // - let message = get_digest_to_hash(&bob_request.prev_outs, &bob_request.tx, &smart_contract)?; - - // - // Round 2 - // - - let mut signature_shares = BTreeMap::new(); - - let round2_request = Round2Request { - txid: bob_request.txid()?, - proof_hash: bob_request.proof.hash(), - commitments_map: commitments_map.clone(), - message, - }; - - // TODO: do this concurrently with async - // TODO: take a random sample instead of the first `threshold` members - // TODO: what if we get a timeout or can't meet that threshold? loop? send to more members? - for (member_id, member) in &threshold_of_members { - // send json RPC request - let rpc_ctx = RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); - let resp = json_rpc_request( - &rpc_ctx, - "round_2_signing", - &[serde_json::value::to_raw_value(&round2_request)?], - ) - .await; - debug!("resp to 2nd request: {:?}", resp); - - let resp = resp.context("second rpc request to committee didn't work")?; - - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let round2_response: Round2Response = response.result()?; + 'retry: loop { + // + // Round 1 + // + + let mut commitments_map = BTreeMap::new(); + + let mut available_members = { + let ms_r = member_status.read().unwrap(); + self.committee_cfg + .members + .iter() + .filter(|(key, _)| ms_r.get_member_status(key) == MemberStatus::Online) + .collect_vec() + }; + if available_members.len() < self.committee_cfg.threshold { + return Err(anyhow::Error::msg("not enough available signers")); + } - // store the commitment - signature_shares.insert(**member_id, round2_response.signature_share); - } + available_members.shuffle(&mut rand::thread_rng()); + available_members.truncate(self.committee_cfg.threshold); + + let futures = available_members + .iter() + .map(|(_, member)| async { + let rpc_ctx = + RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); + json_rpc_request( + &rpc_ctx, + "round_1_signing", + &[serde_json::value::to_raw_value(&bob_request).unwrap()], + ) + .await + }) + .collect_vec(); + + let round_1_responses = join_all(futures).await; + + for (idx, resp) in round_1_responses.into_iter().enumerate() { + let (member_id, member) = available_members[idx]; + debug!("resp to 1st request from {:?}: {:?}", member_id, resp); + let resp = match resp { + Ok(x) => x, + Err(rpc_error) => { + warn!("Round 1 error with {}, marking as disconnected and retrying round 1: {rpc_error}", member.address); + let mut ms_w = member_status.write().unwrap(); + ms_w.mark_as_disconnected(member_id); + continue 'retry; + } + }; + + let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; + let resp: Round1Response = response.result()?; + + // store the commitment + commitments_map.insert(*member_id, resp.commitments); + } - // - // Aggregate signatures - // - - debug!("- aggregate signature shares"); - let signing_package = frost_secp256k1_tr::SigningPackage::new(commitments_map, &message); - let group_signature = { - let res = frost_secp256k1_tr::aggregate( - &signing_package, - &signature_shares, - &self.pubkey_package, - ); - if let Some(err) = res.err() { - error!("error: {}", err); + // + // Produce transaction and digest + // + let message = + get_digest_to_hash(&bob_request.prev_outs, &bob_request.tx, &smart_contract)?; + + // + // Round 2 + // + + let mut signature_shares = BTreeMap::new(); + + let round2_request = Round2Request { + txid: bob_request.txid()?, + proof_hash: bob_request.proof.hash(), + commitments_map: commitments_map.clone(), + message, + }; + + let futures = available_members + .iter() + .map(|(_, member)| async { + let rpc_ctx = + RpcCtx::new(Some("2.0"), None, Some(member.address.clone()), None, None); + json_rpc_request( + &rpc_ctx, + "round_2_signing", + &[serde_json::value::to_raw_value(&round2_request)?], + ) + .await + }) + .collect_vec(); + + let round_2_responses = join_all(futures).await; + + for (idx, resp) in round_2_responses.into_iter().enumerate() { + let (member_id, member) = available_members[idx]; + debug!("resp to 2nd request from {:?}: {:?}", member_id, resp); + let resp = match resp { + Ok(x) => x, + Err(rpc_error) => { + warn!("Round 2 error with {}, marking as offline and retrying from round 1: {rpc_error}", member.address); + let mut ms_w = member_status.write().unwrap(); + ms_w.mark_as_offline(member_id); + continue 'retry; + } + }; + + let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; + let round2_response: Round2Response = response.result()?; + + // store the commitment + signature_shares.insert(*member_id, round2_response.signature_share); } - res.context("failed to aggregate signatures")? - }; - #[cfg(debug_assertions)] - { - // verify using FROST - let group_pubkey = self.pubkey_package.verifying_key(); - assert!(group_pubkey.verify(&message, &group_signature).is_ok()); - debug!("- the signature verified locally with FROST lib"); - - // assert that the pubkey is the same - let deserialized_pubkey = - bitcoin::PublicKey::from_slice(&group_pubkey.serialize()).unwrap(); - let zkbitcoin_pubkey: bitcoin::PublicKey = - bitcoin::PublicKey::from_str(ZKBITCOIN_PUBKEY).unwrap(); - assert_eq!(deserialized_pubkey, zkbitcoin_pubkey); - - // let's compare pubkeys + // + // Aggregate signatures + // + + debug!("- aggregate signature shares"); + let signing_package = + frost_secp256k1_tr::SigningPackage::new(commitments_map, &message); + let group_signature = { + let res = frost_secp256k1_tr::aggregate( + &signing_package, + &signature_shares, + &self.pubkey_package, + ); + if let Some(err) = res.err() { + error!("error: {}", err); + } + res.context("failed to aggregate signatures")? + }; + + #[cfg(debug_assertions)] { - // from hardcoded - let secp = secp256k1::Secp256k1::default(); + // verify using FROST + let group_pubkey = self.pubkey_package.verifying_key(); + assert!(group_pubkey.verify(&message, &group_signature).is_ok()); + debug!("- the signature verified locally with FROST lib"); + + // assert that the pubkey is the same + let deserialized_pubkey = + bitcoin::PublicKey::from_slice(&group_pubkey.serialize()).unwrap(); + let zkbitcoin_pubkey: bitcoin::PublicKey = + bitcoin::PublicKey::from_str(ZKBITCOIN_PUBKEY).unwrap(); + assert_eq!(deserialized_pubkey, zkbitcoin_pubkey); + + // let's compare pubkeys + { + // from hardcoded + let secp = secp256k1::Secp256k1::default(); + let zkbitcoin_pubkey: bitcoin::PublicKey = + bitcoin::PublicKey::from_str(ZKBITCOIN_PUBKEY).unwrap(); + let internal_key = UntweakedPublicKey::from(zkbitcoin_pubkey); + let (tweaked, _) = internal_key.tap_tweak(&secp, None); + let tweaked = tweaked.to_string(); + debug!("tweaked: {}", tweaked); + + // from FROST + let xone = XOnlyPublicKey::from_slice(&group_pubkey.serialize()[1..]).unwrap(); + let (tweaked2, _) = xone.tap_tweak(&secp, None); + let tweaked2 = tweaked2.to_string(); + debug!("tweaked2: {}", tweaked2); + assert_eq!(tweaked, tweaked2); + + // twaked + let tweaked3 = frost_secp256k1_tr::Secp256K1Sha256::tweaked_public_key( + group_pubkey.element(), + ); + let s = ::Group::serialize( + &tweaked3, + ); + let tweaked3 = s.to_lower_hex_string(); + debug!("tweaked3: {}", tweaked3); + //assert_eq!(tweaked2, tweaked3); + } + + // verify using bitcoin lib + let sig = + secp256k1::schnorr::Signature::from_slice(&group_signature.serialize()[1..]) + .unwrap(); let zkbitcoin_pubkey: bitcoin::PublicKey = bitcoin::PublicKey::from_str(ZKBITCOIN_PUBKEY).unwrap(); let internal_key = UntweakedPublicKey::from(zkbitcoin_pubkey); + let secp = secp256k1::Secp256k1::default(); let (tweaked, _) = internal_key.tap_tweak(&secp, None); - let tweaked = tweaked.to_string(); - debug!("tweaked: {}", tweaked); - - // from FROST - let xone = XOnlyPublicKey::from_slice(&group_pubkey.serialize()[1..]).unwrap(); - let (tweaked2, _) = xone.tap_tweak(&secp, None); - let tweaked2 = tweaked2.to_string(); - debug!("tweaked2: {}", tweaked2); - assert_eq!(tweaked, tweaked2); - - // twaked - let tweaked3 = - frost_secp256k1_tr::Secp256K1Sha256::tweaked_public_key(group_pubkey.element()); - let s = ::Group::serialize( - &tweaked3, - ); - let tweaked3 = s.to_lower_hex_string(); - debug!("tweaked3: {}", tweaked3); - //assert_eq!(tweaked2, tweaked3); + let msg = secp256k1::Message::from_digest(message); + assert!(secp.verify_schnorr(&sig, &msg, &tweaked.into()).is_ok()); + debug!("- the signature verified locally with bitcoin lib"); } - // verify using bitcoin lib - let sig = secp256k1::schnorr::Signature::from_slice(&group_signature.serialize()[1..]) - .unwrap(); - let zkbitcoin_pubkey: bitcoin::PublicKey = - bitcoin::PublicKey::from_str(ZKBITCOIN_PUBKEY).unwrap(); - let internal_key = UntweakedPublicKey::from(zkbitcoin_pubkey); - let secp = secp256k1::Secp256k1::default(); - let (tweaked, _) = internal_key.tap_tweak(&secp, None); - let msg = secp256k1::Message::from_digest(message); - assert!(secp.verify_schnorr(&sig, &msg, &tweaked.into()).is_ok()); - debug!("- the signature verified locally with bitcoin lib"); + // + // Include signature in the witness of the transaction + // + + debug!("- include signature in witness of transaction"); + let serialized = group_signature.serialize(); + debug!("- serialized: {:?}", serialized); + let sig = secp256k1::schnorr::Signature::from_slice(&serialized[1..]) + .context("couldn't convert signature type")?; + + let hash_ty = TapSighashType::All; + let final_signature = taproot::Signature { sig, hash_ty }; + let mut witness = Witness::new(); + witness.push(final_signature.to_vec()); + + let mut transaction = bob_request.tx.clone(); + transaction + .input + .get_mut(bob_request.zkapp_input) + .context("couldn't find zkapp input in transaction")? + .witness = witness; + + // return the signed transaction + return Ok(BobResponse { + unlocked_tx: transaction, + }); } - - // - // Include signature in the witness of the transaction - // - - debug!("- include signature in witness of transaction"); - let serialized = group_signature.serialize(); - debug!("- serialized: {:?}", serialized); - let sig = secp256k1::schnorr::Signature::from_slice(&serialized[1..]) - .context("couldn't convert signature type")?; - - let hash_ty = TapSighashType::All; - let final_signature = taproot::Signature { sig, hash_ty }; - let mut witness = Witness::new(); - witness.push(final_signature.to_vec()); - - let mut transaction = bob_request.tx.clone(); - transaction - .input - .get_mut(bob_request.zkapp_input) - .context("couldn't find zkapp input in transaction")? - .witness = witness; - - // return the signed transaction - Ok(BobResponse { - unlocked_tx: transaction, - }) } } @@ -260,24 +529,43 @@ impl Orchestrator { /// Bob's request to unlock funds from a smart contract. async fn unlock_funds( params: Params<'static>, - context: Arc, + context: RpcOrchestratorContext, ) -> RpcResult { // get bob request let bob_request: [BobRequest; 1] = params.parse()?; let bob_request = &bob_request[0]; info!("received request: {:?}", bob_request); - let bob_response = context.handle_request(bob_request).await.map_err(|e| { - ErrorObjectOwned::owned( - jsonrpsee_types::error::UNKNOWN_ERROR_CODE, - "error while unlocking funds", - Some(format!("the request didn't validate: {e}")), - ) - })?; + let bob_response = context + .0 + .handle_request(bob_request, context.1.clone()) + .await + .map_err(|e| { + ErrorObjectOwned::owned( + jsonrpsee_types::error::UNKNOWN_ERROR_CODE, + "error while unlocking funds", + Some(format!("the request didn't validate: {e}")), + ) + })?; RpcResult::Ok(bob_response) } +async fn get_nodes_status( + _params: Params<'static>, + context: RpcOrchestratorContext, +) -> RpcResult { + let (online, offline) = { + let mss_r = context.1.read().unwrap(); + mss_r.get_status() + }; + + RpcResult::Ok(StatusResponse { + online_members: online, + offline_members: offline, + }) +} + pub async fn run_server( address: Option<&str>, pubkey_package: frost::PublicKeyPackage, @@ -286,16 +574,24 @@ pub async fn run_server( let address = address.unwrap_or("127.0.0.1:6666"); info!("- starting orchestrator at address http://{address}"); - let ctx = Orchestrator { - pubkey_package, - committee_cfg, - }; + let member_status_state = Arc::new(RwLock::new(MemberStatusState::new(&committee_cfg).await)); + let mss_thread_copy = member_status_state.clone(); + tokio::spawn(async move { MemberStatusState::keepalive_thread(mss_thread_copy).await }); + + let ctx = ( + Orchestrator { + pubkey_package, + committee_cfg, + }, + member_status_state.clone(), + ); let server = Server::builder() .build(address.parse::()?) .await?; let mut module = RpcModule::new(ctx); module.register_async_method("unlock_funds", unlock_funds)?; + module.register_async_method("status", get_nodes_status)?; let addr = server.local_addr()?; let handle = server.start(module); diff --git a/src/constants.rs b/src/constants.rs index f6a028d..d66468f 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -37,3 +37,9 @@ pub const STATELESS_ZKAPP_PUBLIC_INPUT_LEN: usize = 1 /* truncated txid */; /// The expected number of public inputs for a stateful zkapp. pub const STATEFUL_ZKAPP_PUBLIC_INPUT_LEN: usize = 1 * 2 /* new state + prev state */ + 1 /* truncated txid */ + 1 /* amount_out */ + 1 /* amount_in */; + +/// The number of seconds to sleep between orchestrator-node keepalive requests +pub const KEEPALIVE_WAIT_SECONDS: u64 = 5; + +/// The total number of fibonacci backoff retries before considering an MPC node offline +pub const KEEPALIVE_MAX_RETRIES: u8 = 10; diff --git a/src/json_rpc_stuff.rs b/src/json_rpc_stuff.rs index 26454b8..5d8d796 100644 --- a/src/json_rpc_stuff.rs +++ b/src/json_rpc_stuff.rs @@ -5,7 +5,7 @@ use anyhow::{Context, Result}; use base64::{engine::general_purpose, Engine}; use bitcoin::{Amount, Transaction, Txid}; -use log::{debug, info, log_enabled, Level}; +use log::{debug, log_enabled, Level}; use reqwest::{ header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, Client, @@ -46,18 +46,18 @@ impl RpcCtx { timeout: timeout.unwrap_or(Duration::from_secs(JSON_RPC_TIMEOUT)), }; - info!("- using RPC node at address {}", ctx.address()); + debug!("- using RPC node at address {}", ctx.address()); if ctx.auth().is_some() { - info!("- using given RPC credentials"); + debug!("- using given RPC credentials"); } else { - info!("- using no RPC credentials"); + debug!("- using no RPC credentials"); } if let Some(wallet) = ctx.wallet() { - info!("- using wallet {wallet}"); + debug!("- using wallet {wallet}"); } else { - info!("- using default wallet"); + debug!("- using default wallet"); } ctx