From 7fcbbf319464653ed71687a4a02762b9ab623b0f Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Tue, 18 Jun 2024 09:57:25 +0200 Subject: [PATCH] chore(code): Compute hash from block parts, send Invalid if it doesn't match the metadata (#242) * Add validity check when building the value from parts, send validity to consensus. * Return all parts in ascending sequence order * Trim last tx batch block part when max block size is reached. * Cargo fmt * Clear the mempool on update for now. --- code/crates/actors/src/mempool.rs | 7 +- code/crates/test-app/src/part_store.rs | 3 +- code/crates/test-app/src/value_builder.rs | 95 ++++++++++++++++------- 3 files changed, 75 insertions(+), 30 deletions(-) diff --git a/code/crates/actors/src/mempool.rs b/code/crates/actors/src/mempool.rs index 0c6edba1d..7d4972d5e 100644 --- a/code/crates/actors/src/mempool.rs +++ b/code/crates/actors/src/mempool.rs @@ -217,8 +217,11 @@ impl Actor for Mempool { reply.send(txes)?; } - Msg::Update { tx_hashes } => { - tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); + Msg::Update { .. } => { + //tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); + // TODO - reset the mempool for now + state.transactions = BTreeMap::new(); + info!("Mempool after Update has size {}", state.transactions.len()); } } diff --git a/code/crates/test-app/src/part_store.rs b/code/crates/test-app/src/part_store.rs index 579b0960e..c4fc03c5d 100644 --- a/code/crates/test-app/src/part_store.rs +++ b/code/crates/test-app/src/part_store.rs @@ -33,6 +33,7 @@ impl PartStore { self.map.get(&(height, round, sequence)) } + // Get all parts for a given height and round, sorted in sequence ascending order. pub fn all_parts(&self, height: Ctx::Height, round: Round) -> Vec<&Ctx::BlockPart> { use malachite_common::BlockPart; @@ -43,7 +44,7 @@ impl PartStore { .map(|(_, b)| b) .collect(); - block_parts.sort_by_key(|b| std::cmp::Reverse(b.sequence())); + block_parts.sort_by_key(|b| b.sequence()); block_parts } diff --git a/code/crates/test-app/src/value_builder.rs b/code/crates/test-app/src/value_builder.rs index f2e7a2b10..8e9fdb654 100644 --- a/code/crates/test-app/src/value_builder.rs +++ b/code/crates/test-app/src/value_builder.rs @@ -4,14 +4,14 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use bytesize::ByteSize; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; use malachite_actors::consensus::Metrics; use malachite_actors::consensus::{ConsensusRef, Msg as ConsensusMsg}; use malachite_actors::host::{LocallyProposedValue, ReceivedProposedValue}; use malachite_actors::mempool::{MempoolRef, Msg as MempoolMsg}; use malachite_actors::value_builder::ValueBuilder; -use malachite_common::{Context, Round, SignedVote, TransactionBatch}; +use malachite_common::{Context, Round, SignedVote, Transaction, TransactionBatch}; use malachite_driver::Validity; use malachite_test::{Address, BlockMetadata, BlockPart, Content, Height, TestContext, Value}; @@ -79,6 +79,7 @@ impl ValueBuilder for TestValueBuilder { let mut tx_batch = vec![]; let mut sequence = 1; let mut block_size = 0; + let mut max_block_size_reached = false; loop { trace!( @@ -88,7 +89,7 @@ impl ValueBuilder for TestValueBuilder { sequence ); - let txes = self + let mut txes = self .tx_streamer .call( |reply| MempoolMsg::TxStream { @@ -106,6 +107,22 @@ impl ValueBuilder for TestValueBuilder { return None; } + let mut tx_count = 0; + + 'inner: for tx in &txes { + if block_size + tx.size_bytes() > self.params.max_block_size.as_u64() as usize { + max_block_size_reached = true; + break 'inner; + } + + block_size += tx.size_bytes(); + tx_batch.push(tx.clone()); + tx_count += 1; + } + + // Trim the tx batch so it does not overflow the block. + txes = txes.into_iter().take(tx_count).collect(); + // Create, store and gossip the batch in a BlockPart let block_part = BlockPart::new( height, @@ -121,20 +138,8 @@ impl ValueBuilder for TestValueBuilder { .cast(ConsensusMsg::BuilderBlockPart(block_part)) .unwrap(); - let mut tx_count = 0; - - 'inner: for tx in txes { - if block_size + tx.size_bytes() > self.params.max_block_size.as_u64() as usize { - break 'inner; - } - - block_size += tx.size_bytes(); - tx_batch.push(tx); - tx_count += 1; - } - // Simulate execution of reaped txes - let exec_time = self.params.exec_time_per_tx * tx_count; + let exec_time = self.params.exec_time_per_tx * tx_count as u32; trace!("Simulating tx execution for {tx_count} tx-es, sleeping for {exec_time:?}"); tokio::time::sleep(exec_time).await; @@ -149,9 +154,15 @@ impl ValueBuilder for TestValueBuilder { sequence += 1; - if Instant::now() > deadline - || block_size >= self.params.max_block_size.as_u64() as usize - { + if Instant::now() > deadline || max_block_size_reached { + if max_block_size_reached { + debug!( + "Value Builder stopped streaming Tx-es due to max block size being reached" + ); + } else { + debug!("Value Builder stopped streaming Tx-es due to deadline being reached"); + } + // Create, store and gossip the BlockMetadata in a BlockPart let value = Value::new_from_transactions(&tx_batch); @@ -176,11 +187,12 @@ impl ValueBuilder for TestValueBuilder { .unwrap(); info!( - "Value Builder created a block with {} tx-es of size {} in {:?} with hash {:?} ", + "Value Builder created a block with {} tx-es of size {} in {:?} with hash {:?}, disseminated in {} block parts ", tx_batch.len(), ByteSize::b(block_size as u64), Instant::now() - start, - value.id() + value.id(), + sequence, ); return result; @@ -220,19 +232,38 @@ impl ValueBuilder for TestValueBuilder { let highest_sequence = all_parts.len() as u64; if let Some(last_part) = self.part_store.get(height, round, highest_sequence) { - // If the "last" part includes a metadata then this is truly the last part. - // So in this case all block parts have been received, including the metadata that includes - // the block hash/ value. This can be returned as the block is complete. - // TODO - the logic here is weak, we assume earlier parts don't include metadata - // Should change once we implement `oneof`/ proper enum in protobuf but good enough for now test code + // Check if the part with the highest sequence number had metadata content. + // TODO - do more validations, e.g. there is no higher tx block part. match last_part.metadata() { + // All block parts should have been received, including the metadata that has + // the block hash/ value. Some(meta) => { + // Compute the block size. let block_size: usize = all_parts.iter().map(|p| p.size_bytes()).sum(); + + // Compute the number of transactions in the block. let tx_count: usize = all_parts .iter() .map(|p| p.content.tx_count().unwrap_or(0)) .sum(); + let received_value = meta.value(); + + // Compute the expected block hash/ value from the block parts. + let all_txes: Vec = all_parts + .iter() + .flat_map(|p| match p.content.as_ref() { + Content::TxBatch(tx_batch) => { + info!("get txes from received part {}", p.sequence); + tx_batch.transactions().to_vec() + } + Content::Metadata(_) => { + vec![] + } + }) + .collect(); + let expected_value = Value::new_from_transactions(&all_txes); + info!( height = %last_part.height, round = %last_part.round, @@ -242,12 +273,22 @@ impl ValueBuilder for TestValueBuilder { "Value Builder received last block part", ); + let valid = if received_value != expected_value { + error!( + "Invalid block received with value {:?}, expected {:?}", + received_value, expected_value + ); + Validity::Invalid + } else { + Validity::Valid + }; + Some(ReceivedProposedValue { validator_address: last_part.validator_address, height: last_part.height, round: last_part.round, value: meta.value(), - valid: Validity::Valid, + valid, }) } None => None,