Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(code): Embed Resumable type inside each Effect for creating resumption value #683

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 153 additions & 21 deletions code/crates/consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,36 @@ use crate::input::RequestId;
use crate::types::SignedConsensusMsg;
use crate::ConsensusMsg;

/// Provides a way to construct the appropriate [`Resume`] value to
/// resume execution after handling an [`Effect`].
///
/// Eeach `Effect` embeds a value that implements [`Resumable`]
/// which is used to construct the appropriate [`Resume`] value.
///
/// ## Example
///
/// ```rust,ignore
/// fn effect_handler(effect: Effect<Ctx>) -> Result<Resume<Ctx>, Error> {
/// match effect {
/// Effect::ResetTimeouts(r) => {
/// reset_timeouts();
/// Ok(r.resume_with(()))
/// }
/// Effect::GetValidatorSet(height, r) => {)
/// let validator_set = get_validator_set(height);
/// Ok(r.resume_with(validator_set))
/// }
/// // ...
/// }
/// ```
pub trait Resumable<Ctx: Context> {
/// The value type that will be used to resume execution
type Value;

/// Creates the appropriate [`Resume`] value to resume execution with.
fn resume_with(self, value: Self::Value) -> Resume<Ctx>;
}

/// An effect which may be yielded by a consensus process.
///
/// Effects are handled by the caller using [`process!`][process]
Expand All @@ -21,71 +51,102 @@ where
{
/// Reset all timeouts
/// Resume with: [`Resume::Continue`]
ResetTimeouts,
ResetTimeouts(resume::Continue),

/// Cancel all timeouts
/// Resume with: [`Resume::Continue`]
CancelAllTimeouts,
CancelAllTimeouts(resume::Continue),

/// Cancel a given timeout
/// Resume with: [`Resume::Continue`]
CancelTimeout(Timeout),
CancelTimeout(Timeout, resume::Continue),

/// Schedule a timeout
/// Resume with: [`Resume::Continue`]
ScheduleTimeout(Timeout),
ScheduleTimeout(Timeout, resume::Continue),

/// Consensus is starting a new round with the given proposer
/// Resume with: [`Resume::Continue`]
StartRound(Ctx::Height, Round, Ctx::Address),
StartRound(Ctx::Height, Round, Ctx::Address, resume::Continue),

/// Broadcast a message
/// Resume with: [`Resume::Continue`]
Broadcast(SignedConsensusMsg<Ctx>),
Broadcast(SignedConsensusMsg<Ctx>, resume::Continue),

/// Get a value to propose at the given height and round, within the given timeout
/// Resume with: [`Resume::Continue`]
GetValue(Ctx::Height, Round, Timeout),
GetValue(Ctx::Height, Round, Timeout, resume::Continue),

/// Restream value at the given height, round and valid round
/// Restream the value identified by the given information.
/// Resume with: [`Resume::Continue`]
RestreamValue(Ctx::Height, Round, Round, Ctx::Address, ValueId<Ctx>),
RestreamValue(
/// Height of the value
Ctx::Height,
/// Round of the value
Round,
/// Valid round of the value
Round,
/// Address of the proposer for that value
Ctx::Address,
/// Value ID of the value to restream
ValueId<Ctx>,
/// For resumption
resume::Continue,
),

/// Get the validator set at the given height
/// Resume with: [`Resume::ValidatorSet`]
GetValidatorSet(Ctx::Height),
GetValidatorSet(Ctx::Height, resume::ValidatorSet),

/// Consensus has decided on a value
/// Resume with: [`Resume::Continue`]
Decide { certificate: CommitCertificate<Ctx> },
Decide(CommitCertificate<Ctx>, resume::Continue),

/// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers
/// Resume with: [`Resume::Continue`]
GetVoteSet(Ctx::Height, Round),
GetVoteSet(Ctx::Height, Round, resume::Continue),

/// A peer has required our vote set, send the response
SendVoteSetResponse(RequestId, Ctx::Height, Round, VoteSet<Ctx>),
/// Resume with: [`Resume::Continue`]`
SendVoteSetResponse(
RequestId,
Ctx::Height,
Round,
VoteSet<Ctx>,
resume::Continue,
),

/// Persist a consensus message in the Write-Ahead Log for crash recovery
PersistMessage(SignedConsensusMsg<Ctx>),
/// Resume with: [`Resume::Continue`]`
PersistMessage(SignedConsensusMsg<Ctx>, resume::Continue),

/// Persist a timeout in the Write-Ahead Log for crash recovery
PersistTimeout(Timeout),
/// Resume with: [`Resume::Continue`]`
PersistTimeout(Timeout, resume::Continue),

/// Sign a vote with this node's private key
/// Resume with: [`Resume::SignedVote`]
SignVote(Ctx::Vote),
SignVote(Ctx::Vote, resume::SignedVote),

/// Sign a proposal with this node's private key
/// Resume with: [`Resume::SignedProposal`]
SignProposal(Ctx::Proposal),
SignProposal(Ctx::Proposal, resume::SignedProposal),

/// Verify a signature
/// Resume with: [`Resume::SignatureValidity`]
VerifySignature(SignedMessage<Ctx, ConsensusMsg<Ctx>>, PublicKey<Ctx>),
VerifySignature(
SignedMessage<Ctx, ConsensusMsg<Ctx>>,
PublicKey<Ctx>,
resume::SignatureValidity,
),

/// Verify a commit certificate
VerifyCertificate(CommitCertificate<Ctx>, Ctx::ValidatorSet, ThresholdParams),
VerifyCertificate(
CommitCertificate<Ctx>,
Ctx::ValidatorSet,
ThresholdParams,
resume::CertificateValidity,
),
}

