Skip to content

Commit

Permalink
Second pass on the bft actor. Did most of replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 25, 2024
1 parent 2f91a09 commit ad7b440
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 101 deletions.
12 changes: 6 additions & 6 deletions node/actors/bft/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async fn replica_prepare_old_view() {
s.spawn_bg(runner.run(ctx));

let replica_prepare = util.new_replica_prepare();
util.leader.view = util.replica.view.next();
util.leader.view = util.replica.view_number.next();
util.leader.phase = Phase::Prepare;
let res = util
.process_replica_prepare(ctx, util.sign(replica_prepare))
Expand All @@ -164,7 +164,7 @@ async fn replica_prepare_during_commit() {
s.spawn_bg(runner.run(ctx));

let replica_prepare = util.new_replica_prepare();
util.leader.view = util.replica.view;
util.leader.view = util.replica.view_number;
util.leader.phase = Phase::Commit;
let res = util
.process_replica_prepare(ctx, util.sign(replica_prepare))
Expand All @@ -175,7 +175,7 @@ async fn replica_prepare_during_commit() {
current_view,
current_phase: Phase::Commit,
}) => {
assert_eq!(current_view, util.replica.view);
assert_eq!(current_view, util.replica.view_number);
}
);
Ok(())
Expand Down Expand Up @@ -607,13 +607,13 @@ async fn replica_commit_old() {
s.spawn_bg(runner.run(ctx));

let mut replica_commit = util.new_replica_commit(ctx).await;
replica_commit.view.number = ViewNumber(util.replica.view.0 - 1);
replica_commit.view.number = ViewNumber(util.replica.view_number.0 - 1);
let replica_commit = util.sign(replica_commit);
let res = util.process_replica_commit(ctx, replica_commit).await;
assert_matches!(
res,
Err(replica_commit::Error::Old { current_view, current_phase }) => {
assert_eq!(current_view, util.replica.view);
assert_eq!(current_view, util.replica.view_number);
assert_eq!(current_phase, util.replica.phase);
}
);
Expand All @@ -632,7 +632,7 @@ async fn replica_commit_not_leader_in_view() {
s.spawn_bg(runner.run(ctx));

util.produce_block(ctx).await;
let current_view_leader = util.view_leader(util.replica.view);
let current_view_leader = util.view_leader(util.replica.view_number);
assert_ne!(current_view_leader, util.owner_key().public());
let replica_commit = util.new_current_replica_commit();
let res = util
Expand Down
28 changes: 14 additions & 14 deletions node/actors/bft/src/replica/commit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//! Handler of a ReplicaCommit message.
use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
Expand All @@ -21,10 +20,10 @@ pub(crate) enum Error {
current_view: validator::ViewNumber,
},
/// Duplicate signer.
#[error("duplicate signer (current view: {current_view:?}, signer: {signer:?})")]
#[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")]
DuplicateSigner {
/// Current view.
current_view: validator::ViewNumber,
/// View number of the message.
message_view: validator::ViewNumber,
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
Expand Down Expand Up @@ -72,17 +71,17 @@ impl StateMachine {
}

// If the message is from a past view, ignore it.
if message.view.number < self.view {
if message.view.number < self.view_number {
return Err(Error::Old {
current_view: self.view,
current_view: self.view_number,
});
}

// If we already have a message from the same validator for the same view, ignore it.
// If we already have a message from the same validator for the same or past view, ignore it.
if let Some(&view) = self.commit_views_cache.get(author) {
if view == message.view.number {
if view >= message.view.number {
return Err(Error::DuplicateSigner {
current_view: self.view,
message_view: message.view.number,
signer: author.clone().into(),
});
}
Expand Down Expand Up @@ -115,19 +114,19 @@ impl StateMachine {
// Calculate the CommitQC signers weight.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);

// Update commit message current view number for author
// Update view number of last commit message for author
self.commit_views_cache
.insert(author.clone(), message.view.number);

// Clean up commit_qcs for the case that no replica is at the view
// of a given CommitQC
// of a given CommitQC.
// This prevents commit_qcs map from growing indefinitely in case some
// malicious replica starts spamming messages for future views
// malicious replica starts spamming messages for future views.
let active_views: HashSet<_> = self.commit_views_cache.values().collect();
self.commit_qcs_cache
.retain(|view_number, _| active_views.contains(view_number));

// Now we check if we have enough weight to continue.
// Now we check if we have enough weight to continue. If not, we wait for more messages.
if weight < self.config.genesis().validators.quorum_threshold() {
return Ok(());
};
Expand All @@ -142,6 +141,7 @@ impl StateMachine {
.remove(message)
.unwrap();

// We update our state with the new commit QC.
self.process_commit_qc(ctx, &commit_qc)
.await
.wrap("process_commit_qc()")?;
Expand All @@ -154,7 +154,7 @@ impl StateMachine {
self.phase_start = now;

// Start a new view.
self.start_new_view(ctx, message.view.number.next());
self.start_new_view(ctx, message.view.number.next()).await?;

Ok(())
}
Expand Down
19 changes: 18 additions & 1 deletion node/actors/bft/src/replica/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Makes a justification (for a ReplicaNewView or a LeaderProposal) based on the current state.
pub(crate) fn get_justification(&self) -> validator::ProposalJustification {
// We need some QC in order to be able to create a justification.
// In fact, it should be impossible to get here without a QC. Because
// we only get here after starting a new view, which requires a QC.
assert!(self.high_commit_qc.is_some() || self.high_timeout_qc.is_some());

// We use the highest QC as the justification. If both have the same view, we use the CommitQC.
if self.high_commit_qc.as_ref().map(|x| x.view())
>= self.high_timeout_qc.as_ref().map(|x| &x.view)
{
validator::ProposalJustification::Commit(self.high_commit_qc.clone().unwrap())
} else {
validator::ProposalJustification::Timeout(self.high_timeout_qc.clone().unwrap())
}
}

/// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if
/// we have the proposal corresponding to this qc, we save the corresponding block to DB.
pub(crate) async fn process_commit_qc(
Expand Down Expand Up @@ -72,7 +89,7 @@ impl StateMachine {
}));
}
let backup = storage::ReplicaState {
view: self.view,
view: self.view_number,
phase: self.phase,
high_vote: self.high_vote.clone(),
high_commit_qc: self.high_commit_qc.clone(),
Expand Down
118 changes: 76 additions & 42 deletions node/actors/bft/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,29 @@ use zksync_concurrency::{
time,
};
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::{validator, validator::ConsensusMsg};
use zksync_consensus_roles::validator::{self, ConsensusMsg};

mod commit;
mod leader_commit;
mod misc;
mod new_view;
mod proposal;
mod replica_prepare;
#[cfg(test)]
mod tests;
mod timeout;

/// The StateMachine struct contains the state of the replica. It is responsible
/// for validating and voting on blocks. When participating in consensus we are always a replica.
#[derive(Debug)]
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
/// Consensus configuration.
pub(crate) config: Arc<Config>,
/// Pipe through which replica sends network messages.
pub(super) outbound_pipe: OutputSender,
/// Pipe through which replica receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,

/// The current view number.
pub(crate) view: validator::ViewNumber,
pub(crate) view_number: validator::ViewNumber,
/// The current phase.
pub(crate) phase: validator::Phase,
/// The highest block proposal that the replica has committed to.
Expand Down Expand Up @@ -93,11 +92,11 @@ impl StateMachine {
StateMachine::inbound_selection_function,
);

let mut this = Self {
let this = Self {
config,
outbound_pipe,
inbound_pipe: recv,
view: backup.view,
view_number: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
high_commit_qc: backup.high_commit_qc,
Expand All @@ -107,20 +106,26 @@ impl StateMachine {
commit_qcs_cache: BTreeMap::new(),
timeout_views_cache: BTreeMap::new(),
timeout_qcs_cache: BTreeMap::new(),
timeout_deadline: time::Deadline::Infinite,
timeout_deadline: time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION),
phase_start: ctx.now(),
};

// We need to start the replica before processing inputs.
this.start_new_view(ctx).await.wrap("start_new_view()")?;

Ok((this, send))
}

/// Runs a loop to process incoming messages (may be `None` if the channel times out while waiting for a message).
/// This is the main entry point for the state machine,
/// potentially triggering state modifications and message sending to the executor.
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
// If this is the first view, we immediately timeout. This will force the replicas
// to synchronize right at the beginning and will provide a justification for the
// next view. This is necessary because the first view is not justified by any
// previous view.
if self.view_number == validator::ViewNumber(0) {
self.start_timeout(ctx).await?;
}

// Main loop.
loop {
let recv = self
.inbound_pipe
Expand All @@ -134,90 +139,119 @@ impl StateMachine {

// Check for timeout.
let Some(req) = recv.ok() else {
self.start_new_view(ctx).await?;
self.start_timeout(ctx).await?;
continue;
};

let now = ctx.now();
let label = match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) => {
ConsensusMsg::LeaderProposal(_) => {
let res = match self
.process_replica_prepare(ctx, req.msg.cast().unwrap())
.on_proposal(ctx, req.msg.cast().unwrap())
.await
.wrap("process_replica_prepare()")
.wrap("on_proposal()")
{
Ok(()) => Ok(()),
Err(err) => {
match err {
super::replica_prepare::Error::Internal(e) => {
tracing::error!(
"process_replica_prepare: internal error: {e:#}"
);
// If the error is internal, we stop here.
proposal::Error::Internal(e) => {
tracing::error!("on_proposal: internal error: {e:#}");
return Err(e);
}
super::replica_prepare::Error::Old { .. } => {
tracing::debug!("process_replica_prepare: {err:#}");
// If the error is due to an old message, we log it at a lower level.
proposal::Error::Old { .. } => {
tracing::debug!("on_proposal: {err:#}");
}
_ => {
tracing::warn!("process_replica_prepare: {err:#}");
tracing::warn!("on_proposal: {err:#}");
}
}
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::LeaderPrepare(_) => {
ConsensusMsg::ReplicaCommit(_) => {
let res = match self
.on_proposal(ctx, req.msg.cast().unwrap())
.on_commit(ctx, req.msg.cast().unwrap())
.await
.wrap("on_commit()")
{
Ok(()) => Ok(()),
Err(err) => {
match err {
// If the error is internal, we stop here.
commit::Error::Internal(e) => {
tracing::error!("on_commit: internal error: {e:#}");
return Err(e);
}
// If the error is due to an old message, we log it at a lower level.
commit::Error::Old { .. } => {
tracing::debug!("on_commit: {err:#}");
}
_ => {
tracing::warn!("on_commit: {err:#}");
}
}
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::ReplicaTimeout(_) => {
let res = match self
.on_timeout(ctx, req.msg.cast().unwrap())
.await
.wrap("process_leader_prepare()")
.wrap("on_timeout()")
{
Ok(()) => Ok(()),
Err(err) => {
match err {
super::proposal::Error::Internal(e) => {
tracing::error!(
"process_leader_prepare: internal error: {e:#}"
);
// If the error is internal, we stop here.
timeout::Error::Internal(e) => {
tracing::error!("on_timeout: internal error: {e:#}");
return Err(e);
}
super::proposal::Error::Old { .. } => {
tracing::info!("process_leader_prepare: {err:#}");
// If the error is due to an old message, we log it at a lower level.
timeout::Error::Old { .. } => {
tracing::debug!("on_timeout: {err:#}");
}
_ => {
tracing::warn!("process_leader_prepare: {err:#}");
tracing::warn!("on_timeout: {err:#}");
}
}
Err(())
}
};
metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res)
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::LeaderCommit(_) => {
ConsensusMsg::ReplicaNewView(_) => {
let res = match self
.process_leader_commit(ctx, req.msg.cast().unwrap())
.on_new_view(ctx, req.msg.cast().unwrap())
.await
.wrap("process_leader_commit()")
.wrap("on_new_view()")
{
Ok(()) => Ok(()),
Err(err) => {
match err {
super::leader_commit::Error::Internal(e) => {
tracing::error!("process_leader_commit: internal error: {e:#}");
// If the error is internal, we stop here.
new_view::Error::Internal(e) => {
tracing::error!("on_new_view: internal error: {e:#}");
return Err(e);
}
super::leader_commit::Error::Old { .. } => {
tracing::info!("process_leader_commit: {err:#}");
// If the error is due to an old message, we log it at a lower level.
new_view::Error::Old { .. } => {
tracing::debug!("on_new_view: {err:#}");
}
_ => {
tracing::warn!("process_leader_commit: {err:#}");
tracing::warn!("on_new_view: {err:#}");
}
}
Err(())
}
};
metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res)
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
_ => unreachable!(),
};
Expand Down
Loading

0 comments on commit ad7b440

Please sign in to comment.