From 879d7b696dbb99b8089769e4e2d42394379e5522 Mon Sep 17 00:00:00 2001 From: William Smith Date: Mon, 11 Dec 2023 21:13:39 -0700 Subject: [PATCH] [core] Detect and diagnose split brain in checkpoint proposals (#14821) --- Cargo.lock | 2 + crates/sui-core/Cargo.toml | 2 + crates/sui-core/src/authority.rs | 83 ++++- crates/sui-core/src/authority_client.rs | 21 +- crates/sui-core/src/authority_server.rs | 15 +- crates/sui-core/src/checkpoints/metrics.rs | 7 + crates/sui-core/src/checkpoints/mod.rs | 333 ++++++++++++++++-- crates/sui-core/src/stake_aggregator.rs | 34 ++ crates/sui-core/src/test_authority_clients.rs | 28 +- .../sui-e2e-tests/tests/checkpoint_tests.rs | 39 ++ crates/sui-network/build.rs | 9 + crates/sui-types/src/messages_checkpoint.rs | 36 ++ 12 files changed, 571 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 186c340652545..718c8a3995411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11471,6 +11471,7 @@ dependencies = [ "clap", "criterion", "dashmap", + "diffy", "either", "enum_dispatch", "expect-test", @@ -11538,6 +11539,7 @@ dependencies = [ "tap", "telemetry-subscribers", "tempfile", + "test-cluster", "test-fuzz", "thiserror", "tokio", diff --git a/crates/sui-core/Cargo.toml b/crates/sui-core/Cargo.toml index fbde77d6122b6..ceaa3b9ff8901 100644 --- a/crates/sui-core/Cargo.toml +++ b/crates/sui-core/Cargo.toml @@ -14,6 +14,7 @@ bcs.workspace = true bytes.workspace = true chrono.workspace = true dashmap.workspace = true +diffy = { version = "0.3", default-features = false } either.workspace = true enum_dispatch.workspace = true eyre.workspace = true @@ -95,6 +96,7 @@ pretty_assertions.workspace = true serde-reflection.workspace = true serde_yaml.workspace = true +test-cluster.workspace = true move-symbol-pool.workspace = true sui-test-transaction-builder.workspace = true diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index ad7e864571c24..e32bc5043b2a0 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -69,7 +69,7 @@ use sui_json_rpc_types::{ SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects, SuiTransactionBlockEvents, TransactionFilter, }; -use sui_macros::{fail_point, fail_point_async}; +use sui_macros::{fail_point, fail_point_async, fail_point_if}; use sui_protocol_config::{ProtocolConfig, SupportedProtocolVersions}; use sui_storage::indexes::{CoinInfo, ObjectIndexChanges}; use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait}; @@ -95,10 +95,10 @@ use sui_types::inner_temporary_store::{ use sui_types::message_envelope::Message; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest, - CheckpointDigest, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp, - VerifiedCheckpoint, + CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse, + CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse, + CheckpointTimestamp, VerifiedCheckpoint, }; -use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse}; use sui_types::messages_consensus::AuthorityCapabilities; use sui_types::messages_grpc::{ HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, @@ -1364,7 +1364,9 @@ impl AuthorityState { let protocol_config = epoch_store.protocol_config(); let transaction_data = &certificate.data().intent_message().value; let (kind, signer, gas) = transaction_data.execution_parts(); - let (inner_temp_store, effects, execution_error_opt) = + + #[allow(unused_mut)] + let (inner_temp_store, mut effects, execution_error_opt) = epoch_store.executor().execute_transaction_to_effects( &self.database, protocol_config, @@ -1387,6 +1389,11 @@ impl AuthorityState { tx_digest, ); + fail_point_if!("cp_execution_nondeterminism", || { + #[cfg(msim)] + self.create_fail_state(certificate, epoch_store, &mut effects); + }); + Ok((inner_temp_store, effects, execution_error_opt.err())) } @@ -1728,6 +1735,37 @@ impl AuthorityState { .await } + #[cfg(msim)] + fn create_fail_state( + &self, + certificate: &VerifiedExecutableTransaction, + epoch_store: &Arc, + effects: &mut TransactionEffects, + ) { + use std::cell::RefCell; + thread_local! { + static FAIL_STATE: RefCell<(u64, HashSet)> = RefCell::new((0, HashSet::new())); + } + if !certificate.data().intent_message().value.is_system_tx() { + let committee = epoch_store.committee(); + let cur_stake = (**committee).weight(&self.name); + if cur_stake > 0 { + FAIL_STATE.with_borrow_mut(|fail_state| { + //let (&mut failing_stake, &mut failing_validators) = fail_state; + if fail_state.0 < committee.validity_threshold() { + fail_state.0 += cur_stake; + fail_state.1.insert(self.name); + } + + if fail_state.1.contains(&self.name) { + info!("cp_exec failing tx"); + effects.gas_cost_summary_mut_for_testing().computation_cost += 1; + } + }); + } + } + } + fn process_object_index( &self, effects: &TransactionEffects, @@ -2098,6 +2136,41 @@ impl AuthorityState { }) } + #[instrument(level = "trace", skip_all)] + pub fn handle_checkpoint_request_v2( + &self, + request: &CheckpointRequestV2, + ) -> SuiResult { + let summary = if request.certified { + let summary = match request.sequence_number { + Some(seq) => self + .checkpoint_store + .get_checkpoint_by_sequence_number(seq)?, + None => self.checkpoint_store.get_latest_certified_checkpoint(), + } + .map(|v| v.into_inner()); + summary.map(CheckpointSummaryResponse::Certified) + } else { + let summary = match request.sequence_number { + Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?, + None => self + .checkpoint_store + .get_latest_locally_computed_checkpoint(), + }; + summary.map(CheckpointSummaryResponse::Pending) + }; + let contents = match &summary { + Some(s) => self + .checkpoint_store + .get_checkpoint_contents(&s.content_digest())?, + None => None, + }; + Ok(CheckpointResponseV2 { + checkpoint: summary, + contents, + }) + } + fn check_protocol_version( supported_protocol_versions: SupportedProtocolVersions, current_version: ProtocolVersion, diff --git a/crates/sui-core/src/authority_client.rs b/crates/sui-core/src/authority_client.rs index 281ae3b7c5fd1..a32cc436ab9c9 100644 --- a/crates/sui-core/src/authority_client.rs +++ b/crates/sui-core/src/authority_client.rs @@ -10,7 +10,9 @@ use std::time::Duration; use sui_network::{api::ValidatorClient, tonic}; use sui_types::base_types::AuthorityName; use sui_types::committee::CommitteeWithNetworkMetadata; -use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse}; +use sui_types::messages_checkpoint::{ + CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2, +}; use sui_types::multiaddr::Multiaddr; use sui_types::sui_system_state::SuiSystemState; use sui_types::{error::SuiError, transaction::*}; @@ -59,6 +61,11 @@ pub trait AuthorityAPI { request: CheckpointRequest, ) -> Result; + async fn handle_checkpoint_v2( + &self, + request: CheckpointRequestV2, + ) -> Result; + // This API is exclusively used by the benchmark code. // Hence it's OK to return a fixed system state type. async fn handle_system_state_object( @@ -189,6 +196,18 @@ impl AuthorityAPI for NetworkAuthorityClient { .map_err(Into::into) } + /// Handle Object information requests for this account. + async fn handle_checkpoint_v2( + &self, + request: CheckpointRequestV2, + ) -> Result { + self.client() + .checkpoint_v2(request) + .await + .map(tonic::Response::into_inner) + .map_err(Into::into) + } + async fn handle_system_state_object( &self, request: SystemStateRequest, diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 5ba8196421242..f41be9b776043 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -28,7 +28,9 @@ use sui_types::{effects::TransactionEffectsAPI, message_envelope::Message}; use sui_types::{error::*, transaction::*}; use sui_types::{ fp_ensure, - messages_checkpoint::{CheckpointRequest, CheckpointResponse}, + messages_checkpoint::{ + CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2, + }, }; use tap::TapFallible; use tokio::task::JoinHandle; @@ -606,6 +608,17 @@ impl Validator for ValidatorService { return Ok(tonic::Response::new(response)); } + async fn checkpoint_v2( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + + let response = self.state.handle_checkpoint_request_v2(&request)?; + + return Ok(tonic::Response::new(response)); + } + async fn get_system_state_object( &self, _request: tonic::Request, diff --git a/crates/sui-core/src/checkpoints/metrics.rs b/crates/sui-core/src/checkpoints/metrics.rs index c52065030ed48..62266d7c69e7f 100644 --- a/crates/sui-core/src/checkpoints/metrics.rs +++ b/crates/sui-core/src/checkpoints/metrics.rs @@ -21,6 +21,7 @@ pub struct CheckpointMetrics { pub highest_accumulated_epoch: IntGauge, pub checkpoint_creation_latency_ms: Histogram, pub remote_checkpoint_forks: IntCounter, + pub split_brain_checkpoint_forks: IntCounter, pub last_created_checkpoint_age_ms: Histogram, pub last_certified_checkpoint_age_ms: Histogram, } @@ -105,6 +106,12 @@ impl CheckpointMetrics { registry ) .unwrap(), + split_brain_checkpoint_forks: register_int_counter_with_registry!( + "split_brain_checkpoint_forks", + "Number of checkpoints that have resulted in a split brain", + registry + ) + .unwrap(), }; Arc::new(this) } diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index 4680ddace756b..f26e3acacd1b5 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -7,40 +7,53 @@ mod checkpoint_output; mod metrics; use crate::authority::{AuthorityState, EffectsNotifyRead}; +use crate::authority_client::{make_network_authority_clients_with_network_config, AuthorityAPI}; use crate::checkpoints::causal_order::CausalOrder; use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput}; pub use crate::checkpoints::checkpoint_output::{ LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus, }; pub use crate::checkpoints::metrics::CheckpointMetrics; -use crate::stake_aggregator::{InsertResult, StakeAggregator}; +use crate::stake_aggregator::{InsertResult, MultiStakeAggregator}; use crate::state_accumulator::StateAccumulator; +use diffy::create_patch; use futures::future::{select, Either}; use futures::FutureExt; +use itertools::Itertools; use mysten_metrics::{monitored_scope, spawn_monitored_task, MonitoredFutureExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; +use sui_macros::fail_point; +use sui_network::default_mysten_network_config; use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::consensus_handler::SequencedConsensusTransactionKey; -use std::collections::HashSet; +use chrono::Utc; +use rand::rngs::OsRng; +use rand::seq::SliceRandom; +use std::collections::BTreeMap; +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::io::Write; use std::path::Path; use std::sync::Arc; use std::time::Duration; use sui_protocol_config::ProtocolVersion; -use sui_types::base_types::{EpochId, TransactionDigest}; -use sui_types::crypto::{AuthoritySignInfo, AuthorityStrongQuorumSignInfo}; +use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest}; +use sui_types::committee::StakeUnit; +use sui_types::crypto::AuthorityStrongQuorumSignInfo; use sui_types::digests::{CheckpointContentsDigest, CheckpointDigest}; use sui_types::effects::{TransactionEffects, TransactionEffectsAPI}; use sui_types::error::{SuiError, SuiResult}; use sui_types::gas::GasCostSummary; use sui_types::message_envelope::Message; -use sui_types::messages_checkpoint::SignedCheckpointSummary; use sui_types::messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, - CheckpointSignatureMessage, CheckpointSummary, CheckpointTimestamp, EndOfEpochData, - FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint, VerifiedCheckpointContents, + CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber, + CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, + EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint, + VerifiedCheckpointContents, }; +use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary}; use sui_types::messages_consensus::ConsensusTransactionKey; use sui_types::signature::GenericSignature; use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait}; @@ -196,6 +209,13 @@ impl CheckpointStore { .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into())) } + pub fn get_locally_computed_checkpoint( + &self, + sequence_number: CheckpointSequenceNumber, + ) -> Result, TypedStoreError> { + self.locally_computed_checkpoints.get(&sequence_number) + } + pub fn get_sequence_number_by_contents_digest( &self, digest: &CheckpointContentsDigest, @@ -218,6 +238,14 @@ impl CheckpointStore { .map(|(_, v)| v.into()) } + pub fn get_latest_locally_computed_checkpoint(&self) -> Option { + self.locally_computed_checkpoints + .unbounded_iter() + .skip_to_last() + .next() + .map(|(_, v)| v) + } + pub fn multi_get_checkpoint_by_sequence_number( &self, sequence_numbers: &[CheckpointSequenceNumber], @@ -655,6 +683,7 @@ pub struct CheckpointAggregator { exit: watch::Receiver<()>, current: Option, output: Box, + state: Arc, metrics: Arc, } @@ -663,7 +692,10 @@ pub struct CheckpointSignatureAggregator { next_index: u64, summary: CheckpointSummary, digest: CheckpointDigest, - signatures: StakeAggregator, + /// Aggregates voting stake for each signed checkpoint proposal by authority + signatures_by_digest: MultiStakeAggregator, + tables: Arc, + state: Arc, metrics: Arc, } @@ -1206,6 +1238,7 @@ impl CheckpointAggregator { notify: Arc, exit: watch::Receiver<()>, output: Box, + state: Arc, metrics: Arc, ) -> Self { let current = None; @@ -1216,6 +1249,7 @@ impl CheckpointAggregator { exit, current, output, + state, metrics, } } @@ -1284,7 +1318,11 @@ impl CheckpointAggregator { next_index: 0, digest: summary.digest(), summary, - signatures: StakeAggregator::new(self.epoch_store.committee().clone()), + signatures_by_digest: MultiStakeAggregator::new( + self.epoch_store.committee().clone(), + ), + tables: self.tables.clone(), + state: self.state.clone(), metrics: self.metrics.clone(), }); self.current.as_mut().unwrap() @@ -1363,23 +1401,9 @@ impl CheckpointSignatureAggregator { let their_digest = *data.summary.digest(); let (_, signature) = data.summary.into_data_and_sig(); let author = signature.authority; - // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify - // the signature so we know that the author signed the message at some point. - if their_digest != self.digest { - self.metrics.remote_checkpoint_forks.inc(); - warn!( - checkpoint_seq = self.summary.sequence_number, - "Validator {:?} has mismatching checkpoint digest {}, we have digest {}", - author.concise(), - their_digest, - self.digest - ); - return Err(()); - } - let envelope = SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature); - match self.signatures.insert(envelope) { + match self.signatures_by_digest.insert(their_digest, envelope) { InsertResult::Failed { error } => { warn!( checkpoint_seq = self.summary.sequence_number, @@ -1387,17 +1411,266 @@ impl CheckpointSignatureAggregator { author.concise(), error ); + self.check_for_split_brain(); Err(()) } - InsertResult::QuorumReached(cert) => Ok(cert), + InsertResult::QuorumReached(cert) => { + // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify + // the signature so we know that the author signed the message at some point. + if their_digest != self.digest { + self.metrics.remote_checkpoint_forks.inc(); + warn!( + checkpoint_seq = self.summary.sequence_number, + "Validator {:?} has mismatching checkpoint digest {}, we have digest {}", + author.concise(), + their_digest, + self.digest + ); + return Err(()); + } + Ok(cert) + } InsertResult::NotEnoughVotes { bad_votes: _, bad_authorities: _, - } => Err(()), + } => { + self.check_for_split_brain(); + Err(()) + } + } + } + + /// Check if there is a split brain condition in checkpoint signature aggregation, defined + /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal, + /// irrespective of the outcome of any outstanding votes. + fn check_for_split_brain(&self) { + debug!( + checkpoint_seq = self.summary.sequence_number, + "Checking for split brain condition" + ); + if self.signatures_by_digest.quorum_unreachable() { + // TODO: at this point we should immediately halt processing + // of new transaction certificates to avoid building on top of + // forked output + // self.halt_all_execution(); + + let digests_by_stake_messages = self + .signatures_by_digest + .get_all_unique_values() + .into_iter() + .sorted_by_key(|(_, (_, stake))| -(*stake as i64)) + .map(|(digest, (_authorities, total_stake))| { + format!("{:?} (total stake: {})", digest, total_stake) + }) + .collect::>(); + error!( + checkpoint_seq = self.summary.sequence_number, + "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}", + self.signatures_by_digest.uncommitted_stake(), + digests_by_stake_messages, + ); + self.metrics.split_brain_checkpoint_forks.inc(); + + let all_unique_values = self.signatures_by_digest.get_all_unique_values(); + let local_summary = self.summary.clone(); + let state = self.state.clone(); + let tables = self.tables.clone(); + + tokio::spawn(async move { + diagnose_split_brain(all_unique_values, local_summary, state, tables).await; + }); } } } +/// Create data dump containing relevant data for diagnosing cause of the +/// split brain by querying one disagreeing validator for full checkpoint contents. +/// To minimize peer chatter, we only query one validator at random from each +/// disagreeing faction, as all honest validators that participated in this round may +/// inevitably run the same process. +async fn diagnose_split_brain( + all_unique_values: BTreeMap, StakeUnit)>, + local_summary: CheckpointSummary, + state: Arc, + tables: Arc, +) { + debug!( + checkpoint_seq = local_summary.sequence_number, + "Running split brain diagnostics..." + ); + let time = Utc::now(); + // collect one random disagreeing validator per differing digest + let digest_to_validator = all_unique_values + .iter() + .filter_map(|(digest, (validators, _))| { + if *digest != local_summary.digest() { + let random_validator = validators.choose(&mut OsRng).unwrap(); + Some((*digest, *random_validator)) + } else { + None + } + }) + .collect::>(); + if digest_to_validator.is_empty() { + panic!( + "Given split brain condition, there should be at \ + least one validator that disagrees with local signature" + ); + } + + let sui_system_state = state + .database + .get_sui_system_state_object() + .expect("Failed to get system state object"); + let committee = sui_system_state.get_current_epoch_committee(); + let network_config = default_mysten_network_config(); + let network_clients = + make_network_authority_clients_with_network_config(&committee, &network_config) + .expect("Failed to make authority clients from committee {committee}"); + + // Query all disagreeing validators + let response_futures = digest_to_validator + .values() + .cloned() + .map(|validator| { + let client = network_clients + .get(&validator) + .expect("Failed to get network client"); + let request = CheckpointRequestV2 { + sequence_number: Some(local_summary.sequence_number), + request_content: true, + certified: false, + }; + client.handle_checkpoint_v2(request) + }) + .collect::>(); + + let digest_name_pair = digest_to_validator.iter(); + let response_data = futures::future::join_all(response_futures) + .await + .into_iter() + .zip(digest_name_pair) + .filter_map(|(response, (digest, name))| match response { + Ok(response) => match response { + CheckpointResponseV2 { + checkpoint: Some(CheckpointSummaryResponse::Pending(summary)), + contents: Some(contents), + } => Some((*name, *digest, summary, contents)), + CheckpointResponseV2 { + checkpoint: Some(CheckpointSummaryResponse::Certified(_)), + contents: _, + } => { + panic!("Expected pending checkpoint, but got certified checkpoint"); + } + CheckpointResponseV2 { + checkpoint: None, + contents: _, + } => { + error!( + "Summary for checkpoint {:?} not found on validator {:?}", + local_summary.sequence_number, name + ); + None + } + CheckpointResponseV2 { + checkpoint: _, + contents: None, + } => { + error!( + "Contents for checkpoint {:?} not found on validator {:?}", + local_summary.sequence_number, name + ); + None + } + }, + Err(e) => { + error!( + "Failed to get checkpoint contents from validator for fork diagnostics: {:?}", + e + ); + None + } + }) + .collect::>(); + + let local_checkpoint_contents = tables + .get_checkpoint_contents(&local_summary.content_digest) + .unwrap_or_else(|_| { + panic!( + "Could not find checkpoint contents for digest {:?}", + local_summary.digest() + ) + }) + .unwrap_or_else(|| { + panic!( + "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}", + local_summary.sequence_number, + local_summary.digest() + ) + }); + let local_contents_text = format!("{local_checkpoint_contents:?}"); + + let local_summary_text = format!("{local_summary:?}"); + let local_validator = state.name.concise(); + let diff_patches = response_data + .iter() + .map(|(name, other_digest, other_summary, contents)| { + let other_contents_text = format!("{contents:?}"); + let other_summary_text = format!("{other_summary:?}"); + let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents + .enumerate_transactions(&local_summary) + .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects)) + .unzip(); + let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents + .enumerate_transactions(other_summary) + .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects)) + .unzip(); + let summary_patch = create_patch(&local_summary_text, &other_summary_text); + let contents_patch = create_patch(&local_contents_text, &other_contents_text); + let local_transcations_text = format!("{local_transactions:#?}"); + let other_transactions_text = format!("{other_transactions:#?}"); + let transactions_patch = + create_patch(&local_transcations_text, &other_transactions_text); + let local_effects_text = format!("{local_effects:#?}"); + let other_effects_text = format!("{other_effects:#?}"); + let effects_patch = create_patch(&local_effects_text, &other_effects_text); + let seq_number = local_summary.sequence_number; + let local_digest = local_summary.digest(); + let other_validator = name.concise(); + format!( + "Checkpoint: {seq_number:?}\n\ + Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\ + Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\ + Summary Diff: \n{summary_patch}\n\n\ + Contents Diff: \n{contents_patch}\n\n\ + Transactions Diff: \n{transactions_patch}\n\n\ + Effects Diff: \n{effects_patch}", + ) + }) + .collect::>() + .join("\n\n\n"); + + let header = format!( + "Checkpoint Fork Dump - Authority {local_validator:?}: \n\ + Datetime: {time}", + ); + let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n"); + let path = tempfile::tempdir() + .expect("Failed to create tempdir") + .into_path() + .join(Path::new("checkpoint_fork_dump.txt")); + let mut file = File::create(path).unwrap(); + write!(file, "{}", fork_logs_text).unwrap(); + debug!("{}", fork_logs_text); + + fail_point!("split_brain_reached"); + + // There is no option to never restart the node, so choosing longer than should + // be needed for any testcase + // #[cfg(msim)] + // sui_simulator::task::kill_current_node(Some(Duration::from_secs(100))); +} + pub trait CheckpointServiceNotify { fn notify_checkpoint_signature( &self, @@ -1439,7 +1712,7 @@ impl CheckpointService { let (exit_snd, exit_rcv) = watch::channel(()); let builder = CheckpointBuilder::new( - state, + state.clone(), checkpoint_store.clone(), epoch_store.clone(), notify_builder.clone(), @@ -1461,6 +1734,7 @@ impl CheckpointService { notify_aggregator.clone(), exit_rcv, certified_checkpoint_output, + state.clone(), metrics.clone(), ); @@ -1572,14 +1846,13 @@ impl PendingCheckpoint { mod tests { use super::*; use crate::authority::test_authority_builder::TestAuthorityBuilder; - use crate::state_accumulator::StateAccumulator; use async_trait::async_trait; use shared_crypto::intent::{Intent, IntentScope}; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use sui_macros::sim_test; use sui_types::base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest}; - use sui_types::crypto::Signature; + use sui_types::crypto::{AuthoritySignInfo, Signature}; use sui_types::effects::TransactionEffects; use sui_types::messages_checkpoint::SignedCheckpointSummary; use sui_types::move_package::MovePackage; diff --git a/crates/sui-core/src/stake_aggregator.rs b/crates/sui-core/src/stake_aggregator.rs index 151f5a28d465c..8d475e5b5531c 100644 --- a/crates/sui-core/src/stake_aggregator.rs +++ b/crates/sui-core/src/stake_aggregator.rs @@ -93,6 +93,10 @@ impl StakeAggregator { self.data.contains_key(authority) } + pub fn keys(&self) -> impl Iterator { + self.data.keys() + } + pub fn committee(&self) -> &Committee { &self.committee } @@ -277,6 +281,36 @@ where } } +impl MultiStakeAggregator +where + K: Hash + Eq, +{ + #[allow(dead_code)] + pub fn authorities_for_key(&self, k: &K) -> Option> { + self.stake_maps.get(k).map(|(_, agg)| agg.keys()) + } + + /// The sum of all remaining stake, i.e. all stake not yet + /// committed by vote to a specific value + pub fn uncommitted_stake(&self) -> StakeUnit { + self.committee.total_votes() - self.total_votes() + } + + /// Total stake of the largest faction + pub fn plurality_stake(&self) -> StakeUnit { + self.stake_maps + .values() + .map(|(_, agg)| agg.total_votes()) + .max() + .unwrap_or_default() + } + + /// If true, there isn't enough uncommitted stake to reach quorum for any value + pub fn quorum_unreachable(&self) -> bool { + self.uncommitted_stake() + self.plurality_stake() < self.committee.threshold::() + } +} + /// Like MultiStakeAggregator, but for counting votes for a generic value instead of an envelope, in /// scenarios where byzantine validators may submit multiple votes for different values. pub struct GenericMultiStakeAggregator { diff --git a/crates/sui-core/src/test_authority_clients.rs b/crates/sui-core/src/test_authority_clients.rs index 39e42c8a89e72..82bdff374bc34 100644 --- a/crates/sui-core/src/test_authority_clients.rs +++ b/crates/sui-core/src/test_authority_clients.rs @@ -12,7 +12,6 @@ use crate::{authority::AuthorityState, authority_client::AuthorityAPI}; use async_trait::async_trait; use mysten_metrics::spawn_monitored_task; use sui_config::genesis::Genesis; -use sui_types::effects::{TransactionEffectsAPI, TransactionEvents}; use sui_types::error::SuiResult; use sui_types::messages_grpc::{ HandleCertificateResponse, HandleCertificateResponseV2, HandleTransactionResponse, @@ -26,6 +25,10 @@ use sui_types::{ messages_checkpoint::{CheckpointRequest, CheckpointResponse}, transaction::{CertifiedTransaction, Transaction, VerifiedTransaction}, }; +use sui_types::{ + effects::{TransactionEffectsAPI, TransactionEvents}, + messages_checkpoint::{CheckpointRequestV2, CheckpointResponseV2}, +}; #[derive(Clone, Copy, Default)] pub struct LocalAuthorityClientFaultConfig { @@ -117,6 +120,15 @@ impl AuthorityAPI for LocalAuthorityClient { state.handle_checkpoint_request(&request) } + async fn handle_checkpoint_v2( + &self, + request: CheckpointRequestV2, + ) -> Result { + let state = self.state.clone(); + + state.handle_checkpoint_request_v2(&request) + } + async fn handle_system_state_object( &self, _request: SystemStateRequest, @@ -282,6 +294,13 @@ impl AuthorityAPI for MockAuthorityApi { unimplemented!(); } + async fn handle_checkpoint_v2( + &self, + _request: CheckpointRequestV2, + ) -> Result { + unimplemented!(); + } + async fn handle_system_state_object( &self, _request: SystemStateRequest, @@ -345,6 +364,13 @@ impl AuthorityAPI for HandleTransactionTestAuthorityClient { unimplemented!() } + async fn handle_checkpoint_v2( + &self, + _request: CheckpointRequestV2, + ) -> Result { + unimplemented!() + } + async fn handle_system_state_object( &self, _request: SystemStateRequest, diff --git a/crates/sui-e2e-tests/tests/checkpoint_tests.rs b/crates/sui-e2e-tests/tests/checkpoint_tests.rs index 79ce529c69feb..ff0e53197305d 100644 --- a/crates/sui-e2e-tests/tests/checkpoint_tests.rs +++ b/crates/sui-e2e-tests/tests/checkpoint_tests.rs @@ -1,7 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; +use sui_macros::register_fail_point; +use sui_macros::register_fail_point_if; use sui_macros::sim_test; use sui_test_transaction_builder::make_transfer_sui_transaction; use test_cluster::TestClusterBuilder; @@ -35,3 +41,36 @@ async fn basic_checkpoints_integration_test() { panic!("Did not include transaction in checkpoint in 60 seconds"); } + +#[sim_test] +async fn checkpoint_split_brain_test() { + let committee_size = 9; + // count number of nodes that have reached split brain condition + let count_split_brain_nodes: Arc> = Default::default(); + let count_clone = count_split_brain_nodes.clone(); + register_fail_point("split_brain_reached", move || { + let counter = count_clone.lock().unwrap(); + counter.fetch_add(1, Ordering::Relaxed); + }); + + register_fail_point_if("cp_execution_nondeterminism", || true); + + let test_cluster = TestClusterBuilder::new() + .with_num_validators(committee_size) + .build() + .await; + + let tx = make_transfer_sui_transaction(&test_cluster.wallet, None, None).await; + test_cluster + .wallet + .execute_transaction_may_fail(tx) + .await + .ok(); + + // provide enough time for validators to detect split brain + tokio::time::sleep(Duration::from_secs(20)).await; + + // all honest validators should eventually detect a split brain + let final_count = count_split_brain_nodes.lock().unwrap(); + assert!(final_count.load(Ordering::Relaxed) >= 1); +} diff --git a/crates/sui-network/build.rs b/crates/sui-network/build.rs index 4d6598a2639dc..5060a3eac08b2 100644 --- a/crates/sui-network/build.rs +++ b/crates/sui-network/build.rs @@ -85,6 +85,15 @@ fn main() -> Result<()> { .codec_path(codec_path) .build(), ) + .method( + Method::builder() + .name("checkpoint_v2") + .route_name("CheckpointV2") + .input_type("sui_types::messages_checkpoint::CheckpointRequestV2") + .output_type("sui_types::messages_checkpoint::CheckpointResponseV2") + .codec_path(codec_path) + .build(), + ) .method( Method::builder() .name("get_system_state_object") diff --git a/crates/sui-types/src/messages_checkpoint.rs b/crates/sui-types/src/messages_checkpoint.rs index 72da7c44e6618..39dc8aa95d4b6 100644 --- a/crates/sui-types/src/messages_checkpoint.rs +++ b/crates/sui-types/src/messages_checkpoint.rs @@ -55,6 +55,35 @@ pub struct CheckpointRequest { pub request_content: bool, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CheckpointRequestV2 { + /// if a sequence number is specified, return the checkpoint with that sequence number; + /// otherwise if None returns the latest checkpoint stored (authenticated or pending, + /// depending on the value of `certified` flag) + pub sequence_number: Option, + // A flag, if true also return the contents of the + // checkpoint besides the meta-data. + pub request_content: bool, + // If true, returns certified checkpoint, otherwise returns pending checkpoint + pub certified: bool, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum CheckpointSummaryResponse { + Certified(CertifiedCheckpointSummary), + Pending(CheckpointSummary), +} + +impl CheckpointSummaryResponse { + pub fn content_digest(&self) -> CheckpointContentsDigest { + match self { + Self::Certified(s) => s.content_digest, + Self::Pending(s) => s.content_digest, + } + } +} + #[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CheckpointResponse { @@ -62,6 +91,13 @@ pub struct CheckpointResponse { pub contents: Option, } +#[allow(clippy::large_enum_variant)] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CheckpointResponseV2 { + pub checkpoint: Option, + pub contents: Option, +} + // The constituent parts of checkpoints, signed and certified /// The Sha256 digest of an EllipticCurveMultisetHash committing to the live object set.