/// A value with which the consensus process can be resumed after yielding an [`Effect`].
Expand All @@ -103,8 +164,9 @@ where
/// Resume execution
Continue,

/// Resume execution with an optional validator set at the given height
ValidatorSet(Ctx::Height, Option<Ctx::ValidatorSet>),
/// Resume execution with `Some(Ctx::ValidatorSet)` if a validator set
/// was successfully fetched, or `None` otherwise.
ValidatorSet(Option<Ctx::ValidatorSet>),

/// Resume execution with the validity of the signature
SignatureValidity(bool),
Expand All @@ -118,3 +180,73 @@ where
/// Resume execution with the result of the verification of the [`CommitCertificate`]
CertificateValidity(Result<(), CertificateError<Ctx>>),
}

pub mod resume {
use super::*;

#[derive(Debug, Default)]
pub struct Continue;

impl<Ctx: Context> Resumable<Ctx> for Continue {
type Value = ();

fn resume_with(self, _: ()) -> Resume<Ctx> {
Resume::Continue
}
}

#[derive(Debug, Default)]
pub struct ValidatorSet;

impl<Ctx: Context> Resumable<Ctx> for ValidatorSet {
type Value = Option<Ctx::ValidatorSet>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::ValidatorSet(value)
}
}

#[derive(Debug, Default)]
pub struct SignatureValidity;

impl<Ctx: Context> Resumable<Ctx> for SignatureValidity {
type Value = bool;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::SignatureValidity(value)
}
}

#[derive(Debug, Default)]
pub struct SignedVote;

impl<Ctx: Context> Resumable<Ctx> for SignedVote {
type Value = SignedMessage<Ctx, Ctx::Vote>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::SignedVote(value)
}
}

#[derive(Debug, Default)]
pub struct SignedProposal;

impl<Ctx: Context> Resumable<Ctx> for SignedProposal {
type Value = SignedMessage<Ctx, Ctx::Proposal>;

fn resume_with(self, a: Self::Value) -> Resume<Ctx> {
Resume::SignedProposal(a)
}
}

#[derive(Debug, Default)]
pub struct CertificateValidity;

impl<Ctx: Context> Resumable<Ctx> for CertificateValidity {
type Value = Result<(), CertificateError<Ctx>>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::CertificateValidity(value)
}
}
}
2 changes: 2 additions & 0 deletions code/crates/consensus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use crate::effect::Resume;
/// The types of error that can be emitted by the consensus process.
#[derive_where(Debug)]
#[derive(thiserror::Error)]
#[allow(private_interfaces)]
pub enum Error<Ctx>
where
Ctx: Context,
{
/// The consensus process was resumed with a value which
/// does not match the expected type of resume value.
#[allow(private_interfaces)]
#[error("Unexpected resume: {0:?}, expected one of: {1}")]
UnexpectedResume(Resume<Ctx>, &'static str),

Expand Down
5 changes: 4 additions & 1 deletion code/crates/consensus/src/gen.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use genawaiter::sync as gen;
use genawaiter::GeneratorState;

use crate::{Effect, Error, Resume};
use crate::effect::{Effect, Resume};
use crate::error::Error;

pub use gen::Gen;

#[allow(private_interfaces)]
pub type Co<Ctx> = gen::Co<Effect<Ctx>, Resume<Ctx>>;

pub type CoResult<Ctx> = GeneratorState<Effect<Ctx>, Result<(), Error<Ctx>>>;
1 change: 1 addition & 0 deletions code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use timeout::on_timeout_elapsed;
use vote::on_vote;
use vote_set::{on_vote_set_request, on_vote_set_response};

#[allow(private_interfaces)]
pub async fn handle<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/consensus/src/handle/decide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
CommitCertificate::new(height, proposal_round, value.id(), commits)
});

perform!(co, Effect::Decide { certificate });
perform!(co, Effect::Decide(certificate, Default::default()));

// Reinitialize to remove any previous round or equivocating precommits.
// TODO: Revise when evidence module is added.
Expand Down
Loading
Loading