Skip to content

Commit

Permalink
Ignore randomness RPCs from byzantine peers (MystenLabs#18690)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Jul 19, 2024
1 parent d0e250a commit 77bae85
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 9 deletions.
13 changes: 13 additions & 0 deletions crates/sui-config/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ pub struct RandomnessConfig {
/// If unspecified, this will default to 20.
#[serde(skip_serializing_if = "Option::is_none")]
pub send_partial_signatures_inflight_limit: Option<usize>,

/// Maximum proportion of total peer weight to ignore in case of byzantine behavior.
///
/// If unspecified, this will default to 0.2.
#[serde(skip_serializing_if = "Option::is_none")]
pub max_ignored_peer_weight_factor: Option<f64>,
}

impl RandomnessConfig {
Expand Down Expand Up @@ -417,4 +423,11 @@ impl RandomnessConfig {
self.send_partial_signatures_inflight_limit
.unwrap_or(SEND_PARTIAL_SIGNATURES_INFLIGHT_LIMIT)
}

pub fn max_ignored_peer_weight_factor(&self) -> f64 {
const MAX_IGNORED_PEER_WEIGHT_FACTOR: f64 = 0.2;

self.max_ignored_peer_weight_factor
.unwrap_or(MAX_IGNORED_PEER_WEIGHT_FACTOR)
}
}
7 changes: 7 additions & 0 deletions crates/sui-network/src/randomness/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Builder {
let config = config.unwrap_or_default();
let metrics = metrics.unwrap_or_else(Metrics::disabled);
let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
let mailbox_sender = sender.downgrade();
let handle = Handle {
sender: sender.clone(),
};
Expand All @@ -81,6 +82,7 @@ impl Builder {
config,
handle,
mailbox,
mailbox_sender,
allowed_peers,
metrics,
randomness_tx,
Expand All @@ -96,6 +98,7 @@ pub struct UnstartedRandomness {
pub(super) config: RandomnessConfig,
pub(super) handle: Handle,
pub(super) mailbox: mpsc::Receiver<RandomnessMessage>,
pub(super) mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
pub(super) allowed_peers: AllowedPeersUpdatable,
pub(super) metrics: Metrics,
pub(super) randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
Expand All @@ -108,6 +111,7 @@ impl UnstartedRandomness {
config,
handle,
mailbox,
mailbox_sender,
allowed_peers,
metrics,
randomness_tx,
Expand All @@ -117,14 +121,17 @@ impl UnstartedRandomness {
name,
config,
mailbox,
mailbox_sender,
network,
allowed_peers,
allowed_peers_set: HashSet::new(),
metrics,
randomness_tx,

epoch: 0,
authority_info: Arc::new(HashMap::new()),
peer_share_ids: None,
blocked_share_id_count: 0,
dkg_output: None,
aggregation_threshold: 0,
highest_requested_round: BTreeMap::new(),
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-network/src/randomness/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Metrics {
if let Some(inner) = &self.0 {
inner.current_epoch.set(epoch as i64);
inner.highest_round_generated.set(-1);
inner.num_ignored_byzantine_peers.set(0);
}
}

Expand Down Expand Up @@ -61,6 +62,12 @@ impl Metrics {
.as_ref()
.map(|inner| &inner.round_observation_latency)
}

pub fn inc_num_ignored_byzantine_peers(&self) {
if let Some(inner) = &self.0 {
inner.num_ignored_byzantine_peers.inc();
}
}
}

struct Inner {
Expand All @@ -69,6 +76,7 @@ struct Inner {
num_rounds_pending: IntGauge,
round_generation_latency: Histogram,
round_observation_latency: Histogram,
num_ignored_byzantine_peers: IntGauge,
}

const LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -107,6 +115,11 @@ impl Inner {
LATENCY_SEC_BUCKETS.to_vec(),
registry
).unwrap(),
num_ignored_byzantine_peers: register_int_gauge_with_registry!(
"randomness_num_ignored_byzantine_peers",
"The number of byzantine peers that have been ignored by the randomness newtork loop in the current epoch",
registry
).unwrap(),
}
.pipe(Arc::new)
}
Expand Down
70 changes: 61 additions & 9 deletions crates/sui-network/src/randomness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use mysten_metrics::spawn_monitored_task;
use mysten_network::anemo_ext::NetworkExt;
use serde::{Deserialize, Serialize};
use std::{
collections::{btree_map::BTreeMap, HashMap},
collections::{btree_map::BTreeMap, HashMap, HashSet},
ops::Bound,
sync::Arc,
time::{self, Duration},
Expand Down Expand Up @@ -191,6 +191,7 @@ enum RandomnessMessage {
Vec<Vec<u8>>,
Option<RandomnessSignature>,
),
MaybeIgnoreByzantinePeer(EpochId, PeerId),
AdminGetPartialSignatures(RandomnessRound, oneshot::Sender<Vec<u8>>),
AdminInjectPartialSignatures(
AuthorityName,
Expand All @@ -209,14 +210,17 @@ struct RandomnessEventLoop {
name: AuthorityName,
config: RandomnessConfig,
mailbox: mpsc::Receiver<RandomnessMessage>,
mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
network: anemo::Network,
allowed_peers: AllowedPeersUpdatable,
allowed_peers_set: HashSet<PeerId>,
metrics: Metrics,
randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,

epoch: EpochId,
authority_info: Arc<HashMap<AuthorityName, (PeerId, PartyId)>>,
peer_share_ids: Option<HashMap<PeerId, Vec<ShareIndex>>>,
blocked_share_id_count: usize,
dkg_output: Option<dkg::Output<bls12381::G2Element, bls12381::G2Element>>,
aggregation_threshold: u16,
highest_requested_round: BTreeMap<EpochId, RandomnessRound>,
Expand Down Expand Up @@ -285,6 +289,9 @@ impl RandomnessEventLoop {
self.receive_partial_signatures(peer_id, epoch, round, partial_sigs)
}
}
RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id) => {
self.maybe_ignore_byzantine_peer(epoch, peer_id)
}
RandomnessMessage::AdminGetPartialSignatures(round, tx) => {
self.admin_get_partial_signatures(round, tx)
}
Expand Down Expand Up @@ -330,12 +337,12 @@ impl RandomnessEventLoop {
Ok(acc)
},
)?);
self.allowed_peers.update(Arc::new(
authority_info
.values()
.map(|(peer_id, _)| *peer_id)
.collect(),
));
self.allowed_peers_set = authority_info
.values()
.map(|(peer_id, _)| *peer_id)
.collect();
self.allowed_peers
.update(Arc::new(self.allowed_peers_set.clone()));
self.epoch = new_epoch;
self.authority_info = Arc::new(authority_info);
self.dkg_output = Some(dkg_output);
Expand Down Expand Up @@ -641,7 +648,13 @@ impl RandomnessEventLoop {
warn!(
"received invalid partial signatures from possibly-Byzantine peer {peer_id}"
);
// TODO: Ignore future messages from peers sending bad signatures.
if let Some(sender) = self.mailbox_sender.upgrade() {
sender.try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(
epoch,
peer_id,
))
.expect("RandomnessEventLoop mailbox should not overflow or be closed");
}
return false;
}
true
Expand Down Expand Up @@ -713,11 +726,15 @@ impl RandomnessEventLoop {
return;
}

// TODO: ignore future messages from peers sending bad signatures.
if let Err(e) =
ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
{
info!("received invalid full signature from peer {peer_id}: {e:?}");
if let Some(sender) = self.mailbox_sender.upgrade() {
sender
.try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id))
.expect("RandomnessEventLoop mailbox should not overflow or be closed");
}
return;
}

