Skip to content

Commit

Permalink
[Consensus] handle blocks with clock drift (MystenLabs#16252)
Browse files Browse the repository at this point in the history
## Description 

- Add basic timedrift handling.
- Add a basic unit test with `AuthorityService`.
- Update types visibility.

## Test Plan 

Unit test.

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Feb 16, 2024
1 parent 4f52e05 commit 884b8f2
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 107 deletions.
9 changes: 9 additions & 0 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub struct Parameters {
#[serde(default = "Parameters::default_leader_timeout")]
pub leader_timeout: Duration,

/// Maximum forward time drift (how far in future) allowed for received blocks.
#[serde(default = "Parameters::default_max_forward_time_drift")]
pub max_forward_time_drift: Duration,

/// The database path. The path should be provided in order for the node to be able to boot
pub db_path: Option<PathBuf>,
}
Expand All @@ -27,6 +31,10 @@ impl Parameters {
Duration::from_millis(250)
}

pub fn default_max_forward_time_drift() -> Duration {
Duration::from_millis(500)
}

pub fn db_path_str_unsafe(&self) -> String {
self.db_path
.clone()
Expand All @@ -42,6 +50,7 @@ impl Default for Parameters {
fn default() -> Self {
Self {
leader_timeout: Parameters::default_leader_timeout(),
max_forward_time_drift: Parameters::default_max_forward_time_drift(),
db_path: None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ expression: parameters
leader_timeout:
secs: 0
nanos: 250000000
max_forward_time_drift:
secs: 0
nanos: 500000000
db_path: ~

181 changes: 165 additions & 16 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{sync::Arc, time::Instant, vec};
use std::{sync::Arc, time::Duration, time::Instant, vec};

use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::ProtocolConfig;
use tokio::time::sleep;
use tracing::info;

use crate::{
block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
block::{timestamp_utc_ms, BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
block_manager::BlockManager,
block_verifier::{BlockVerifier, SignedBlockVerifier},
broadcaster::Broadcaster,
Expand All @@ -30,12 +31,49 @@ use crate::{
CommitConsumer,
};

// This type is used by Sui as part of starting consensus via MysticetiManager.
pub type ConsensusAuthority = AuthorityNode<AnemoManager>;
// This type is used by Sui as part of starting consensus via MysticetiManager.
// It hides the details of the types.
pub struct ConsensusAuthority(AuthorityNode<AnemoManager>);

pub struct AuthorityNode<N>
impl ConsensusAuthority {
pub async fn start(
own_index: AuthorityIndex,
committee: Committee,
parameters: Parameters,
protocol_config: ProtocolConfig,
protocol_keypair: ProtocolKeyPair,
network_keypair: NetworkKeyPair,
transaction_verifier: Arc<dyn TransactionVerifier>,
commit_consumer: CommitConsumer,
registry: Registry,
) -> Self {
let authority_node = AuthorityNode::start(
own_index,
committee,
parameters,
protocol_config,
protocol_keypair,
network_keypair,
transaction_verifier,
commit_consumer,
registry,
)
.await;
Self(authority_node)
}

pub async fn stop(self) {
self.0.stop().await;
}

pub fn transaction_client(&self) -> Arc<TransactionClient> {
self.0.transaction_client()
}
}

pub(crate) struct AuthorityNode<N>
where
N: NetworkManager<AuthorityService>,
N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
{
context: Arc<Context>,
start_time: Instant,
Expand All @@ -48,9 +86,9 @@ where

impl<N> AuthorityNode<N>
where
N: NetworkManager<AuthorityService>,
N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
{
pub async fn start(
pub(crate) async fn start(
own_index: AuthorityIndex,
committee: Committee,
parameters: Parameters,
Expand Down Expand Up @@ -134,7 +172,7 @@ where
}
}

pub async fn stop(mut self) {
pub(crate) async fn stop(mut self) {
info!(
"Stopping authority. Total run time: {:?}",
self.start_time.elapsed()
Expand All @@ -152,20 +190,20 @@ where
.observe(self.start_time.elapsed().as_secs_f64());
}

pub fn transaction_client(&self) -> Arc<TransactionClient> {
pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
self.transaction_client.clone()
}
}

/// Authority's network interface.
pub struct AuthorityService {
pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
context: Arc<Context>,
block_verifier: Arc<dyn BlockVerifier>,
core_dispatcher: ChannelCoreThreadDispatcher,
core_dispatcher: C,
}

#[async_trait]
impl NetworkService for AuthorityService {
impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
async fn handle_send_block(
&self,
peer: AuthorityIndex,
Expand All @@ -174,6 +212,8 @@ impl NetworkService for AuthorityService {
// TODO: dedup block verifications, here and with fetched blocks.
let signed_block: SignedBlock =
bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;

// Reject blocks not produced by the peer.
if peer != signed_block.author() {
self.context
.metrics
Expand All @@ -185,6 +225,8 @@ impl NetworkService for AuthorityService {
info!("Block with wrong authority from {}: {}", peer, e);
return Err(e);
}

// Reject blocks failing validations.
if let Err(e) = self.block_verifier.verify(&signed_block) {
self.context
.metrics
Expand All @@ -196,6 +238,31 @@ impl NetworkService for AuthorityService {
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

// Reject block with timestamp too far in the future.
let forward_time_drift = Duration::from_millis(
verified_block
.timestamp_ms()
.saturating_sub(timestamp_utc_ms()),
);
if forward_time_drift > self.context.parameters.max_forward_time_drift {
return Err(ConsensusError::BlockTooFarInFuture {
block_timestamp: verified_block.timestamp_ms(),
forward_time_drift,
});
}

// Wait until the block's timestamp is current.
if forward_time_drift > Duration::ZERO {
self.context
.metrics
.node_metrics
.block_timestamp_drift_wait_ms
.with_label_values(&[&peer.to_string()])
.inc_by(forward_time_drift.as_millis() as u64);
sleep(forward_time_drift).await;
}

self.core_dispatcher
.add_blocks(vec![verified_block])
.await
Expand All @@ -214,18 +281,61 @@ impl NetworkService for AuthorityService {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::sync::Arc;

use async_trait::async_trait;
use consensus_config::{local_committee_and_keys, NetworkKeyPair, Parameters, ProtocolKeyPair};
use fastcrypto::traits::ToFromBytes;
use parking_lot::Mutex;
use prometheus::Registry;
use sui_protocol_config::ProtocolConfig;
use tempfile::TempDir;
use tokio::sync::mpsc::unbounded_channel;
use tokio::time::sleep;

use super::*;
use crate::authority_node::AuthorityService;
use crate::block::{timestamp_utc_ms, BlockRef, Round, TestBlock, VerifiedBlock};
use crate::block_verifier::NoopBlockVerifier;
use crate::context::Context;
use crate::core_thread::{CoreError, CoreThreadDispatcher};
use crate::network::NetworkService;
use crate::transaction::NoopTransactionVerifier;

struct FakeCoreThreadDispatcher {
blocks: Mutex<Vec<VerifiedBlock>>,
}

impl FakeCoreThreadDispatcher {
fn new() -> Self {
Self {
blocks: Mutex::new(vec![]),
}
}

fn get_blocks(&self) -> Vec<VerifiedBlock> {
self.blocks.lock().clone()
}
}

#[async_trait]
impl CoreThreadDispatcher for Arc<FakeCoreThreadDispatcher> {
async fn add_blocks(&self, blocks: Vec<VerifiedBlock>) -> Result<Vec<BlockRef>, CoreError> {
let block_refs = blocks.iter().map(|b| b.reference()).collect();
self.blocks.lock().extend(blocks);
Ok(block_refs)
}

async fn force_new_block(&self, _round: Round) -> Result<(), CoreError> {
unimplemented!()
}

async fn get_missing_blocks(&self) -> Result<Vec<BTreeSet<BlockRef>>, CoreError> {
unimplemented!()
}
}

#[tokio::test]
async fn start_and_stop() {
let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
Expand Down Expand Up @@ -260,10 +370,49 @@ mod tests {
)
.await;

assert_eq!(authority.context.own_index, own_index);
assert_eq!(authority.context.committee.epoch(), 0);
assert_eq!(authority.context.committee.size(), 1);
assert_eq!(authority.0.context.own_index, own_index);
assert_eq!(authority.0.context.committee.epoch(), 0);
assert_eq!(authority.0.context.committee.size(), 1);

authority.stop().await;
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_authority_service() {
let (context, _keys) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = NoopBlockVerifier {};
let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
let authority_service = Arc::new(AuthorityService {
context: context.clone(),
block_verifier: Arc::new(block_verifier),
core_dispatcher: core_dispatcher.clone(),
});

// Test delaying blocks with time drift.
let now = timestamp_utc_ms();
let max_drift = context.parameters.max_forward_time_drift;
let input_block = VerifiedBlock::new_for_test(
TestBlock::new(9, 0)
.set_timestamp_ms(now + max_drift.as_millis() as u64)
.build(),
);

let service = authority_service.clone();
let serialized = input_block.serialized().clone();
tokio::spawn(async move {
service
.handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
.await
.unwrap();
});

sleep(max_drift / 2).await;
assert!(core_dispatcher.get_blocks().is_empty());

sleep(max_drift).await;
let blocks = core_dispatcher.get_blocks();
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0], input_block);
}
}
2 changes: 1 addition & 1 deletion consensus/core/src/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

#[cfg(test)]
#[path = "tests/base_committer_tests.rs"]
pub mod base_committer_tests;
mod base_committer_tests;

#[allow(unused)]
pub(crate) struct BaseCommitterOptions {
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl fmt::Debug for Slot {
/// will affect the values of `BlockDigest` and `BlockRef`.
#[allow(unused)]
#[derive(Deserialize, Serialize)]
pub struct SignedBlock {
pub(crate) struct SignedBlock {
inner: Block,
signature: Bytes,
}
Expand Down
Loading

0 comments on commit 884b8f2

Please sign in to comment.