Skip to content

Commit

Permalink
chore(code): Example app cleanup (#690)
Browse files Browse the repository at this point in the history
* chore(code): Doc comments and module renaming

* Renaming

* Cleanup

* Remove BlockMetadata
  • Loading branch information
romac authored Dec 17, 2024
1 parent add0d2a commit 9151e6e
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 257 deletions.
120 changes: 0 additions & 120 deletions code/crates/app-channel/src/channel.rs

This file was deleted.

12 changes: 8 additions & 4 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implementation of a host actor acting as a bridge between consensus and the application.
//! Implementation of a host actor for bridiging consensus and the application via a set of channels.
use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, SpawnErr};
use tokio::sync::mpsc;
Expand All @@ -8,8 +8,12 @@ use malachite_engine::host::HostMsg;

use crate::app::types::core::Context;
use crate::app::types::metrics::Metrics;
use crate::channel::AppMsg;
use crate::msgs::AppMsg;

/// Actor for bridging consensus and the application via a set of channels.
///
/// This actor is responsible for forwarding messages from the
/// consensus actor to the application over a channel, and vice-versa.
pub struct Connector<Ctx>
where
Ctx: Context,
Expand Down Expand Up @@ -102,7 +106,7 @@ where
value_id,
} => {
self.sender
.send(AppMsg::RestreamValue {
.send(AppMsg::RestreamProposal {
height,
round,
valid_round,
Expand Down Expand Up @@ -182,7 +186,7 @@ where
.send(AppMsg::ProcessSyncedValue {
height,
round,
validator_address,
proposer: validator_address,
value_bytes,
reply,
})
Expand Down
10 changes: 6 additions & 4 deletions code/crates/app-channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Channel-based interface for Malachite applications.
// TODO: Enforce proper documentation
// #![warn(
// missing_docs,
Expand All @@ -10,11 +12,11 @@

pub use malachite_app as app;

pub mod connector;
pub mod spawn;
mod connector;
mod spawn;

mod channel;
pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply};
mod msgs;
pub use msgs::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply};

mod run;
pub use run::run;
181 changes: 181 additions & 0 deletions code/crates/app-channel/src/msgs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::time::Duration;

use bytes::Bytes;
use derive_where::derive_where;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

use malachite_engine::consensus::Msg as ConsensusActorMsg;
use malachite_engine::network::Msg as NetworkActorMsg;

use crate::app::types::core::{CommitCertificate, Context, Round, ValueId};
use crate::app::types::streaming::StreamMessage;
use crate::app::types::sync::DecidedValue;
use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue};

pub type Reply<T> = oneshot::Sender<T>;

/// Channels created for application consumption
pub struct Channels<Ctx: Context> {
/// Channel for receiving messages from consensus
pub consensus: mpsc::Receiver<AppMsg<Ctx>>,
/// Channel for sending messages to the networking layer
pub network: mpsc::Sender<NetworkMsg<Ctx>>,
}

/// Messages sent from consensus to the application.
#[derive_where(Debug)]
pub enum AppMsg<Ctx: Context> {
/// Notifies the application that consensus is ready.
///
/// The application MAY reply with a message to instruct
/// consensus to start at a given height.
ConsensusReady {
/// Channel for sending a [`ConsensusMsg::StartHeight`] message back to consensus
reply: Reply<ConsensusMsg<Ctx>>,
},

/// Notifies the application that a new consensus round has begun.
StartedRound {
/// Current consensus height
height: Ctx::Height,
/// Round that was just started
round: Round,
/// Proposer for that round
proposer: Ctx::Address,
},

/// Requests the application to build a value for consensus to run on.
///
/// The application MUST reply to this message with the requested value
/// within the specified timeout duration.
GetValue {
/// Height which consensus is at
height: Ctx::Height,
/// Round which consensus is at
round: Round,
/// Maximum time allowed for the application to respond
timeout: Duration,
/// Channel for sending back the value just built to consensus
reply: Reply<LocallyProposedValue<Ctx>>,
},

/// Requests the application to re-stream a proposal that it has already seen.
///
/// The application MUST re-publish again all the proposal parts pertaining
/// to that value by sending [`NetworkMsg::PublishProposalPart`] messages through
/// the [`Channels::network`] channel.
RestreamProposal {
/// Height of the proposal
height: Ctx::Height,
/// Round of the proposal
round: Round,
/// Rround at which the proposal was locked on
valid_round: Round,
/// Address of the original proposer
address: Ctx::Address,
/// Unique identifier of the proposed value
value_id: ValueId<Ctx>,
},

/// Requests the earliest height available in the history maintained by the application.
///
/// The application MUST respond with its earliest available height.
GetHistoryMinHeight { reply: Reply<Ctx::Height> },

/// Notifies the application that consensus has received a proposal part over the network.
///
/// If this part completes the full proposal, the application MUST respond
/// with the complete proposed value. Otherwise, it MUST respond with `None`.
ReceivedProposalPart {
/// Peer whom the proposal part was received from
from: PeerId,
/// Received proposal part, together with its stream metadata
part: StreamMessage<Ctx::ProposalPart>,
/// Channel for returning the complete value if proposal is now complete
reply: Reply<ProposedValue<Ctx>>,
},

/// Requests the validator set for a specific height
GetValidatorSet {
/// Height of the validator set to retrieve
height: Ctx::Height,
/// Channel for sending back the validator set
reply: Reply<Ctx::ValidatorSet>,
},

/// Notifies the application that consensus has decided on a value.
///
/// This message includes a commit certificate containing the ID of
/// the value that was decided on, the height and round at which it was decided,
/// and the aggregated signatures of the validators that committed to it.
///
/// In response to this message, the application MAY send a [`ConsensusMsg::StartHeight`]
/// message back to consensus, instructing it to start the next height.
Decided {
/// The certificate for the decided value
certificate: CommitCertificate<Ctx>,
/// Channel for instructing consensus to start the next height, if desired
reply: Reply<ConsensusMsg<Ctx>>,
},

/// Requests a previously decided value from the application's storage.
///
/// The application MUST respond with that value if available, or `None` otherwise.
GetDecidedValue {
/// Height of the decided value to retrieve
height: Ctx::Height,
/// Channel for sending back the decided value
reply: Reply<Option<DecidedValue<Ctx>>>,
},

/// Notifies the application that a value has been synced from the network.
/// This may happen when the node is catching up with the network.
///
/// If a value can be decoded from the bytes provided, then the application MUST reply
/// to this message with the decoded value.
ProcessSyncedValue {
/// Height of the synced value
height: Ctx::Height,
/// Round of the synced value
round: Round,
/// Address of the original proposer
proposer: Ctx::Address,
/// Raw encoded value data
value_bytes: Bytes,
/// Channel for sending back the proposed value, if successfully decoded
reply: Reply<ProposedValue<Ctx>>,
},
}

/// Messages sent from the application to consensus.
#[derive_where(Debug)]
pub enum ConsensusMsg<Ctx: Context> {
/// Instructs consensus to start a new height with the given validator set.
StartHeight(Ctx::Height, Ctx::ValidatorSet),
}

impl<Ctx: Context> From<ConsensusMsg<Ctx>> for ConsensusActorMsg<Ctx> {
fn from(msg: ConsensusMsg<Ctx>) -> ConsensusActorMsg<Ctx> {
match msg {
ConsensusMsg::StartHeight(height, validator_set) => {
ConsensusActorMsg::StartHeight(height, validator_set)
}
}
}
}

/// Messages sent from the application to the networking layer.
#[derive_where(Debug)]
pub enum NetworkMsg<Ctx: Context> {
/// Publish a proposal part to the network, within a stream.
PublishProposalPart(StreamMessage<Ctx::ProposalPart>),
}

impl<Ctx: Context> From<NetworkMsg<Ctx>> for NetworkActorMsg<Ctx> {
fn from(msg: NetworkMsg<Ctx>) -> NetworkActorMsg<Ctx> {
match msg {
NetworkMsg::PublishProposalPart(part) => NetworkActorMsg::PublishProposalPart(part),
}
}
}
2 changes: 1 addition & 1 deletion code/crates/consensus/src/handle/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
height: signed_proposal.height(),
round: signed_proposal.round(),
valid_round: signed_proposal.pol_round(),
validator_address: signed_proposal.validator_address().clone(),
proposer: signed_proposal.validator_address().clone(),
value: signed_proposal.value().clone(),
validity: Validity::Valid,
extension: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion code/crates/consensus/src/handle/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
height,
round,
valid_round,
validator_address: state.address().clone(),
proposer: state.address().clone(),
value: value.clone(),
validity: Validity::Valid,
extension,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/consensus/src/handle/proposed_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
proposed_value.round,
proposed_value.value.clone(),
proposed_value.valid_round,
proposed_value.validator_address.clone(),
proposed_value.proposer.clone(),
);

// TODO: Keep unsigned proposals in keeper.
Expand Down
2 changes: 1 addition & 1 deletion code/crates/consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct ProposedValue<Ctx: Context> {
pub height: Ctx::Height,
pub round: Round,
pub valid_round: Round,
pub validator_address: Ctx::Address,
pub proposer: Ctx::Address,
pub value: Ctx::Value,
pub validity: Validity,
pub extension: Option<SignedExtension<Ctx>>,
Expand Down
Loading

0 comments on commit 9151e6e

Please sign in to comment.