Skip to content

Commit

Permalink
chore(example): Make up random values to propose, and break them down…
Browse files Browse the repository at this point in the history
… into into parts to stream them (#692)

* chore(code): Break down proposed value into parts and stream them

Since the proposed value is just an integer, we decompose it into its
prime factors and use those as parts.

* Make up random values instead of using hard-coded one
  • Loading branch information
romac authored Dec 17, 2024
1 parent 9151e6e commit 545de8f
Show file tree
Hide file tree
Showing 24 changed files with 617 additions and 239 deletions.
4 changes: 4 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ where
.send(AppMsg::ReceivedProposalPart { from, part, reply })
.await?;

reply_to.send(rx.await?)?;
if let Some(value) = rx.await? {
reply_to.send(value)?;
}
}

HostMsg::GetValidatorSet { height, reply_to } => {
Expand Down
4 changes: 2 additions & 2 deletions code/crates/app-channel/src/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ pub enum AppMsg<Ctx: Context> {
from: PeerId,
/// Received proposal part, together with its stream metadata
part: StreamMessage<Ctx::ProposalPart>,
/// Channel for returning the complete value if proposal is now complete
reply: Reply<ProposedValue<Ctx>>,
/// Channel for returning the complete value if the proposal is now complete
reply: Reply<Option<ProposedValue<Ctx>>>,
},

/// Requests the validator set for a specific height
Expand Down
2 changes: 1 addition & 1 deletion code/crates/app-channel/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub async fn spawn_host_actor<Ctx>(
where
Ctx: Context,
{
let (tx, rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(128);
let actor_ref = Connector::spawn(tx, metrics).await?;
Ok((actor_ref, rx))
}
Expand Down
3 changes: 2 additions & 1 deletion code/crates/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ malachite-peer.workspace = true
malachite-sync.workspace = true

async-trait = { workspace = true }
derive-where = { workspace = true }
eyre = { workspace = true }
libp2p-identity = { workspace = true }
rand = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions code/crates/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
mod node;
pub use node::Node;

pub mod part_store;
pub mod types;

mod spawn;
Expand All @@ -21,6 +22,7 @@ pub mod streaming {
}

pub mod host {
// TODO: Move this under `types`
pub use malachite_engine::host::LocallyProposedValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use malachite_core_types::{Context, Round, ValueId};

// This is a temporary store implementation for proposal parts
//
// TODO:
// - [ ] add Address to key
// note: not sure if this is required as consensus should verify that only the parts signed by the proposer for
// the height and round should be forwarded here (see the TODOs in consensus)
// TODO: Add Address to key
// NOTE: Not sure if this is required as consensus should verify that only the parts signed by the proposer for
// the height and round should be forwarded here (see the TODOs in consensus)

type Key<Height> = (Height, Round);

Expand Down
2 changes: 1 addition & 1 deletion code/crates/app/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod metrics {
pub use libp2p_identity::Keypair;

pub mod streaming {
pub use malachite_engine::util::streaming::StreamMessage;
pub use malachite_engine::util::streaming::{Sequence, StreamId, StreamMessage};
}

pub mod sync {
Expand Down
1 change: 1 addition & 0 deletions code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl<Ctx: Context> LocallyProposedValue<Ctx> {
pub type HostRef<Ctx> = ActorRef<HostMsg<Ctx>>;

/// Messages that need to be handled by the host actor.
#[derive_where(Debug)]
pub enum HostMsg<Ctx: Context> {
/// Consensus is ready
ConsensusReady(ConsensusRef<Ctx>),
Expand Down
2 changes: 2 additions & 0 deletions code/crates/engine/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ where
_: WalRef<Ctx>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
info!("Shutting down WAL");

let _ = state.wal_sender.send(self::thread::WalMsg::Shutdown).await;

Ok(())
Expand Down
17 changes: 14 additions & 3 deletions code/crates/signing-ed25519/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ impl SigningScheme for Ed25519 {
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(transparent))]
pub struct Signature(ed25519_consensus::Signature);

impl Signature {
Expand Down Expand Up @@ -127,6 +129,11 @@ impl PrivateKey {
PublicKey::new(self.0.verification_key())
}

#[cfg_attr(coverage_nightly, coverage(off))]
pub fn sign(&self, msg: &[u8]) -> Signature {
Signature(self.0.sign(msg))
}

#[cfg_attr(coverage_nightly, coverage(off))]
pub fn inner(&self) -> &ed25519_consensus::SigningKey {
&self.0
Expand Down Expand Up @@ -170,15 +177,19 @@ impl PublicKey {
self.0.as_bytes()
}

pub fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), signature::Error> {
self.0
.verify(signature.inner(), msg)
.map_err(|_| signature::Error::new())
}

pub fn inner(&self) -> &ed25519_consensus::VerificationKey {
&self.0
}
}

impl Verifier<Signature> for PublicKey {
fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), signature::Error> {
self.0
.verify(signature.inner(), msg)
.map_err(|_| signature::Error::new())
PublicKey::verify(self, msg, signature)
}
}
3 changes: 2 additions & 1 deletion code/crates/starknet/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ pub mod codec;
pub mod host;
pub mod mempool;
pub mod node;
pub mod part_store;
pub mod spawn;
pub mod streaming;

pub use malachite_app::part_store;

pub mod proto {
pub use malachite_proto::*;
pub use malachite_starknet_p2p_proto::*;
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/p2p-types/src/proposal_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl ProposalPart {
Self::Fin(_) => PartType::Fin,
}
}

pub fn to_sign_bytes(&self) -> Bytes {
proto::Protobuf::to_bytes(self).unwrap() // FIXME: unwrap
}
Expand Down
21 changes: 15 additions & 6 deletions code/crates/test/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,25 @@ message Signature {
}

message ProposalPart {
oneof part {
ProposalInit init = 1;
ProposalData data = 2;
ProposalFin fin = 3;
}
}

message ProposalInit {
uint64 height = 1;
uint32 round = 2;
uint64 sequence = 3;
Address validator_address = 4;
Content content = 5;
bool fin = 6;
Address proposer = 4;
}

message ProposalData {
uint64 factor = 1;
}

message Content {
Value value = 1;
message ProposalFin {
Signature signature = 1;
}

message Extension {
Expand Down
6 changes: 3 additions & 3 deletions code/crates/test/src/codec/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Codec<StreamMessage<ProposalPart>> for ProtobufCodec {
sequence: msg.sequence,
content: match &msg.content {
StreamContent::Data(data) => {
Some(proto::stream_message::Content::Data(data.to_bytes()))
Some(proto::stream_message::Content::Data(data.to_bytes()?))
}
StreamContent::Fin(end) => Some(proto::stream_message::Content::Fin(*end)),
},
Expand Down Expand Up @@ -451,13 +451,13 @@ fn decode_vote(msg: proto::SignedMessage) -> Option<SignedVote<TestContext>> {
Some(SignedVote::new(vote, signature))
}

fn encode_signature(signature: &Signature) -> proto::Signature {
pub(crate) fn encode_signature(signature: &Signature) -> proto::Signature {
proto::Signature {
bytes: Bytes::copy_from_slice(signature.to_bytes().as_ref()),
}
}

fn decode_signature(signature: proto::Signature) -> Result<Signature, ProtoError> {
pub(crate) fn decode_signature(signature: proto::Signature) -> Result<Signature, ProtoError> {
let bytes = <[u8; 64]>::try_from(signature.bytes.as_ref())
.map_err(|_| ProtoError::Other("Invalid signature length".to_string()))?;
Ok(Signature::from_bytes(bytes))
Expand Down
6 changes: 3 additions & 3 deletions code/crates/test/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::vote::*;

#[derive(Clone, Debug)]
pub struct TestContext {
ed25519_provider: Arc<Ed25519Provider>,
pub signing_provider: Arc<Ed25519Provider>,
}

impl TestContext {
pub fn new(private_key: PrivateKey) -> Self {
Self {
ed25519_provider: Arc::new(Ed25519Provider::new(private_key)),
signing_provider: Arc::new(Ed25519Provider::new(private_key)),
}
}
}
Expand All @@ -37,7 +37,7 @@ impl Context for TestContext {
type SigningProvider = Ed25519Provider;

fn signing_provider(&self) -> &Self::SigningProvider {
&self.ed25519_provider
&self.signing_provider
}

fn select_proposer<'a>(
Expand Down
Loading

0 comments on commit 545de8f

Please sign in to comment.