Expand Down Expand Up @@ -756,6 +773,41 @@ impl RandomnessEventLoop {
.expect("RandomnessRoundReceiver mailbox should not overflow or be closed");
}

fn maybe_ignore_byzantine_peer(&mut self, epoch: EpochId, peer_id: PeerId) {
if epoch != self.epoch {
return; // make sure we're still on the same epoch
}
let Some(dkg_output) = &self.dkg_output else {
return; // can't ignore a peer if we haven't finished DKG
};
if !self.allowed_peers_set.contains(&peer_id) {
return; // peer is already disallowed
}
let Some(peer_share_ids) = &self.peer_share_ids else {
return; // can't ignore a peer if we haven't finished DKG
};
let Some(peer_shares) = peer_share_ids.get(&peer_id) else {
warn!("can't ignore unknown byzantine peer {peer_id:?}");
return;
};
let max_ignored_shares = (self.config.max_ignored_peer_weight_factor()
* (dkg_output.nodes.total_weight() as f64)) as usize;
if self.blocked_share_id_count + peer_shares.len() > max_ignored_shares {
warn!("ignoring byzantine peer {peer_id:?} with {} shares would exceed max ignored peer weight {max_ignored_shares}", peer_shares.len());
return;
}

warn!(
"ignoring byzantine peer {peer_id:?} with {} shares",
peer_shares.len()
);
self.blocked_share_id_count += peer_shares.len();
self.allowed_peers_set.remove(&peer_id);
self.allowed_peers
.update(Arc::new(self.allowed_peers_set.clone()));
self.metrics.inc_num_ignored_byzantine_peers();
}

