From 072fcfbe71792906e4c467cf599a50cdc88e6adb Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Sun, 8 Sep 2024 20:00:00 -0600 Subject: [PATCH] [consensus] Switch to certified vote scoring & incrementally score subdags (#18806) After analyzing the results of our Leader Scoring Strategy experimentation. We will be switching from a Vote scoring strategy to a Certified Vote Scoring Strategy. This has shown improvements of about ~90ms for P50 latency with 6K mixed TPS in private-testnet. We can see that this scoring strategy gives us the best distribution of scores, as scores are distributed together across major geographic regions which improves our quorum latency. As part of this change we also moved to an incremental scoring process with a new struct called ScoringSubdag that keeps track of the votes for leaders and relevant stake for eventual reputation score calculation. Also removed the ReputationScoreCalculator & LeaderScoringStrategy components as we are now finalizing on the scoring strategy we will be using. Full experiment results : https://www.notion.so/mystenlabs/Leader-Scoring-Strategy-f11bbbd1055e453f9f0f5490544941ed?pvs=4 ## Testing - [x] unit tests & simtests - [ ] run certified vote v2 & v3 with incremental scoring in private-testnet and finalize on one - [ ] run upgrade test from vote scoring to certified vote scoring --- Cargo.lock | 4 +- Cargo.toml | 35 +- consensus/core/src/commit.rs | 34 +- consensus/core/src/core.rs | 308 ++++++++- consensus/core/src/dag_state.rs | 178 ++++- consensus/core/src/leader_schedule.rs | 634 ++++++++++++++++-- consensus/core/src/leader_scoring.rs | 395 +++++++++-- consensus/core/src/leader_scoring_strategy.rs | 373 ----------- consensus/core/src/lib.rs | 1 - consensus/core/src/linearizer.rs | 76 ++- crates/sui-open-rpc/spec/openrpc.json | 1 + crates/sui-protocol-config/src/lib.rs | 21 + ...sui_protocol_config__test__version_58.snap | 1 + 13 files changed, 1527 insertions(+), 534 deletions(-) delete mode 100644 consensus/core/src/leader_scoring_strategy.rs diff --git a/Cargo.lock b/Cargo.lock index 85d22bac7e4f0..6194e29567b35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10274,9 +10274,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.6" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" dependencies = [ "bytes", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 463dc0a36e668..6550b6234c331 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,7 +238,10 @@ overflow-checks = true opt-level = 1 [workspace.lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(msim)', 'cfg(fail_points)'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(msim)', + 'cfg(fail_points)', +] } # Dependencies that should be kept in sync through the whole workspace [workspace.dependencies] @@ -347,7 +350,12 @@ http-body = "1" humantime = "2.1.0" hyper = "1" hyper-util = "0.1.6" -hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-roots", "http2", "ring", "tls12"] } +hyper-rustls = { version = "0.27", default-features = false, features = [ + "webpki-roots", + "http2", + "ring", + "tls12", +] } im = "15" impl-trait-for-tuples = "0.2.0" indexmap = { version = "2.1.0", features = ["serde"] } @@ -408,7 +416,7 @@ proptest-derive = "0.3.0" prost = "0.13" prost-build = "0.13" protobuf = { version = "2.28", features = ["with-bytes"] } -quinn-proto = "0.11.6" +quinn-proto = "0.11.7" quote = "1.0.23" rand = "0.8.5" rayon = "1.5.3" @@ -431,7 +439,11 @@ rusoto_kms = { version = "0.48.0", default-features = false, features = [ russh = "0.38.0" russh-keys = "0.38.0" rust-version = "1.56.1" -rustls = { version = "0.23", default-features = false, features = ["std", "tls12", "ring"] } +rustls = { version = "0.23", default-features = false, features = [ + "std", + "tls12", + "ring", +] } rustls-pemfile = "2" rustversion = "1.0.9" rustyline = "9.1.2" @@ -472,7 +484,10 @@ thiserror = "1.0.40" tiny-bip39 = "1.0.0" tokio = "1.36.0" tokio-retry = "0.3" -tokio-rustls = { version = "0.26", default-features = false, features = ["tls12", "ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = [ + "tls12", + "ring", +] } tokio-stream = { version = "0.1.14", features = ["sync", "net"] } tokio-util = "0.7.10" toml = { version = "0.7.4", features = ["preserve_order"] } @@ -558,7 +573,9 @@ move-analyzer = { path = "external-crates/move/crates/move-analyzer" } fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "5f2c63266a065996d53f98156f0412782b468597" } fastcrypto-tbls = { git = "https://github.com/MystenLabs/fastcrypto", rev = "5f2c63266a065996d53f98156f0412782b468597" } fastcrypto-zkp = { git = "https://github.com/MystenLabs/fastcrypto", rev = "5f2c63266a065996d53f98156f0412782b468597", package = "fastcrypto-zkp" } -fastcrypto-vdf = { git = "https://github.com/MystenLabs/fastcrypto", rev = "5f2c63266a065996d53f98156f0412782b468597", features = ["experimental"] } +fastcrypto-vdf = { git = "https://github.com/MystenLabs/fastcrypto", rev = "5f2c63266a065996d53f98156f0412782b468597", features = [ + "experimental", +] } passkey-types = { version = "0.2.0" } passkey-client = { version = "0.2.0" } passkey-authenticator = { version = "0.2.0" } @@ -572,7 +589,11 @@ anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "e609f7697e anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "e609f7697ed6169bf0760882a0b6c032a57e4f3b" } # core-types with json format for REST api -sui-sdk2 = { package = "sui-sdk", git = "https://github.com/mystenlabs/sui-rust-sdk.git", rev = "bd233b6879b917fb95e17f21927c198e7a60c924", features = ["hash", "serde", "schemars"] } +sui-sdk2 = { package = "sui-sdk", git = "https://github.com/mystenlabs/sui-rust-sdk.git", rev = "bd233b6879b917fb95e17f21927c198e7a60c924", features = [ + "hash", + "serde", + "schemars", +] } ### Workspace Members ### anemo-benchmark = { path = "crates/anemo-benchmark" } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index 406b482338f91..d40ee0a5a4ced 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -543,9 +543,22 @@ impl CommitRange { self.0.end.saturating_sub(1) } + pub(crate) fn extend_to(&mut self, other: CommitIndex) { + let new_end = other.saturating_add(1); + assert!(self.0.end <= new_end); + self.0 = self.0.start..new_end; + } + + pub(crate) fn size(&self) -> usize { + self.0 + .end + .checked_sub(self.0.start) + .expect("Range should never have end < start") as usize + } + /// Check whether the two ranges have the same size. pub(crate) fn is_equal_size(&self, other: &Self) -> bool { - self.0.end.wrapping_sub(self.0.start) == other.0.end.wrapping_sub(other.0.start) + self.size() == other.size() } /// Check if the provided range is sequentially after this range. @@ -669,15 +682,21 @@ mod tests { #[tokio::test] async fn test_commit_range() { telemetry_subscribers::init_for_testing(); - let range1 = CommitRange::new(1..=5); + let mut range1 = CommitRange::new(1..=5); let range2 = CommitRange::new(2..=6); let range3 = CommitRange::new(5..=10); let range4 = CommitRange::new(6..=10); let range5 = CommitRange::new(6..=9); + let range6 = CommitRange::new(1..=1); assert_eq!(range1.start(), 1); assert_eq!(range1.end(), 5); + // Test range size + assert_eq!(range1.size(), 5); + assert_eq!(range3.size(), 6); + assert_eq!(range6.size(), 1); + // Test next range check assert!(!range1.is_next_range(&range2)); assert!(!range1.is_next_range(&range3)); @@ -695,5 +714,16 @@ mod tests { assert!(range2 < range3); assert!(range3 < range4); assert!(range5 < range4); + + // Test extending range + range1.extend_to(10); + assert_eq!(range1.start(), 1); + assert_eq!(range1.end(), 10); + assert_eq!(range1.size(), 10); + + range1.extend_to(20); + assert_eq!(range1.start(), 1); + assert_eq!(range1.end(), 20); + assert_eq!(range1.size(), 20); } } diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index e8d5fa9276ad9..0dc63b35d30d0 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -550,7 +550,17 @@ impl Core { tracing::info!( "Leader schedule change triggered at commit index {last_commit_index}" ); - self.leader_schedule.update_leader_schedule(&self.dag_state); + if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + self.leader_schedule + .update_leader_schedule_v2(&self.dag_state); + } else { + self.leader_schedule + .update_leader_schedule_v1(&self.dag_state); + } commits_until_update = self .leader_schedule .commits_until_leader_schedule_update(self.dag_state.clone()); @@ -604,9 +614,18 @@ impl Core { // TODO: refcount subdags let subdags = self.commit_observer.handle_commit(sequenced_leaders)?; - self.dag_state - .write() - .add_unscored_committed_subdags(subdags.clone()); + if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + self.dag_state.write().add_scoring_subdags(subdags.clone()); + } else { + // TODO: Remove when DistributedVoteScoring is enabled. + self.dag_state + .write() + .add_unscored_committed_subdags(subdags.clone()); + } committed_subdags.extend(subdags); } @@ -911,13 +930,13 @@ mod test { use tokio::time::sleep; use super::*; - use crate::test_dag_builder::DagBuilder; use crate::{ block::{genesis_blocks, TestBlock}, block_verifier::NoopBlockVerifier, commit::{CommitAPI as _, CommitRange}, leader_scoring::ReputationScores, storage::{mem_store::MemStore, Store, WriteBatch}, + test_dag_builder::DagBuilder, transaction::TransactionClient, CommitConsumer, CommitIndex, }; @@ -1672,6 +1691,131 @@ mod test { last_round_blocks = this_round_blocks; } + for core_fixture in cores { + // Check commits have been persisted to store + let last_commit = core_fixture + .store + .read_last_commit() + .unwrap() + .expect("last commit should be set"); + // There are 28 leader rounds with rounds completed up to and including + // round 29. Round 30 blocks will only include their own blocks, so the + // 28th leader will not be committed. + assert_eq!(last_commit.index(), 27); + let all_stored_commits = core_fixture + .store + .scan_commits((0..=CommitIndex::MAX).into()) + .unwrap(); + assert_eq!(all_stored_commits.len(), 27); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .bad_nodes + .len(), + 1 + ); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .good_nodes + .len(), + 1 + ); + let expected_reputation_scores = + ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .reputation_scores, + expected_reputation_scores + ); + } + } + + // TODO: Remove this when DistributedVoteScoring is enabled. + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_leader_schedule_change_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let default_params = Parameters::default(); + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + + // create the cores and their signals for all the authorities + let mut cores = create_cores(context, vec![1, 1, 1, 1]); + + // Now iterate over a few rounds and ensure the corresponding signals are created while network advances + let mut last_round_blocks = Vec::new(); + for round in 1..=30 { + let mut this_round_blocks = Vec::new(); + + // Wait for min round delay to allow blocks to be proposed. + sleep(default_params.min_round_delay).await; + + for core_fixture in &mut cores { + // add the blocks from last round + // this will trigger a block creation for the round and a signal should be emitted + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // A "new round" signal should be received given that all the blocks of previous round have been processed + let new_round = receive( + Duration::from_secs(1), + core_fixture.signal_receivers.new_round_receiver(), + ) + .await; + assert_eq!(new_round, round); + + // Check that a new block has been proposed. + let block = tokio::time::timeout( + Duration::from_secs(1), + core_fixture.block_receiver.recv(), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(block.round(), round); + assert_eq!(block.author(), core_fixture.core.context.own_index); + + // append the new block to this round blocks + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + + let block = core_fixture.core.last_proposed_block(); + + // ensure that produced block is referring to the blocks of last_round + assert_eq!( + block.ancestors().len(), + core_fixture.core.context.committee.size() + ); + for ancestor in block.ancestors() { + if block.round() > 1 { + // don't bother with round 1 block which just contains the genesis blocks. + assert!( + last_round_blocks + .iter() + .any(|block| block.reference() == *ancestor), + "Reference from previous round should be added" + ); + } + } + } + + last_round_blocks = this_round_blocks; + } + for core_fixture in cores { // Check commits have been persisted to store let last_commit = core_fixture @@ -1866,6 +2010,158 @@ mod test { // create the cores and their signals for all the authorities let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]); + // Now iterate over a few rounds and ensure the corresponding signals are created while network advances + let mut last_round_blocks = Vec::new(); + for round in 1..=33 { + let mut this_round_blocks = Vec::new(); + + // Wait for min round delay to allow blocks to be proposed. + sleep(default_params.min_round_delay).await; + + for core_fixture in &mut cores { + // add the blocks from last round + // this will trigger a block creation for the round and a signal should be emitted + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // A "new round" signal should be received given that all the blocks of previous round have been processed + let new_round = receive( + Duration::from_secs(1), + core_fixture.signal_receivers.new_round_receiver(), + ) + .await; + assert_eq!(new_round, round); + + // Check that a new block has been proposed. + let block = tokio::time::timeout( + Duration::from_secs(1), + core_fixture.block_receiver.recv(), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(block.round(), round); + assert_eq!(block.author(), core_fixture.core.context.own_index); + + // append the new block to this round blocks + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + + let block = core_fixture.core.last_proposed_block(); + + // ensure that produced block is referring to the blocks of last_round + assert_eq!( + block.ancestors().len(), + core_fixture.core.context.committee.size() + ); + for ancestor in block.ancestors() { + if block.round() > 1 { + // don't bother with round 1 block which just contains the genesis blocks. + assert!( + last_round_blocks + .iter() + .any(|block| block.reference() == *ancestor), + "Reference from previous round should be added" + ); + } + } + } + + last_round_blocks = this_round_blocks; + } + + for core_fixture in cores { + // Check commits have been persisted to store + let last_commit = core_fixture + .store + .read_last_commit() + .unwrap() + .expect("last commit should be set"); + // There are 31 leader rounds with rounds completed up to and including + // round 33. Round 33 blocks will only include their own blocks, so there + // should only be 30 commits. + // However on a leader schedule change boundary its is possible for a + // new leader to get selected for the same round if the leader elected + // gets swapped allowing for multiple leaders to be committed at a round. + // Meaning with multi leader per round explicitly set to 1 we will have 30, + // otherwise 31. + // NOTE: We used 31 leader rounds to specifically trigger the scenario + // where the leader schedule boundary occurred AND we had a swap to a new + // leader for the same round + let expected_commit_count = match num_leaders_per_round { + Some(1) => 30, + _ => 31, + }; + assert_eq!(last_commit.index(), expected_commit_count); + let all_stored_commits = core_fixture + .store + .scan_commits((0..=CommitIndex::MAX).into()) + .unwrap(); + assert_eq!(all_stored_commits.len(), expected_commit_count as usize); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .bad_nodes + .len(), + 1 + ); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .good_nodes + .len(), + 1 + ); + let expected_reputation_scores = + ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]); + assert_eq!( + core_fixture + .core + .leader_schedule + .leader_swap_table + .read() + .reputation_scores, + expected_reputation_scores + ); + } + } + + // TODO: Remove two tests below this when DistributedVoteScoring is enabled. + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring() + { + parameterized_test_commit_on_leader_schedule_change_boundary_with_vote_scoring(Some(1)) + .await; + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_commit_on_leader_schedule_change_boundary_with_multileader_with_vote_scoring() { + parameterized_test_commit_on_leader_schedule_change_boundary_with_vote_scoring(None).await; + } + + async fn parameterized_test_commit_on_leader_schedule_change_boundary_with_vote_scoring( + num_leaders_per_round: Option, + ) { + telemetry_subscribers::init_for_testing(); + let default_params = Parameters::default(); + + let (mut context, _) = Context::new_for_test(6); + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round); + // create the cores and their signals for all the authorities + let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]); + // Now iterate over a few rounds and ensure the corresponding signals are created while network advances let mut last_round_blocks = Vec::new(); for round in 1..=63 { @@ -1940,7 +2236,7 @@ mod test { // However on a leader schedule change boundary its is possible for a // new leader to get selected for the same round if the leader elected // gets swapped allowing for multiple leaders to be committed at a round. - // Meaning with multi leader per round explicitly set to 1 we will have 60, + // Meaning with multi leader per round explicitly set to 1 we will have 30, // otherwise 61. // NOTE: We used 61 leader rounds to specifically trigger the scenario // where the leader schedule boundary occurred AND we had a swap to a new diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index aa7f3ca718f28..cb1112633287e 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -20,12 +20,13 @@ use crate::{ }, commit::{ load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, - CommitRef, CommitVote, CommittedSubDag, TrustedCommit, GENESIS_COMMIT_INDEX, + CommitRef, CommitVote, TrustedCommit, GENESIS_COMMIT_INDEX, }, context::Context, - leader_scoring::ReputationScores, + leader_scoring::{ReputationScores, ScoringSubdag}, stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::{Store, WriteBatch}, + CommittedSubDag, }; /// DagState provides the API to write and read accepted blocks from the DAG. @@ -61,6 +62,11 @@ pub(crate) struct DagState { // Last committed rounds per authority. last_committed_rounds: Vec, + /// The committed subdags that have been scored but scores have not been used + /// for leader schedule yet. + scoring_subdag: ScoringSubdag, + + // TODO: Remove when DistributedVoteScoring is enabled. /// The list of committed subdags that have been sequenced by the universal /// committer but have yet to be used to calculate reputation scores for the /// next leader schedule. Until then we consider it as "unscored" subdags. @@ -101,8 +107,6 @@ impl DagState { .read_last_commit() .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); - let mut unscored_committed_subdags = Vec::new(); - let commit_info = store .read_last_commit_info() .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); @@ -115,6 +119,9 @@ impl DagState { (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1) }; + let mut unscored_committed_subdags = Vec::new(); + let mut scoring_subdag = ScoringSubdag::new(context.clone()); + if let Some(last_commit) = last_commit.as_ref() { store .scan_commits((commit_recovery_start_index..=last_commit.index()).into()) @@ -141,10 +148,17 @@ impl DagState { tracing::info!( "DagState was initialized with the following state: \ - {last_commit:?}; {last_committed_rounds:?}; {} unscored_committed_subdags;", + {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;", unscored_committed_subdags.len() ); + if context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags)); + } + let mut state = Self { context, genesis, @@ -158,6 +172,7 @@ impl DagState { blocks_to_write: vec![], commits_to_write: vec![], commit_info_to_write: vec![], + scoring_subdag, unscored_committed_subdags, store, cached_rounds, @@ -637,6 +652,11 @@ impl DagState { // We empty the unscored committed subdags to calculate reputation scores. assert!(self.unscored_committed_subdags.is_empty()); + // We create an empty scoring subdag once reputation scores are calculated. + // Note: It is okay for this to not be gated by protocol config as the + // scoring_subdag should be empty in either case at this point. + assert!(self.scoring_subdag.is_empty()); + let commit_info = CommitInfo { committed_rounds: self.last_committed_rounds.clone(), reputation_scores, @@ -820,6 +840,7 @@ impl DagState { .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)) } + // TODO: Remove four methods below this when DistributedVoteScoring is enabled. pub(crate) fn unscored_committed_subdags_count(&self) -> u64 { self.unscored_committed_subdags.len() as u64 } @@ -840,6 +861,34 @@ impl DagState { std::mem::take(&mut self.unscored_committed_subdags) } + pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec) { + self.scoring_subdag.add_subdags(scoring_subdags); + } + + pub(crate) fn clear_scoring_subdag(&mut self) { + self.scoring_subdag.clear(); + } + + pub(crate) fn scoring_subdags_count(&self) -> usize { + self.scoring_subdag.scored_subdags_count() + } + + pub(crate) fn is_scoring_subdag_empty(&self) -> bool { + self.scoring_subdag.is_empty() + } + + pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores { + self.scoring_subdag.calculate_scores() + } + + pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex { + self.scoring_subdag + .commit_range + .as_ref() + .expect("commit range should exist for scoring subdag") + .end() + } + pub(crate) fn genesis_blocks(&self) -> Vec { self.genesis.values().cloned().collect() } @@ -1362,6 +1411,123 @@ mod test { assert_eq!(result, expected); } + // TODO: Remove when DistributedVoteScoring is enabled. + #[tokio::test] + async fn test_flush_and_recovery_with_unscored_subdag() { + telemetry_subscribers::init_for_testing(); + let num_authorities: u32 = 4; + let (mut context, _) = Context::new_for_test(num_authorities as usize); + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let mut dag_state = DagState::new(context.clone(), store.clone()); + + // Create test blocks and commits for round 1 ~ 10 + let num_rounds: u32 = 10; + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=num_rounds).build(); + let mut commits = vec![]; + let leaders = dag_builder + .leader_blocks(1..=num_rounds) + .into_iter() + .flatten() + .collect::>(); + + let mut last_committed_rounds = vec![0; 4]; + for (idx, leader) in leaders.into_iter().enumerate() { + let commit_index = idx as u32 + 1; + let (subdag, commit) = dag_builder.get_sub_dag_and_commit( + leader.clone(), + last_committed_rounds.clone(), + commit_index, + ); + for block in subdag.blocks.iter() { + last_committed_rounds[block.author().value()] = + max(block.round(), last_committed_rounds[block.author().value()]); + } + commits.push(commit); + } + + // Add the blocks from first 5 rounds and first 5 commits to the dag state + let temp_commits = commits.split_off(5); + dag_state.accept_blocks(dag_builder.blocks(1..=5)); + for commit in commits.clone() { + dag_state.add_commit(commit); + } + + // Flush the dag state + dag_state.flush(); + + // Add the rest of the blocks and commits to the dag state + dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds)); + for commit in temp_commits.clone() { + dag_state.add_commit(commit); + } + + // All blocks should be found in DagState. + let all_blocks = dag_builder.blocks(6..=num_rounds); + let block_refs = all_blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let result = dag_state + .get_blocks(&block_refs) + .into_iter() + .map(|b| b.unwrap()) + .collect::>(); + assert_eq!(result, all_blocks); + + // Last commit index should be 10. + assert_eq!(dag_state.last_commit_index(), 10); + assert_eq!(dag_state.last_committed_rounds(), last_committed_rounds); + + // Destroy the dag state. + drop(dag_state); + + // Recover the state from the store + let dag_state = DagState::new(context.clone(), store.clone()); + + // Blocks of first 5 rounds should be found in DagState. + let blocks = dag_builder.blocks(1..=5); + let block_refs = blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let result = dag_state + .get_blocks(&block_refs) + .into_iter() + .map(|b| b.unwrap()) + .collect::>(); + assert_eq!(result, blocks); + + // Blocks above round 5 should not be in DagState, because they are not flushed. + let missing_blocks = dag_builder.blocks(6..=num_rounds); + let block_refs = missing_blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let retrieved_blocks = dag_state + .get_blocks(&block_refs) + .into_iter() + .flatten() + .collect::>(); + assert!(retrieved_blocks.is_empty()); + + // Last commit index should be 5. + assert_eq!(dag_state.last_commit_index(), 5); + + // This is the last_commmit_rounds of the first 5 commits that were flushed + let expected_last_committed_rounds = vec![4, 5, 4, 4]; + assert_eq!( + dag_state.last_committed_rounds(), + expected_last_committed_rounds + ); + // Unscored subdags will be recoverd based on the flushed commits and no commit info + assert_eq!(dag_state.unscored_committed_subdags_count(), 5); + } + #[tokio::test] async fn test_flush_and_recovery() { telemetry_subscribers::init_for_testing(); @@ -1472,7 +1638,7 @@ mod test { expected_last_committed_rounds ); // Unscored subdags will be recoverd based on the flushed commits and no commit info - assert_eq!(dag_state.unscored_committed_subdags_count(), 5); + assert_eq!(dag_state.scoring_subdags_count(), 5); } #[tokio::test] diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index bbbb68fe03978..756cc9361d86b 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -16,10 +16,6 @@ use crate::{ context::Context, dag_state::DagState, leader_scoring::{ReputationScoreCalculator, ReputationScores}, - leader_scoring_strategy::{ - CertificateScoringStrategy, CertifiedVoteScoringStrategyV1, CertifiedVoteScoringStrategyV2, - ScoringStrategy, VoteScoringStrategy, - }, CommitIndex, Round, }; @@ -31,7 +27,6 @@ pub(crate) struct LeaderSchedule { pub leader_swap_table: Arc>, context: Arc, num_commits_per_schedule: u64, - scoring_strategy: Arc, } impl LeaderSchedule { @@ -48,7 +43,6 @@ impl LeaderSchedule { context, num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE, leader_swap_table: Arc::new(RwLock::new(leader_swap_table)), - scoring_strategy: Self::choose_scoring_strategy(), } } @@ -72,58 +66,110 @@ impl LeaderSchedule { }, ); - tracing::info!( + if context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + tracing::info!( + "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.", + dag_state.read().scoring_subdags_count(), + ); + } else { + // TODO: Remove when DistributedVoteScoring is enabled. + tracing::info!( "LeaderSchedule recovered using {leader_swap_table:?}. There are {} pending unscored subdags in DagState.", dag_state.read().unscored_committed_subdags_count(), ); + } // create the schedule Self::new(context, leader_swap_table) } - // TODO: remove this once scoring strategy is finalized - fn choose_scoring_strategy() -> Arc { - if let Ok(scoring_strategy) = std::env::var("CONSENSUS_SCORING_STRATEGY") { - tracing::info!( - "Using scoring strategy {scoring_strategy} for ReputationScoreCalculator" - ); - let scoring_strategy: Arc = match scoring_strategy.as_str() { - "vote" => Arc::new(VoteScoringStrategy {}), - "certified_vote_v1" => Arc::new(CertifiedVoteScoringStrategyV1 {}), - "certified_vote_v2" => Arc::new(CertifiedVoteScoringStrategyV2 {}), - "certificate" => Arc::new(CertificateScoringStrategy {}), - _ => Arc::new(VoteScoringStrategy {}), - }; - scoring_strategy - } else { - tracing::info!("Using scoring strategy vote for ReputationScoreCalculator"); - Arc::new(VoteScoringStrategy {}) - } - } - pub(crate) fn commits_until_leader_schedule_update( &self, dag_state: Arc>, ) -> usize { - let unscored_committed_subdags_count = dag_state.read().unscored_committed_subdags_count(); + let subdag_count = if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + dag_state.read().scoring_subdags_count() as u64 + } else { + // TODO: Remove when DistributedVoteScoring is enabled. + dag_state.read().unscored_committed_subdags_count() + }; + assert!( - unscored_committed_subdags_count <= self.num_commits_per_schedule, - "Unscored committed subdags count exceeds the number of commits per schedule" + subdag_count <= self.num_commits_per_schedule, + "Committed subdags count exceeds the number of commits per schedule" ); self.num_commits_per_schedule - .checked_sub(unscored_committed_subdags_count) + .checked_sub(subdag_count) .unwrap() as usize } - /// Checks whether the dag state unscored sub dags list is empty. If yes then that means that + /// Checks whether the dag state sub dags list is empty. If yes then that means that /// either (1) the system has just started and there is no unscored sub dag available (2) the /// schedule has updated - new scores have been calculated. Both cases we consider as valid cases /// where the schedule has been updated. pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock) -> bool { - dag_state.read().unscored_committed_subdags_count() == 0 + if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + dag_state.read().is_scoring_subdag_empty() + } else { + // TODO: Remove when DistributedVoteScoring is enabled. + dag_state.read().unscored_committed_subdags_count() == 0 + } + } + + pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock) { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["LeaderSchedule::update_leader_schedule"]) + .start_timer(); + + let (reputation_scores, last_commit_index) = { + let dag_state = dag_state.read(); + let reputation_scores = dag_state.calculate_scoring_subdag_scores(); + + let last_commit_index = dag_state.scoring_subdag_commit_range(); + + (reputation_scores, last_commit_index) + }; + + { + let mut dag_state = dag_state.write(); + // Clear scoring subdag as we have updated the leader schedule + dag_state.clear_scoring_subdag(); + // Buffer score and last commit rounds in dag state to be persisted later + dag_state.add_commit_info(reputation_scores.clone()); + } + + self.update_leader_swap_table(LeaderSwapTable::new( + self.context.clone(), + last_commit_index, + reputation_scores.clone(), + )); + + reputation_scores.update_metrics(self.context.clone()); + + self.context + .metrics + .node_metrics + .num_of_bad_nodes + .set(self.leader_swap_table.read().bad_nodes.len() as i64); } - pub(crate) fn update_leader_schedule(&self, dag_state: &RwLock) { + // TODO: Remove when DistributedVoteScoring is enabled. + pub(crate) fn update_leader_schedule_v1(&self, dag_state: &RwLock) { let _s = self .context .metrics @@ -142,12 +188,8 @@ impl LeaderSchedule { .scope_processing_time .with_label_values(&["ReputationScoreCalculator::calculate"]) .start_timer(); - let reputation_scores = ReputationScoreCalculator::new( - self.context.clone(), - &unscored_subdags, - self.scoring_strategy.as_ref(), - ) - .calculate(); + let reputation_scores = + ReputationScoreCalculator::new(self.context.clone(), &unscored_subdags).calculate(); drop(score_calculation_timer); reputation_scores.update_metrics(self.context.clone()); @@ -624,7 +666,7 @@ mod tests { ); // Leader Scoring & Schedule Change is disabled, unscored subdags should not be accumulated. - assert_eq!(0, dag_state.read().unscored_committed_subdags_count()); + assert_eq!(0, dag_state.read().scoring_subdags_count()); let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); @@ -704,10 +746,10 @@ mod tests { last_committed_rounds, dag_state.read().last_committed_rounds() ); - let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); - assert_eq!(1, dag_state.read().unscored_committed_subdags_count()); - let actual_subdag = actual_unscored_subdags[0].clone(); - assert_eq!(*subdags.last().unwrap(), actual_subdag); + assert_eq!(1, dag_state.read().scoring_subdags_count()); + let recovered_scores = dag_state.read().calculate_scoring_subdag_scores(); + let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]); + assert_eq!(recovered_scores, expected_scores); let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); @@ -747,7 +789,7 @@ mod tests { expected_last_committed_rounds, dag_state.read().last_committed_rounds() ); - assert_eq!(0, dag_state.read().unscored_committed_subdags_count()); + assert_eq!(0, dag_state.read().scoring_subdags_count()); let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); @@ -771,7 +813,7 @@ mod tests { let mut dag_builder = DagBuilder::new(context.clone()); dag_builder.layers(1..=2).build(); - let mut expected_unscored_subdags = vec![]; + let mut expected_scored_subdags = vec![]; let mut expected_commits = vec![]; let leaders = dag_builder .leader_blocks(1..=2) @@ -794,7 +836,7 @@ mod tests { max(block.round(), last_committed_rounds[block.author().value()]); } expected_commits.push(commit); - expected_unscored_subdags.push(subdag); + expected_scored_subdags.push(subdag); } // The CommitInfo for the first 2 commits are written to store. 10 commits @@ -817,15 +859,13 @@ mod tests { last_committed_rounds, dag_state.read().last_committed_rounds() ); - let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); assert_eq!( - expected_unscored_subdags.len() as u64, - dag_state.read().unscored_committed_subdags_count() + expected_scored_subdags.len(), + dag_state.read().scoring_subdags_count() ); - for (idx, expected_subdag) in expected_unscored_subdags.into_iter().enumerate() { - let actual_subdag = actual_unscored_subdags[idx].clone(); - assert_eq!(expected_subdag, actual_subdag); - } + let recovered_scores = dag_state.read().calculate_scoring_subdag_scores(); + let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]); + assert_eq!(recovered_scores, expected_scores); let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); @@ -852,9 +892,7 @@ mod tests { CommitRef::new(1, CommitDigest::MIN), vec![], )]; - dag_state - .write() - .add_unscored_committed_subdags(unscored_subdags); + dag_state.write().add_scoring_subdags(unscored_subdags); let commits_until_leader_schedule_update = leader_schedule.commits_until_leader_schedule_update(dag_state.clone()); @@ -953,7 +991,7 @@ mod tests { let mut dag_state_write = dag_state.write(); dag_state_write.set_last_commit(last_commit); - dag_state_write.add_unscored_committed_subdags(unscored_subdags); + dag_state_write.add_scoring_subdags(unscored_subdags); drop(dag_state_write); assert_eq!( @@ -961,7 +999,7 @@ mod tests { AuthorityIndex::new_for_test(0) ); - leader_schedule.update_leader_schedule(&dag_state); + leader_schedule.update_leader_schedule_v2(&dag_state); let leader_swap_table = leader_schedule.leader_swap_table.read(); assert_eq!(leader_swap_table.good_nodes.len(), 1); @@ -1155,4 +1193,480 @@ mod tests { // Update leader from old swap table to new invalid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); } + + // TODO: Remove all tests below this when DistributedVoteScoring is enabled. + #[tokio::test] + async fn test_leader_schedule_from_store_with_no_scores_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_mysticeti_leader_scoring_and_schedule_for_testing(false); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + + let leader_timestamp = context.clock.timestamp_utc_ms(); + let blocks = vec![ + VerifiedBlock::new_for_test( + TestBlock::new(10, 2) + .set_timestamp_ms(leader_timestamp) + .build(), + ), + VerifiedBlock::new_for_test(TestBlock::new(9, 0).build()), + VerifiedBlock::new_for_test(TestBlock::new(9, 2).build()), + VerifiedBlock::new_for_test(TestBlock::new(9, 3).build()), + ]; + + let leader = blocks[0].clone(); + let leader_ref = leader.reference(); + let last_commit_index = 10; + let last_commit = TrustedCommit::new_for_test( + last_commit_index, + CommitDigest::MIN, + leader_timestamp, + leader_ref, + blocks + .iter() + .map(|block| block.reference()) + .collect::>(), + ); + + // The CommitInfo for the first 10 commits are written to store. This is the + // info that LeaderSchedule will be recovered from + let committed_rounds = vec![9, 9, 10, 9]; + let commit_ref = CommitRef::new(10, CommitDigest::MIN); + let commit_info = CommitInfo { + reputation_scores: ReputationScores::default(), + committed_rounds, + }; + + store + .write( + WriteBatch::default() + .commit_info(vec![(commit_ref, commit_info)]) + .blocks(blocks) + .commits(vec![last_commit]), + ) + .unwrap(); + + // CommitIndex '11' will be written to store. This should result in the cached + // last_committed_rounds & unscored subdags in DagState to be updated with the + // latest commit information on recovery. + let leader_timestamp = context.clock.timestamp_utc_ms(); + let blocks = vec![ + VerifiedBlock::new_for_test( + TestBlock::new(11, 3) + .set_timestamp_ms(leader_timestamp) + .build(), + ), + VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()), + VerifiedBlock::new_for_test(TestBlock::new(10, 1).build()), + VerifiedBlock::new_for_test(TestBlock::new(10, 3).build()), + ]; + + let leader = blocks[0].clone(); + let leader_ref = leader.reference(); + let last_commit_index = 11; + let expected_last_committed_rounds = vec![10, 10, 10, 11]; + let last_commit = TrustedCommit::new_for_test( + last_commit_index, + CommitDigest::MIN, + leader_timestamp, + leader_ref, + blocks + .iter() + .map(|block| block.reference()) + .collect::>(), + ); + store + .write( + WriteBatch::default() + .blocks(blocks) + .commits(vec![last_commit]), + ) + .unwrap(); + + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + + // Check that DagState recovery from stored CommitInfo worked correctly + assert_eq!( + expected_last_committed_rounds, + dag_state.read().last_committed_rounds() + ); + + // Leader Scoring & Schedule Change is disabled, unscored subdags should not be accumulated. + assert_eq!(0, dag_state.read().unscored_committed_subdags_count()); + + let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); + + // Check that LeaderSchedule recovery from stored CommitInfo worked correctly + let leader_swap_table = leader_schedule.leader_swap_table.read(); + assert_eq!(leader_swap_table.good_nodes.len(), 0); + assert_eq!(leader_swap_table.bad_nodes.len(), 0); + } + + #[tokio::test] + async fn test_leader_schedule_from_store_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_consensus_bad_nodes_stake_threshold_for_testing(33); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + + // Populate fully connected test blocks for round 0 ~ 11, authorities 0 ~ 3. + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=11).build(); + let mut subdags = vec![]; + let mut expected_commits = vec![]; + let leaders = dag_builder + .leader_blocks(1..=11) + .into_iter() + .flatten() + .collect::>(); + let mut blocks_to_write = vec![]; + + let mut last_committed_rounds = vec![0; 4]; + for (idx, leader) in leaders.into_iter().enumerate() { + let commit_index = idx as u32 + 1; + let (sub_dag, commit) = dag_builder.get_sub_dag_and_commit( + leader.clone(), + last_committed_rounds.clone(), + commit_index, + ); + for block in sub_dag.blocks.iter() { + blocks_to_write.push(block.clone()); + last_committed_rounds[block.author().value()] = + max(block.round(), last_committed_rounds[block.author().value()]); + } + + expected_commits.push(commit); + subdags.push(sub_dag); + } + + // The CommitInfo for the first 10 commits are written to store. This is the + // info that LeaderSchedule will be recovered from + let commit_range = (1..=10).into(); + let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]); + let committed_rounds = vec![9, 9, 10, 9]; + let commit_ref = expected_commits[9].reference(); + let commit_info = CommitInfo { + reputation_scores, + committed_rounds, + }; + + // CommitIndex '11' will be written to store. This should result in the cached + // last_committed_rounds & unscored subdags in DagState to be updated with the + // latest commit information on recovery. + store + .write( + WriteBatch::default() + .commit_info(vec![(commit_ref, commit_info)]) + .blocks(blocks_to_write) + .commits(expected_commits), + ) + .unwrap(); + + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + + // Check that DagState recovery from stored CommitInfo worked correctly + assert_eq!( + last_committed_rounds, + dag_state.read().last_committed_rounds() + ); + let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); + assert_eq!(1, dag_state.read().unscored_committed_subdags_count()); + let actual_subdag = actual_unscored_subdags[0].clone(); + assert_eq!(*subdags.last().unwrap(), actual_subdag); + + let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); + + // Check that LeaderSchedule recovery from stored CommitInfo worked correctly + let leader_swap_table = leader_schedule.leader_swap_table.read(); + assert_eq!(leader_swap_table.good_nodes.len(), 1); + assert_eq!( + leader_swap_table.good_nodes[0].0, + AuthorityIndex::new_for_test(0) + ); + assert_eq!(leader_swap_table.bad_nodes.len(), 1); + assert!( + leader_swap_table + .bad_nodes + .contains_key(&AuthorityIndex::new_for_test(2)), + "{:?}", + leader_swap_table.bad_nodes + ); + } + + #[tokio::test] + async fn test_leader_schedule_from_store_no_commits_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_consensus_bad_nodes_stake_threshold_for_testing(33); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + + let expected_last_committed_rounds = vec![0, 0, 0, 0]; + + // Check that DagState recovery from stored CommitInfo worked correctly + assert_eq!( + expected_last_committed_rounds, + dag_state.read().last_committed_rounds() + ); + assert_eq!(0, dag_state.read().unscored_committed_subdags_count()); + + let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); + + // Check that LeaderSchedule recovery from stored CommitInfo worked correctly + let leader_swap_table = leader_schedule.leader_swap_table.read(); + assert_eq!(leader_swap_table.good_nodes.len(), 0); + assert_eq!(leader_swap_table.bad_nodes.len(), 0); + } + + #[tokio::test] + async fn test_leader_schedule_from_store_no_commit_info_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_consensus_bad_nodes_stake_threshold_for_testing(33); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + + // Populate fully connected test blocks for round 0 ~ 2, authorities 0 ~ 3. + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=2).build(); + + let mut expected_unscored_subdags = vec![]; + let mut expected_commits = vec![]; + let leaders = dag_builder + .leader_blocks(1..=2) + .into_iter() + .flatten() + .collect::>(); + let mut blocks_to_write = vec![]; + + let mut last_committed_rounds = vec![0; 4]; + for (idx, leader) in leaders.into_iter().enumerate() { + let commit_index = idx as u32 + 1; + let (subdag, commit) = dag_builder.get_sub_dag_and_commit( + leader.clone(), + last_committed_rounds.clone(), + commit_index, + ); + for block in subdag.blocks.iter() { + blocks_to_write.push(block.clone()); + last_committed_rounds[block.author().value()] = + max(block.round(), last_committed_rounds[block.author().value()]); + } + expected_commits.push(commit); + expected_unscored_subdags.push(subdag); + } + + // The CommitInfo for the first 2 commits are written to store. 10 commits + // would have been required for a leader schedule update so at this point + // no commit info should have been persisted and no leader schedule should + // be recovered. However dag state should have properly recovered the + // unscored subdags & last committed rounds. + store + .write( + WriteBatch::default() + .blocks(blocks_to_write) + .commits(expected_commits), + ) + .unwrap(); + + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + + // Check that DagState recovery from stored CommitInfo worked correctly + assert_eq!( + last_committed_rounds, + dag_state.read().last_committed_rounds() + ); + let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); + assert_eq!( + expected_unscored_subdags.len() as u64, + dag_state.read().unscored_committed_subdags_count() + ); + for (idx, expected_subdag) in expected_unscored_subdags.into_iter().enumerate() { + let actual_subdag = actual_unscored_subdags[idx].clone(); + assert_eq!(expected_subdag, actual_subdag); + } + + let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); + + // Check that LeaderSchedule recovery from stored CommitInfo worked correctly + let leader_swap_table = leader_schedule.leader_swap_table.read(); + assert_eq!(leader_swap_table.good_nodes.len(), 0); + assert_eq!(leader_swap_table.bad_nodes.len(), 0); + } + + #[tokio::test] + async fn test_leader_schedule_commits_until_leader_schedule_update_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + let context = Arc::new(context); + let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); + + let dag_state = Arc::new(RwLock::new(DagState::new( + context.clone(), + Arc::new(MemStore::new()), + ))); + let unscored_subdags = vec![CommittedSubDag::new( + BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN), + vec![], + context.clock.timestamp_utc_ms(), + CommitRef::new(1, CommitDigest::MIN), + vec![], + )]; + dag_state + .write() + .add_unscored_committed_subdags(unscored_subdags); + + let commits_until_leader_schedule_update = + leader_schedule.commits_until_leader_schedule_update(dag_state.clone()); + assert_eq!(commits_until_leader_schedule_update, 299); + } + + // TODO: Remove when DistributedVoteScoring is enabled. + #[tokio::test] + async fn test_leader_schedule_update_leader_schedule_with_vote_scoring() { + telemetry_subscribers::init_for_testing(); + let mut context = Context::new_for_test(4).0; + context + .protocol_config + .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + context + .protocol_config + .set_consensus_bad_nodes_stake_threshold_for_testing(33); + let context = Arc::new(context); + let leader_schedule = Arc::new(LeaderSchedule::new( + context.clone(), + LeaderSwapTable::default(), + )); + let dag_state = Arc::new(RwLock::new(DagState::new( + context.clone(), + Arc::new(MemStore::new()), + ))); + + // Populate fully connected test blocks for round 0 ~ 4, authorities 0 ~ 3. + let max_round: u32 = 4; + let num_authorities: u32 = 4; + + let mut blocks = Vec::new(); + let (genesis_references, genesis): (Vec<_>, Vec<_>) = context + .committee + .authorities() + .map(|index| { + let author_idx = index.0.value() as u32; + let block = TestBlock::new(0, author_idx).build(); + VerifiedBlock::new_for_test(block) + }) + .map(|block| (block.reference(), block)) + .unzip(); + blocks.extend(genesis); + + let mut ancestors = genesis_references; + let mut leader = None; + for round in 1..=max_round { + let mut new_ancestors = vec![]; + for author in 0..num_authorities { + let base_ts = round as BlockTimestampMs * 1000; + let block = VerifiedBlock::new_for_test( + TestBlock::new(round, author) + .set_timestamp_ms(base_ts + (author + round) as u64) + .set_ancestors(ancestors.clone()) + .build(), + ); + new_ancestors.push(block.reference()); + + // Simulate referenced block which was part of another committed + // subdag. + if round == 3 && author == 0 { + tracing::info!("Skipping {block} in committed subdags blocks"); + continue; + } + + blocks.push(block.clone()); + + // only write one block for the final round, which is the leader + // of the committed subdag. + if round == max_round { + leader = Some(block.clone()); + break; + } + } + ancestors = new_ancestors; + } + + let leader_block = leader.unwrap(); + let leader_ref = leader_block.reference(); + let commit_index = 1; + + let last_commit = TrustedCommit::new_for_test( + commit_index, + CommitDigest::MIN, + context.clock.timestamp_utc_ms(), + leader_ref, + blocks + .iter() + .map(|block| block.reference()) + .collect::>(), + ); + + let unscored_subdags = vec![CommittedSubDag::new( + leader_ref, + blocks, + context.clock.timestamp_utc_ms(), + last_commit.reference(), + vec![], + )]; + + let mut dag_state_write = dag_state.write(); + dag_state_write.set_last_commit(last_commit); + dag_state_write.add_unscored_committed_subdags(unscored_subdags); + drop(dag_state_write); + + assert_eq!( + leader_schedule.elect_leader(4, 0), + AuthorityIndex::new_for_test(0) + ); + + leader_schedule.update_leader_schedule_v1(&dag_state); + + let leader_swap_table = leader_schedule.leader_swap_table.read(); + assert_eq!(leader_swap_table.good_nodes.len(), 1); + assert_eq!( + leader_swap_table.good_nodes[0].0, + AuthorityIndex::new_for_test(2) + ); + assert_eq!(leader_swap_table.bad_nodes.len(), 1); + assert!(leader_swap_table + .bad_nodes + .contains_key(&AuthorityIndex::new_for_test(0))); + assert_eq!( + leader_schedule.elect_leader(4, 0), + AuthorityIndex::new_for_test(2) + ); + } } diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 380003debd37d..c7b151607cd01 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashSet}, fmt::Debug, ops::Bound::{Excluded, Included}, sync::Arc, @@ -12,15 +12,14 @@ use consensus_config::AuthorityIndex; use serde::{Deserialize, Serialize}; use crate::{ - block::{BlockAPI, BlockDigest, BlockRef, Slot, VerifiedBlock}, + block::{BlockAPI, BlockDigest, BlockRef, Slot}, commit::{CommitRange, CommittedSubDag}, context::Context, - leader_scoring_strategy::ScoringStrategy, stake_aggregator::{QuorumThreshold, StakeAggregator}, - Round, + Round, VerifiedBlock, }; -pub(crate) struct ReputationScoreCalculator<'a> { +pub(crate) struct ReputationScoreCalculator { // The range of commits that these scores are calculated from. pub(crate) commit_range: CommitRange, // The scores per authority. Vec index is the `AuthorityIndex`. @@ -31,19 +30,10 @@ pub(crate) struct ReputationScoreCalculator<'a> { // the subdags are then combined into one `UnscoredSubdag` so that we can // calculate the scores for the leaders in this subdag. unscored_subdag: UnscoredSubdag, - // There are multiple scoring strategies that can be used to calculate the scores - // and the `ReputationScoreCalculator` is responsible for applying the strategy. - // For now this is dynamic while we are experimenting but eventually we can - // replace this with the final strategy that works best. - scoring_strategy: &'a dyn ScoringStrategy, } -impl<'a> ReputationScoreCalculator<'a> { - pub(crate) fn new( - context: Arc, - unscored_subdags: &[CommittedSubDag], - scoring_strategy: &'a dyn ScoringStrategy, - ) -> Self { +impl ReputationScoreCalculator { + pub(crate) fn new(context: Arc, unscored_subdags: &[CommittedSubDag]) -> Self { let num_authorities = context.committee.size(); let scores_per_authority = vec![0_u64; num_authorities]; @@ -58,7 +48,6 @@ impl<'a> ReputationScoreCalculator<'a> { Self { unscored_subdag, commit_range, - scoring_strategy, scores_per_authority, } } @@ -68,10 +57,7 @@ impl<'a> ReputationScoreCalculator<'a> { for leader in leaders { let leader_slot = Slot::from(leader); tracing::trace!("Calculating score for leader {leader_slot}"); - self.add_scores( - self.scoring_strategy - .calculate_scores_for_leader(&self.unscored_subdag, leader_slot), - ); + self.add_scores(self.calculate_scores_for_leader(&self.unscored_subdag, leader_slot)); } ReputationScores::new(self.commit_range.clone(), self.scores_per_authority.clone()) @@ -84,6 +70,47 @@ impl<'a> ReputationScoreCalculator<'a> { self.scores_per_authority[authority_idx] += *score; } } + + // VoteScoringStrategy + // This scoring strategy will give one point to any votes for the leader. + fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { + let num_authorities = subdag.context.committee.size(); + let mut scores_per_authority = vec![0_u64; num_authorities]; + + let leader_blocks = subdag.get_blocks_at_slot(leader_slot); + + if leader_blocks.is_empty() { + tracing::trace!("[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring", subdag.context.own_index); + return scores_per_authority; + } + + // At this point we are guaranteed that there is only one leader per slot + // because we are operating on committed subdags. + assert!(leader_blocks.len() == 1); + + let leader_block = leader_blocks.first().unwrap(); + + let voting_round = leader_slot.round + 1; + let voting_blocks = subdag.get_blocks_at_round(voting_round); + for potential_vote in voting_blocks { + // TODO: use the decided leader as input instead of leader slot. If the leader was skipped, + // votes to skip should be included in the score as well. + if subdag.is_vote(&potential_vote, leader_block) { + let authority = potential_vote.author(); + tracing::trace!( + "Found a vote {} for leader {leader_block} from authority {authority}", + potential_vote.reference() + ); + tracing::trace!( + "[{}] scores +1 reputation for {authority}!", + subdag.context.own_index + ); + scores_per_authority[authority] += 1; + } + } + + scores_per_authority + } } #[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -138,6 +165,182 @@ impl ReputationScores { } } +/// ScoringSubdag represents the scoring votes in a collection of subdags across +/// multiple commits. +/// These subdags are "scoring" for the purposes of leader schedule change. As +/// new subdags are added, the DAG is traversed and votes for leaders are recorded +/// and scored along with stake. On a leader schedule change, finalized reputation +/// scores will be calculated based on the votes & stake collected in this struct. +pub(crate) struct ScoringSubdag { + pub(crate) context: Arc, + pub(crate) commit_range: Option, + // Only includes committed leaders for now. + // TODO: Include skipped leaders as well + pub(crate) leaders: HashSet, + // A map of votes to the stake of strongly linked blocks that include that vote + // Note: Inlcuding stake aggregator so that we can quickly check if it exceeds + // quourum threshold and only include those scores for certain scoring strategies. + pub(crate) votes: BTreeMap>, +} + +impl ScoringSubdag { + pub(crate) fn new(context: Arc) -> Self { + Self { + context, + commit_range: None, + leaders: HashSet::new(), + votes: BTreeMap::new(), + } + } + + pub(crate) fn add_subdags(&mut self, committed_subdags: Vec) { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"]) + .start_timer(); + for subdag in committed_subdags { + // If the commit range is not set, then set it to the range of the first + // committed subdag index. + if self.commit_range.is_none() { + self.commit_range = Some(CommitRange::new( + subdag.commit_ref.index..=subdag.commit_ref.index, + )); + } else { + let commit_range = self.commit_range.as_mut().unwrap(); + commit_range.extend_to(subdag.commit_ref.index); + } + + // Add the committed leader to the list of leaders we will be scoring. + tracing::trace!("Adding new committed leader {} for scoring", subdag.leader); + self.leaders.insert(subdag.leader); + + // Check each block in subdag. Blocks are in order so we should traverse the + // oldest blocks first + for block in subdag.blocks { + for ancestor in block.ancestors() { + // Weak links may point to blocks with lower round numbers + // than strong links. + if ancestor.round != block.round().saturating_sub(1) { + continue; + } + + // If a blocks strong linked ancestor is in leaders, then + // it's a vote for leader. + if self.leaders.contains(ancestor) { + // There should never be duplicate references to blocks + // with strong linked ancestors to leader. + tracing::trace!( + "Found a vote {} for leader {ancestor} from authority {}", + block.reference(), + block.author() + ); + assert!(self + .votes + .insert(block.reference(), StakeAggregator::new()) + .is_none(), "Vote {block} already exists. Duplicate vote found for leader {ancestor}"); + } + + if let Some(stake) = self.votes.get_mut(ancestor) { + // Vote is strongly linked to a future block, so we + // consider this a distributed vote. + tracing::trace!( + "Found a distributed vote {ancestor} from authority {}", + ancestor.author + ); + stake.add(block.author(), &self.context.committee); + } + } + } + } + } + + // Iterate through votes and calculate scores for each authority based on + // scoring strategy that is used. (Vote or CertifiedVote) + pub(crate) fn calculate_scores(&self) -> ReputationScores { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::calculate_scores"]) + .start_timer(); + + let scores_per_authority = self.score_distributed_votes(); + + // TODO: Normalize scores + ReputationScores::new( + self.commit_range + .clone() + .expect("CommitRange should be set if calculate_scores is called."), + scores_per_authority, + ) + } + + /// This scoring strategy aims to give scores based on overall vote distribution. + /// Instead of only giving one point for each vote that is included in 2f+1 + /// blocks. We give a score equal to the amount of stake of all blocks that + /// included the vote. + fn score_distributed_votes(&self) -> Vec { + let num_authorities = self.context.committee.size(); + let mut scores_per_authority = vec![0_u64; num_authorities]; + + for (vote, stake_agg) in self.votes.iter() { + let authority = vote.author; + let stake = stake_agg.stake(); + tracing::trace!( + "[{}] scores +{stake} reputation for {authority}!", + self.context.own_index, + ); + scores_per_authority[authority.value()] += stake; + } + scores_per_authority + } + + /// This scoring strategy gives points equal to the amount of stake in blocks + /// that include the authority's vote, if that amount of total_stake > 2f+1. + /// We consider this a certified vote. + // TODO: This will be used for ancestor selection + #[allow(unused)] + fn score_certified_votes(&self) -> Vec { + let num_authorities = self.context.committee.size(); + let mut scores_per_authority = vec![0_u64; num_authorities]; + + for (vote, stake_agg) in self.votes.iter() { + let authority = vote.author; + if stake_agg.reached_threshold(&self.context.committee) { + let stake = stake_agg.stake(); + tracing::trace!( + "[{}] scores +{stake} reputation for {authority}!", + self.context.own_index, + ); + scores_per_authority[authority.value()] += stake; + } + } + scores_per_authority + } + + pub(crate) fn scored_subdags_count(&self) -> usize { + if let Some(commit_range) = &self.commit_range { + commit_range.size() + } else { + 0 + } + } + + pub(crate) fn is_empty(&self) -> bool { + self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none() + } + + pub(crate) fn clear(&mut self) { + self.leaders.clear(); + self.votes.clear(); + self.commit_range = None; + } +} + /// UnscoredSubdag represents a collection of subdags across multiple commits. /// These subdags are considered unscored for the purposes of leader schedule /// change. On a leader schedule change, reputation scores will be calculated @@ -239,44 +442,6 @@ impl UnscoredSubdag { self.find_supported_leader_block(leader_slot, potential_vote) == Some(reference) } - pub(crate) fn is_certificate( - &self, - potential_certificate: &VerifiedBlock, - leader_block: &VerifiedBlock, - all_votes: &mut HashMap, - ) -> bool { - let mut votes_stake_aggregator = StakeAggregator::::new(); - for reference in potential_certificate.ancestors() { - let is_vote = if let Some(is_vote) = all_votes.get(reference) { - *is_vote - } else if let Some(potential_vote) = self.get_block(reference) { - let is_vote = self.is_vote(&potential_vote, leader_block); - all_votes.insert(*reference, is_vote); - is_vote - } else { - tracing::trace!( - "Potential vote not found in unscored committed subdags: {:?}", - reference - ); - false - }; - - if is_vote { - tracing::trace!("{reference} is a vote for {leader_block}"); - if votes_stake_aggregator.add(reference.author, &self.context.committee) { - tracing::trace!( - "{potential_certificate} is a certificate for leader {leader_block}" - ); - return true; - } - } else { - tracing::trace!("{reference} is not a vote for {leader_block}",); - } - } - tracing::trace!("{potential_certificate} is not a certificate for leader {leader_block}"); - false - } - pub(crate) fn get_blocks_at_slot(&self, slot: Slot) -> Vec { let mut blocks = vec![]; for (_block_ref, block) in self.blocks.range(( @@ -313,8 +478,7 @@ mod tests { use std::cmp::max; use super::*; - use crate::commit::{CommitDigest, CommitRef}; - use crate::{leader_scoring_strategy::VoteScoringStrategy, test_dag_builder::DagBuilder}; + use crate::{test_dag_builder::DagBuilder, CommitDigest, CommitRef}; #[tokio::test] async fn test_reputation_scores_authorities_by_score() { @@ -368,6 +532,99 @@ mod tests { ); } + #[tokio::test] + async fn test_scoring_subdag() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3. + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=3).build(); + // Build round 4 but with just the leader block + dag_builder + .layer(4) + .authorities(vec![ + AuthorityIndex::new_for_test(1), + AuthorityIndex::new_for_test(2), + AuthorityIndex::new_for_test(3), + ]) + .skip_block() + .build(); + + let leaders = dag_builder + .leader_blocks(1..=4) + .into_iter() + .flatten() + .collect::>(); + + let mut scoring_subdag = ScoringSubdag::new(context.clone()); + let mut last_committed_rounds = vec![0; 4]; + for (idx, leader) in leaders.into_iter().enumerate() { + let commit_index = idx as u32 + 1; + let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( + leader, + last_committed_rounds.clone(), + commit_index, + ); + for block in subdag.blocks.iter() { + last_committed_rounds[block.author().value()] = + max(block.round(), last_committed_rounds[block.author().value()]); + } + scoring_subdag.add_subdags(vec![subdag]); + } + + let scores = scoring_subdag.calculate_scores(); + assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]); + assert_eq!(scores.commit_range, (1..=4).into()); + } + + #[tokio::test] + async fn test_certified_vote_scoring_subdag() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3. + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=3).build(); + // Build round 4 but with just the leader block + dag_builder + .layer(4) + .authorities(vec![ + AuthorityIndex::new_for_test(1), + AuthorityIndex::new_for_test(2), + AuthorityIndex::new_for_test(3), + ]) + .skip_block() + .build(); + + let leaders = dag_builder + .leader_blocks(1..=4) + .into_iter() + .flatten() + .collect::>(); + + let mut scoring_subdag = ScoringSubdag::new(context.clone()); + let mut last_committed_rounds = vec![0; 4]; + for (idx, leader) in leaders.into_iter().enumerate() { + let commit_index = idx as u32 + 1; + let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( + leader, + last_committed_rounds.clone(), + commit_index, + ); + for block in subdag.blocks.iter() { + last_committed_rounds[block.author().value()] = + max(block.round(), last_committed_rounds[block.author().value()]); + } + scoring_subdag.add_subdags(vec![subdag]); + } + + let scores_per_authority = scoring_subdag.score_certified_votes(); + assert_eq!(scores_per_authority, vec![4, 4, 4, 4]); + assert_eq!(scoring_subdag.commit_range.unwrap(), (1..=4).into()); + } + + // TODO: Remove all tests below this when DistributedVoteScoring is enabled. #[tokio::test] async fn test_reputation_score_calculator() { telemetry_subscribers::init_for_testing(); @@ -408,9 +665,7 @@ mod tests { } unscored_subdags.push(subdag); } - let scoring_strategy = VoteScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); + let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); let scores = calculator.calculate(); assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]); assert_eq!(scores.commit_range, (1..=4).into()); @@ -423,9 +678,7 @@ mod tests { let context = Arc::new(Context::new_for_test(4).0); let unscored_subdags = vec![]; - let scoring_strategy = VoteScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); + let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); calculator.calculate(); } @@ -443,9 +696,7 @@ mod tests { CommitRef::new(1, CommitDigest::MIN), vec![], )]; - let scoring_strategy = VoteScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); + let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); calculator.calculate(); } @@ -498,9 +749,7 @@ mod tests { unscored_subdags.push(subdag); } - let scoring_strategy = VoteScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); + let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); let scores = calculator.calculate(); assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]); assert_eq!(scores.commit_range, (1..=4).into()); diff --git a/consensus/core/src/leader_scoring_strategy.rs b/consensus/core/src/leader_scoring_strategy.rs deleted file mode 100644 index 640e3667cd71c..0000000000000 --- a/consensus/core/src/leader_scoring_strategy.rs +++ /dev/null @@ -1,373 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::{collections::HashMap, ops::Range}; - -use crate::{ - block::{BlockAPI, BlockRef, Slot}, - commit::DEFAULT_WAVE_LENGTH, - leader_scoring::UnscoredSubdag, - stake_aggregator::{QuorumThreshold, StakeAggregator}, -}; - -#[allow(unused)] -pub(crate) trait ScoringStrategy: Send + Sync { - fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec; - - // Based on the scoring strategy there is a minimum number of rounds required - // for the scores to be calculated. This method allows that to be set by the - // scoring strategy. - fn leader_scoring_round_range(&self, min_round: u32, max_round: u32) -> Range; -} - -/// This scoring strategy is like `CertifiedVoteScoringStrategyV1` but instead of -/// only giving one point for each vote that is included in 2f+1 certificates. We -/// give a score equal to the amount of stake of all certificates that included -/// the vote. -pub(crate) struct CertifiedVoteScoringStrategyV2 {} - -impl ScoringStrategy for CertifiedVoteScoringStrategyV2 { - fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { - let num_authorities = subdag.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - let decision_round = leader_slot.round + DEFAULT_WAVE_LENGTH - 1; - - let leader_blocks = subdag.get_blocks_at_slot(leader_slot); - - if leader_blocks.is_empty() { - tracing::trace!("[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring", subdag.context.own_index); - return scores_per_authority; - } - - // At this point we are guaranteed that there is only one leader per slot - // because we are operating on committed subdags. - assert!(leader_blocks.len() == 1); - - let leader_block = leader_blocks.first().unwrap(); - - let decision_blocks = subdag.get_blocks_at_round(decision_round); - - let mut all_votes: HashMap)> = - HashMap::new(); - for potential_cert in decision_blocks { - let authority = potential_cert.reference().author; - for reference in potential_cert.ancestors() { - if let Some((is_vote, stake_agg)) = all_votes.get_mut(reference) { - if *is_vote { - stake_agg.add(authority, &subdag.context.committee); - } - } else if let Some(potential_vote) = subdag.get_block(reference) { - let is_vote = subdag.is_vote(&potential_vote, leader_block); - let mut stake_agg = StakeAggregator::::new(); - stake_agg.add(authority, &subdag.context.committee); - all_votes.insert(*reference, (is_vote, stake_agg)); - } else { - tracing::trace!( - "Potential vote not found in unscored committed subdags: {:?}", - reference - ); - }; - } - } - - for (vote_ref, (is_vote, stake_agg)) in all_votes { - if is_vote { - let authority = vote_ref.author; - tracing::trace!( - "Found a certified vote {vote_ref} for leader {leader_block} from authority {authority}" - ); - tracing::trace!( - "[{}] scores +{} reputation for {authority}!", - subdag.context.own_index, - stake_agg.stake() - ); - scores_per_authority[authority] += stake_agg.stake(); - } - } - - scores_per_authority - } - - fn leader_scoring_round_range(&self, min_round: u32, max_round: u32) -> Range { - // To be able to calculate scores using certified votes we require +1 round - // for the votes on the leader and +1 round for the certificates of those votes. - assert!(min_round < max_round - 1); - min_round..max_round.saturating_sub(1) - } -} - -/// This scoring strategy gives one point for each authority vote that is included -/// in 2f+1 certificates. We are calling this a certified vote. -pub(crate) struct CertifiedVoteScoringStrategyV1 {} - -impl ScoringStrategy for CertifiedVoteScoringStrategyV1 { - fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { - let num_authorities = subdag.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - let decision_round = leader_slot.round + DEFAULT_WAVE_LENGTH - 1; - - let leader_blocks = subdag.get_blocks_at_slot(leader_slot); - - if leader_blocks.is_empty() { - tracing::trace!("[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring", subdag.context.own_index); - return scores_per_authority; - } - - // At this point we are guaranteed that there is only one leader per slot - // because we are operating on committed subdags. - assert!(leader_blocks.len() == 1); - - let leader_block = leader_blocks.first().unwrap(); - - let decision_blocks = subdag.get_blocks_at_round(decision_round); - - let mut all_votes: HashMap)> = - HashMap::new(); - for potential_cert in decision_blocks { - let authority = potential_cert.reference().author; - for reference in potential_cert.ancestors() { - if let Some((is_vote, stake_agg)) = all_votes.get_mut(reference) { - if *is_vote { - stake_agg.add(authority, &subdag.context.committee); - } - } else if let Some(potential_vote) = subdag.get_block(reference) { - let is_vote = subdag.is_vote(&potential_vote, leader_block); - let mut stake_agg = StakeAggregator::::new(); - stake_agg.add(authority, &subdag.context.committee); - all_votes.insert(*reference, (is_vote, stake_agg)); - } else { - tracing::trace!( - "Potential vote not found in unscored committed subdags: {:?}", - reference - ); - }; - } - } - - for (vote_ref, (is_vote, stake_agg)) in all_votes { - if is_vote && stake_agg.reached_threshold(&subdag.context.committee) { - let authority = vote_ref.author; - tracing::trace!( - "Found a certified vote {vote_ref} for leader {leader_block} from authority {authority}" - ); - tracing::trace!( - "[{}] scores +1 reputation for {authority}!", - subdag.context.own_index - ); - scores_per_authority[authority] += 1; - } - } - - scores_per_authority - } - - fn leader_scoring_round_range(&self, min_round: u32, max_round: u32) -> Range { - // To be able to calculate scores using certified votes we require +1 round - // for the votes on the leader and +1 round for the certificates of those votes. - assert!(min_round < max_round - 1); - min_round..max_round.saturating_sub(1) - } -} - -// This scoring strategy will give one point to any votes for the leader. -pub(crate) struct VoteScoringStrategy {} - -impl ScoringStrategy for VoteScoringStrategy { - fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { - let num_authorities = subdag.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - let leader_blocks = subdag.get_blocks_at_slot(leader_slot); - - if leader_blocks.is_empty() { - tracing::trace!("[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring", subdag.context.own_index); - return scores_per_authority; - } - - // At this point we are guaranteed that there is only one leader per slot - // because we are operating on committed subdags. - assert!(leader_blocks.len() == 1); - - let leader_block = leader_blocks.first().unwrap(); - - let voting_round = leader_slot.round + 1; - let voting_blocks = subdag.get_blocks_at_round(voting_round); - for potential_vote in voting_blocks { - // TODO: use the decided leader as input instead of leader slot. If the leader was skipped, - // votes to skip should be included in the score as well. - if subdag.is_vote(&potential_vote, leader_block) { - let authority = potential_vote.author(); - tracing::trace!( - "Found a vote {} for leader {leader_block} from authority {authority}", - potential_vote.reference() - ); - tracing::trace!( - "[{}] scores +1 reputation for {authority}!", - subdag.context.own_index - ); - scores_per_authority[authority] += 1; - } - } - - scores_per_authority - } - - fn leader_scoring_round_range(&self, min_round: u32, max_round: u32) -> Range { - // To be able to calculate scores using votes we require +1 round - // for the votes on the leader. - assert!(min_round < max_round); - min_round..max_round - } -} - -// This scoring strategy will give one point to any certificates for the leader. -pub(crate) struct CertificateScoringStrategy {} - -impl ScoringStrategy for CertificateScoringStrategy { - fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { - let num_authorities = subdag.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - let decision_round = leader_slot.round + DEFAULT_WAVE_LENGTH - 1; - - let leader_blocks = subdag.get_blocks_at_slot(leader_slot); - - if leader_blocks.is_empty() { - tracing::trace!("[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring", subdag.context.own_index); - return scores_per_authority; - } - - // At this point we are guaranteed that there is only one leader per slot - // because we are operating on committed subdags. - assert!(leader_blocks.len() == 1); - - let leader_block = leader_blocks.first().unwrap(); - - let decision_blocks = subdag.get_blocks_at_round(decision_round); - let mut all_votes = HashMap::new(); - for potential_cert in decision_blocks { - let authority = potential_cert.reference().author; - if subdag.is_certificate(&potential_cert, leader_block, &mut all_votes) { - tracing::trace!( - "Found a certificate {} for leader {leader_block} from authority {authority}", - potential_cert.reference() - ); - tracing::trace!( - "[{}] scores +1 reputation for {authority}!", - subdag.context.own_index - ); - scores_per_authority[authority] += 1; - } - } - - scores_per_authority - } - - fn leader_scoring_round_range(&self, min_round: u32, max_round: u32) -> Range { - // To be able to calculate scores using certificates we require +1 round - // for the votes on the leader and +1 round for the certificates of those votes. - assert!(min_round < max_round - 1); - min_round..max_round.saturating_sub(1) - } -} - -#[cfg(test)] -mod tests { - use std::{cmp::max, sync::Arc}; - - use consensus_config::AuthorityIndex; - - use super::*; - use crate::{ - commit::CommittedSubDag, context::Context, leader_scoring::ReputationScoreCalculator, - test_dag_builder::DagBuilder, - }; - - #[tokio::test] - async fn test_certificate_scoring_strategy() { - let (context, unscored_subdags) = basic_setup(); - let scoring_strategy = CertificateScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); - let scores = calculator.calculate(); - assert_eq!(scores.scores_per_authority, vec![2, 1, 1, 1]); - assert_eq!(scores.commit_range, (1..=4).into()); - } - - #[tokio::test] - async fn test_vote_scoring_strategy() { - let (context, unscored_subdags) = basic_setup(); - let scoring_strategy = VoteScoringStrategy {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); - let scores = calculator.calculate(); - assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]); - assert_eq!(scores.commit_range, (1..=4).into()); - } - - #[tokio::test] - async fn test_certified_vote_scoring_strategy_v1() { - let (context, unscored_subdags) = basic_setup(); - let scoring_strategy = CertifiedVoteScoringStrategyV1 {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); - let scores = calculator.calculate(); - assert_eq!(scores.scores_per_authority, vec![1, 1, 1, 1]); - assert_eq!(scores.commit_range, (1..=4).into()); - } - - #[tokio::test] - async fn test_certified_vote_scoring_strategy_v2() { - let (context, unscored_subdags) = basic_setup(); - let scoring_strategy = CertifiedVoteScoringStrategyV2 {}; - let mut calculator = - ReputationScoreCalculator::new(context.clone(), &unscored_subdags, &scoring_strategy); - let scores = calculator.calculate(); - assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]); - assert_eq!(scores.commit_range, (1..=4).into()); - } - - fn basic_setup() -> (Arc, Vec) { - telemetry_subscribers::init_for_testing(); - let context = Arc::new(Context::new_for_test(4).0); - - // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3. - let mut dag_builder = DagBuilder::new(context.clone()); - dag_builder.layers(1..=3).build(); - // Build round 4 but with just the leader block - dag_builder - .layer(4) - .authorities(vec![ - AuthorityIndex::new_for_test(1), - AuthorityIndex::new_for_test(2), - AuthorityIndex::new_for_test(3), - ]) - .skip_block() - .build(); - - let leaders = dag_builder - .leader_blocks(1..=4) - .into_iter() - .flatten() - .collect::>(); - - let mut unscored_subdags = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( - leader, - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } - unscored_subdags.push(subdag); - } - (context, unscored_subdags) - } -} diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 4c3cc17ca965c..597e4a6b13313 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -20,7 +20,6 @@ mod dag_state; mod error; mod leader_schedule; mod leader_scoring; -mod leader_scoring_strategy; mod leader_timeout; mod linearizer; mod metrics; diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index 1267a6e579116..f9fffd6e659f5 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -6,12 +6,11 @@ use std::{collections::HashSet, sync::Arc}; use consensus_config::AuthorityIndex; use parking_lot::RwLock; -use crate::commit::sort_sub_dag_blocks; -use crate::leader_schedule::LeaderSchedule; use crate::{ block::{BlockAPI, VerifiedBlock}, - commit::{Commit, CommittedSubDag, TrustedCommit}, + commit::{sort_sub_dag_blocks, Commit, CommittedSubDag, TrustedCommit}, dag_state::DagState, + leader_schedule::LeaderSchedule, }; /// Expand a committed sequence of leader into a sequence of sub-dags. @@ -262,11 +261,80 @@ mod tests { // Create some commits let commits = linearizer.handle_commit(leaders.clone()); + // Write them in DagState + dag_state.write().add_scoring_subdags(commits); + + // Now update the leader schedule + leader_schedule.update_leader_schedule_v2(&dag_state); + + assert!( + leader_schedule.leader_schedule_updated(&dag_state), + "Leader schedule should have been updated" + ); + + // Try to commit now the rest of the 10 leaders + let leaders = dag_builder + .leader_blocks(11..=20) + .into_iter() + .map(Option::unwrap) + .collect::>(); + + // Now on the commits only the first one should contain the updated scores, the other should be empty + let commits = linearizer.handle_commit(leaders.clone()); + assert_eq!(commits.len(), 10); + let scores = vec![ + (AuthorityIndex::new_for_test(1), 29), + (AuthorityIndex::new_for_test(0), 29), + (AuthorityIndex::new_for_test(3), 29), + (AuthorityIndex::new_for_test(2), 29), + ]; + assert_eq!(commits[0].reputation_scores_desc, scores); + + for commit in commits.into_iter().skip(1) { + assert_eq!(commit.reputation_scores_desc, vec![]); + } + } + + // TODO: Remove when DistributedVoteScoring is enabled. + #[tokio::test] + async fn test_handle_commit_with_schedule_update_with_unscored_subdags() { + telemetry_subscribers::init_for_testing(); + let num_authorities = 4; + let context = Arc::new(Context::new_for_test(num_authorities).0); + let dag_state = Arc::new(RwLock::new(DagState::new( + context.clone(), + Arc::new(MemStore::new()), + ))); + const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10; + let leader_schedule = Arc::new( + LeaderSchedule::new(context.clone(), LeaderSwapTable::default()) + .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE), + ); + let mut linearizer = Linearizer::new(dag_state.clone(), leader_schedule.clone()); + + // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3. + let num_rounds: u32 = 20; + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=num_rounds) + .build() + .persist_layers(dag_state.clone()); + + // Take the first 10 leaders + let leaders = dag_builder + .leader_blocks(1..=10) + .into_iter() + .map(Option::unwrap) + .collect::>(); + + // Create some commits + let commits = linearizer.handle_commit(leaders.clone()); + // Write them in DagState dag_state.write().add_unscored_committed_subdags(commits); // Now update the leader schedule - leader_schedule.update_leader_schedule(&dag_state); + leader_schedule.update_leader_schedule_v1(&dag_state); assert!( leader_schedule.leader_schedule_updated(&dag_state), diff --git a/crates/sui-open-rpc/spec/openrpc.json b/crates/sui-open-rpc/spec/openrpc.json index fecb9d3fa8214..2fa56e07c8afc 100644 --- a/crates/sui-open-rpc/spec/openrpc.json +++ b/crates/sui-open-rpc/spec/openrpc.json @@ -1304,6 +1304,7 @@ "ban_entry_init": false, "bridge": false, "commit_root_state_digest": false, + "consensus_distributed_vote_scoring_strategy": false, "consensus_order_end_of_epoch_last": true, "disable_invariant_violation_check_in_swap_loc": false, "disallow_adding_abilities_on_upgrade": false, diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index 3c9a2e08bbc9e..40dd7360bf72f 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -174,6 +174,7 @@ const MAX_PROTOCOL_VERSION: u64 = 58; // Version 57: Reduce minimum number of random beacon shares. // Version 58: Optimize boolean binops // Finalize bridge committee on mainnet. +// Switch to distributed vote scoring in consensus in devnet #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); @@ -516,6 +517,10 @@ struct FeatureFlags { // Rethrow type layout errors during serialization instead of trying to convert them. #[serde(skip_serializing_if = "is_false")] rethrow_serialization_type_layout_errors: bool, + + // Use distributed vote leader scoring strategy in consensus. + #[serde(skip_serializing_if = "is_false")] + consensus_distributed_vote_scoring_strategy: bool, } fn is_false(b: &bool) -> bool { @@ -1562,6 +1567,11 @@ impl ProtocolConfig { pub fn rethrow_serialization_type_layout_errors(&self) -> bool { self.feature_flags.rethrow_serialization_type_layout_errors } + + pub fn consensus_distributed_vote_scoring_strategy(&self) -> bool { + self.feature_flags + .consensus_distributed_vote_scoring_strategy + } } #[cfg(not(msim))] @@ -2713,6 +2723,12 @@ impl ProtocolConfig { if chain == Chain::Mainnet { cfg.bridge_should_try_to_finalize_committee = Some(true); } + + if chain != Chain::Mainnet && chain != Chain::Testnet { + // Enable distributed vote scoring for devnet + cfg.feature_flags + .consensus_distributed_vote_scoring_strategy = true; + } } // Use this template when making changes: // @@ -2869,6 +2885,11 @@ impl ProtocolConfig { pub fn set_passkey_auth_for_testing(&mut self, val: bool) { self.feature_flags.passkey_auth = val } + + pub fn set_consensus_distributed_vote_scoring_strategy_for_testing(&mut self, val: bool) { + self.feature_flags + .consensus_distributed_vote_scoring_strategy = val; + } } type OverrideFn = dyn Fn(ProtocolVersion, ProtocolConfig) -> ProtocolConfig + Send; diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_58.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_58.snap index 8ffc7e45a17e7..617af2983b009 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_58.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_58.snap @@ -65,6 +65,7 @@ feature_flags: passkey_auth: true authority_capabilities_v2: true rethrow_serialization_type_layout_errors: true + consensus_distributed_vote_scoring_strategy: true max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000