From fa9ef0a42ffcf71312528c1433c19563835573b2 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Wed, 12 Jun 2024 18:41:47 +0200 Subject: [PATCH] chore(code): Inform the host when a block is decided and simulate host actions (#231) * Inform the host when a block is decided. Clean the block part store. Update metrics here. * Don't prune anymore from get value and block parts * Remove mempool txes that have been included inthe decided block * Remove comment --------- Co-authored-by: Romain Ruetschi --- code/crates/actors/src/consensus.rs | 6 +++ code/crates/actors/src/host.rs | 19 +++++++ code/crates/actors/src/mempool.rs | 63 ++++++++++++++++++----- code/crates/actors/src/value_builder.rs | 2 + code/crates/test-app/src/value_builder.rs | 56 +++++++++++++++----- 5 files changed, 121 insertions(+), 25 deletions(-) diff --git a/code/crates/actors/src/consensus.rs b/code/crates/actors/src/consensus.rs index dbd29b146..7de2f5a69 100644 --- a/code/crates/actors/src/consensus.rs +++ b/code/crates/actors/src/consensus.rs @@ -673,6 +673,12 @@ where value.id() ); + self.host.cast(HostMsg::DecidedOnValue { + height, + round, + value, + })?; + self.metrics.block_end(); self.metrics.finalized_blocks.inc(); self.metrics diff --git a/code/crates/actors/src/host.rs b/code/crates/actors/src/host.rs index d4d358f9b..9fcc81218 100644 --- a/code/crates/actors/src/host.rs +++ b/code/crates/actors/src/host.rs @@ -58,6 +58,13 @@ pub enum Msg { height: Ctx::Height, reply_to: RpcReplyPort, }, + + // Decided value + DecidedOnValue { + height: Ctx::Height, + round: Round, + value: Ctx::Value, + }, } pub struct State { @@ -205,6 +212,18 @@ impl Actor for Host { reply_to.send(value)?; } + Msg::DecidedOnValue { + height, + round, + value, + } => { + info!("what"); + let _v = state + .value_builder + .decided_on_value(height, round, value) + .await; + } + Msg::GetValidatorSet { height: _, reply_to, diff --git a/code/crates/actors/src/mempool.rs b/code/crates/actors/src/mempool.rs index b18ce26a0..8fa3127fb 100644 --- a/code/crates/actors/src/mempool.rs +++ b/code/crates/actors/src/mempool.rs @@ -1,4 +1,5 @@ -use std::collections::VecDeque; +use std::collections::{BTreeMap, VecDeque}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use async_trait::async_trait; @@ -49,12 +50,41 @@ pub enum Msg { num_txes: usize, reply: RpcReplyPort>, }, + Update { + tx_hashes: Vec, + }, } #[allow(dead_code)] pub struct State { msg_queue: VecDeque, - transactions: Vec, + pub transactions: BTreeMap, +} + +impl State { + pub fn new() -> Self { + Self { + msg_queue: VecDeque::new(), + transactions: BTreeMap::new(), + } + } + + pub fn add_tx(&mut self, tx: &Transaction) { + let mut hash = DefaultHasher::new(); + tx.0.hash(&mut hash); + let key = hash.finish(); + self.transactions.entry(key).or_insert(tx.clone()); + } + + pub fn remove_tx(&mut self, hash: &u64) { + self.transactions.remove_entry(hash); + } +} + +impl Default for State { + fn default() -> Self { + Self::new() + } } impl Mempool { @@ -150,10 +180,7 @@ impl Actor for Mempool { self.gossip_mempool .cast(GossipMempoolMsg::Subscribe(forward))?; - Ok(State { - msg_queue: VecDeque::new(), - transactions: vec![], - }) + Ok(State::new()) } #[tracing::instrument(name = "mempool", skip(self, myself, msg, state))] @@ -170,7 +197,7 @@ impl Actor for Mempool { Msg::Input(tx) => { if state.transactions.len() < self.mempool_config.max_tx_count { - state.transactions.push(tx); + state.add_tx(&tx); } else { trace!("Mempool is full, dropping transaction"); } @@ -179,15 +206,20 @@ impl Actor for Mempool { Msg::TxStream { reply, num_txes, .. } => { - let txes = generate_txes( + let txes = generate_and_broadcast_txes( num_txes, self.test_config.tx_size.as_u64(), - self.mempool_config.gossip_batch_size, + &self.mempool_config, + state, &self.gossip_mempool, )?; reply.send(txes)?; } + + Msg::Update { tx_hashes } => { + tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); + } } Ok(()) @@ -204,10 +236,11 @@ impl Actor for Mempool { } } -fn generate_txes( +fn generate_and_broadcast_txes( count: usize, size: u64, - batch_size: usize, + config: &MempoolConfig, + state: &mut State, gossip_mempool: &GossipMempoolRef, ) -> Result, ActorProcessingErr> { let mut transactions = vec![]; @@ -220,10 +253,14 @@ fn generate_txes( let tx_bytes: Vec = (0..size).map(|_| rng.sample(range)).collect(); let tx = Transaction::new(tx_bytes); - // TODO: Remove tx-es on decided block + // Add transaction to state + if state.transactions.len() < config.max_tx_count { + state.add_tx(&tx); + } tx_batch.push(tx.clone()); - if batch_size > 0 && tx_batch.len() >= batch_size { + // Gossip tx-es to peers in batches + if config.gossip_batch_size > 0 && tx_batch.len() >= config.gossip_batch_size { let mempool_batch = MempoolTransactionBatch::new(std::mem::take(&mut tx_batch)); gossip_mempool.cast(GossipMempoolMsg::Broadcast(Channel::Mempool, mempool_batch))?; } diff --git a/code/crates/actors/src/value_builder.rs b/code/crates/actors/src/value_builder.rs index 14bf9c6d5..0a5328039 100644 --- a/code/crates/actors/src/value_builder.rs +++ b/code/crates/actors/src/value_builder.rs @@ -28,4 +28,6 @@ pub trait ValueBuilder: Send + Sync + 'static { height: Ctx::Height, round: Round, ) -> Option>; + + async fn decided_on_value(&mut self, height: Ctx::Height, round: Round, value: Ctx::Value); } diff --git a/code/crates/test-app/src/value_builder.rs b/code/crates/test-app/src/value_builder.rs index 2f3e6e2f2..ac213a3cb 100644 --- a/code/crates/test-app/src/value_builder.rs +++ b/code/crates/test-app/src/value_builder.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use std::marker::PhantomData; use std::time::{Duration, Instant}; @@ -79,9 +80,6 @@ impl ValueBuilder for TestValueBuilder { let mut sequence = 1; let mut block_size = 0; - // Prune the PartStore of all parts for heights lower than `height - 1` - self.part_store.prune(height.decrement().unwrap_or(height)); - loop { trace!( "Build local value for h:{}, r:{}, s:{}", @@ -205,9 +203,6 @@ impl ValueBuilder for TestValueBuilder { let round = block_part.round; let sequence = block_part.sequence; - // Prune all block parts for heights lower than `height - 1` - self.part_store.prune(height.decrement().unwrap_or(height)); - self.part_store.store(block_part.clone()); let all_parts = self.part_store.all_parts(height, round); @@ -245,12 +240,6 @@ impl ValueBuilder for TestValueBuilder { "Value Builder received last block part", ); - // FIXME: At this point we don't know if this block (and its txes) will be decided on. - // So these need to be moved after the block is decided. - self.metrics.block_tx_count.observe(tx_count as f64); - self.metrics.block_size_bytes.observe(block_size as f64); - self.metrics.finalized_txes.inc_by(tx_count as u64); - Some(ReceivedProposedValue { validator_address: last_part.validator_address, height: last_part.height, @@ -283,4 +272,47 @@ impl ValueBuilder for TestValueBuilder { valid: Validity::Valid, }) } + + #[tracing::instrument( + name = "value_builder.decided", + skip_all, + fields( + height = %height, + round = %round, + ) + )] + async fn decided_on_value(&mut self, height: Height, round: Round, value: Value) { + info!("Build and store block with hash {value:?}"); + + let all_parts = self.part_store.all_parts(height, round); + + // TODO - build the block from block parts and store it + + // Update metrics + let block_size: usize = all_parts.iter().map(|p| p.size_bytes()).sum(); + let tx_count: usize = all_parts + .iter() + .map(|p| p.content.tx_count().unwrap_or(0)) + .sum(); + + self.metrics.block_tx_count.observe(tx_count as f64); + self.metrics.block_size_bytes.observe(block_size as f64); + self.metrics.finalized_txes.inc_by(tx_count as u64); + + // Send Update to mempool to remove all the tx-es included in the block. + let mut tx_hashes = vec![]; + for part in all_parts { + if let Content::TxBatch(transaction_batch) = part.content.as_ref() { + tx_hashes.extend(transaction_batch.transactions().iter().map(|tx| { + let mut hash = DefaultHasher::new(); + tx.0.hash(&mut hash); + hash.finish() + })); + } + } + let _ = self.tx_streamer.cast(MempoolMsg::Update { tx_hashes }); + + // Prune the PartStore of all parts for heights lower than `height - 1` + self.part_store.prune(height.decrement().unwrap_or(height)); + } }