Skip to content

Commit

Permalink
chore(code): Compute hash from block parts, send Invalid if it doesn'…
Browse files Browse the repository at this point in the history
…t match the metadata (#242)

* Add validity check when building the value from parts, send validity to
consensus.

* Return all parts in ascending sequence order

* Trim last tx batch block part when max block size is reached.

* Cargo fmt

* Clear the mempool on update for now.
  • Loading branch information
ancazamfir authored Jun 18, 2024
1 parent e7f5db7 commit 7fcbbf3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 30 deletions.
7 changes: 5 additions & 2 deletions code/crates/actors/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ impl Actor for Mempool {
reply.send(txes)?;
}

Msg::Update { tx_hashes } => {
tx_hashes.iter().for_each(|hash| state.remove_tx(hash));
Msg::Update { .. } => {
//tx_hashes.iter().for_each(|hash| state.remove_tx(hash));
// TODO - reset the mempool for now
state.transactions = BTreeMap::new();
info!("Mempool after Update has size {}", state.transactions.len());
}
}

Expand Down
3 changes: 2 additions & 1 deletion code/crates/test-app/src/part_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl<Ctx: Context> PartStore<Ctx> {
self.map.get(&(height, round, sequence))
}

// Get all parts for a given height and round, sorted in sequence ascending order.
pub fn all_parts(&self, height: Ctx::Height, round: Round) -> Vec<&Ctx::BlockPart> {
use malachite_common::BlockPart;

Expand All @@ -43,7 +44,7 @@ impl<Ctx: Context> PartStore<Ctx> {
.map(|(_, b)| b)
.collect();

block_parts.sort_by_key(|b| std::cmp::Reverse(b.sequence()));
block_parts.sort_by_key(|b| b.sequence());
block_parts
}

Expand Down
95 changes: 68 additions & 27 deletions code/crates/test-app/src/value_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::time::{Duration, Instant};

use async_trait::async_trait;
use bytesize::ByteSize;
use tracing::{error, info, trace};
use tracing::{debug, error, info, trace};

use malachite_actors::consensus::Metrics;
use malachite_actors::consensus::{ConsensusRef, Msg as ConsensusMsg};
use malachite_actors::host::{LocallyProposedValue, ReceivedProposedValue};
use malachite_actors::mempool::{MempoolRef, Msg as MempoolMsg};
use malachite_actors::value_builder::ValueBuilder;
use malachite_common::{Context, Round, SignedVote, TransactionBatch};
use malachite_common::{Context, Round, SignedVote, Transaction, TransactionBatch};
use malachite_driver::Validity;
use malachite_test::{Address, BlockMetadata, BlockPart, Content, Height, TestContext, Value};

Expand Down Expand Up @@ -79,6 +79,7 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
let mut tx_batch = vec![];
let mut sequence = 1;
let mut block_size = 0;
let mut max_block_size_reached = false;

loop {
trace!(
Expand All @@ -88,7 +89,7 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
sequence
);

let txes = self
let mut txes = self
.tx_streamer
.call(
|reply| MempoolMsg::TxStream {
Expand All @@ -106,6 +107,22 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
return None;
}

let mut tx_count = 0;

'inner: for tx in &txes {
if block_size + tx.size_bytes() > self.params.max_block_size.as_u64() as usize {
max_block_size_reached = true;
break 'inner;
}

block_size += tx.size_bytes();
tx_batch.push(tx.clone());
tx_count += 1;
}

// Trim the tx batch so it does not overflow the block.
txes = txes.into_iter().take(tx_count).collect();

// Create, store and gossip the batch in a BlockPart
let block_part = BlockPart::new(
height,
Expand All @@ -121,20 +138,8 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
.cast(ConsensusMsg::BuilderBlockPart(block_part))
.unwrap();

let mut tx_count = 0;

'inner: for tx in txes {
if block_size + tx.size_bytes() > self.params.max_block_size.as_u64() as usize {
break 'inner;
}

block_size += tx.size_bytes();
tx_batch.push(tx);
tx_count += 1;
}

// Simulate execution of reaped txes
let exec_time = self.params.exec_time_per_tx * tx_count;
let exec_time = self.params.exec_time_per_tx * tx_count as u32;
trace!("Simulating tx execution for {tx_count} tx-es, sleeping for {exec_time:?}");
tokio::time::sleep(exec_time).await;

Expand All @@ -149,9 +154,15 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {

sequence += 1;

if Instant::now() > deadline
|| block_size >= self.params.max_block_size.as_u64() as usize
{
if Instant::now() > deadline || max_block_size_reached {
if max_block_size_reached {
debug!(
"Value Builder stopped streaming Tx-es due to max block size being reached"
);
} else {
debug!("Value Builder stopped streaming Tx-es due to deadline being reached");
}

// Create, store and gossip the BlockMetadata in a BlockPart
let value = Value::new_from_transactions(&tx_batch);

Expand All @@ -176,11 +187,12 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
.unwrap();

info!(
"Value Builder created a block with {} tx-es of size {} in {:?} with hash {:?} ",
"Value Builder created a block with {} tx-es of size {} in {:?} with hash {:?}, disseminated in {} block parts ",
tx_batch.len(),
ByteSize::b(block_size as u64),
Instant::now() - start,
value.id()
value.id(),
sequence,
);

return result;
Expand Down Expand Up @@ -220,19 +232,38 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
let highest_sequence = all_parts.len() as u64;

if let Some(last_part) = self.part_store.get(height, round, highest_sequence) {
// If the "last" part includes a metadata then this is truly the last part.
// So in this case all block parts have been received, including the metadata that includes
// the block hash/ value. This can be returned as the block is complete.
// TODO - the logic here is weak, we assume earlier parts don't include metadata
// Should change once we implement `oneof`/ proper enum in protobuf but good enough for now test code
// Check if the part with the highest sequence number had metadata content.
// TODO - do more validations, e.g. there is no higher tx block part.
match last_part.metadata() {
// All block parts should have been received, including the metadata that has
// the block hash/ value.
Some(meta) => {
// Compute the block size.
let block_size: usize = all_parts.iter().map(|p| p.size_bytes()).sum();

// Compute the number of transactions in the block.
let tx_count: usize = all_parts
.iter()
.map(|p| p.content.tx_count().unwrap_or(0))
.sum();

let received_value = meta.value();

// Compute the expected block hash/ value from the block parts.
let all_txes: Vec<Transaction> = all_parts
.iter()
.flat_map(|p| match p.content.as_ref() {
Content::TxBatch(tx_batch) => {
info!("get txes from received part {}", p.sequence);
tx_batch.transactions().to_vec()
}
Content::Metadata(_) => {
vec![]
}
})
.collect();
let expected_value = Value::new_from_transactions(&all_txes);

info!(
height = %last_part.height,
round = %last_part.round,
Expand All @@ -242,12 +273,22 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
"Value Builder received last block part",
);

let valid = if received_value != expected_value {
error!(
"Invalid block received with value {:?}, expected {:?}",
received_value, expected_value
);
Validity::Invalid
} else {
Validity::Valid
};

Some(ReceivedProposedValue {
validator_address: last_part.validator_address,
height: last_part.height,
round: last_part.round,
value: meta.value(),
valid: Validity::Valid,
valid,
})
}
None => None,
Expand Down

0 comments on commit 7fcbbf3

Please sign in to comment.