From ad7b440e2426266f7a2b819c45914a25207b446c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Fri, 25 Oct 2024 02:41:20 +0100 Subject: [PATCH] Second pass on the bft actor. Did most of replica. --- node/actors/bft/src/leader/tests.rs | 12 +- node/actors/bft/src/replica/commit.rs | 28 +-- node/actors/bft/src/replica/misc.rs | 19 +- node/actors/bft/src/replica/mod.rs | 118 +++++++----- node/actors/bft/src/replica/new_view.rs | 138 ++++++++++++--- node/actors/bft/src/replica/proposal.rs | 10 +- node/actors/bft/src/replica/tests.rs | 4 +- node/actors/bft/src/replica/timeout.rs | 197 +++++++++++++++++++++ node/actors/bft/src/testonly/ut_harness.rs | 8 +- spec/informal-spec/replica.rs | 14 +- spec/informal-spec/types.rs | 2 +- 11 files changed, 449 insertions(+), 101 deletions(-) create mode 100644 node/actors/bft/src/replica/timeout.rs diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index e6453384..9b6a9614 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -137,7 +137,7 @@ async fn replica_prepare_old_view() { s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(); - util.leader.view = util.replica.view.next(); + util.leader.view = util.replica.view_number.next(); util.leader.phase = Phase::Prepare; let res = util .process_replica_prepare(ctx, util.sign(replica_prepare)) @@ -164,7 +164,7 @@ async fn replica_prepare_during_commit() { s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(); - util.leader.view = util.replica.view; + util.leader.view = util.replica.view_number; util.leader.phase = Phase::Commit; let res = util .process_replica_prepare(ctx, util.sign(replica_prepare)) @@ -175,7 +175,7 @@ async fn replica_prepare_during_commit() { current_view, current_phase: Phase::Commit, }) => { - assert_eq!(current_view, util.replica.view); + assert_eq!(current_view, util.replica.view_number); } ); Ok(()) @@ -607,13 +607,13 @@ async fn replica_commit_old() { s.spawn_bg(runner.run(ctx)); let mut replica_commit = util.new_replica_commit(ctx).await; - replica_commit.view.number = ViewNumber(util.replica.view.0 - 1); + replica_commit.view.number = ViewNumber(util.replica.view_number.0 - 1); let replica_commit = util.sign(replica_commit); let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!( res, Err(replica_commit::Error::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica.view); + assert_eq!(current_view, util.replica.view_number); assert_eq!(current_phase, util.replica.phase); } ); @@ -632,7 +632,7 @@ async fn replica_commit_not_leader_in_view() { s.spawn_bg(runner.run(ctx)); util.produce_block(ctx).await; - let current_view_leader = util.view_leader(util.replica.view); + let current_view_leader = util.view_leader(util.replica.view_number); assert_ne!(current_view_leader, util.owner_key().public()); let replica_commit = util.new_current_replica_commit(); let res = util diff --git a/node/actors/bft/src/replica/commit.rs b/node/actors/bft/src/replica/commit.rs index 913fcd48..78ca9282 100644 --- a/node/actors/bft/src/replica/commit.rs +++ b/node/actors/bft/src/replica/commit.rs @@ -1,4 +1,3 @@ -//! Handler of a ReplicaCommit message. use super::StateMachine; use crate::metrics; use std::collections::HashSet; @@ -21,10 +20,10 @@ pub(crate) enum Error { current_view: validator::ViewNumber, }, /// Duplicate signer. - #[error("duplicate signer (current view: {current_view:?}, signer: {signer:?})")] + #[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")] DuplicateSigner { - /// Current view. - current_view: validator::ViewNumber, + /// View number of the message. + message_view: validator::ViewNumber, /// Signer of the message. signer: Box, }, @@ -72,17 +71,17 @@ impl StateMachine { } // If the message is from a past view, ignore it. - if message.view.number < self.view { + if message.view.number < self.view_number { return Err(Error::Old { - current_view: self.view, + current_view: self.view_number, }); } - // If we already have a message from the same validator for the same view, ignore it. + // If we already have a message from the same validator for the same or past view, ignore it. if let Some(&view) = self.commit_views_cache.get(author) { - if view == message.view.number { + if view >= message.view.number { return Err(Error::DuplicateSigner { - current_view: self.view, + message_view: message.view.number, signer: author.clone().into(), }); } @@ -115,19 +114,19 @@ impl StateMachine { // Calculate the CommitQC signers weight. let weight = self.config.genesis().validators.weight(&commit_qc.signers); - // Update commit message current view number for author + // Update view number of last commit message for author self.commit_views_cache .insert(author.clone(), message.view.number); // Clean up commit_qcs for the case that no replica is at the view - // of a given CommitQC + // of a given CommitQC. // This prevents commit_qcs map from growing indefinitely in case some - // malicious replica starts spamming messages for future views + // malicious replica starts spamming messages for future views. let active_views: HashSet<_> = self.commit_views_cache.values().collect(); self.commit_qcs_cache .retain(|view_number, _| active_views.contains(view_number)); - // Now we check if we have enough weight to continue. + // Now we check if we have enough weight to continue. If not, we wait for more messages. if weight < self.config.genesis().validators.quorum_threshold() { return Ok(()); }; @@ -142,6 +141,7 @@ impl StateMachine { .remove(message) .unwrap(); + // We update our state with the new commit QC. self.process_commit_qc(ctx, &commit_qc) .await .wrap("process_commit_qc()")?; @@ -154,7 +154,7 @@ impl StateMachine { self.phase_start = now; // Start a new view. - self.start_new_view(ctx, message.view.number.next()); + self.start_new_view(ctx, message.view.number.next()).await?; Ok(()) } diff --git a/node/actors/bft/src/replica/misc.rs b/node/actors/bft/src/replica/misc.rs index 02dcf2ce..fc08baaf 100644 --- a/node/actors/bft/src/replica/misc.rs +++ b/node/actors/bft/src/replica/misc.rs @@ -5,6 +5,23 @@ use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; impl StateMachine { + /// Makes a justification (for a ReplicaNewView or a LeaderProposal) based on the current state. + pub(crate) fn get_justification(&self) -> validator::ProposalJustification { + // We need some QC in order to be able to create a justification. + // In fact, it should be impossible to get here without a QC. Because + // we only get here after starting a new view, which requires a QC. + assert!(self.high_commit_qc.is_some() || self.high_timeout_qc.is_some()); + + // We use the highest QC as the justification. If both have the same view, we use the CommitQC. + if self.high_commit_qc.as_ref().map(|x| x.view()) + >= self.high_timeout_qc.as_ref().map(|x| &x.view) + { + validator::ProposalJustification::Commit(self.high_commit_qc.clone().unwrap()) + } else { + validator::ProposalJustification::Timeout(self.high_timeout_qc.clone().unwrap()) + } + } + /// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if /// we have the proposal corresponding to this qc, we save the corresponding block to DB. pub(crate) async fn process_commit_qc( @@ -72,7 +89,7 @@ impl StateMachine { })); } let backup = storage::ReplicaState { - view: self.view, + view: self.view_number, phase: self.phase, high_vote: self.high_vote.clone(), high_commit_qc: self.high_commit_qc.clone(), diff --git a/node/actors/bft/src/replica/mod.rs b/node/actors/bft/src/replica/mod.rs index 664a85f3..df6e37da 100644 --- a/node/actors/bft/src/replica/mod.rs +++ b/node/actors/bft/src/replica/mod.rs @@ -11,22 +11,21 @@ use zksync_concurrency::{ time, }; use zksync_consensus_network::io::ConsensusReq; -use zksync_consensus_roles::{validator, validator::ConsensusMsg}; +use zksync_consensus_roles::validator::{self, ConsensusMsg}; mod commit; -mod leader_commit; mod misc; mod new_view; mod proposal; -mod replica_prepare; #[cfg(test)] mod tests; +mod timeout; /// The StateMachine struct contains the state of the replica. It is responsible /// for validating and voting on blocks. When participating in consensus we are always a replica. #[derive(Debug)] pub(crate) struct StateMachine { - /// Consensus configuration and output channel. + /// Consensus configuration. pub(crate) config: Arc, /// Pipe through which replica sends network messages. pub(super) outbound_pipe: OutputSender, @@ -34,7 +33,7 @@ pub(crate) struct StateMachine { inbound_pipe: sync::prunable_mpsc::Receiver, /// The current view number. - pub(crate) view: validator::ViewNumber, + pub(crate) view_number: validator::ViewNumber, /// The current phase. pub(crate) phase: validator::Phase, /// The highest block proposal that the replica has committed to. @@ -93,11 +92,11 @@ impl StateMachine { StateMachine::inbound_selection_function, ); - let mut this = Self { + let this = Self { config, outbound_pipe, inbound_pipe: recv, - view: backup.view, + view_number: backup.view, phase: backup.phase, high_vote: backup.high_vote, high_commit_qc: backup.high_commit_qc, @@ -107,13 +106,10 @@ impl StateMachine { commit_qcs_cache: BTreeMap::new(), timeout_views_cache: BTreeMap::new(), timeout_qcs_cache: BTreeMap::new(), - timeout_deadline: time::Deadline::Infinite, + timeout_deadline: time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION), phase_start: ctx.now(), }; - // We need to start the replica before processing inputs. - this.start_new_view(ctx).await.wrap("start_new_view()")?; - Ok((this, send)) } @@ -121,6 +117,15 @@ impl StateMachine { /// This is the main entry point for the state machine, /// potentially triggering state modifications and message sending to the executor. pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { + // If this is the first view, we immediately timeout. This will force the replicas + // to synchronize right at the beginning and will provide a justification for the + // next view. This is necessary because the first view is not justified by any + // previous view. + if self.view_number == validator::ViewNumber(0) { + self.start_timeout(ctx).await?; + } + + // Main loop. loop { let recv = self .inbound_pipe @@ -134,32 +139,32 @@ impl StateMachine { // Check for timeout. let Some(req) = recv.ok() else { - self.start_new_view(ctx).await?; + self.start_timeout(ctx).await?; continue; }; let now = ctx.now(); let label = match &req.msg.msg { - ConsensusMsg::ReplicaPrepare(_) => { + ConsensusMsg::LeaderProposal(_) => { let res = match self - .process_replica_prepare(ctx, req.msg.cast().unwrap()) + .on_proposal(ctx, req.msg.cast().unwrap()) .await - .wrap("process_replica_prepare()") + .wrap("on_proposal()") { Ok(()) => Ok(()), Err(err) => { match err { - super::replica_prepare::Error::Internal(e) => { - tracing::error!( - "process_replica_prepare: internal error: {e:#}" - ); + // If the error is internal, we stop here. + proposal::Error::Internal(e) => { + tracing::error!("on_proposal: internal error: {e:#}"); return Err(e); } - super::replica_prepare::Error::Old { .. } => { - tracing::debug!("process_replica_prepare: {err:#}"); + // If the error is due to an old message, we log it at a lower level. + proposal::Error::Old { .. } => { + tracing::debug!("on_proposal: {err:#}"); } _ => { - tracing::warn!("process_replica_prepare: {err:#}"); + tracing::warn!("on_proposal: {err:#}"); } } Err(()) @@ -167,57 +172,86 @@ impl StateMachine { }; metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) } - ConsensusMsg::LeaderPrepare(_) => { + ConsensusMsg::ReplicaCommit(_) => { let res = match self - .on_proposal(ctx, req.msg.cast().unwrap()) + .on_commit(ctx, req.msg.cast().unwrap()) + .await + .wrap("on_commit()") + { + Ok(()) => Ok(()), + Err(err) => { + match err { + // If the error is internal, we stop here. + commit::Error::Internal(e) => { + tracing::error!("on_commit: internal error: {e:#}"); + return Err(e); + } + // If the error is due to an old message, we log it at a lower level. + commit::Error::Old { .. } => { + tracing::debug!("on_commit: {err:#}"); + } + _ => { + tracing::warn!("on_commit: {err:#}"); + } + } + Err(()) + } + }; + metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) + } + ConsensusMsg::ReplicaTimeout(_) => { + let res = match self + .on_timeout(ctx, req.msg.cast().unwrap()) .await - .wrap("process_leader_prepare()") + .wrap("on_timeout()") { Ok(()) => Ok(()), Err(err) => { match err { - super::proposal::Error::Internal(e) => { - tracing::error!( - "process_leader_prepare: internal error: {e:#}" - ); + // If the error is internal, we stop here. + timeout::Error::Internal(e) => { + tracing::error!("on_timeout: internal error: {e:#}"); return Err(e); } - super::proposal::Error::Old { .. } => { - tracing::info!("process_leader_prepare: {err:#}"); + // If the error is due to an old message, we log it at a lower level. + timeout::Error::Old { .. } => { + tracing::debug!("on_timeout: {err:#}"); } _ => { - tracing::warn!("process_leader_prepare: {err:#}"); + tracing::warn!("on_timeout: {err:#}"); } } Err(()) } }; - metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res) + metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) } - ConsensusMsg::LeaderCommit(_) => { + ConsensusMsg::ReplicaNewView(_) => { let res = match self - .process_leader_commit(ctx, req.msg.cast().unwrap()) + .on_new_view(ctx, req.msg.cast().unwrap()) .await - .wrap("process_leader_commit()") + .wrap("on_new_view()") { Ok(()) => Ok(()), Err(err) => { match err { - super::leader_commit::Error::Internal(e) => { - tracing::error!("process_leader_commit: internal error: {e:#}"); + // If the error is internal, we stop here. + new_view::Error::Internal(e) => { + tracing::error!("on_new_view: internal error: {e:#}"); return Err(e); } - super::leader_commit::Error::Old { .. } => { - tracing::info!("process_leader_commit: {err:#}"); + // If the error is due to an old message, we log it at a lower level. + new_view::Error::Old { .. } => { + tracing::debug!("on_new_view: {err:#}"); } _ => { - tracing::warn!("process_leader_commit: {err:#}"); + tracing::warn!("on_new_view: {err:#}"); } } Err(()) } }; - metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res) + metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) } _ => unreachable!(), }; diff --git a/node/actors/bft/src/replica/new_view.rs b/node/actors/bft/src/replica/new_view.rs index c0b1a102..8578e8e1 100644 --- a/node/actors/bft/src/replica/new_view.rs +++ b/node/actors/bft/src/replica/new_view.rs @@ -1,24 +1,123 @@ +use std::cmp::max; + use super::StateMachine; use crate::metrics; -use zksync_concurrency::{ctx, error::Wrap as _}; +use zksync_concurrency::{ctx, error::Wrap, time}; use zksync_consensus_network::io::ConsensusInputMessage; -use zksync_consensus_roles::validator::{self, ViewNumber}; +use zksync_consensus_roles::validator; + +/// Errors that can occur when processing a ReplicaNewView message. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// Message signer isn't part of the validator set. + #[error("message signer isn't part of the validator set (signer: {signer:?})")] + NonValidatorSigner { + /// Signer of the message. + signer: Box, + }, + /// Past view or phase. + #[error("past view (current view: {current_view:?})")] + Old { + /// Current view. + current_view: validator::ViewNumber, + }, + /// Invalid message signature. + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] anyhow::Error), + /// Invalid message. + #[error("invalid message: {0:#}")] + InvalidMessage(#[source] validator::ReplicaNewViewVerifyError), + /// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable. + #[error(transparent)] + Internal(#[from] ctx::Error), +} + +impl Wrap for Error { + fn with_wrap C>( + self, + f: F, + ) -> Self { + match self { + Error::Internal(err) => Error::Internal(err.with_wrap(f)), + err => err, + } + } +} impl StateMachine { + /// Processes a ReplicaNewView message. + pub(crate) async fn on_new_view( + &mut self, + ctx: &ctx::Ctx, + signed_message: validator::Signed, + ) -> Result<(), Error> { + // ----------- Checking origin of the message -------------- + + // Unwrap message. + let message = &signed_message.msg; + let author = &signed_message.key; + + // Check that the message signer is in the validator committee. + if !self.config.genesis().validators.contains(author) { + return Err(Error::NonValidatorSigner { + signer: author.clone().into(), + }); + } + + // If the message is from a past view, ignore it. + if message.view().number < self.view_number { + return Err(Error::Old { + current_view: self.view_number, + }); + } + + // ----------- Checking the signed part of the message -------------- + + // Check the signature on the message. + signed_message.verify().map_err(Error::InvalidSignature)?; + + message + .verify(self.config.genesis()) + .map_err(Error::InvalidMessage)?; + + // ----------- All checks finished. Now we process the message. -------------- + + // Update the state machine. + match &message.justification { + validator::ProposalJustification::Commit(qc) => self + .process_commit_qc(ctx, qc) + .await + .wrap("process_commit_qc()")?, + validator::ProposalJustification::Timeout(qc) => { + if let Some(high_qc) = qc.high_qc() { + self.process_commit_qc(ctx, high_qc) + .await + .wrap("process_commit_qc()")?; + } + self.high_timeout_qc = max(Some(qc.clone()), self.high_timeout_qc.clone()); + } + }; + + // If the message is for a future view, we need to start a new view. + if message.view().number > self.view_number { + self.start_new_view(ctx, message.view().number).await?; + } + + Ok(()) + } + /// This blocking method is used whenever we start a new view. pub(crate) async fn start_new_view( &mut self, ctx: &ctx::Ctx, - view: ViewNumber, + view: validator::ViewNumber, ) -> ctx::Result<()> { // Update the state machine. - self.view = self.view.next(); - tracing::info!("Starting view {}", self.view); - metrics::METRICS.replica_view_number.set(self.view.0); - + self.view_number = view; self.phase = validator::Phase::Prepare; + + // Clear the block proposal cache. if let Some(qc) = self.high_commit_qc.as_ref() { - // Clear the block cache. self.block_proposal_cache .retain(|k, _| k > &qc.header().number); } @@ -26,27 +125,26 @@ impl StateMachine { // Backup our state. self.backup_state(ctx).await.wrap("backup_state()")?; - // Send the replica message. + // Broadcast our new view message. let output_message = ConsensusInputMessage { message: self .config .secret_key - .sign_msg(validator::ConsensusMsg::ReplicaPrepare( - validator::ReplicaPrepare { - view: validator::View { - genesis: self.config.genesis().hash(), - number: self.view, - }, - high_vote: self.high_vote.clone(), - high_qc: self.high_commit_qc.clone(), + .sign_msg(validator::ConsensusMsg::ReplicaNewView( + validator::ReplicaNewView { + justification: self.get_justification(), }, )), - recipient: Target::Broadcast, }; self.outbound_pipe.send(output_message.into()); - // Reset the timer. - self.reset_timer(ctx); + // Log the event. + tracing::info!("Starting view {}", self.view_number); + metrics::METRICS.replica_view_number.set(self.view_number.0); + + // Reset the timeout. + self.timeout_deadline = time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION); + Ok(()) } } diff --git a/node/actors/bft/src/replica/proposal.rs b/node/actors/bft/src/replica/proposal.rs index adfa2ec5..08f79ce7 100644 --- a/node/actors/bft/src/replica/proposal.rs +++ b/node/actors/bft/src/replica/proposal.rs @@ -87,9 +87,11 @@ impl StateMachine { // Check that the message is for the current view or a future view. We only allow proposals for // the current view if we have not voted or timed out yet. - if view < self.view || (view == self.view && self.phase != validator::Phase::Prepare) { + if view < self.view_number + || (view == self.view_number && self.phase != validator::Phase::Prepare) + { return Err(Error::Old { - current_view: self.view, + current_view: self.view_number, current_phase: self.phase, }); } @@ -196,7 +198,7 @@ impl StateMachine { }; // Update the state machine. - self.view = message.view().number; + self.view_number = message.view().number; self.phase = validator::Phase::Commit; self.high_vote = Some(commit_vote.clone()); match &message.justification { @@ -217,7 +219,7 @@ impl StateMachine { // Backup our state. self.backup_state(ctx).await.wrap("backup_state()")?; - // Broadcast our message. + // Broadcast our commit message. let output_message = ConsensusInputMessage { message: self .config diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 36e7d084..d5d0ae5f 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -149,14 +149,14 @@ async fn leader_prepare_old_view() { s.spawn_bg(runner.run(ctx)); let mut leader_prepare = util.new_leader_prepare(ctx).await; - leader_prepare.justification.view.number.0 = util.replica.view.0 - 1; + leader_prepare.justification.view.number.0 = util.replica.view_number.0 - 1; let res = util .process_leader_prepare(ctx, util.sign(leader_prepare)) .await; assert_matches!( res, Err(proposal::Error::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica.view); + assert_eq!(current_view, util.replica.view_number); assert_eq!(current_phase, util.replica.phase); } ); diff --git a/node/actors/bft/src/replica/timeout.rs b/node/actors/bft/src/replica/timeout.rs new file mode 100644 index 00000000..a1036780 --- /dev/null +++ b/node/actors/bft/src/replica/timeout.rs @@ -0,0 +1,197 @@ +use super::StateMachine; +use crate::metrics; +use std::{cmp::max, collections::HashSet}; +use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _, time}; +use zksync_consensus_network::io::ConsensusInputMessage; +use zksync_consensus_roles::validator; + +/// Errors that can occur when processing a ReplicaTimeout message. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// Message signer isn't part of the validator set. + #[error("message signer isn't part of the validator set (signer: {signer:?})")] + NonValidatorSigner { + /// Signer of the message. + signer: Box, + }, + /// Past view or phase. + #[error("past view (current view: {current_view:?})")] + Old { + /// Current view. + current_view: validator::ViewNumber, + }, + /// Duplicate signer. + #[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")] + DuplicateSigner { + /// View number of the message. + message_view: validator::ViewNumber, + /// Signer of the message. + signer: Box, + }, + /// Invalid message signature. + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] anyhow::Error), + /// Invalid message. + #[error("invalid message: {0:#}")] + InvalidMessage(#[source] validator::ReplicaTimeoutVerifyError), + /// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable. + #[error(transparent)] + Internal(#[from] ctx::Error), +} + +impl Wrap for Error { + fn with_wrap C>( + self, + f: F, + ) -> Self { + match self { + Error::Internal(err) => Error::Internal(err.with_wrap(f)), + err => err, + } + } +} + +impl StateMachine { + /// Processes a ReplicaTimeout message. + pub(crate) async fn on_timeout( + &mut self, + ctx: &ctx::Ctx, + signed_message: validator::Signed, + ) -> Result<(), Error> { + // ----------- Checking origin of the message -------------- + + // Unwrap message. + let message = &signed_message.msg; + let author = &signed_message.key; + + // Check that the message signer is in the validator committee. + if !self.config.genesis().validators.contains(author) { + return Err(Error::NonValidatorSigner { + signer: author.clone().into(), + }); + } + + // If the message is from a past view, ignore it. + if message.view.number < self.view_number { + return Err(Error::Old { + current_view: self.view_number, + }); + } + + // If we already have a message from the same validator for the same or past view, ignore it. + if let Some(&view) = self.commit_views_cache.get(author) { + if view >= message.view.number { + return Err(Error::DuplicateSigner { + message_view: message.view.number, + signer: author.clone().into(), + }); + } + } + + // ----------- Checking the signed part of the message -------------- + + // Check the signature on the message. + signed_message.verify().map_err(Error::InvalidSignature)?; + + message + .verify(self.config.genesis()) + .map_err(Error::InvalidMessage)?; + + // ----------- All checks finished. Now we process the message. -------------- + + // We add the message to the incrementally-constructed QC. + let timeout_qc = self + .timeout_qcs_cache + .entry(message.view.number) + .or_insert_with(|| validator::TimeoutQC::new(message.view)); + + // Should always succeed as all checks have been already performed + timeout_qc + .add(&signed_message, self.config.genesis()) + .expect("could not add message to TimeoutQC"); + + // Calculate the TimeoutQC signers weight. + let weight = timeout_qc.weight(&self.config.genesis().validators); + + // Update view number of last timeout message for author + self.timeout_views_cache + .insert(author.clone(), message.view.number); + + // Clean up timeout_qcs for the case that no replica is at the view + // of a given TimeoutQC + // This prevents timeout_qcs map from growing indefinitely in case some + // malicious replica starts spamming messages for future views + let active_views: HashSet<_> = self.timeout_views_cache.values().collect(); + self.timeout_qcs_cache + .retain(|view_number, _| active_views.contains(view_number)); + + // Now we check if we have enough weight to continue. If not, we wait for more messages. + if weight < self.config.genesis().validators.quorum_threshold() { + return Ok(()); + }; + + // ----------- We have a QC. Now we process it. -------------- + + // Consume the created timeout QC for this view. + let timeout_qc = self.timeout_qcs_cache.remove(&message.view.number).unwrap(); + + // We update our state with the new timeout QC. + if let Some(commit_qc) = timeout_qc.high_qc() { + self.process_commit_qc(ctx, commit_qc) + .await + .wrap("process_commit_qc()")?; + } + self.high_timeout_qc = max(Some(timeout_qc.clone()), self.high_timeout_qc.clone()); + + // Metrics. + let now = ctx.now(); + metrics::METRICS + .leader_commit_phase_latency + .observe_latency(now - self.phase_start); + self.phase_start = now; + + // Start a new view. + self.start_new_view(ctx, message.view.number.next()).await?; + + Ok(()) + } + + /// This blocking method is used whenever we timeout in a view. + pub(crate) async fn start_timeout(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { + // Update the state machine. + self.phase = validator::Phase::Timeout; + + // Backup our state. + self.backup_state(ctx).await.wrap("backup_state()")?; + + // Broadcast our timeout message. + let output_message = ConsensusInputMessage { + message: self + .config + .secret_key + .sign_msg(validator::ConsensusMsg::ReplicaTimeout( + validator::ReplicaTimeout { + view: validator::View { + genesis: self.config.genesis().hash(), + number: self.view_number, + }, + high_vote: self.high_vote.clone(), + high_qc: self.high_commit_qc.clone(), + }, + )), + }; + + self.outbound_pipe.send(output_message.into()); + + // Log the event. + tracing::info!("Timed out at view {}", self.view_number); + metrics::METRICS.replica_view_number.set(self.view_number.0); + + // Reset the timeout. This allows us send more timeout messages until the consensus progresses. + // However, this isn't strictly necessary since the network retries messages until they are delivered. + // This is just an extra safety measure. + self.timeout_deadline = time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION); + + Ok(()) + } +} diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 2d3bb834..b7454a7d 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -94,7 +94,7 @@ impl UTHarness { let want = validator::ReplicaPrepare { view: validator::View { genesis: self.genesis().hash(), - number: self.replica.view.next(), + number: self.replica.view_number.next(), }, high_qc: self.replica.high_commit_qc.clone(), high_vote: self.replica.high_vote.clone(), @@ -121,17 +121,17 @@ impl UTHarness { } pub(crate) fn set_owner_as_view_leader(&mut self) { - let mut view = self.replica.view; + let mut view = self.replica.view_number; while self.view_leader(view) != self.owner_key().public() { view = view.next(); } - self.replica.view = view; + self.replica.view_number = view; } pub(crate) fn replica_view(&self) -> validator::View { validator::View { genesis: self.genesis().hash(), - number: self.replica.view, + number: self.replica.view_number, } } diff --git a/spec/informal-spec/replica.rs b/spec/informal-spec/replica.rs index 5f30ba8d..e2b122b5 100644 --- a/spec/informal-spec/replica.rs +++ b/spec/informal-spec/replica.rs @@ -65,15 +65,15 @@ impl ReplicaState { self.high_vote, self.high_commit_qc); - // Update our state so that we can no longer vote commit in this view. - self.phase = Phase::Timeout; + // Update our state so that we can no longer vote commit in this view. + self.phase = Phase::Timeout; - // Send the vote to all replicas (including ourselves). - self.send(vote); + // Send the vote to all replicas (including ourselves). + self.send(vote); } - // Try to get a message from the message queue and process it. We don't - // detail the message queue structure since it's boilerplate. + // Try to get a message from the message queue and process it. We don't + // detail the message queue structure since it's boilerplate. if let Some(message) = message_queue.pop() { match message { Proposal(msg) => { @@ -223,7 +223,7 @@ impl ReplicaState { // If the message isn't current, just ignore it. assert!(new_view.view() >= self.view) - // Check that the new view is valid. + // Check that the new view message is valid. assert!(new_view.verify()); // Update our state. diff --git a/spec/informal-spec/types.rs b/spec/informal-spec/types.rs index a2a517c6..65989df8 100644 --- a/spec/informal-spec/types.rs +++ b/spec/informal-spec/types.rs @@ -183,7 +183,7 @@ impl SignedTimeoutVote { } fn verify(self) -> bool { - // If we wish, there are three invariants that are easy to check but don't need to be stricly enforced for correctness: + // If we wish, there are three invariants that are easy to check but don't need to be strictly enforced for correctness: // 1. self.view() >= self.high_vote.view() // 2. self.high_vote.view() >= self.high_commit_qc_view // 3. self.view() > self.high_commit_qc_view