diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index f835b0a37c81c..76e972cb05ad3 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -18,6 +18,10 @@ pub struct Parameters { #[serde(default = "Parameters::default_leader_timeout")] pub leader_timeout: Duration, + /// Maximum forward time drift (how far in future) allowed for received blocks. + #[serde(default = "Parameters::default_max_forward_time_drift")] + pub max_forward_time_drift: Duration, + /// The database path. The path should be provided in order for the node to be able to boot pub db_path: Option, } @@ -27,6 +31,10 @@ impl Parameters { Duration::from_millis(250) } + pub fn default_max_forward_time_drift() -> Duration { + Duration::from_millis(500) + } + pub fn db_path_str_unsafe(&self) -> String { self.db_path .clone() @@ -42,6 +50,7 @@ impl Default for Parameters { fn default() -> Self { Self { leader_timeout: Parameters::default_leader_timeout(), + max_forward_time_drift: Parameters::default_max_forward_time_drift(), db_path: None, } } diff --git a/consensus/config/tests/snapshots/parameters_test__parameters.snap b/consensus/config/tests/snapshots/parameters_test__parameters.snap index e1a32bb537bfa..efd2e2f911b79 100644 --- a/consensus/config/tests/snapshots/parameters_test__parameters.snap +++ b/consensus/config/tests/snapshots/parameters_test__parameters.snap @@ -5,5 +5,8 @@ expression: parameters leader_timeout: secs: 0 nanos: 250000000 +max_forward_time_drift: + secs: 0 + nanos: 500000000 db_path: ~ diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 5260dc0322adf..adf30c7e019be 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Instant, vec}; +use std::{sync::Arc, time::Duration, time::Instant, vec}; use async_trait::async_trait; use bytes::Bytes; @@ -9,10 +9,11 @@ use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, Pr use parking_lot::RwLock; use prometheus::Registry; use sui_protocol_config::ProtocolConfig; +use tokio::time::sleep; use tracing::info; use crate::{ - block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock}, + block::{timestamp_utc_ms, BlockAPI, BlockRef, SignedBlock, VerifiedBlock}, block_manager::BlockManager, block_verifier::{BlockVerifier, SignedBlockVerifier}, broadcaster::Broadcaster, @@ -30,12 +31,49 @@ use crate::{ CommitConsumer, }; -// This type is used by Sui as part of starting consensus via MysticetiManager. -pub type ConsensusAuthority = AuthorityNode; +// This type is used by Sui as part of starting consensus via MysticetiManager. +// It hides the details of the types. +pub struct ConsensusAuthority(AuthorityNode); -pub struct AuthorityNode +impl ConsensusAuthority { + pub async fn start( + own_index: AuthorityIndex, + committee: Committee, + parameters: Parameters, + protocol_config: ProtocolConfig, + protocol_keypair: ProtocolKeyPair, + network_keypair: NetworkKeyPair, + transaction_verifier: Arc, + commit_consumer: CommitConsumer, + registry: Registry, + ) -> Self { + let authority_node = AuthorityNode::start( + own_index, + committee, + parameters, + protocol_config, + protocol_keypair, + network_keypair, + transaction_verifier, + commit_consumer, + registry, + ) + .await; + Self(authority_node) + } + + pub async fn stop(self) { + self.0.stop().await; + } + + pub fn transaction_client(&self) -> Arc { + self.0.transaction_client() + } +} + +pub(crate) struct AuthorityNode where - N: NetworkManager, + N: NetworkManager>, { context: Arc, start_time: Instant, @@ -48,9 +86,9 @@ where impl AuthorityNode where - N: NetworkManager, + N: NetworkManager>, { - pub async fn start( + pub(crate) async fn start( own_index: AuthorityIndex, committee: Committee, parameters: Parameters, @@ -134,7 +172,7 @@ where } } - pub async fn stop(mut self) { + pub(crate) async fn stop(mut self) { info!( "Stopping authority. Total run time: {:?}", self.start_time.elapsed() @@ -152,20 +190,20 @@ where .observe(self.start_time.elapsed().as_secs_f64()); } - pub fn transaction_client(&self) -> Arc { + pub(crate) fn transaction_client(&self) -> Arc { self.transaction_client.clone() } } /// Authority's network interface. -pub struct AuthorityService { +pub(crate) struct AuthorityService { context: Arc, block_verifier: Arc, - core_dispatcher: ChannelCoreThreadDispatcher, + core_dispatcher: C, } #[async_trait] -impl NetworkService for AuthorityService { +impl NetworkService for AuthorityService { async fn handle_send_block( &self, peer: AuthorityIndex, @@ -174,6 +212,8 @@ impl NetworkService for AuthorityService { // TODO: dedup block verifications, here and with fetched blocks. let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?; + + // Reject blocks not produced by the peer. if peer != signed_block.author() { self.context .metrics @@ -185,6 +225,8 @@ impl NetworkService for AuthorityService { info!("Block with wrong authority from {}: {}", peer, e); return Err(e); } + + // Reject blocks failing validations. if let Err(e) = self.block_verifier.verify(&signed_block) { self.context .metrics @@ -196,6 +238,31 @@ impl NetworkService for AuthorityService { return Err(e); } let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); + + // Reject block with timestamp too far in the future. + let forward_time_drift = Duration::from_millis( + verified_block + .timestamp_ms() + .saturating_sub(timestamp_utc_ms()), + ); + if forward_time_drift > self.context.parameters.max_forward_time_drift { + return Err(ConsensusError::BlockTooFarInFuture { + block_timestamp: verified_block.timestamp_ms(), + forward_time_drift, + }); + } + + // Wait until the block's timestamp is current. + if forward_time_drift > Duration::ZERO { + self.context + .metrics + .node_metrics + .block_timestamp_drift_wait_ms + .with_label_values(&[&peer.to_string()]) + .inc_by(forward_time_drift.as_millis() as u64); + sleep(forward_time_drift).await; + } + self.core_dispatcher .add_blocks(vec![verified_block]) .await @@ -214,18 +281,61 @@ impl NetworkService for AuthorityService { #[cfg(test)] mod tests { + use std::collections::BTreeSet; use std::sync::Arc; + use async_trait::async_trait; use consensus_config::{local_committee_and_keys, NetworkKeyPair, Parameters, ProtocolKeyPair}; use fastcrypto::traits::ToFromBytes; + use parking_lot::Mutex; use prometheus::Registry; use sui_protocol_config::ProtocolConfig; use tempfile::TempDir; use tokio::sync::mpsc::unbounded_channel; + use tokio::time::sleep; use super::*; + use crate::authority_node::AuthorityService; + use crate::block::{timestamp_utc_ms, BlockRef, Round, TestBlock, VerifiedBlock}; + use crate::block_verifier::NoopBlockVerifier; + use crate::context::Context; + use crate::core_thread::{CoreError, CoreThreadDispatcher}; + use crate::network::NetworkService; use crate::transaction::NoopTransactionVerifier; + struct FakeCoreThreadDispatcher { + blocks: Mutex>, + } + + impl FakeCoreThreadDispatcher { + fn new() -> Self { + Self { + blocks: Mutex::new(vec![]), + } + } + + fn get_blocks(&self) -> Vec { + self.blocks.lock().clone() + } + } + + #[async_trait] + impl CoreThreadDispatcher for Arc { + async fn add_blocks(&self, blocks: Vec) -> Result, CoreError> { + let block_refs = blocks.iter().map(|b| b.reference()).collect(); + self.blocks.lock().extend(blocks); + Ok(block_refs) + } + + async fn force_new_block(&self, _round: Round) -> Result<(), CoreError> { + unimplemented!() + } + + async fn get_missing_blocks(&self) -> Result>, CoreError> { + unimplemented!() + } + } + #[tokio::test] async fn start_and_stop() { let (committee, keypairs) = local_committee_and_keys(0, vec![1]); @@ -260,10 +370,49 @@ mod tests { ) .await; - assert_eq!(authority.context.own_index, own_index); - assert_eq!(authority.context.committee.epoch(), 0); - assert_eq!(authority.context.committee.size(), 1); + assert_eq!(authority.0.context.own_index, own_index); + assert_eq!(authority.0.context.committee.epoch(), 0); + assert_eq!(authority.0.context.committee.size(), 1); authority.stop().await; } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_authority_service() { + let (context, _keys) = Context::new_for_test(4); + let context = Arc::new(context); + let block_verifier = NoopBlockVerifier {}; + let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new()); + let authority_service = Arc::new(AuthorityService { + context: context.clone(), + block_verifier: Arc::new(block_verifier), + core_dispatcher: core_dispatcher.clone(), + }); + + // Test delaying blocks with time drift. + let now = timestamp_utc_ms(); + let max_drift = context.parameters.max_forward_time_drift; + let input_block = VerifiedBlock::new_for_test( + TestBlock::new(9, 0) + .set_timestamp_ms(now + max_drift.as_millis() as u64) + .build(), + ); + + let service = authority_service.clone(); + let serialized = input_block.serialized().clone(); + tokio::spawn(async move { + service + .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized) + .await + .unwrap(); + }); + + sleep(max_drift / 2).await; + assert!(core_dispatcher.get_blocks().is_empty()); + + sleep(max_drift).await; + let blocks = core_dispatcher.get_blocks(); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0], input_block); + } } diff --git a/consensus/core/src/base_committer.rs b/consensus/core/src/base_committer.rs index 987fe295b1405..66bc4a01dfbde 100644 --- a/consensus/core/src/base_committer.rs +++ b/consensus/core/src/base_committer.rs @@ -17,7 +17,7 @@ use crate::{ #[cfg(test)] #[path = "tests/base_committer_tests.rs"] -pub mod base_committer_tests; +mod base_committer_tests; #[allow(unused)] pub(crate) struct BaseCommitterOptions { diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 60dac58bc3763..200699bce818e 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -324,7 +324,7 @@ impl fmt::Debug for Slot { /// will affect the values of `BlockDigest` and `BlockRef`. #[allow(unused)] #[derive(Deserialize, Serialize)] -pub struct SignedBlock { +pub(crate) struct SignedBlock { inner: Block, signature: Bytes, } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index 95b125c03e424..ec4c1a6356aa4 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -1,10 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{ - fmt::{self, Display, Formatter}, - sync::Arc, -}; +use std::fmt::{self, Display, Formatter}; use consensus_config::AuthorityIndex; use serde::{Deserialize, Serialize}; @@ -80,30 +77,6 @@ impl CommittedSubDag { } } - // Used for recovery, which is why we need to get the data from block store. - pub fn new_from_commit_data(commit_data: Commit, block_store: Arc) -> Self { - let mut leader_block_idx = None; - let commit_blocks = block_store - .read_blocks(&commit_data.blocks) - .expect("We should have the block referenced in the commit data"); - let blocks = commit_blocks - .into_iter() - .enumerate() - .map(|(idx, commit_block_opt)| { - let commit_block = commit_block_opt - .expect("We should have the block referenced in the commit data"); - if commit_block.reference() == commit_data.leader { - leader_block_idx = Some(idx); - } - commit_block - }) - .collect::>(); - let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag"); - let leader_block_ref = blocks[leader_block_idx].reference(); - let timestamp_ms = blocks[leader_block_idx].timestamp_ms(); - CommittedSubDag::new(leader_block_ref, blocks, timestamp_ms, commit_data.index) - } - /// Sort the blocks of the sub-dag by round number then authority index. Any /// deterministic & stable algorithm works. pub fn sort(&mut self) { @@ -142,6 +115,33 @@ impl fmt::Debug for CommittedSubDag { } } +// Recovers the full CommittedSubDag from block store, based on Commit. +pub fn load_committed_subdag_from_store( + block_store: &dyn Store, + commit_data: Commit, +) -> CommittedSubDag { + let mut leader_block_idx = None; + let commit_blocks = block_store + .read_blocks(&commit_data.blocks) + .expect("We should have the block referenced in the commit data"); + let blocks = commit_blocks + .into_iter() + .enumerate() + .map(|(idx, commit_block_opt)| { + let commit_block = + commit_block_opt.expect("We should have the block referenced in the commit data"); + if commit_block.reference() == commit_data.leader { + leader_block_idx = Some(idx); + } + commit_block + }) + .collect::>(); + let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag"); + let leader_block_ref = blocks[leader_block_idx].reference(); + let timestamp_ms = blocks[leader_block_idx].timestamp_ms(); + CommittedSubDag::new(leader_block_ref, blocks, timestamp_ms, commit_data.index) +} + #[allow(unused)] pub struct CommitConsumer { // A channel to send the committed sub dags through @@ -181,7 +181,7 @@ pub(crate) enum LeaderStatus { #[allow(unused)] impl LeaderStatus { - pub fn round(&self) -> Round { + pub(crate) fn round(&self) -> Round { match self { Self::Commit(block) => block.round(), Self::Skip(leader) => leader.round, @@ -189,7 +189,7 @@ impl LeaderStatus { } } - pub fn authority(&self) -> AuthorityIndex { + pub(crate) fn authority(&self) -> AuthorityIndex { match self { Self::Commit(block) => block.author(), Self::Skip(leader) => leader.authority, @@ -197,7 +197,7 @@ impl LeaderStatus { } } - pub fn is_decided(&self) -> bool { + pub(crate) fn is_decided(&self) -> bool { match self { Self::Commit(_) => true, Self::Skip(_) => true, @@ -218,8 +218,9 @@ impl Display for LeaderStatus { #[cfg(test)] mod tests { - use super::*; + use std::sync::Arc; + use super::*; use crate::{block::TestBlock, context::Context, storage::mem_store::MemStore}; #[test] @@ -282,7 +283,7 @@ mod tests { last_committed_rounds: vec![], }; - let subdag = CommittedSubDag::new_from_commit_data(commit_data, store.clone()); + let subdag = load_committed_subdag_from_store(store.as_ref(), commit_data); assert_eq!(subdag.leader, leader_ref); assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms()); assert_eq!( diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index 588908d4e7440..abae8cbda976e 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -8,7 +8,7 @@ use tokio::sync::mpsc::UnboundedSender; use crate::{ block::{timestamp_utc_ms, BlockAPI, VerifiedBlock}, - commit::{CommitIndex, CommittedSubDag}, + commit::{load_committed_subdag_from_store, CommitIndex, CommittedSubDag}, context::Context, dag_state::DagState, linearizer::Linearizer, @@ -111,8 +111,7 @@ impl CommitObserver { // Resend all the committed subdags to the consensus output channel // for all the commits above the last processed index. assert!(commit.index > last_processed_index); - let committed_subdag = - CommittedSubDag::new_from_commit_data(commit, self.store.clone()); + let committed_subdag = load_committed_subdag_from_store(self.store.as_ref(), commit); // Failures in sender.send() are assumed to be permanent if let Err(err) = self.sender.send(committed_subdag) { diff --git a/consensus/core/src/context.rs b/consensus/core/src/context.rs index 5aa39bc6c39b6..643dd086b7bb9 100644 --- a/consensus/core/src/context.rs +++ b/consensus/core/src/context.rs @@ -18,7 +18,7 @@ use crate::metrics::Metrics; /// of this authority. #[allow(dead_code)] #[derive(Clone)] -pub struct Context { +pub(crate) struct Context { /// Index of this authority in the committee. pub own_index: AuthorityIndex, /// Committee of the current epoch. diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 0f543d1cf8e07..b73627400d7ee 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashSet, fmt::Debug, sync::Arc, thread}; +use std::{collections::BTreeSet, fmt::Debug, sync::Arc, thread}; use async_trait::async_trait; use mysten_metrics::{metered_channel, monitored_scope}; @@ -24,23 +24,24 @@ enum CoreThreadCommand { /// Called when a leader timeout occurs and a block should be produced ForceNewBlock(Round, oneshot::Sender<()>), /// Request missing blocks that need to be synced. - GetMissing(oneshot::Sender>>), + GetMissing(oneshot::Sender>>), } #[derive(Error, Debug)] -pub(crate) enum CoreError { +pub enum CoreError { #[error("Core thread shutdown: {0}")] Shutdown(RecvError), } -/// The interface to adhere the implementations of the core thread dispatcher. Also allows the easier mocking during unit tests. +/// The interface to dispatch commands to CoreThread and Core. +/// Also this allows the easier mocking during unit tests. #[async_trait] -pub(crate) trait CoreThreadDispatcher: Sync + Send + 'static { +pub trait CoreThreadDispatcher: Sync + Send + 'static { async fn add_blocks(&self, blocks: Vec) -> Result, CoreError>; async fn force_new_block(&self, round: Round) -> Result<(), CoreError>; - async fn get_missing_blocks(&self) -> Result>, CoreError>; + async fn get_missing_blocks(&self) -> Result>, CoreError>; } #[allow(unused)] @@ -97,7 +98,7 @@ pub(crate) struct ChannelCoreThreadDispatcher { } impl ChannelCoreThreadDispatcher { - pub fn start(core: Core, context: Arc) -> (Self, CoreThreadHandle) { + pub(crate) fn start(core: Core, context: Arc) -> (Self, CoreThreadHandle) { let (sender, receiver) = metered_channel::channel_with_total( CORE_THREAD_COMMANDS_CHANNEL_SIZE, &context.metrics.channel_metrics.core_thread, @@ -154,7 +155,7 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { receiver.await.map_err(Shutdown) } - async fn get_missing_blocks(&self) -> Result>, CoreError> { + async fn get_missing_blocks(&self) -> Result>, CoreError> { let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::GetMissing(sender)).await; receiver.await.map_err(Shutdown) diff --git a/consensus/core/src/error.rs b/consensus/core/src/error.rs index 0f8d5aef9041b..ba369c6a1bca7 100644 --- a/consensus/core/src/error.rs +++ b/consensus/core/src/error.rs @@ -1,12 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::time::Duration; + use consensus_config::{AuthorityIndex, Epoch, Stake}; use fastcrypto::error::FastCryptoError; use thiserror::Error; use typed_store::TypedStoreError; -use crate::block::Round; +use crate::block::{BlockTimestampMs, Round}; /// Errors that can occur when processing blocks, reading from storage, or encountering shutdown. #[allow(unused)] @@ -51,6 +53,12 @@ pub enum ConsensusError { #[error("Invalid transaction: {0}")] InvalidTransaction(String), + #[error("Block at {block_timestamp}ms is too far in the future: {forward_time_drift:?}")] + BlockTooFarInFuture { + block_timestamp: BlockTimestampMs, + forward_time_drift: Duration, + }, + #[error("RocksDB failure: {0}")] RocksDBFailure(#[from] TypedStoreError), diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index d6984411202e3..6f84911663369 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -96,7 +96,7 @@ impl LeaderTimeoutTask { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -140,12 +140,12 @@ mod tests { Ok(()) } - async fn get_missing_blocks(&self) -> Result>, CoreError> { + async fn get_missing_blocks(&self) -> Result>, CoreError> { todo!() } } - #[tokio::test] + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn basic_leader_timeout() { let (context, _signers) = Context::new_for_test(4); let dispatcher = MockCoreThreadDispatcher::default(); @@ -155,7 +155,7 @@ mod tests { ..Default::default() }; let context = Arc::new(context.with_parameters(parameters)); - let now = Instant::now(); + let start = Instant::now(); let (mut signals, signal_receivers) = CoreSignals::new(); @@ -173,7 +173,12 @@ mod tests { let (round, timestamp) = all_calls[0]; assert_eq!(round, 10); - assert!(leader_timeout < timestamp - now); + assert!( + leader_timeout <= timestamp - start, + "Leader timeout setting {:?} should be less than actual time difference {:?}", + leader_timeout, + timestamp - start + ); // now wait another 2 * leader_timeout, no other call should be received sleep(2 * leader_timeout).await; @@ -182,7 +187,7 @@ mod tests { assert_eq!(all_calls.len(), 0); } - #[tokio::test] + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn multiple_leader_timeouts() { let (context, _signers) = Context::new_for_test(4); let dispatcher = MockCoreThreadDispatcher::default(); diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index e68610a4bac8d..73f61e386ebe1 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -14,14 +14,14 @@ use crate::{ /// Expand a committed sequence of leader into a sequence of sub-dags. #[allow(unused)] #[derive(Clone)] -pub struct Linearizer { +pub(crate) struct Linearizer { /// In memory block store representing the dag state dag_state: Arc>, } #[allow(unused)] impl Linearizer { - pub fn new(dag_state: Arc>) -> Self { + pub(crate) fn new(dag_state: Arc>) -> Self { Self { dag_state } } @@ -80,7 +80,10 @@ impl Linearizer { // This function should be called whenever a new commit is observed. This will // iterate over the sequence of committed leaders and produce a list of committed // sub-dags. - pub fn handle_commit(&mut self, committed_leaders: Vec) -> Vec { + pub(crate) fn handle_commit( + &mut self, + committed_leaders: Vec, + ) -> Vec { let mut committed_sub_dags = vec![]; let mut commits = vec![]; let mut committed_blocks = vec![]; diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 91524cc3a8b0c..57658bd8291e1 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -1,13 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + use prometheus::{ register_histogram_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; -use std::sync::Arc; const LATENCY_SEC_BUCKETS: &[f64] = &[ 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, @@ -15,9 +16,9 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[ 12.5, 15., 17.5, 20., 25., 30., 60., 90., 120., 180., 300., ]; -pub struct Metrics { - pub node_metrics: NodeMetrics, - pub channel_metrics: ChannelMetrics, +pub(crate) struct Metrics { + pub(crate) node_metrics: NodeMetrics, + pub(crate) channel_metrics: ChannelMetrics, } pub(crate) fn initialise_metrics(registry: Registry) -> Arc { @@ -35,7 +36,7 @@ pub(crate) fn test_metrics() -> Arc { initialise_metrics(Registry::new()) } -pub struct NodeMetrics { +pub(crate) struct NodeMetrics { pub uptime: Histogram, pub quorum_receive_latency: Histogram, pub core_lock_enqueued: IntCounter, @@ -45,7 +46,9 @@ pub struct NodeMetrics { pub suspended_blocks: IntCounterVec, pub unsuspended_blocks: IntCounterVec, pub invalid_blocks: IntCounterVec, + pub block_timestamp_drift_wait_ms: IntCounterVec, pub broadcaster_rtt_estimate_ms: IntGaugeVec, + // Commit Metrics #[allow(unused)] pub committed_leaders_total: IntCounterVec, @@ -113,6 +116,13 @@ impl NodeMetrics { registry, ) .unwrap(), + block_timestamp_drift_wait_ms: register_int_counter_vec_with_registry!( + "block_timestamp_drift_wait_ms", + "Total time in ms spent waiting, when a received block has timestamp in future.", + &["authority"], + registry, + ) + .unwrap(), broadcaster_rtt_estimate_ms: register_int_gauge_vec_with_registry!( "broadcaster_rtt_estimate_ms", "Estimated RTT latency per peer authority, for block sending in Broadcaster", @@ -151,7 +161,7 @@ impl NodeMetrics { } } -pub struct ChannelMetrics { +pub(crate) struct ChannelMetrics { /// occupancy of the channel from TransactionClient to TransactionConsumer pub tx_transactions_submit: IntGauge, /// total received on channel from TransactionClient to TransactionConsumer diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index c2e37e48a9051..f700b21b946d3 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -28,7 +28,7 @@ use crate::{ }; /// Implements RPC client for Consensus. -pub struct AnemoClient { +pub(crate) struct AnemoClient { context: Arc, network: Arc>, } @@ -230,14 +230,14 @@ impl ConsensusRpc for AnemoServiceProxy { /// 3. Create consensus components. /// 4. Create `AnemoService` for consensus RPC handler. /// 5. Install `AnemoService` to `AnemoManager` with `AnemoManager::install_service()`. -pub struct AnemoManager { +pub(crate) struct AnemoManager { context: Arc, client: Arc, network: Arc>, } impl AnemoManager { - pub fn new(context: Arc) -> Self { + pub(crate) fn new(context: Arc) -> Self { Self { context: context.clone(), client: Arc::new(AnemoClient::new(context)), @@ -415,7 +415,7 @@ mod test { } } - #[tokio::test] + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_basics() { let (context, keys) = Context::new_for_test(4); diff --git a/consensus/core/src/network/mod.rs b/consensus/core/src/network/mod.rs index 17834830e4957..a900f137533cc 100644 --- a/consensus/core/src/network/mod.rs +++ b/consensus/core/src/network/mod.rs @@ -19,7 +19,7 @@ pub(crate) mod anemo_network; /// Network client for communicating with peers. #[async_trait] -pub trait NetworkClient: Send + Sync + 'static { +pub(crate) trait NetworkClient: Send + Sync + 'static { /// Sends a serialized SignedBlock to a peer. async fn send_block(&self, peer: AuthorityIndex, block: &Bytes) -> ConsensusResult<()>; @@ -35,7 +35,7 @@ pub trait NetworkClient: Send + Sync + 'static { /// NOTE: using `async_trait` macro because `NetworkService` methods are called in the trait impl /// of `anemo_gen::ConsensusRpc`, which itself is annotated with `async_trait`. #[async_trait] -pub trait NetworkService: Send + Sync + 'static { +pub(crate) trait NetworkService: Send + Sync + 'static { async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()>; async fn handle_fetch_blocks( &self, @@ -46,7 +46,7 @@ pub trait NetworkService: Send + Sync + 'static { /// An `AuthorityNode` holds a `NetworkManager` until shutdown. /// Dropping `NetworkManager` will shutdown the network service. -pub trait NetworkManager: Send + Sync +pub(crate) trait NetworkManager: Send + Sync where S: NetworkService, { @@ -62,8 +62,6 @@ where fn install_service(&self, network_keypair: NetworkKeyPair, service: Arc); /// Stops the network service. - // TODO: Investigate if we need to use async fn here - #[allow(async_fn_in_trait)] async fn stop(&self); } diff --git a/consensus/core/src/stake_aggregator.rs b/consensus/core/src/stake_aggregator.rs index 668709003991c..0202f6b3a8d82 100644 --- a/consensus/core/src/stake_aggregator.rs +++ b/consensus/core/src/stake_aggregator.rs @@ -5,7 +5,7 @@ use consensus_config::{AuthorityIndex, Committee, Stake}; use std::collections::HashSet; use std::marker::PhantomData; -pub trait CommitteeThreshold { +pub(crate) trait CommitteeThreshold { fn is_threshold(committee: &Committee, amount: Stake) -> bool; } @@ -27,7 +27,7 @@ impl CommitteeThreshold for ValidityThreshold { #[allow(unused)] -pub struct StakeAggregator { +pub(crate) struct StakeAggregator { votes: HashSet, stake: Stake, _phantom: PhantomData, @@ -36,7 +36,7 @@ pub struct StakeAggregator { #[allow(unused)] impl StakeAggregator { - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self { votes: Default::default(), stake: 0, @@ -47,14 +47,14 @@ impl StakeAggregator { /// Adds a vote for the specified authority index to the aggregator. It is guaranteed to count /// the vote only once for an authority. The method returns true when the required threshold has /// been reached. - pub fn add(&mut self, vote: AuthorityIndex, committee: &Committee) -> bool { + pub(crate) fn add(&mut self, vote: AuthorityIndex, committee: &Committee) -> bool { if self.votes.insert(vote) { self.stake += committee.stake(vote); } T::is_threshold(committee, self.stake) } - pub fn clear(&mut self) { + pub(crate) fn clear(&mut self) { self.votes.clear(); self.stake = 0; } diff --git a/consensus/core/src/storage/mod.rs b/consensus/core/src/storage/mod.rs index 57d76a2c522da..58091d754c915 100644 --- a/consensus/core/src/storage/mod.rs +++ b/consensus/core/src/storage/mod.rs @@ -16,7 +16,7 @@ use crate::{ }; /// A common interface for consensus storage. -pub trait Store: Send + Sync { +pub(crate) trait Store: Send + Sync { /// Writes blocks and consensus commits to store. fn write(&self, blocks: Vec, commits: Vec) -> ConsensusResult<()>; diff --git a/consensus/core/src/threshold_clock.rs b/consensus/core/src/threshold_clock.rs index 82f49e29b98c7..84569875d169a 100644 --- a/consensus/core/src/threshold_clock.rs +++ b/consensus/core/src/threshold_clock.rs @@ -30,13 +30,13 @@ impl ThresholdClock { } } - pub fn last_quorum_ts(&self) -> Instant { + pub(crate) fn last_quorum_ts(&self) -> Instant { self.last_quorum_ts } /// Add the block references that have been successfully processed and advance the round accordingly. If the round /// has indeed advanced then the new round is returned, otherwise None is returned. - pub fn add_blocks(&mut self, mut blocks: Vec) -> Option { + pub(crate) fn add_blocks(&mut self, mut blocks: Vec) -> Option { let previous_round = self.round; for block_ref in blocks { self.add_block(block_ref); @@ -44,7 +44,7 @@ impl ThresholdClock { (self.round > previous_round).then_some(self.round) } - pub fn add_block(&mut self, block: BlockRef) { + pub(crate) fn add_block(&mut self, block: BlockRef) { match block.round.cmp(&self.round) { // Blocks with round less then what we currently build are irrelevant here Ordering::Less => {} @@ -73,7 +73,7 @@ impl ThresholdClock { } } - pub fn get_round(&self) -> Round { + pub(crate) fn get_round(&self) -> Round { self.round } } diff --git a/consensus/core/src/universal_committer.rs b/consensus/core/src/universal_committer.rs index b0341d0e77c5c..497799dac33a9 100644 --- a/consensus/core/src/universal_committer.rs +++ b/consensus/core/src/universal_committer.rs @@ -16,11 +16,11 @@ use crate::{ #[cfg(test)] #[path = "tests/universal_committer_tests.rs"] -pub mod universal_committer_tests; +mod universal_committer_tests; #[cfg(test)] #[path = "tests/pipelined_committer_tests.rs"] -pub mod pipelined_committer_tests; +mod pipelined_committer_tests; /// A universal committer uses a collection of committers to commit a sequence of leaders. /// It can be configured to use a combination of different commit strategies, including @@ -39,7 +39,7 @@ impl UniversalCommitter { /// Try to commit part of the dag. This function is idempotent and returns a list of /// ordered decided leaders. #[tracing::instrument(skip_all, fields(last_decided = %last_decided))] - pub fn try_commit(&self, last_decided: Slot) -> Vec { + pub(crate) fn try_commit(&self, last_decided: Slot) -> Vec { let highest_accepted_round = self.dag_state.read().highest_accepted_round(); // Try to decide as many leaders as possible, starting with the highest round. @@ -97,7 +97,7 @@ impl UniversalCommitter { /// Return list of leaders for the round. /// Can return empty vec if round does not have a designated leader. - pub fn get_leaders(&self, round: Round) -> Vec { + pub(crate) fn get_leaders(&self, round: Round) -> Vec { self.committers .iter() .filter_map(|committer| committer.elect_leader(round)) @@ -148,7 +148,7 @@ mod universal_committer_builder { } impl UniversalCommitterBuilder { - pub fn new(context: Arc, dag_state: Arc>) -> Self { + pub(crate) fn new(context: Arc, dag_state: Arc>) -> Self { let leader_schedule = LeaderSchedule::new(context.clone()); Self { context, @@ -160,22 +160,22 @@ mod universal_committer_builder { } } - pub fn with_wave_length(mut self, wave_length: Round) -> Self { + pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self { self.wave_length = wave_length; self } - pub fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self { + pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self { self.number_of_leaders = number_of_leaders; self } - pub fn with_pipeline(mut self, pipeline: bool) -> Self { + pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self { self.pipeline = pipeline; self } - pub fn build(self) -> UniversalCommitter { + pub(crate) fn build(self) -> UniversalCommitter { let mut committers = Vec::new(); let pipeline_stages = if self.pipeline { self.wave_length } else { 1 }; for round_offset in 0..pipeline_stages {