Skip to content

Commit

Permalink
sigma0-dev#18 comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RedaOps committed Jan 27, 2024
1 parent 67d65a6 commit 3e71725
Showing 1 changed file with 67 additions and 67 deletions.
134 changes: 67 additions & 67 deletions src/committee/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use bitcoin::{
key::{TapTweak, UntweakedPublicKey},
secp256k1, taproot, TapSighashType, Witness,
};
use frost_secp256k1_tr::Ciphersuite;
use frost_secp256k1_tr::Group;
use frost_secp256k1_tr::{Ciphersuite, Group, Identifier};
use futures::future::join_all;
use itertools::Itertools;
use jsonrpsee::{server::Server, RpcModule};
Expand All @@ -40,14 +39,12 @@ use super::node::{Round2Request, Round2Response};
// Orchestration logic
//

type RpcOrchestratorContext = Arc<(Orchestrator, Arc<RwLock<MemberStatusState>>)>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitteeConfig {
pub threshold: usize,
// TODO: We could use a Vec instead of a HashMap for the members, since it would be more efficient.
// We do not currently need hashmap functionality, but we might later, so left unchanged.
pub members: HashMap<frost_secp256k1_tr::Identifier, Member>,
pub members: HashMap<Identifier, Member>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
Expand All @@ -61,17 +58,17 @@ pub enum MemberStatus {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusResponse {
pub online_members: Vec<frost_secp256k1_tr::Identifier>,
pub offline_members: Vec<frost_secp256k1_tr::Identifier>,
pub online_members: Vec<Identifier>,
pub offline_members: Vec<Identifier>,
}

// This will be the second part in the RpcModule context wrapped in a RwLock.
// I think this is better than including it in the actual CommitteeConfig since handlers will need
// to wait for a read lock every time an rpc handler needs to access the config.
// This way, handlers will only wait for read locks when they need the status of the members.
pub struct MemberStatusState {
pub key_to_addr: HashMap<frost_secp256k1_tr::Identifier, String>,
pub status: HashMap<frost_secp256k1_tr::Identifier, MemberStatus>,
pub key_to_addr: HashMap<Identifier, String>,
pub status: HashMap<Identifier, MemberStatus>,
}

impl MemberStatusState {
Expand Down Expand Up @@ -110,29 +107,24 @@ impl MemberStatusState {
}
}

pub fn get_member_status(&self, key: &frost_secp256k1_tr::Identifier) -> MemberStatus {
pub fn get_member_status(&self, key: &Identifier) -> MemberStatus {
*self.status.get(key).unwrap()
}

pub fn mark_as_disconnected(&mut self, key: &frost_secp256k1_tr::Identifier) {
pub fn mark_as_disconnected(&mut self, key: &Identifier) {
let m_status = self.status.get_mut(key).unwrap();
*m_status = MemberStatus::Disconnected((
Self::get_current_time_secs() + Self::get_next_fibonacci_backoff_delay(0),
1,
))
}

pub fn mark_as_offline(&mut self, key: &frost_secp256k1_tr::Identifier) {
pub fn mark_as_offline(&mut self, key: &Identifier) {
let m_status = self.status.get_mut(key).unwrap();
*m_status = MemberStatus::Offline;
}

pub fn get_status(
&self,
) -> (
Vec<frost_secp256k1_tr::Identifier>,
Vec<frost_secp256k1_tr::Identifier>,
) {
pub fn get_status(&self) -> (Vec<Identifier>, Vec<Identifier>) {
let mut online = Vec::new();
let mut offline = Vec::new();

Expand Down Expand Up @@ -204,22 +196,20 @@ impl MemberStatusState {
// Get array of members which are not *permanently* offline

let members_to_check = {
let current_time = Self::get_current_time_secs();
let r_lock = state.read().unwrap();
r_lock
.key_to_addr
.status
.iter()
.map(|(key, addr)| {
let status = *r_lock.status.get(key).unwrap();
(*key, addr.clone(), status)
.filter(|(_, s)| {
matches!(s, MemberStatus::Online)
| matches!(s, MemberStatus::Disconnected((r, _)) if *r <= current_time)
})
.filter(|(_, _, status)| match *status {
MemberStatus::Online => true,
MemberStatus::Disconnected((retry_time, _)) => {
retry_time <= Self::get_current_time_secs()
}
_ => false,
.map(|(key, status)| {
let addr = r_lock.key_to_addr.get(key).unwrap();
(*key, addr.clone(), *status)
})
.collect::<Vec<(frost_secp256k1_tr::Identifier, String, MemberStatus)>>()
.collect_vec()
};

// check alive for each one of them
Expand All @@ -246,7 +236,8 @@ impl MemberStatusState {
let member_status = state_w.status.get_mut(key).unwrap();
let last_retries = match old_status {
MemberStatus::Disconnected((_, last_retry_number)) => *last_retry_number,
_ => 0,
MemberStatus::Online => 0,
MemberStatus::Offline => continue,
};

if last_retries > KEEPALIVE_MAX_RETRIES {
Expand Down Expand Up @@ -276,25 +267,24 @@ pub struct Member {
pub struct Orchestrator {
pub pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage,
pub committee_cfg: CommitteeConfig,
pub member_status: Arc<RwLock<MemberStatusState>>,
}

impl Orchestrator {
pub fn new(
pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage,
committee_cfg: CommitteeConfig,
member_status: Arc<RwLock<MemberStatusState>>,
) -> Self {
Self {
pubkey_package,
committee_cfg,
member_status,
}
}

/// Handles bob request from A to Z.
pub async fn handle_request(
&self,
bob_request: &BobRequest,
member_status: Arc<RwLock<MemberStatusState>>,
) -> Result<BobResponse> {
pub async fn handle_request(&self, bob_request: &BobRequest) -> Result<BobResponse> {
// Validate transaction before forwarding it, and get smart contract
let smart_contract = bob_request.validate_request().await?;

Expand All @@ -308,7 +298,7 @@ impl Orchestrator {
let mut commitments_map = BTreeMap::new();

let mut available_members = {
let ms_r = member_status.read().unwrap();
let ms_r = self.member_status.read().unwrap();
self.committee_cfg
.members
.iter()
Expand Down Expand Up @@ -345,17 +335,25 @@ impl Orchestrator {
Ok(x) => x,
Err(rpc_error) => {
warn!("Round 1 error with {}, marking as disconnected and retrying round 1: {rpc_error}", member.address);
let mut ms_w = member_status.write().unwrap();
let mut ms_w = self.member_status.write().unwrap();
ms_w.mark_as_disconnected(member_id);
continue 'retry;
}
};

let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?;
let resp: Round1Response = response.result()?;
if let Ok(response) =
serde_json::from_str::<bitcoincore_rpc::jsonrpc::Response>(&resp)
{
let resp: Round1Response = response.result()?;

// store the commitment
commitments_map.insert(*member_id, resp.commitments);
// store the commitment
commitments_map.insert(*member_id, resp.commitments);
} else {
warn!("Round 1 error with {}, marking as offline and retrying from round 1: deserialize error", member.address);
let mut ms_w = self.member_status.write().unwrap();
ms_w.mark_as_disconnected(member_id);
continue 'retry;
}
}

//
Expand Down Expand Up @@ -400,17 +398,25 @@ impl Orchestrator {
Ok(x) => x,
Err(rpc_error) => {
warn!("Round 2 error with {}, marking as offline and retrying from round 1: {rpc_error}", member.address);
let mut ms_w = member_status.write().unwrap();
let mut ms_w = self.member_status.write().unwrap();
ms_w.mark_as_offline(member_id);
continue 'retry;
}
};

let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?;
let round2_response: Round2Response = response.result()?;
if let Ok(response) =
serde_json::from_str::<bitcoincore_rpc::jsonrpc::Response>(&resp)
{
let round2_response: Round2Response = response.result()?;

// store the commitment
signature_shares.insert(*member_id, round2_response.signature_share);
// store the commitment
signature_shares.insert(*member_id, round2_response.signature_share);
} else {
warn!("Round 2 error with {}, marking as offline and retrying from round 1: deserialize error", member.address);
let mut ms_w = self.member_status.write().unwrap();
ms_w.mark_as_offline(member_id);
continue 'retry;
}
}

//
Expand Down Expand Up @@ -527,34 +533,30 @@ impl Orchestrator {
/// Bob's request to unlock funds from a smart contract.
async fn unlock_funds(
params: Params<'static>,
context: RpcOrchestratorContext,
context: Arc<Orchestrator>,
) -> RpcResult<BobResponse> {
// get bob request
let bob_request: [BobRequest; 1] = params.parse()?;
let bob_request = &bob_request[0];
info!("received request: {:?}", bob_request);

let bob_response = context
.0
.handle_request(bob_request, context.1.clone())
.await
.map_err(|e| {
ErrorObjectOwned::owned(
jsonrpsee_types::error::UNKNOWN_ERROR_CODE,
"error while unlocking funds",
Some(format!("the request didn't validate: {e}")),
)
})?;
let bob_response = context.handle_request(bob_request).await.map_err(|e| {
ErrorObjectOwned::owned(
jsonrpsee_types::error::UNKNOWN_ERROR_CODE,
"error while unlocking funds",
Some(format!("the request didn't validate: {e}")),
)
})?;

RpcResult::Ok(bob_response)
}

async fn get_nodes_status(
_params: Params<'static>,
context: RpcOrchestratorContext,
context: Arc<Orchestrator>,
) -> RpcResult<StatusResponse> {
let (online, offline) = {
let mss_r = context.1.read().unwrap();
let mss_r = context.member_status.read().unwrap();
mss_r.get_status()
};

Expand All @@ -576,13 +578,11 @@ pub async fn run_server(
let mss_thread_copy = member_status_state.clone();
tokio::spawn(async move { MemberStatusState::keepalive_thread(mss_thread_copy).await });

let ctx = (
Orchestrator {
pubkey_package,
committee_cfg,
},
member_status_state.clone(),
);
let ctx = Orchestrator {
pubkey_package,
committee_cfg,
member_status: member_status_state,
};

let server = Server::builder()
.build(address.parse::<SocketAddr>()?)
Expand Down

0 comments on commit 3e71725

Please sign in to comment.