Skip to content

Commit

Permalink
chore(code): Inform the host when a block is decided and simulate hos…
Browse files Browse the repository at this point in the history
…t actions (#231)

* Inform the host when a block is decided.

Clean the block part store.

Update metrics here.

* Don't prune anymore from get value and block parts

* Remove mempool txes that have been included inthe decided block

* Remove comment

---------

Co-authored-by: Romain Ruetschi <[email protected]>
  • Loading branch information
ancazamfir and romac authored Jun 12, 2024
1 parent 738f899 commit fa9ef0a
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 25 deletions.
6 changes: 6 additions & 0 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,12 @@ where
value.id()
);

self.host.cast(HostMsg::DecidedOnValue {
height,
round,
value,
})?;

self.metrics.block_end();
self.metrics.finalized_blocks.inc();
self.metrics
Expand Down
19 changes: 19 additions & 0 deletions code/crates/actors/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ pub enum Msg<Ctx: Context> {
height: Ctx::Height,
reply_to: RpcReplyPort<Ctx::ValidatorSet>,
},

// Decided value
DecidedOnValue {
height: Ctx::Height,
round: Round,
value: Ctx::Value,
},
}

pub struct State<Ctx: Context> {
Expand Down Expand Up @@ -205,6 +212,18 @@ impl<Ctx: Context> Actor for Host<Ctx> {
reply_to.send(value)?;
}

Msg::DecidedOnValue {
height,
round,
value,
} => {
info!("what");
let _v = state
.value_builder
.decided_on_value(height, round, value)
.await;
}

Msg::GetValidatorSet {
height: _,
reply_to,
Expand Down
63 changes: 50 additions & 13 deletions code/crates/actors/src/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -49,12 +50,41 @@ pub enum Msg {
num_txes: usize,
reply: RpcReplyPort<Vec<Transaction>>,
},
Update {
tx_hashes: Vec<u64>,
},
}

#[allow(dead_code)]
pub struct State {
msg_queue: VecDeque<Msg>,
transactions: Vec<Transaction>,
pub transactions: BTreeMap<u64, Transaction>,
}

impl State {
pub fn new() -> Self {
Self {
msg_queue: VecDeque::new(),
transactions: BTreeMap::new(),
}
}

pub fn add_tx(&mut self, tx: &Transaction) {
let mut hash = DefaultHasher::new();
tx.0.hash(&mut hash);
let key = hash.finish();
self.transactions.entry(key).or_insert(tx.clone());
}

pub fn remove_tx(&mut self, hash: &u64) {
self.transactions.remove_entry(hash);
}
}

impl Default for State {
fn default() -> Self {
Self::new()
}
}

impl Mempool {
Expand Down Expand Up @@ -150,10 +180,7 @@ impl Actor for Mempool {
self.gossip_mempool
.cast(GossipMempoolMsg::Subscribe(forward))?;

Ok(State {
msg_queue: VecDeque::new(),
transactions: vec![],
})
Ok(State::new())
}

#[tracing::instrument(name = "mempool", skip(self, myself, msg, state))]
Expand All @@ -170,7 +197,7 @@ impl Actor for Mempool {

Msg::Input(tx) => {
if state.transactions.len() < self.mempool_config.max_tx_count {
state.transactions.push(tx);
state.add_tx(&tx);
} else {
trace!("Mempool is full, dropping transaction");
}
Expand All @@ -179,15 +206,20 @@ impl Actor for Mempool {
Msg::TxStream {
reply, num_txes, ..
} => {
let txes = generate_txes(
let txes = generate_and_broadcast_txes(
num_txes,
self.test_config.tx_size.as_u64(),
self.mempool_config.gossip_batch_size,
&self.mempool_config,
state,
&self.gossip_mempool,
)?;

reply.send(txes)?;
}

Msg::Update { tx_hashes } => {
tx_hashes.iter().for_each(|hash| state.remove_tx(hash));
}
}

Ok(())
Expand All @@ -204,10 +236,11 @@ impl Actor for Mempool {
}
}

fn generate_txes(
fn generate_and_broadcast_txes(
count: usize,
size: u64,
batch_size: usize,
config: &MempoolConfig,
state: &mut State,
gossip_mempool: &GossipMempoolRef,
) -> Result<Vec<Transaction>, ActorProcessingErr> {
let mut transactions = vec![];
Expand All @@ -220,10 +253,14 @@ fn generate_txes(
let tx_bytes: Vec<u8> = (0..size).map(|_| rng.sample(range)).collect();
let tx = Transaction::new(tx_bytes);

// TODO: Remove tx-es on decided block
// Add transaction to state
if state.transactions.len() < config.max_tx_count {
state.add_tx(&tx);
}
tx_batch.push(tx.clone());

if batch_size > 0 && tx_batch.len() >= batch_size {
// Gossip tx-es to peers in batches
if config.gossip_batch_size > 0 && tx_batch.len() >= config.gossip_batch_size {
let mempool_batch = MempoolTransactionBatch::new(std::mem::take(&mut tx_batch));
gossip_mempool.cast(GossipMempoolMsg::Broadcast(Channel::Mempool, mempool_batch))?;
}
Expand Down
2 changes: 2 additions & 0 deletions code/crates/actors/src/value_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ pub trait ValueBuilder<Ctx: Context>: Send + Sync + 'static {
height: Ctx::Height,
round: Round,
) -> Option<ReceivedProposedValue<Ctx>>;

async fn decided_on_value(&mut self, height: Ctx::Height, round: Round, value: Ctx::Value);
}
56 changes: 44 additions & 12 deletions code/crates/test-app/src/value_builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use std::marker::PhantomData;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -79,9 +80,6 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
let mut sequence = 1;
let mut block_size = 0;

// Prune the PartStore of all parts for heights lower than `height - 1`
self.part_store.prune(height.decrement().unwrap_or(height));

loop {
trace!(
"Build local value for h:{}, r:{}, s:{}",
Expand Down Expand Up @@ -205,9 +203,6 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
let round = block_part.round;
let sequence = block_part.sequence;

// Prune all block parts for heights lower than `height - 1`
self.part_store.prune(height.decrement().unwrap_or(height));

self.part_store.store(block_part.clone());
let all_parts = self.part_store.all_parts(height, round);

Expand Down Expand Up @@ -245,12 +240,6 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
"Value Builder received last block part",
);

// FIXME: At this point we don't know if this block (and its txes) will be decided on.
// So these need to be moved after the block is decided.
self.metrics.block_tx_count.observe(tx_count as f64);
self.metrics.block_size_bytes.observe(block_size as f64);
self.metrics.finalized_txes.inc_by(tx_count as u64);

Some(ReceivedProposedValue {
validator_address: last_part.validator_address,
height: last_part.height,
Expand Down Expand Up @@ -283,4 +272,47 @@ impl ValueBuilder<TestContext> for TestValueBuilder<TestContext> {
valid: Validity::Valid,
})
}

#[tracing::instrument(
name = "value_builder.decided",
skip_all,
fields(
height = %height,
round = %round,
)
)]
async fn decided_on_value(&mut self, height: Height, round: Round, value: Value) {
info!("Build and store block with hash {value:?}");

let all_parts = self.part_store.all_parts(height, round);

// TODO - build the block from block parts and store it

// Update metrics
let block_size: usize = all_parts.iter().map(|p| p.size_bytes()).sum();
let tx_count: usize = all_parts
.iter()
.map(|p| p.content.tx_count().unwrap_or(0))
.sum();

self.metrics.block_tx_count.observe(tx_count as f64);
self.metrics.block_size_bytes.observe(block_size as f64);
self.metrics.finalized_txes.inc_by(tx_count as u64);

// Send Update to mempool to remove all the tx-es included in the block.
let mut tx_hashes = vec![];
for part in all_parts {
if let Content::TxBatch(transaction_batch) = part.content.as_ref() {
tx_hashes.extend(transaction_batch.transactions().iter().map(|tx| {
let mut hash = DefaultHasher::new();
tx.0.hash(&mut hash);
hash.finish()
}));
}
}
let _ = self.tx_streamer.cast(MempoolMsg::Update { tx_hashes });

// Prune the PartStore of all parts for heights lower than `height - 1`
self.part_store.prune(height.decrement().unwrap_or(height));
}
}

0 comments on commit fa9ef0a

Please sign in to comment.