diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs deleted file mode 100644 index 7c1cab637..000000000 --- a/code/crates/app-channel/src/channel.rs +++ /dev/null @@ -1,120 +0,0 @@ -use std::time::Duration; - -use bytes::Bytes; -use derive_where::derive_where; -use tokio::sync::mpsc; -use tokio::sync::oneshot; - -use malachite_engine::consensus::Msg as ConsensusActorMsg; -use malachite_engine::network::Msg as NetworkActorMsg; - -use crate::app::types::core::{CommitCertificate, Context, Round, ValueId}; -use crate::app::types::streaming::StreamMessage; -use crate::app::types::sync::DecidedValue; -use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; - -pub type Reply = oneshot::Sender; - -/// Channels created for application consumption -pub struct Channels { - pub consensus: mpsc::Receiver>, - pub network: mpsc::Sender>, -} - -/// Messages sent from consensus to the application. -#[derive_where(Debug)] -pub enum AppMsg { - /// Consensus is ready - ConsensusReady { reply: Reply> }, - - /// Consensus has started a new round. - StartedRound { - height: Ctx::Height, - round: Round, - proposer: Ctx::Address, - }, - - /// Request to build a local block/value from Driver - GetValue { - height: Ctx::Height, - round: Round, - timeout: Duration, - reply: Reply>, - }, - - /// Request to restream an existing block/value from Driver - RestreamValue { - height: Ctx::Height, - round: Round, - valid_round: Round, - address: Ctx::Address, - value_id: ValueId, - }, - - /// Request the earliest block height in the block store - GetHistoryMinHeight { reply: Reply }, - - /// ProposalPart received <-- consensus <-- gossip - ReceivedProposalPart { - from: PeerId, - part: StreamMessage, - reply: Reply>, - }, - - /// Get the validator set at a given height - GetValidatorSet { - height: Ctx::Height, - reply: Reply, - }, - - // Consensus has decided on a value - Decided { - certificate: CommitCertificate, - reply: Reply>, - }, - - // Retrieve decided block from the block store - GetDecidedValue { - height: Ctx::Height, - reply: Reply>>, - }, - - // Synced block - ProcessSyncedValue { - height: Ctx::Height, - round: Round, - validator_address: Ctx::Address, - value_bytes: Bytes, - reply: Reply>, - }, -} - -/// Messages sent from the application to consensus. -#[derive_where(Debug)] -pub enum ConsensusMsg { - StartHeight(Ctx::Height, Ctx::ValidatorSet), -} - -impl From> for ConsensusActorMsg { - fn from(msg: ConsensusMsg) -> ConsensusActorMsg { - match msg { - ConsensusMsg::StartHeight(height, validator_set) => { - ConsensusActorMsg::StartHeight(height, validator_set) - } - } - } -} - -/// Messages sent from the application to consensus gossip. -#[derive_where(Debug)] -pub enum NetworkMsg { - PublishProposalPart(StreamMessage), -} - -impl From> for NetworkActorMsg { - fn from(msg: NetworkMsg) -> NetworkActorMsg { - match msg { - NetworkMsg::PublishProposalPart(part) => NetworkActorMsg::PublishProposalPart(part), - } - } -} diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index 62a6b9eb9..f71daa919 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -1,4 +1,4 @@ -//! Implementation of a host actor acting as a bridge between consensus and the application. +//! Implementation of a host actor for bridiging consensus and the application via a set of channels. use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, SpawnErr}; use tokio::sync::mpsc; @@ -8,8 +8,12 @@ use malachite_engine::host::HostMsg; use crate::app::types::core::Context; use crate::app::types::metrics::Metrics; -use crate::channel::AppMsg; +use crate::msgs::AppMsg; +/// Actor for bridging consensus and the application via a set of channels. +/// +/// This actor is responsible for forwarding messages from the +/// consensus actor to the application over a channel, and vice-versa. pub struct Connector where Ctx: Context, @@ -102,7 +106,7 @@ where value_id, } => { self.sender - .send(AppMsg::RestreamValue { + .send(AppMsg::RestreamProposal { height, round, valid_round, @@ -182,7 +186,7 @@ where .send(AppMsg::ProcessSyncedValue { height, round, - validator_address, + proposer: validator_address, value_bytes, reply, }) diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 4ac58aaf1..9b12de2b7 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -1,3 +1,5 @@ +//! Channel-based interface for Malachite applications. + // TODO: Enforce proper documentation // #![warn( // missing_docs, @@ -10,11 +12,11 @@ pub use malachite_app as app; -pub mod connector; -pub mod spawn; +mod connector; +mod spawn; -mod channel; -pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; +mod msgs; +pub use msgs::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; mod run; pub use run::run; diff --git a/code/crates/app-channel/src/msgs.rs b/code/crates/app-channel/src/msgs.rs new file mode 100644 index 000000000..098f84462 --- /dev/null +++ b/code/crates/app-channel/src/msgs.rs @@ -0,0 +1,181 @@ +use std::time::Duration; + +use bytes::Bytes; +use derive_where::derive_where; +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use malachite_engine::consensus::Msg as ConsensusActorMsg; +use malachite_engine::network::Msg as NetworkActorMsg; + +use crate::app::types::core::{CommitCertificate, Context, Round, ValueId}; +use crate::app::types::streaming::StreamMessage; +use crate::app::types::sync::DecidedValue; +use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; + +pub type Reply = oneshot::Sender; + +/// Channels created for application consumption +pub struct Channels { + /// Channel for receiving messages from consensus + pub consensus: mpsc::Receiver>, + /// Channel for sending messages to the networking layer + pub network: mpsc::Sender>, +} + +/// Messages sent from consensus to the application. +#[derive_where(Debug)] +pub enum AppMsg { + /// Notifies the application that consensus is ready. + /// + /// The application MAY reply with a message to instruct + /// consensus to start at a given height. + ConsensusReady { + /// Channel for sending a [`ConsensusMsg::StartHeight`] message back to consensus + reply: Reply>, + }, + + /// Notifies the application that a new consensus round has begun. + StartedRound { + /// Current consensus height + height: Ctx::Height, + /// Round that was just started + round: Round, + /// Proposer for that round + proposer: Ctx::Address, + }, + + /// Requests the application to build a value for consensus to run on. + /// + /// The application MUST reply to this message with the requested value + /// within the specified timeout duration. + GetValue { + /// Height which consensus is at + height: Ctx::Height, + /// Round which consensus is at + round: Round, + /// Maximum time allowed for the application to respond + timeout: Duration, + /// Channel for sending back the value just built to consensus + reply: Reply>, + }, + + /// Requests the application to re-stream a proposal that it has already seen. + /// + /// The application MUST re-publish again all the proposal parts pertaining + /// to that value by sending [`NetworkMsg::PublishProposalPart`] messages through + /// the [`Channels::network`] channel. + RestreamProposal { + /// Height of the proposal + height: Ctx::Height, + /// Round of the proposal + round: Round, + /// Rround at which the proposal was locked on + valid_round: Round, + /// Address of the original proposer + address: Ctx::Address, + /// Unique identifier of the proposed value + value_id: ValueId, + }, + + /// Requests the earliest height available in the history maintained by the application. + /// + /// The application MUST respond with its earliest available height. + GetHistoryMinHeight { reply: Reply }, + + /// Notifies the application that consensus has received a proposal part over the network. + /// + /// If this part completes the full proposal, the application MUST respond + /// with the complete proposed value. Otherwise, it MUST respond with `None`. + ReceivedProposalPart { + /// Peer whom the proposal part was received from + from: PeerId, + /// Received proposal part, together with its stream metadata + part: StreamMessage, + /// Channel for returning the complete value if proposal is now complete + reply: Reply>, + }, + + /// Requests the validator set for a specific height + GetValidatorSet { + /// Height of the validator set to retrieve + height: Ctx::Height, + /// Channel for sending back the validator set + reply: Reply, + }, + + /// Notifies the application that consensus has decided on a value. + /// + /// This message includes a commit certificate containing the ID of + /// the value that was decided on, the height and round at which it was decided, + /// and the aggregated signatures of the validators that committed to it. + /// + /// In response to this message, the application MAY send a [`ConsensusMsg::StartHeight`] + /// message back to consensus, instructing it to start the next height. + Decided { + /// The certificate for the decided value + certificate: CommitCertificate, + /// Channel for instructing consensus to start the next height, if desired + reply: Reply>, + }, + + /// Requests a previously decided value from the application's storage. + /// + /// The application MUST respond with that value if available, or `None` otherwise. + GetDecidedValue { + /// Height of the decided value to retrieve + height: Ctx::Height, + /// Channel for sending back the decided value + reply: Reply>>, + }, + + /// Notifies the application that a value has been synced from the network. + /// This may happen when the node is catching up with the network. + /// + /// If a value can be decoded from the bytes provided, then the application MUST reply + /// to this message with the decoded value. + ProcessSyncedValue { + /// Height of the synced value + height: Ctx::Height, + /// Round of the synced value + round: Round, + /// Address of the original proposer + proposer: Ctx::Address, + /// Raw encoded value data + value_bytes: Bytes, + /// Channel for sending back the proposed value, if successfully decoded + reply: Reply>, + }, +} + +/// Messages sent from the application to consensus. +#[derive_where(Debug)] +pub enum ConsensusMsg { + /// Instructs consensus to start a new height with the given validator set. + StartHeight(Ctx::Height, Ctx::ValidatorSet), +} + +impl From> for ConsensusActorMsg { + fn from(msg: ConsensusMsg) -> ConsensusActorMsg { + match msg { + ConsensusMsg::StartHeight(height, validator_set) => { + ConsensusActorMsg::StartHeight(height, validator_set) + } + } + } +} + +/// Messages sent from the application to the networking layer. +#[derive_where(Debug)] +pub enum NetworkMsg { + /// Publish a proposal part to the network, within a stream. + PublishProposalPart(StreamMessage), +} + +impl From> for NetworkActorMsg { + fn from(msg: NetworkMsg) -> NetworkActorMsg { + match msg { + NetworkMsg::PublishProposalPart(part) => NetworkActorMsg::PublishProposalPart(part), + } + } +} diff --git a/code/crates/consensus/src/handle/proposal.rs b/code/crates/consensus/src/handle/proposal.rs index 287afff8f..6f456821c 100644 --- a/code/crates/consensus/src/handle/proposal.rs +++ b/code/crates/consensus/src/handle/proposal.rs @@ -86,7 +86,7 @@ where height: signed_proposal.height(), round: signed_proposal.round(), valid_round: signed_proposal.pol_round(), - validator_address: signed_proposal.validator_address().clone(), + proposer: signed_proposal.validator_address().clone(), value: signed_proposal.value().clone(), validity: Validity::Valid, extension: Default::default(), diff --git a/code/crates/consensus/src/handle/propose.rs b/code/crates/consensus/src/handle/propose.rs index bc1bb87a7..7becd52f1 100644 --- a/code/crates/consensus/src/handle/propose.rs +++ b/code/crates/consensus/src/handle/propose.rs @@ -44,7 +44,7 @@ where height, round, valid_round, - validator_address: state.address().clone(), + proposer: state.address().clone(), value: value.clone(), validity: Validity::Valid, extension, diff --git a/code/crates/consensus/src/handle/proposed_value.rs b/code/crates/consensus/src/handle/proposed_value.rs index 64197537f..2bdc372eb 100644 --- a/code/crates/consensus/src/handle/proposed_value.rs +++ b/code/crates/consensus/src/handle/proposed_value.rs @@ -51,7 +51,7 @@ where proposed_value.round, proposed_value.value.clone(), proposed_value.valid_round, - proposed_value.validator_address.clone(), + proposed_value.proposer.clone(), ); // TODO: Keep unsigned proposals in keeper. diff --git a/code/crates/consensus/src/types.rs b/code/crates/consensus/src/types.rs index 17e5b3afa..b0afcd9eb 100644 --- a/code/crates/consensus/src/types.rs +++ b/code/crates/consensus/src/types.rs @@ -55,7 +55,7 @@ pub struct ProposedValue { pub height: Ctx::Height, pub round: Round, pub valid_round: Round, - pub validator_address: Ctx::Address, + pub proposer: Ctx::Address, pub value: Ctx::Value, pub validity: Validity, pub extension: Option>, diff --git a/code/crates/consensus/tests/full_proposal.rs b/code/crates/consensus/tests/full_proposal.rs index 84330fad8..043254246 100644 --- a/code/crates/consensus/tests/full_proposal.rs +++ b/code/crates/consensus/tests/full_proposal.rs @@ -46,7 +46,7 @@ fn prop_msg( } fn value( - validator_address: Address, + proposer: Address, round: u32, value: u64, validity: Validity, @@ -55,19 +55,14 @@ fn value( height: Height::new(1), round: Round::new(round), valid_round: Round::Nil, - validator_address, + proposer, value: Value::new(value), validity, extension: Default::default(), } } -fn val_msg( - validator_address: Address, - round: u32, - value: u64, - validity: Validity, -) -> Input { +fn val_msg(proposer: Address, round: u32, value: u64, validity: Validity) -> Input { Input::ProposedValue( ProposedValue { height: Height::new(1), @@ -75,7 +70,7 @@ fn val_msg( valid_round: Round::Nil, value: Value::new(value), validity, - validator_address, + proposer, extension: Default::default(), }, ValueOrigin::Consensus, diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index f60f5ec88..8f4cdaa30 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -358,7 +358,7 @@ async fn find_previously_built_value( let proposed_value = values .into_iter() - .find(|v| v.validator_address == state.host.address); + .find(|v| v.proposer == state.host.address); Ok(proposed_value) } @@ -421,7 +421,7 @@ fn on_process_synced_value( value_bytes: Bytes, height: Height, round: Round, - validator_address: Address, + proposer: Address, reply_to: RpcReplyPort>, ) -> Result<(), ActorProcessingErr> { let maybe_block = Block::from_bytes(value_bytes.as_ref()); @@ -430,7 +430,7 @@ fn on_process_synced_value( height, round, valid_round: Round::Nil, - validator_address, + proposer, value: block.block_hash, validity: Validity::Valid, extension: None, diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 0a7c448c7..ec6f1f6ff 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -110,7 +110,7 @@ pub fn decode_proposed_value( round: Round::from(proto.round), value: BlockHash::from_bytes(&proto.value)?, valid_round: Round::from(proto.valid_round), - validator_address: Address::from_proto(proposer)?, + proposer: Address::from_proto(proposer)?, validity: Validity::from_bool(proto.validity), extension: proto.extension.map(decode_extension).transpose()?, }) @@ -125,7 +125,7 @@ pub fn encode_proposed_value( round: msg.round.as_u32().expect("round should not be nil"), valid_round: msg.valid_round.as_u32(), value: msg.value.to_bytes()?, - proposer: Some(msg.validator_address.to_proto()?), + proposer: Some(msg.proposer.to_proto()?), validity: match msg.validity { Validity::Valid => true, Validity::Invalid => false, diff --git a/code/crates/starknet/host/src/host/state.rs b/code/crates/starknet/host/src/host/state.rs index 69e13944b..feaf73c44 100644 --- a/code/crates/starknet/host/src/host/state.rs +++ b/code/crates/starknet/host/src/host/state.rs @@ -59,12 +59,12 @@ impl HostState { height: Height, round: Round, ) -> Option> { - let (valid_round, value, validator_address, validity, extension) = self + let (valid_round, value, proposer, validity, extension) = self .build_proposal_content_from_parts(parts, height, round) .await?; Some(ProposedValue { - validator_address, + proposer, height, round, valid_round, diff --git a/code/crates/test/proto/consensus.proto b/code/crates/test/proto/consensus.proto index 83e620b92..b8bcd91ae 100644 --- a/code/crates/test/proto/consensus.proto +++ b/code/crates/test/proto/consensus.proto @@ -56,13 +56,8 @@ message ProposalPart { bool fin = 6; } -message BlockMetadata { - bytes proof = 1; - Value value = 2; -} - message Content { - BlockMetadata metadata = 91; + Value value = 1; } message Extension { diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index d65d1e360..e60959390 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -6,68 +6,18 @@ use malachite_proto::{Error as ProtoError, Protobuf}; use crate::{Address, Height, TestContext, Value}; -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct BlockMetadata { - proof: Bytes, - value: Value, -} - -impl BlockMetadata { - pub fn new(proof: Bytes, value: Value) -> Self { - Self { proof, value } - } - - pub fn value(&self) -> Value { - self.value - } - - pub fn to_bytes(&self) -> Bytes { - Protobuf::to_bytes(self).unwrap() - } - - pub fn size_bytes(&self) -> usize { - self.proof.len() + self.value.size_bytes() - } -} - -impl Protobuf for BlockMetadata { - type Proto = crate::proto::BlockMetadata; - - #[cfg_attr(coverage_nightly, coverage(off))] - fn from_proto(proto: Self::Proto) -> Result { - Ok(Self { - proof: proto.proof, - value: Value::from_proto( - proto - .value - .ok_or_else(|| ProtoError::missing_field::("height"))?, - )?, - }) - } - - #[cfg_attr(coverage_nightly, coverage(off))] - fn to_proto(&self) -> Result { - Ok(crate::proto::BlockMetadata { - proof: self.proof.clone(), - value: Option::from(self.value.to_proto().unwrap()), - }) - } -} - #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Content { - pub metadata: BlockMetadata, + pub value: Value, } impl Content { - pub fn size_bytes(&self) -> usize { - self.metadata.size_bytes() + pub fn new(value: Value) -> Self { + Self { value } } - pub fn new(block_metadata: &BlockMetadata) -> Self { - Self { - metadata: block_metadata.clone(), - } + pub fn size_bytes(&self) -> usize { + self.value.size_bytes() } } @@ -77,10 +27,10 @@ impl Protobuf for Content { #[cfg_attr(coverage_nightly, coverage(off))] fn from_proto(proto: Self::Proto) -> Result { Ok(Self { - metadata: BlockMetadata::from_proto( + value: Value::from_proto( proto - .metadata - .ok_or_else(|| ProtoError::missing_field::("metadata"))?, + .value + .ok_or_else(|| ProtoError::missing_field::("value"))?, )?, }) } @@ -88,7 +38,7 @@ impl Protobuf for Content { #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { Ok(Self::Proto { - metadata: Some(self.metadata.to_proto()?), + value: Some(self.value.to_proto()?), }) } } @@ -111,7 +61,7 @@ pub struct ProposalPart { pub round: Round, pub sequence: u64, pub content: Content, - pub validator_address: Address, + pub proposer: Address, pub fin: bool, } @@ -120,7 +70,7 @@ impl ProposalPart { height: Height, round: Round, sequence: u64, - validator_address: Address, + proposer: Address, content: Content, fin: bool, ) -> Self { @@ -129,7 +79,7 @@ impl ProposalPart { round, sequence, content, - validator_address, + proposer, fin, } } @@ -138,10 +88,6 @@ impl ProposalPart { Protobuf::to_bytes(self).unwrap() } - pub fn metadata(&self) -> &BlockMetadata { - &self.content.metadata - } - pub fn size_bytes(&self) -> usize { self.content.size_bytes() } @@ -171,7 +117,7 @@ impl Protobuf for ProposalPart { .content .ok_or_else(|| ProtoError::missing_field::("content"))?, )?, - validator_address: Address::from_proto( + proposer: Address::from_proto( proto .validator_address .ok_or_else(|| ProtoError::missing_field::("validator_address"))?, @@ -187,7 +133,7 @@ impl Protobuf for ProposalPart { round: self.round.as_u32().expect("round should not be nil"), sequence: self.sequence, content: Some(self.content.to_proto()?), - validator_address: Some(self.validator_address.to_proto()?), + validator_address: Some(self.proposer.to_proto()?), fin: self.fin, }) } diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index 3800a4965..69482e560 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -64,7 +64,7 @@ pub async fn run( error!("Failed to send GetValue reply"); } - let stream_message = state.create_broadcast_message(value); + let stream_message = state.create_stream_message(value); // Broadcast it to others. Old messages need not be broadcast. channels @@ -98,7 +98,7 @@ pub async fn run( } AppMsg::Decided { certificate, reply } => { - state.commit_block(certificate); + state.commit(certificate); if reply .send(ConsensusMsg::StartHeight( @@ -115,14 +115,14 @@ pub async fn run( let decided_value = state.get_decided_value(&height).cloned(); if reply.send(decided_value).is_err() { - error!("Failed to send GetDecidedBlock reply"); + error!("Failed to send GetDecidedValue reply"); } } AppMsg::ProcessSyncedValue { height, round, - validator_address, + proposer, value_bytes, reply, } => { @@ -133,18 +133,18 @@ pub async fn run( height, round, valid_round: Round::Nil, - validator_address, + proposer, value, validity: Validity::Valid, extension: None, }) .is_err() { - error!("Failed to send ProcessSyncedBlock reply"); + error!("Failed to send ProcessSyncedValue reply"); } } - AppMsg::RestreamValue { .. } => { + AppMsg::RestreamProposal { .. } => { unimplemented!("RestreamValue"); } } diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 4b0fb9383..3c481e7f8 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -13,7 +13,7 @@ use malachite_app_channel::app::types::codec::Codec; use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::types::sync::DecidedValue; use malachite_test::codec::proto::ProtobufCodec; -use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; +use malachite_test::{Address, Content, Height, ProposalPart, TestContext, Value}; /// Decodes a Value from its byte representation using ProtobufCodec pub fn decode_value(bytes: Bytes) -> Value { @@ -34,7 +34,7 @@ pub struct State { earliest_height: Height, address: Address, - sequence: u64, + stream_id: u64, undecided_proposals: HashMap>, decided_proposals: HashMap>, blocks: HashMap>, @@ -50,7 +50,7 @@ impl State { current_round: Round::new(0), current_proposer: None, address, - sequence: 0, + stream_id: 0, undecided_proposals: HashMap::new(), decided_proposals: HashMap::new(), blocks: HashMap::new(), @@ -80,13 +80,13 @@ impl State { { assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal - let value = proposal_part.content.metadata.value(); + let value = proposal_part.content.value; let proposal = ProposedValue { height: proposal_part.height, round: proposal_part.round, valid_round: Round::Nil, - validator_address: proposal_part.validator_address, + proposer: proposal_part.proposer, value, validity: Validity::Valid, extension: None, @@ -106,9 +106,9 @@ impl State { self.blocks.get(height) } - /// Commits a block with the given certificate, updating internal state + /// Commits a value with the given certificate, updating internal state /// and moving to the next height - pub fn commit_block(&mut self, certificate: CommitCertificate) { + pub fn commit(&mut self, certificate: CommitCertificate) { // Sort out proposals for (height, value) in self.undecided_proposals.clone() { if height > self.current_height { @@ -122,17 +122,12 @@ impl State { self.undecided_proposals.remove(&height); } - // Commit block transactions to "database" - // TODO: retrieve all transactions from block parts let value = self.decided_proposals.get(&certificate.height).unwrap(); let value_bytes = encode_value(&value.value); self.blocks.insert( self.current_height, - DecidedValue { - value_bytes, - certificate, - }, + DecidedValue::new(value_bytes, certificate), ); // Move to next height @@ -163,7 +158,7 @@ impl State { height: *height, round: self.current_round, valid_round: Round::Nil, - validator_address: self.address, + proposer: self.address, value, validity: Validity::Valid, extension: None, @@ -176,34 +171,30 @@ impl State { } } - /// Creates a broadcast message containing a proposal part - /// Updates internal sequence number and current proposal - pub fn create_broadcast_message( + /// Creates a stream message containing a proposal part. + /// Updates internal sequence number and current proposal. + pub fn create_stream_message( &mut self, value: LocallyProposedValue, ) -> StreamMessage { - // TODO: create proof properly. - let fake_proof = [ - self.current_height.as_u64().to_le_bytes().to_vec(), - self.current_round.as_u32().unwrap().to_le_bytes().to_vec(), - ] - .concat(); + // Only a single proposal part + let sequence = 0; - let content = Content::new(&BlockMetadata::new(fake_proof.into(), value.value)); + let content = Content::new(value.value); let proposal_part = ProposalPart::new( self.current_height, self.current_round, - self.sequence, + sequence, self.address, content, - true, // each proposal part is a full proposal + true, // full proposal is emitted as a single proposal part ); let stream_content = StreamContent::Data(proposal_part); - let msg = StreamMessage::new(self.sequence, self.sequence, stream_content); + let msg = StreamMessage::new(self.stream_id, sequence, stream_content); - self.sequence += 1; + self.stream_id += 1; self.current_proposal = Some(msg.clone()); msg