fn maybe_start_pending_tasks(&mut self) {
let dkg_output = if let Some(dkg_output) = &self.dkg_output {
dkg_output
Expand Down
114 changes: 114 additions & 0 deletions crates/sui-network/src/randomness/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,120 @@ async fn test_restart_recovery() {
}
}

#[tokio::test]
async fn test_byzantine_peer_handling() {
telemetry_subscribers::init_for_testing();
let committee_fixture = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
let committee = committee_fixture.committee();

let mut randomness_rxs = Vec::new();
let mut networks: Vec<anemo::Network> = Vec::new();
let mut nodes = Vec::new();
let mut handles = Vec::new();
let mut authority_info = HashMap::new();

for (authority, stake) in committee.members() {
let config = RandomnessConfig {
max_ignored_peer_weight_factor: Some(0.3),
..Default::default()
};

let (tx, rx) = mpsc::channel(3);
randomness_rxs.push(rx);
let (unstarted, router) = Builder::new(*authority, tx).config(config).build();

let network = utils::build_network(|r| r.merge(router));
for n in networks.iter() {
network.connect(n.local_addr()).await.unwrap();
}
networks.push(network.clone());

let node = node_from_committee(committee, authority, *stake);
authority_info.insert(*authority, (network.peer_id(), node.id));
nodes.push(node);

let (r, handle) = unstarted.build(network);
handles.push((authority, handle));

let span = tracing::span!(
tracing::Level::INFO,
"RandomnessEventLoop",
authority = ?authority.concise(),
);
tokio::spawn(r.start().instrument(span));
}
info!(?authority_info, "authorities constructed");

let nodes = nodes::Nodes::new(nodes).unwrap();

// Send partial sigs from authorities 0 and 1, but give them different DKG output so they think
// each other are byzantine.
for (i, (authority, handle)) in handles.iter().enumerate() {
let mock_dkg_output = mocked_dkg::generate_mocked_output::<PkG, EncG>(
nodes.clone(),
committee.validity_threshold().try_into().unwrap(),
if i < 2 { 100 + i as u128 } else { 0 },
committee
.authority_index(authority)
.unwrap()
.try_into()
.unwrap(),
);
handle.send_partial_signatures(0, RandomnessRound(0));
handle.update_epoch(
0,
authority_info.clone(),
mock_dkg_output,
committee.validity_threshold().try_into().unwrap(),
None,
);
}
for rx in &mut randomness_rxs[2..] {
// Validators (2, 3) can communicate normally.
let (epoch, round, bytes) = rx.recv().await.unwrap();
assert_eq!(0, epoch);
assert_eq!(0, round.0);
assert_ne!(0, bytes.len());
}
for rx in &mut randomness_rxs[..2] {
// Validators (0, 1) are byzantine.
assert!(rx.try_recv().is_err());
}

// New epoch, they should forgive each other.
for (authority, handle) in handles.iter().take(2) {
let mock_dkg_output = mocked_dkg::generate_mocked_output::<PkG, EncG>(
nodes.clone(),
committee.validity_threshold().try_into().unwrap(),
0,
committee
.authority_index(authority)
.unwrap()
.try_into()
.unwrap(),
);
handle.send_partial_signatures(1, RandomnessRound(0));
handle.update_epoch(
1,
authority_info.clone(),
mock_dkg_output,
committee.validity_threshold().try_into().unwrap(),
None,
);
}
for rx in &mut randomness_rxs[..2] {
// Validators (0, 1) can communicate normally in new epoch.
let (epoch, round, bytes) = rx.recv().await.unwrap();
assert_eq!(1, epoch);
assert_eq!(0, round.0);
assert_ne!(0, bytes.len());
}
for rx in &mut randomness_rxs[2..] {
// Validators (2, 3) are still on old epoch.
assert!(rx.try_recv().is_err());
}
}

fn node_from_committee(
committee: &Committee,
authority: &AuthorityPublicKeyBytes,
Expand Down

0 comments on commit 77bae85

Please sign in to comment.