diff --git a/code/Cargo.lock b/code/Cargo.lock index 986ddff8d..d989fa797 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2754,6 +2754,7 @@ name = "malachite-app" version = "0.1.0" dependencies = [ "async-trait", + "derive-where", "eyre", "libp2p-identity", "malachite-codec", @@ -2938,6 +2939,7 @@ dependencies = [ "async-trait", "bytes", "color-eyre", + "derive-where", "eyre", "libp2p-identity", "malachite-app-channel", @@ -2945,6 +2947,8 @@ dependencies = [ "malachite-test-cli", "rand", "serde_json", + "sha3", + "tokio", "tracing", ] diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index f71daa919..d8fd02c53 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -137,7 +137,9 @@ where .send(AppMsg::ReceivedProposalPart { from, part, reply }) .await?; - reply_to.send(rx.await?)?; + if let Some(value) = rx.await? { + reply_to.send(value)?; + } } HostMsg::GetValidatorSet { height, reply_to } => { diff --git a/code/crates/app-channel/src/msgs.rs b/code/crates/app-channel/src/msgs.rs index 098f84462..dc7967a4a 100644 --- a/code/crates/app-channel/src/msgs.rs +++ b/code/crates/app-channel/src/msgs.rs @@ -92,8 +92,8 @@ pub enum AppMsg { from: PeerId, /// Received proposal part, together with its stream metadata part: StreamMessage, - /// Channel for returning the complete value if proposal is now complete - reply: Reply>, + /// Channel for returning the complete value if the proposal is now complete + reply: Reply>>, }, /// Requests the validator set for a specific height diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 1c978611f..37137d0dd 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -22,7 +22,7 @@ pub async fn spawn_host_actor( where Ctx: Context, { - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(128); let actor_ref = Connector::spawn(tx, metrics).await?; Ok((actor_ref, rx)) } diff --git a/code/crates/app/Cargo.toml b/code/crates/app/Cargo.toml index 7222c4855..da039eed3 100644 --- a/code/crates/app/Cargo.toml +++ b/code/crates/app/Cargo.toml @@ -19,9 +19,10 @@ malachite-peer.workspace = true malachite-sync.workspace = true async-trait = { workspace = true } +derive-where = { workspace = true } eyre = { workspace = true } libp2p-identity = { workspace = true } -rand = { workspace = true } +rand = { workspace = true } serde = { workspace = true } tracing = { workspace = true } diff --git a/code/crates/app/src/lib.rs b/code/crates/app/src/lib.rs index dfeaf507e..fff9669a6 100644 --- a/code/crates/app/src/lib.rs +++ b/code/crates/app/src/lib.rs @@ -11,6 +11,7 @@ mod node; pub use node::Node; +pub mod part_store; pub mod types; mod spawn; @@ -21,6 +22,7 @@ pub mod streaming { } pub mod host { + // TODO: Move this under `types` pub use malachite_engine::host::LocallyProposedValue; } diff --git a/code/crates/starknet/host/src/part_store.rs b/code/crates/app/src/part_store.rs similarity index 90% rename from code/crates/starknet/host/src/part_store.rs rename to code/crates/app/src/part_store.rs index 56409de6f..a78174b67 100644 --- a/code/crates/starknet/host/src/part_store.rs +++ b/code/crates/app/src/part_store.rs @@ -7,10 +7,9 @@ use malachite_core_types::{Context, Round, ValueId}; // This is a temporary store implementation for proposal parts // -// TODO: -// - [ ] add Address to key -// note: not sure if this is required as consensus should verify that only the parts signed by the proposer for -// the height and round should be forwarded here (see the TODOs in consensus) +// TODO: Add Address to key +// NOTE: Not sure if this is required as consensus should verify that only the parts signed by the proposer for +// the height and round should be forwarded here (see the TODOs in consensus) type Key = (Height, Round); diff --git a/code/crates/app/src/types.rs b/code/crates/app/src/types.rs index 5fa6bc4f6..27017d5df 100644 --- a/code/crates/app/src/types.rs +++ b/code/crates/app/src/types.rs @@ -19,7 +19,7 @@ pub mod metrics { pub use libp2p_identity::Keypair; pub mod streaming { - pub use malachite_engine::util::streaming::StreamMessage; + pub use malachite_engine::util::streaming::{Sequence, StreamId, StreamMessage}; } pub mod sync { diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index c8e5569fc..a2fbd5a35 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -46,6 +46,7 @@ impl LocallyProposedValue { pub type HostRef = ActorRef>; /// Messages that need to be handled by the host actor. +#[derive_where(Debug)] pub enum HostMsg { /// Consensus is ready ConsensusReady(ConsensusRef), diff --git a/code/crates/engine/src/wal.rs b/code/crates/engine/src/wal.rs index 1221b1115..f01c14999 100644 --- a/code/crates/engine/src/wal.rs +++ b/code/crates/engine/src/wal.rs @@ -229,6 +229,8 @@ where _: WalRef, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { + info!("Shutting down WAL"); + let _ = state.wal_sender.send(self::thread::WalMsg::Shutdown).await; Ok(()) diff --git a/code/crates/signing-ed25519/src/lib.rs b/code/crates/signing-ed25519/src/lib.rs index 5add93d46..a8e73b06c 100644 --- a/code/crates/signing-ed25519/src/lib.rs +++ b/code/crates/signing-ed25519/src/lib.rs @@ -52,6 +52,8 @@ impl SigningScheme for Ed25519 { } #[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(transparent))] pub struct Signature(ed25519_consensus::Signature); impl Signature { @@ -127,6 +129,11 @@ impl PrivateKey { PublicKey::new(self.0.verification_key()) } + #[cfg_attr(coverage_nightly, coverage(off))] + pub fn sign(&self, msg: &[u8]) -> Signature { + Signature(self.0.sign(msg)) + } + #[cfg_attr(coverage_nightly, coverage(off))] pub fn inner(&self) -> &ed25519_consensus::SigningKey { &self.0 @@ -170,6 +177,12 @@ impl PublicKey { self.0.as_bytes() } + pub fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), signature::Error> { + self.0 + .verify(signature.inner(), msg) + .map_err(|_| signature::Error::new()) + } + pub fn inner(&self) -> &ed25519_consensus::VerificationKey { &self.0 } @@ -177,8 +190,6 @@ impl PublicKey { impl Verifier for PublicKey { fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), signature::Error> { - self.0 - .verify(signature.inner(), msg) - .map_err(|_| signature::Error::new()) + PublicKey::verify(self, msg, signature) } } diff --git a/code/crates/starknet/host/src/lib.rs b/code/crates/starknet/host/src/lib.rs index 10db6863b..ec399d2df 100644 --- a/code/crates/starknet/host/src/lib.rs +++ b/code/crates/starknet/host/src/lib.rs @@ -8,10 +8,11 @@ pub mod codec; pub mod host; pub mod mempool; pub mod node; -pub mod part_store; pub mod spawn; pub mod streaming; +pub use malachite_app::part_store; + pub mod proto { pub use malachite_proto::*; pub use malachite_starknet_p2p_proto::*; diff --git a/code/crates/starknet/p2p-types/src/proposal_part.rs b/code/crates/starknet/p2p-types/src/proposal_part.rs index 973a2799f..3ee62f937 100644 --- a/code/crates/starknet/p2p-types/src/proposal_part.rs +++ b/code/crates/starknet/p2p-types/src/proposal_part.rs @@ -43,6 +43,7 @@ impl ProposalPart { Self::Fin(_) => PartType::Fin, } } + pub fn to_sign_bytes(&self) -> Bytes { proto::Protobuf::to_bytes(self).unwrap() // FIXME: unwrap } diff --git a/code/crates/test/proto/consensus.proto b/code/crates/test/proto/consensus.proto index b8bcd91ae..8ff28c523 100644 --- a/code/crates/test/proto/consensus.proto +++ b/code/crates/test/proto/consensus.proto @@ -48,16 +48,25 @@ message Signature { } message ProposalPart { + oneof part { + ProposalInit init = 1; + ProposalData data = 2; + ProposalFin fin = 3; + } +} + +message ProposalInit { uint64 height = 1; uint32 round = 2; - uint64 sequence = 3; - Address validator_address = 4; - Content content = 5; - bool fin = 6; + Address proposer = 4; +} + +message ProposalData { + uint64 factor = 1; } -message Content { - Value value = 1; +message ProposalFin { + Signature signature = 1; } message Extension { diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index 28a66fa23..4a31cc839 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -125,7 +125,7 @@ impl Codec> for ProtobufCodec { sequence: msg.sequence, content: match &msg.content { StreamContent::Data(data) => { - Some(proto::stream_message::Content::Data(data.to_bytes())) + Some(proto::stream_message::Content::Data(data.to_bytes()?)) } StreamContent::Fin(end) => Some(proto::stream_message::Content::Fin(*end)), }, @@ -451,13 +451,13 @@ fn decode_vote(msg: proto::SignedMessage) -> Option> { Some(SignedVote::new(vote, signature)) } -fn encode_signature(signature: &Signature) -> proto::Signature { +pub(crate) fn encode_signature(signature: &Signature) -> proto::Signature { proto::Signature { bytes: Bytes::copy_from_slice(signature.to_bytes().as_ref()), } } -fn decode_signature(signature: proto::Signature) -> Result { +pub(crate) fn decode_signature(signature: proto::Signature) -> Result { let bytes = <[u8; 64]>::try_from(signature.bytes.as_ref()) .map_err(|_| ProtoError::Other("Invalid signature length".to_string()))?; Ok(Signature::from_bytes(bytes)) diff --git a/code/crates/test/src/context.rs b/code/crates/test/src/context.rs index 2d1319776..0a74b4aad 100644 --- a/code/crates/test/src/context.rs +++ b/code/crates/test/src/context.rs @@ -13,13 +13,13 @@ use crate::vote::*; #[derive(Clone, Debug)] pub struct TestContext { - ed25519_provider: Arc, + pub signing_provider: Arc, } impl TestContext { pub fn new(private_key: PrivateKey) -> Self { Self { - ed25519_provider: Arc::new(Ed25519Provider::new(private_key)), + signing_provider: Arc::new(Ed25519Provider::new(private_key)), } } } @@ -37,7 +37,7 @@ impl Context for TestContext { type SigningProvider = Ed25519Provider; fn signing_provider(&self) -> &Self::SigningProvider { - &self.ed25519_provider + &self.signing_provider } fn select_proposer<'a>( diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index e60959390..d72b56e30 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -1,105 +1,107 @@ use bytes::Bytes; +use malachite_signing_ed25519::Signature; use serde::{Deserialize, Serialize}; use malachite_core_types::Round; -use malachite_proto::{Error as ProtoError, Protobuf}; +use malachite_proto::{self as proto, Error as ProtoError, Protobuf}; -use crate::{Address, Height, TestContext, Value}; +use crate::codec::proto::{decode_signature, encode_signature}; +use crate::{Address, Height, TestContext}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct Content { - pub value: Value, +pub struct ProposalData { + pub factor: u64, } -impl Content { - pub fn new(value: Value) -> Self { - Self { value } +impl ProposalData { + pub fn new(factor: u64) -> Self { + Self { factor } } pub fn size_bytes(&self) -> usize { - self.value.size_bytes() + std::mem::size_of::() } } -impl Protobuf for Content { - type Proto = crate::proto::Content; +#[derive(Serialize, Deserialize)] +#[serde(remote = "Round")] +enum RoundDef { + Nil, + Some(u32), +} - #[cfg_attr(coverage_nightly, coverage(off))] - fn from_proto(proto: Self::Proto) -> Result { - Ok(Self { - value: Value::from_proto( - proto - .value - .ok_or_else(|| ProtoError::missing_field::("value"))?, - )?, - }) +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum ProposalPart { + Init(ProposalInit), + Data(ProposalData), + Fin(ProposalFin), +} + +impl ProposalPart { + pub fn get_type(&self) -> &'static str { + match self { + Self::Init(_) => "init", + Self::Data(_) => "data", + Self::Fin(_) => "fin", + } } - #[cfg_attr(coverage_nightly, coverage(off))] - fn to_proto(&self) -> Result { - Ok(Self::Proto { - value: Some(self.value.to_proto()?), - }) + pub fn as_init(&self) -> Option<&ProposalInit> { + match self { + Self::Init(init) => Some(init), + _ => None, + } } -} -#[derive(Serialize, Deserialize)] -#[serde(remote = "Round")] -pub enum RoundDef { - /// No round, ie. `-1` - Nil, + pub fn as_data(&self) -> Option<&ProposalData> { + match self { + Self::Data(data) => Some(data), + _ => None, + } + } - /// Some round `r` where `r >= 0` - Some(u32), + pub fn to_sign_bytes(&self) -> Bytes { + proto::Protobuf::to_bytes(self).unwrap() // FIXME: unwrap + } } /// A part of a value for a height, round. Identified in this scope by the sequence. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct ProposalPart { +pub struct ProposalInit { pub height: Height, #[serde(with = "RoundDef")] pub round: Round, - pub sequence: u64, - pub content: Content, pub proposer: Address, - pub fin: bool, } -impl ProposalPart { - pub fn new( - height: Height, - round: Round, - sequence: u64, - proposer: Address, - content: Content, - fin: bool, - ) -> Self { +impl ProposalInit { + pub fn new(height: Height, round: Round, proposer: Address) -> Self { Self { height, round, - sequence, - content, proposer, - fin, } } +} - pub fn to_bytes(&self) -> Bytes { - Protobuf::to_bytes(self).unwrap() - } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ProposalFin { + pub signature: Signature, +} - pub fn size_bytes(&self) -> usize { - self.content.size_bytes() +impl ProposalFin { + pub fn new(signature: Signature) -> Self { + Self { signature } } } impl malachite_core_types::ProposalPart for ProposalPart { fn is_first(&self) -> bool { - self.sequence == 0 + matches!(self, Self::Init(_)) } fn is_last(&self) -> bool { - self.fin + matches!(self, Self::Fin(_)) } } @@ -108,33 +110,54 @@ impl Protobuf for ProposalPart { #[cfg_attr(coverage_nightly, coverage(off))] fn from_proto(proto: Self::Proto) -> Result { - Ok(Self { - height: Height::from_proto(proto.height)?, - round: Round::new(proto.round), - sequence: proto.sequence, - content: Content::from_proto( - proto - .content - .ok_or_else(|| ProtoError::missing_field::("content"))?, - )?, - proposer: Address::from_proto( - proto - .validator_address - .ok_or_else(|| ProtoError::missing_field::("validator_address"))?, - )?, - fin: proto.fin, - }) + use crate::proto::proposal_part::Part; + + let part = proto + .part + .ok_or_else(|| ProtoError::missing_field::("part"))?; + + match part { + Part::Init(init) => Ok(Self::Init(ProposalInit { + height: Height::new(init.height), + round: Round::new(init.round), + proposer: init + .proposer + .ok_or_else(|| ProtoError::missing_field::("proposer")) + .and_then(Address::from_proto)?, + })), + Part::Data(data) => Ok(Self::Data(ProposalData::new(data.factor))), + Part::Fin(fin) => Ok(Self::Fin(ProposalFin { + signature: fin + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(decode_signature)?, + })), + } } #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { - Ok(crate::proto::ProposalPart { - height: self.height.to_proto()?, - round: self.round.as_u32().expect("round should not be nil"), - sequence: self.sequence, - content: Some(self.content.to_proto()?), - validator_address: Some(self.proposer.to_proto()?), - fin: self.fin, - }) + use crate::proto; + use crate::proto::proposal_part::Part; + + match self { + Self::Init(init) => Ok(Self::Proto { + part: Some(Part::Init(proto::ProposalInit { + height: init.height.as_u64(), + round: init.round.as_u32().unwrap(), + proposer: Some(init.proposer.to_proto()?), + })), + }), + Self::Data(data) => Ok(Self::Proto { + part: Some(Part::Data(proto::ProposalData { + factor: data.factor, + })), + }), + Self::Fin(fin) => Ok(Self::Proto { + part: Some(Part::Fin(proto::ProposalFin { + signature: Some(encode_signature(&fin.signature)), + })), + }), + } } } diff --git a/code/crates/test/src/signing.rs b/code/crates/test/src/signing.rs index f75657c64..ea34f6911 100644 --- a/code/crates/test/src/signing.rs +++ b/code/crates/test/src/signing.rs @@ -31,13 +31,24 @@ impl Ed25519Provider { pub fn new(private_key: PrivateKey) -> Self { Self { private_key } } + + pub fn private_key(&self) -> &PrivateKey { + &self.private_key + } + + pub fn sign(&self, data: &[u8]) -> Signature { + self.private_key.sign(data) + } + + pub fn verify(&self, data: &[u8], signature: &Signature, public_key: &PublicKey) -> bool { + public_key.verify(data, signature).is_ok() + } } impl SigningProvider for Ed25519Provider { #[cfg_attr(coverage_nightly, coverage(off))] fn sign_vote(&self, vote: Vote) -> SignedVote { - use signature::Signer; - let signature = self.private_key.sign(&vote.to_bytes()); + let signature = self.sign(&vote.to_bytes()); SignedVote::new(vote, signature) } @@ -48,13 +59,11 @@ impl SigningProvider for Ed25519Provider { signature: &Signature, public_key: &PublicKey, ) -> bool { - use signature::Verifier; public_key.verify(&vote.to_bytes(), signature).is_ok() } #[cfg_attr(coverage_nightly, coverage(off))] fn sign_proposal(&self, proposal: Proposal) -> SignedProposal { - use signature::Signer; let signature = self.private_key.sign(&proposal.to_bytes()); SignedProposal::new(proposal, signature) } @@ -66,14 +75,12 @@ impl SigningProvider for Ed25519Provider { signature: &Signature, public_key: &PublicKey, ) -> bool { - use signature::Verifier; public_key.verify(&proposal.to_bytes(), signature).is_ok() } #[cfg_attr(coverage_nightly, coverage(off))] fn sign_proposal_part(&self, proposal_part: ProposalPart) -> SignedProposalPart { - use signature::Signer; - let signature = self.private_key.sign(&proposal_part.to_bytes()); + let signature = self.private_key.sign(&proposal_part.to_sign_bytes()); SignedProposalPart::new(proposal_part, signature) } @@ -84,9 +91,8 @@ impl SigningProvider for Ed25519Provider { signature: &Signature, public_key: &PublicKey, ) -> bool { - use signature::Verifier; public_key - .verify(&proposal_part.to_bytes(), signature) + .verify(&proposal_part.to_sign_bytes(), signature) .is_ok() } diff --git a/code/examples/channel/Cargo.toml b/code/examples/channel/Cargo.toml index 7817f5cb4..37e1f2561 100644 --- a/code/examples/channel/Cargo.toml +++ b/code/examples/channel/Cargo.toml @@ -11,11 +11,14 @@ rust-version.workspace = true async-trait.workspace = true bytes.workspace = true color-eyre.workspace = true +derive-where.workspace = true eyre.workspace = true libp2p-identity.workspace = true rand.workspace = true serde_json.workspace = true +sha3.workspace = true tracing.workspace = true +tokio.workspace = true malachite-app-channel.workspace = true malachite-test.workspace = true diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index 69482e560..b0d3f793e 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -1,7 +1,9 @@ +use std::time::Duration; + use eyre::eyre; use tracing::{error, info}; -use malachite_app_channel::app::host::LocallyProposedValue; +use malachite_app_channel::app::streaming::StreamContent; use malachite_app_channel::app::types::core::{Round, Validity}; use malachite_app_channel::app::types::ProposedValue; use malachite_app_channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; @@ -19,6 +21,8 @@ pub async fn run( AppMsg::ConsensusReady { reply } => { info!("Consensus is ready"); + tokio::time::sleep(Duration::from_secs(1)).await; + if reply .send(ConsensusMsg::StartHeight( state.current_height, @@ -48,29 +52,39 @@ pub async fn run( timeout: _, reply, } => { - info!(%height, %round, "Get value"); + // NOTE: We can ignore the timeout as we are building the value right away. + // If we were let's say reaping as many txes from a mempool and executing them, + // then we would need to respect the timeout and stop at a certain point. - let proposal = state.propose_value(&height); + info!(%height, %round, "Consensus is requesting a value to propose"); - let value = LocallyProposedValue::new( - proposal.height, - proposal.round, - proposal.value, - proposal.extension, - ); + // Check if we have a previously built value for that height and round + if let Some(proposal) = state.get_previously_built_value(height, round) { + info!(value = %proposal.value.id(), "Re-using previously built value"); + + if reply.send(proposal).is_err() { + error!("Failed to send GetValue reply"); + } + + return Ok(()); + } + + // Otherwise, propose a new value + let proposal = state.propose_value(height, round); // Send it to consensus - if reply.send(value.clone()).is_err() { + if reply.send(proposal.clone()).is_err() { error!("Failed to send GetValue reply"); } - let stream_message = state.create_stream_message(value); - - // Broadcast it to others. Old messages need not be broadcast. - channels - .network - .send(NetworkMsg::PublishProposalPart(stream_message)) - .await?; + // Decompose the proposal into proposal parts and stream them over the network + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } } AppMsg::GetHistoryMinHeight { reply } => { @@ -79,15 +93,18 @@ pub async fn run( } } - AppMsg::ReceivedProposalPart { - from: _, - part, - reply, - } => { - if let Some(proposed_value) = state.add_proposal(part) { - if reply.send(proposed_value).is_err() { - error!("Failed to send ReceivedProposalPart reply"); - } + AppMsg::ReceivedProposalPart { from, part, reply } => { + let part_type = match &part.content { + StreamContent::Data(part) => part.get_type(), + StreamContent::Fin(_) => "end of stream", + }; + + info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + + let proposed_value = state.received_proposal_part(from, part); + + if reply.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); } } @@ -98,6 +115,12 @@ pub async fn run( } AppMsg::Decided { certificate, reply } => { + info!( + height = %certificate.height, round = %certificate.round, + value = %certificate.value_id, + "Consensus has decided on value" + ); + state.commit(certificate); if reply @@ -126,6 +149,8 @@ pub async fn run( value_bytes, reply, } => { + info!(%height, %round, "Processing synced value"); + let value = decode_value(value_bytes); if reply @@ -145,7 +170,7 @@ pub async fn run( } AppMsg::RestreamProposal { .. } => { - unimplemented!("RestreamValue"); + error!("RestreamProposal not implemented"); } } } diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index 2c27dfdd5..59ee91c2d 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -1,15 +1,19 @@ //! Example application using channels mod app; -mod node; -mod state; use eyre::eyre; +use tracing::{error, info, trace}; + use malachite_test_cli::args::{Args, Commands}; use malachite_test_cli::config::load_config; use malachite_test_cli::{logging, runtime}; + +mod node; +mod state; +mod streaming; + use node::App; -use tracing::{error, info, trace}; /// Main entry point for the application /// diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 9f0da976d..d917b6efc 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -107,7 +107,7 @@ impl Node for App { let codec = ProtobufCodec; let mut channels = malachite_app_channel::run( - ctx, + ctx.clone(), codec, self.clone(), self.config.clone(), @@ -117,7 +117,7 @@ impl Node for App { ) .await?; - let mut state = State::new(address, start_height.unwrap_or_default()); + let mut state = State::new(ctx, address, start_height.unwrap_or_default()); crate::app::run(genesis, &mut state, &mut channels).await } diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 3c481e7f8..68afb6c1c 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -1,10 +1,13 @@ //! Internal state of the application. This is a simplified abstract to keep it simple. //! A regular application would have mempool implemented, a proper database and input methods like RPC. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use bytes::Bytes; -use tracing::error; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use sha3::Digest; +use tracing::debug; use malachite_app_channel::app::consensus::ProposedValue; use malachite_app_channel::app::host::LocallyProposedValue; @@ -12,40 +15,49 @@ use malachite_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachite_app_channel::app::types::codec::Codec; use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::types::sync::DecidedValue; +use malachite_app_channel::app::types::PeerId; use malachite_test::codec::proto::ProtobufCodec; -use malachite_test::{Address, Content, Height, ProposalPart, TestContext, Value}; +use malachite_test::{ + Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value, +}; -/// Decodes a Value from its byte representation using ProtobufCodec -pub fn decode_value(bytes: Bytes) -> Value { - ProtobufCodec.decode(bytes).unwrap() -} - -/// Encodes a Value into its byte representation using ProtobufCodec -pub fn encode_value(value: &Value) -> Bytes { - ProtobufCodec.encode(value).unwrap() -} +use crate::streaming::{PartStreamsMap, ProposalParts}; /// Represents the internal state of the application node /// Contains information about current height, round, proposals and blocks pub struct State { + ctx: TestContext, + pub current_height: Height, pub current_round: Round, pub current_proposer: Option
, - earliest_height: Height, address: Address, stream_id: u64, - undecided_proposals: HashMap>, + undecided_proposals: HashMap<(Height, Round), ProposedValue>, decided_proposals: HashMap>, - blocks: HashMap>, - current_proposal: Option>, + decided_values: BTreeMap>, + streams_map: PartStreamsMap, + rng: StdRng, +} + +// Make up a seed for the rng based on our address in +// order for each node to likely propose different values at +// each round. +fn seed_from_address(address: &Address) -> u64 { + address.into_inner().chunks(8).fold(0u64, |acc, chunk| { + let term = chunk.iter().fold(0u64, |acc, &x| { + acc.wrapping_shl(8).wrapping_add(u64::from(x)) + }); + acc.wrapping_add(term) + }) } impl State { /// Creates a new State instance with the given validator address and starting height - pub fn new(address: Address, height: Height) -> Self { + pub fn new(ctx: TestContext, address: Address, height: Height) -> Self { Self { - earliest_height: height, + ctx, current_height: height, current_round: Round::new(0), current_proposer: None, @@ -53,64 +65,66 @@ impl State { stream_id: 0, undecided_proposals: HashMap::new(), decided_proposals: HashMap::new(), - blocks: HashMap::new(), - current_proposal: None, + decided_values: BTreeMap::new(), + streams_map: PartStreamsMap::new(), + rng: StdRng::seed_from_u64(seed_from_address(&address)), } } /// Returns the earliest height available in the state pub fn get_earliest_height(&self) -> Height { - self.earliest_height + self.decided_values + .keys() + .next() + .copied() + .unwrap_or_default() } /// Processes and adds a new proposal to the state if it's valid /// Returns Some(ProposedValue) if the proposal was accepted, None otherwise - pub fn add_proposal( + pub fn received_proposal_part( &mut self, - stream_message: StreamMessage, + from: PeerId, + part: StreamMessage, ) -> Option> { - let StreamContent::Data(proposal_part) = stream_message.content else { - error!("Invalid proposal: {:?}", stream_message.content); - return None; - }; + let sequence = part.sequence; + + // Check if we have a full proposal + let parts = self.streams_map.insert(from, part)?; + + // Check if the proposal is outdated + if parts.height < self.current_height { + debug!( + height = %self.current_height, + round = %self.current_round, + part.height = %parts.height, + part.round = %parts.round, + part.sequence = %sequence, + "Received outdated proposal part, ignoring" + ); - if proposal_part.height > self.current_height - || proposal_part.height == self.current_height - && proposal_part.round >= self.current_round - { - assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal - - let value = proposal_part.content.value; + return None; + } - let proposal = ProposedValue { - height: proposal_part.height, - round: proposal_part.round, - valid_round: Round::Nil, - proposer: proposal_part.proposer, - value, - validity: Validity::Valid, - extension: None, - }; + // Re-assemble the proposal from its parts + let value = assemble_value_from_parts(parts); - self.undecided_proposals - .insert(proposal_part.height, proposal.clone()); + self.undecided_proposals + .insert((value.height, value.round), value.clone()); - Some(proposal) - } else { - None - } + Some(value) } /// Retrieves a decided block at the given height pub fn get_decided_value(&self, height: &Height) -> Option<&DecidedValue> { - self.blocks.get(height) + self.decided_values.get(height) } /// Commits a value with the given certificate, updating internal state /// and moving to the next height pub fn commit(&mut self, certificate: CommitCertificate) { // Sort out proposals - for (height, value) in self.undecided_proposals.clone() { + for ((height, round), value) in self.undecided_proposals.clone() { if height > self.current_height { continue; } @@ -119,13 +133,13 @@ impl State { self.decided_proposals.insert(height, value); } - self.undecided_proposals.remove(&height); + self.undecided_proposals.remove(&(height, round)); } let value = self.decided_proposals.get(&certificate.height).unwrap(); let value_bytes = encode_value(&value.value); - self.blocks.insert( + self.decided_values.insert( self.current_height, DecidedValue::new(value_bytes, certificate), ); @@ -138,65 +152,199 @@ impl State { /// Retrieves a previously built proposal value for the given height pub fn get_previously_built_value( &self, - height: &Height, - ) -> Option<&ProposedValue> { - self.undecided_proposals.get(height) + height: Height, + round: Round, + ) -> Option> { + let proposal = self.undecided_proposals.get(&(height, round))?; + + Some(LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension.clone(), + )) } /// Creates a new proposal value for the given height /// Returns either a previously built proposal or creates a new one - pub fn propose_value(&mut self, height: &Height) -> ProposedValue { - if let Some(proposal) = self.get_previously_built_value(height) { - proposal.clone() - } else { - assert_eq!(height.as_u64(), self.current_height.as_u64()); + fn create_proposal(&mut self, height: Height, round: Round) -> ProposedValue { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + // We create a new value. + let value = self.make_value(); + + let proposal = ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer: self.address, // We are the proposer + value, + validity: Validity::Valid, // Our proposals are de facto valid + extension: None, // Vote extension can be added here + }; - // We create a new value. - let value = Value::new(42); // TODO: get value + // Insert the new proposal into the undecided proposals. + self.undecided_proposals + .insert((height, round), proposal.clone()); - let proposal = ProposedValue { - height: *height, - round: self.current_round, - valid_round: Round::Nil, - proposer: self.address, - value, - validity: Validity::Valid, - extension: None, - }; + proposal + } - // Insert the new proposal into the undecided proposals. - self.undecided_proposals.insert(*height, proposal.clone()); + /// Make up a new value to propose + /// A real application would have a more complex logic here, + /// typically reaping transactions from a mempool and executing them against its state, + /// before computing the merkle root of the new app state. + fn make_value(&mut self) -> Value { + let value = self.rng.gen_range(100..=100000); + Value::new(value) + } - proposal - } + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one + pub fn propose_value( + &mut self, + height: Height, + round: Round, + ) -> LocallyProposedValue { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + let proposal = self.create_proposal(height, round); + + LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension, + ) } /// Creates a stream message containing a proposal part. /// Updates internal sequence number and current proposal. - pub fn create_stream_message( + pub fn stream_proposal( &mut self, value: LocallyProposedValue, - ) -> StreamMessage { - // Only a single proposal part - let sequence = 0; + ) -> impl Iterator> { + let parts = self.value_to_parts(value); - let content = Content::new(value.value); + let stream_id = self.stream_id; + self.stream_id += 1; - let proposal_part = ProposalPart::new( - self.current_height, - self.current_round, + let mut msgs = Vec::with_capacity(parts.len() + 1); + let mut sequence = 0; + + for part in parts { + let msg = StreamMessage::new(stream_id, sequence, StreamContent::Data(part)); + sequence += 1; + msgs.push(msg); + } + + msgs.push(StreamMessage::new( + stream_id, sequence, - self.address, - content, - true, // full proposal is emitted as a single proposal part - ); + StreamContent::Fin(true), + )); - let stream_content = StreamContent::Data(proposal_part); - let msg = StreamMessage::new(self.stream_id, sequence, stream_content); + msgs.into_iter() + } - self.stream_id += 1; - self.current_proposal = Some(msg.clone()); + fn value_to_parts(&self, value: LocallyProposedValue) -> Vec { + let mut hasher = sha3::Keccak256::new(); + let mut parts = Vec::new(); - msg + // Init + // Include metadata about the proposal + { + parts.push(ProposalPart::Init(ProposalInit::new( + value.height, + value.round, + self.address, + ))); + + hasher.update(value.height.as_u64().to_be_bytes().as_slice()); + hasher.update(value.round.as_i64().to_be_bytes().as_slice()); + + if let Some(ext) = &value.extension { + hasher.update(ext.data.as_ref()); + } + } + + // Data + // Include each prime factor of the value as a separate proposal part + { + for factor in factor_value(value.value) { + parts.push(ProposalPart::Data(ProposalData::new(factor))); + + hasher.update(factor.to_be_bytes().as_slice()); + } + } + + // Fin + // Sign the hash of the proposal parts + { + let hash = hasher.finalize().to_vec(); + let signature = self.ctx.signing_provider.sign(&hash); + parts.push(ProposalPart::Fin(ProposalFin::new(signature))); + } + + parts } } + +/// Re-assemble a [`ProposedValue`] from its [`ProposalParts`]. +/// +/// This is done by multiplying all the factors in the parts. +fn assemble_value_from_parts(parts: ProposalParts) -> ProposedValue { + let value = parts + .parts + .iter() + .filter_map(|part| part.as_data()) + .fold(1, |acc, data| acc * data.factor); + + ProposedValue { + height: parts.height, + round: parts.round, + valid_round: Round::Nil, + proposer: parts.proposer, + value: Value::new(value), + validity: Validity::Valid, // TODO: Check signature in Fin part + extension: None, + } +} + +/// Decodes a Value from its byte representation using ProtobufCodec +pub fn decode_value(bytes: Bytes) -> Value { + ProtobufCodec.decode(bytes).unwrap() +} + +/// Encodes a Value into its byte representation using ProtobufCodec +pub fn encode_value(value: &Value) -> Bytes { + ProtobufCodec.encode(value).unwrap() +} + +/// Returns the list of prime factors of the given value +/// +/// In a real application, this would typically split transactions +/// into chunks ino order to reduce bandwidth requirements due +/// to duplication of gossip messages. +fn factor_value(value: Value) -> Vec { + let mut factors = Vec::new(); + let mut n = value.as_u64(); + + let mut i = 2; + while i * i <= n { + if n % i == 0 { + factors.push(i); + n /= i; + } else { + i += 1; + } + } + + if n > 1 { + factors.push(n); + } + + factors +} diff --git a/code/examples/channel/src/streaming.rs b/code/examples/channel/src/streaming.rs new file mode 100644 index 000000000..b3cb65a31 --- /dev/null +++ b/code/examples/channel/src/streaming.rs @@ -0,0 +1,136 @@ +use std::cmp::Ordering; +use std::collections::{BTreeMap, BinaryHeap, HashSet}; + +use malachite_app_channel::app::consensus::PeerId; +use malachite_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; +use malachite_app_channel::app::types::core::Round; +use malachite_test::{Address, Height, ProposalInit, ProposalPart}; + +struct MinSeq(StreamMessage); + +impl PartialEq for MinSeq { + fn eq(&self, other: &Self) -> bool { + self.0.sequence == other.0.sequence + } +} + +impl Eq for MinSeq {} + +impl Ord for MinSeq { + fn cmp(&self, other: &Self) -> Ordering { + other.0.sequence.cmp(&self.0.sequence) + } +} + +impl PartialOrd for MinSeq { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +struct MinHeap(BinaryHeap>); + +impl Default for MinHeap { + fn default() -> Self { + Self(BinaryHeap::new()) + } +} + +impl MinHeap { + fn push(&mut self, msg: StreamMessage) { + self.0.push(MinSeq(msg)); + } + + fn len(&self) -> usize { + self.0.len() + } + + fn drain(&mut self) -> Vec { + self.0 + .drain() + .filter_map(|msg| msg.0.content.into_data()) + .collect() + } +} + +#[derive(Default)] +struct StreamState { + buffer: MinHeap, + init_info: Option, + seen_sequences: HashSet, + total_messages: usize, + fin_received: bool, +} + +impl StreamState { + fn is_done(&self) -> bool { + self.init_info.is_some() && self.fin_received && self.buffer.len() == self.total_messages + } + + fn insert(&mut self, msg: StreamMessage) -> Option { + if msg.is_first() { + self.init_info = msg.content.as_data().and_then(|p| p.as_init()).cloned(); + } + + if msg.is_fin() { + self.fin_received = true; + self.total_messages = msg.sequence as usize + 1; + } + + self.buffer.push(msg); + + if self.is_done() { + let init_info = self.init_info.take()?; + + Some(ProposalParts { + height: init_info.height, + round: init_info.round, + proposer: init_info.proposer, + parts: self.buffer.drain(), + }) + } else { + None + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ProposalParts { + pub height: Height, + pub round: Round, + pub proposer: Address, + pub parts: Vec, +} + +#[derive(Default)] +pub struct PartStreamsMap { + streams: BTreeMap<(PeerId, StreamId), StreamState>, +} + +impl PartStreamsMap { + pub fn new() -> Self { + Self::default() + } + + pub fn insert( + &mut self, + peer_id: PeerId, + msg: StreamMessage, + ) -> Option { + let stream_id = msg.stream_id; + let state = self.streams.entry((peer_id, stream_id)).or_default(); + + if !state.seen_sequences.insert(msg.sequence) { + // We have already seen a message with this sequence number. + return None; + } + + let result = state.insert(msg); + + if state.is_done() { + self.streams.remove(&(peer_id, stream_id)); + } + + result + } +}