From ca80b37171bfa1e8c0597619d8d51f6f304f1913 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Sun, 16 Jun 2024 22:52:49 -0400 Subject: [PATCH 1/9] mempool: check txs in isolation --- crates/core/app/src/app/mod.rs | 31 +++++++++++++++++++-------- crates/core/app/src/server/mempool.rs | 12 ++++++----- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index ec1ccff920..328139ace8 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -183,6 +183,27 @@ impl App { let mut proposal_size_bytes = 0u64; let max_proposal_size_bytes = proposal.max_tx_bytes as u64; + + // First, we filter the proposal to fit within the block limit. + for tx in proposal.txs { + let transaction_size = tx.len() as u64; + + // We compute the total proposal size if we were to include this transaction. + let total_with_tx = proposal_size_bytes.saturating_add(transaction_size); + + if total_with_tx >= max_proposal_size_bytes { + break; + } + + match self.deliver_tx_bytes(&tx).await { + Ok(_) => { + proposal_size_bytes = total_with_tx; + included_txs.push(tx) + } + Err(_) => continue, + } + } + // The CometBFT spec requires that application "MUST" check that the list // of transactions in the proposal does not exceed `max_tx_bytes`. And shed // excess transactions so as to be "as close as possible" to the target @@ -200,15 +221,7 @@ impl App { // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_comet_expected_behavior.md#adapting-existing-applications-that-use-abci // - Application requirements: // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements - for tx in proposal.txs { - let tx_len_bytes = tx.len() as u64; - proposal_size_bytes = proposal_size_bytes.saturating_add(tx_len_bytes); - if proposal_size_bytes <= max_proposal_size_bytes { - included_txs.push(tx); - } else { - break; - } - } + tracing::debug!( "finished processing PrepareProposal, including {}/{} candidate transactions", included_txs.len(), diff --git a/crates/core/app/src/server/mempool.rs b/crates/core/app/src/server/mempool.rs index daccf449bb..7ad6a67b98 100644 --- a/crates/core/app/src/server/mempool.rs +++ b/crates/core/app/src/server/mempool.rs @@ -26,7 +26,7 @@ use crate::{app::App, metrics}; /// blocks we want. pub struct Mempool { queue: mpsc::Receiver>, - app: App, + snapshot: Snapshot, rx_snapshot: watch::Receiver, } @@ -35,12 +35,12 @@ impl Mempool { storage: Storage, queue: mpsc::Receiver>, ) -> Self { - let app = App::new(storage.latest_snapshot()); + let snapshot = storage.latest_snapshot(); let snapshot_rx = storage.subscribe(); Self { queue, - app, + snapshot, rx_snapshot: snapshot_rx, } } @@ -56,7 +56,9 @@ impl Mempool { CheckTxKind::Recheck => "recheck", }; - match self.app.deliver_tx_bytes(tx_bytes.as_ref()).await { + let mut app = App::new(self.snapshot.clone()); + + match app.deliver_tx_bytes(tx_bytes.as_ref()).await { Ok(events) => { let elapsed = start.elapsed(); tracing::info!(?elapsed, "tx accepted"); @@ -91,7 +93,7 @@ impl Mempool { if let Ok(()) = change { let snapshot = self.rx_snapshot.borrow().clone(); tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state"); - self.app = App::new(snapshot); + self.snapshot= snapshot; } else { // TODO: what triggers this, now that the channel is owned by the // shared Storage instance, rather than the consensus worker? From f740113171dc3bacf24aefc46c3742b0b96c3568 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Sun, 16 Jun 2024 22:52:49 -0400 Subject: [PATCH 2/9] app: only build valid blocks --- crates/core/app/src/app/mod.rs | 4 +-- crates/core/app/src/server/consensus.rs | 34 ++++++++++++++----------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index 328139ace8..4acc9969db 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -576,10 +576,10 @@ impl App { jmt_root } - pub fn tendermint_validator_updates(&self) -> Vec { + pub fn cometbft_validator_updates(&self) -> Vec { self.state .cometbft_validator_updates() - // If the tendermint validator updates are not set, we return an empty + // If the cometbft validator updates are not set, we return an empty // update set, signaling no change to Tendermint. .unwrap_or_default() } diff --git a/crates/core/app/src/server/consensus.rs b/crates/core/app/src/server/consensus.rs index d687baf507..4c0a219e19 100644 --- a/crates/core/app/src/server/consensus.rs +++ b/crates/core/app/src/server/consensus.rs @@ -70,6 +70,18 @@ impl Consensus { .await .expect("init_chain must succeed"), ), + Request::PrepareProposal(proposal) => Response::PrepareProposal( + self.prepare_proposal(proposal) + .instrument(span) + .await + .expect("prepare proposal must succeed"), + ), + Request::ProcessProposal(proposal) => Response::ProcessProposal( + self.process_proposal(proposal) + .instrument(span) + .await + .expect("process proposal must succeed"), + ), Request::BeginBlock(begin_block) => Response::BeginBlock( self.begin_block(begin_block) .instrument(span) @@ -88,18 +100,6 @@ impl Consensus { .await .expect("commit must succeed"), ), - Request::PrepareProposal(proposal) => Response::PrepareProposal( - self.prepare_proposal(proposal) - .instrument(span) - .await - .expect("prepare proposal must succeed"), - ), - Request::ProcessProposal(proposal) => Response::ProcessProposal( - self.process_proposal(proposal) - .instrument(span) - .await - .expect("process proposal must succeed"), - ), })); } Ok(()) @@ -123,7 +123,7 @@ impl Consensus { // to be provided inside the initial app genesis state (`GenesisAppState`). Returning those // validators in InitChain::Response tells Tendermint that they are the initial validator // set. See https://docs.tendermint.com/master/spec/abci/abci.html#initchain - let validators = self.app.tendermint_validator_updates(); + let validators = self.app.cometbft_validator_updates(); let app_hash = match &app_state { crate::genesis::AppState::Checkpoint(h) => { @@ -162,7 +162,11 @@ impl Consensus { proposal: request::PrepareProposal, ) -> Result { tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal"); - Ok(self.app.prepare_proposal(proposal).await) + // We prepare a proposal against an isolated fork of the application state. + let mut tmp_app = App::new(self.storage.latest_snapshot()); + // Once we are done, we discard it so that the application state doesn't get corrupted + // if another round of consensus is required because the proposal fails to finalize. + Ok(tmp_app.prepare_proposal(proposal).await) } async fn process_proposal( @@ -218,7 +222,7 @@ impl Consensus { // validators and voting power. This must be the last step performed, // after all voting power calculations and validator state transitions have // been completed. - let validator_updates = self.app.tendermint_validator_updates(); + let validator_updates = self.app.cometbft_validator_updates(); tracing::debug!( ?validator_updates, From da7a79f28fd01c2be8c805fd2229aad962b08c5c Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 17 Jun 2024 10:31:22 -0400 Subject: [PATCH 3/9] penumbra: process proposal does block prevalidation --- crates/core/app/src/app/mod.rs | 51 +++++++++++++++---------- crates/core/app/src/server/consensus.rs | 5 ++- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index 4acc9969db..b7cf250604 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -181,20 +181,26 @@ impl App { num_candidate_txs ); - let mut proposal_size_bytes = 0u64; + // This is a node controlled parameter that is different from the homonymous + // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed that + // limit, presuming that a subset of those transactions will be shed. + // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements let max_proposal_size_bytes = proposal.max_tx_bytes as u64; + // Tracking the size of the proposal + let mut proposal_size_bytes = 0u64; - // First, we filter the proposal to fit within the block limit. for tx in proposal.txs { let transaction_size = tx.len() as u64; // We compute the total proposal size if we were to include this transaction. let total_with_tx = proposal_size_bytes.saturating_add(transaction_size); + // First, we filter proposals to fit within the block limit. if total_with_tx >= max_proposal_size_bytes { break; } + // Then, we make sure to only include successful transactions. match self.deliver_tx_bytes(&tx).await { Ok(_) => { proposal_size_bytes = total_with_tx; @@ -204,24 +210,6 @@ impl App { } } - // The CometBFT spec requires that application "MUST" check that the list - // of transactions in the proposal does not exceed `max_tx_bytes`. And shed - // excess transactions so as to be "as close as possible" to the target - // parameter. - // - // A couple things to note about this: - // - `max_tx_bytes` here is an operator controlled parameter - // - it is different than the homonymous mempool configuration - // parameter controlling the maximum size of a single tx. - // - the motivation for this check is that even though `PrepareProposal` - // is only called by the proposer process, CometBFT might not honor - // the target, presuming that some transactions might be yanked. - // For more details, see the specification: - // - Adapting existing applications to use ABCI+: - // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_comet_expected_behavior.md#adapting-existing-applications-that-use-abci - // - Application requirements: - // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements - tracing::debug!( "finished processing PrepareProposal, including {}/{} candidate transactions", included_txs.len(), @@ -230,11 +218,32 @@ impl App { response::PrepareProposal { txs: included_txs } } + #[instrument(skip_all, ret, level = "debug")] pub async fn process_proposal( &mut self, proposal: request::ProcessProposal, ) -> response::ProcessProposal { - tracing::debug!(?proposal, "processing proposal"); + tracing::debug!( + height = proposal.height.value(), + proposer = ?proposal.proposer_address, + proposal_hash = ?proposal.hash, + "processing proposal" + ); + let max_proposal_size = 1 << 20 - 1; + let mut total_proposal_size = 0u64; + for tx in proposal.txs { + let tx_size = tx.len() as u64; + total_proposal_size = total_proposal_size.saturating_add(tx_size); + if total_proposal_size >= max_proposal_size { + return response::ProcessProposal::Reject; + } + + match self.deliver_tx_bytes(&tx).await { + Ok(_) => continue, + Err(_) => return response::ProcessProposal::Reject, + } + } + response::ProcessProposal::Accept } diff --git a/crates/core/app/src/server/consensus.rs b/crates/core/app/src/server/consensus.rs index 4c0a219e19..5f4ecdca7b 100644 --- a/crates/core/app/src/server/consensus.rs +++ b/crates/core/app/src/server/consensus.rs @@ -174,7 +174,10 @@ impl Consensus { proposal: request::ProcessProposal, ) -> Result { tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, hash = %proposal.hash, "processing proposal"); - Ok(self.app.process_proposal(proposal).await) + // We process the propopsal in an isolated state fork. Eventually, we should cache this work and + // re-use it when processing a `FinalizeBlock` message (starting in `0.38.x`). + let mut tmp_app = App::new(self.storage.latest_snapshot()); + Ok(tmp_app.process_proposal(proposal).await) } async fn begin_block( From 73da8c4b8257047a556bea6a7ef7d907967b6eab Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 17 Jun 2024 10:49:04 -0400 Subject: [PATCH 4/9] app(mempool): update comments and description --- crates/core/app/src/server/mempool.rs | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/crates/core/app/src/server/mempool.rs b/crates/core/app/src/server/mempool.rs index 7ad6a67b98..2bf48fd937 100644 --- a/crates/core/app/src/server/mempool.rs +++ b/crates/core/app/src/server/mempool.rs @@ -12,18 +12,7 @@ use tracing::Instrument; use crate::{app::App, metrics}; -/// When using ABCI, we can't control block proposal directly, so we could -/// potentially end up creating blocks with mutually incompatible transactions. -/// While we'd reject one of them during execution, it's nicer to try to filter -/// them out at the mempool stage. Currently, the way we do this is by having -/// the mempool worker maintain an ephemeral fork of the entire execution state, -/// and execute incoming transactions against the fork. This prevents -/// conflicting transactions in the local mempool, since we'll update the fork, -/// then reject the second transaction against the forked state. When we learn a -/// new state has been committed, we discard and recreate the ephemeral fork. -/// -/// After switching to ABCI++, we can eliminate this mechanism and just build -/// blocks we want. +/// A mempool service that applies transaction checks against an isolated application fork. pub struct Mempool { queue: mpsc::Receiver>, snapshot: Snapshot, @@ -92,13 +81,10 @@ impl Mempool { change = self.rx_snapshot.changed() => { if let Ok(()) = change { let snapshot = self.rx_snapshot.borrow().clone(); - tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state"); - self.snapshot= snapshot; + tracing::debug!(height = ?snapshot.version(), "mempool has rewired to use the latest snapshot"); + self.snapshot = snapshot; } else { - // TODO: what triggers this, now that the channel is owned by the - // shared Storage instance, rather than the consensus worker? tracing::info!("state notification channel closed, shutting down"); - // old: The consensus worker shut down, we should too. return Ok(()); } } From 6f1a37c27ff3b7f4f8c386674c0317c72327b4ab Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 17 Jun 2024 12:52:53 -0400 Subject: [PATCH 5/9] app(mempool): extract block payload size into an App-level parameter --- crates/core/app/src/app/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index b7cf250604..e1e2c56ec0 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -51,6 +51,9 @@ pub mod state_key; /// The inter-block state being written to by the application. type InterBlockState = Arc>; +/// The maximum size of a CometBFT block payload. +const MAX_BLOCK_PAYLOAD_SIZE_BYTES: u64 = (1 << 20) - 1; + /// The Penumbra application, written as a bundle of [`Component`]s. /// /// The [`App`] is not a [`Component`], but @@ -184,7 +187,7 @@ impl App { // This is a node controlled parameter that is different from the homonymous // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed that // limit, presuming that a subset of those transactions will be shed. - // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements + // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md let max_proposal_size_bytes = proposal.max_tx_bytes as u64; // Tracking the size of the proposal let mut proposal_size_bytes = 0u64; @@ -229,12 +232,11 @@ impl App { proposal_hash = ?proposal.hash, "processing proposal" ); - let max_proposal_size = 1 << 20 - 1; let mut total_proposal_size = 0u64; for tx in proposal.txs { let tx_size = tx.len() as u64; total_proposal_size = total_proposal_size.saturating_add(tx_size); - if total_proposal_size >= max_proposal_size { + if total_proposal_size >= MAX_BLOCK_PAYLOAD_SIZE_BYTES { return response::ProcessProposal::Reject; } From 723b61b4a8f97b7c16f0d0fc856688d78f8e0f0a Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 17 Jun 2024 13:57:44 -0400 Subject: [PATCH 6/9] app: s/this/that Co-authored-by: dynst <148708712+dynst@users.noreply.github.com> Signed-off-by: Erwan Or --- crates/core/app/src/app/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index e1e2c56ec0..b4586b8224 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -185,7 +185,7 @@ impl App { ); // This is a node controlled parameter that is different from the homonymous - // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed that + // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this // limit, presuming that a subset of those transactions will be shed. // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md let max_proposal_size_bytes = proposal.max_tx_bytes as u64; From 48568433c195e69443ba9a4097fd942da5948aeb Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 19 Jun 2024 16:11:00 -0400 Subject: [PATCH 7/9] penumbra: harden proposal validation --- crates/core/app/src/app/mod.rs | 51 ++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index b4586b8224..24b7b4217d 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -51,8 +51,14 @@ pub mod state_key; /// The inter-block state being written to by the application. type InterBlockState = Arc>; -/// The maximum size of a CometBFT block payload. -const MAX_BLOCK_PAYLOAD_SIZE_BYTES: u64 = (1 << 20) - 1; +/// The maximum size of a CometBFT block payload (1MB) +const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024; + +/// The maximum size of a single individual transaction (30KB). +const MAX_TRANSACTION_SIZE_BYTES: usize = 30 * 1024; + +/// The maximum size of the evidence portion of a block (30KB). +const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024; /// The Penumbra application, written as a bundle of [`Component`]s. /// @@ -213,11 +219,16 @@ impl App { } } + // The evidence payload is validated by Comet, we can lean on three guarantees: + // 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES` + // 2. Expired evidence is filtered + // 3. Evidence is valid. tracing::debug!( "finished processing PrepareProposal, including {}/{} candidate transactions", included_txs.len(), num_candidate_txs ); + response::PrepareProposal { txs: included_txs } } @@ -232,11 +243,39 @@ impl App { proposal_hash = ?proposal.hash, "processing proposal" ); - let mut total_proposal_size = 0u64; + + // Proposal validation: + // 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`] + // 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`] + // 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`] + // 4. Each transaction applies successfully. + let mut evidence_buffer: Vec = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES); + let mut bytes_tracker = 0usize; + + for evidence in proposal.misbehavior { + let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into(); + let evidence_size = match proto_evidence.encode(&mut evidence_buffer) { + Ok(_) => evidence_buffer.len(), + Err(_) => return response::ProcessProposal::Reject, + }; + bytes_tracker = bytes_tracker.saturating_add(evidence_size); + if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES { + return response::ProcessProposal::Reject; + } + } + + // The evidence payload is valid, now we validate the block txs + // payload: they MUST be below the tx size limit, and apply cleanly on + // state fork. + let mut total_txs_payload_size = 0usize; for tx in proposal.txs { - let tx_size = tx.len() as u64; - total_proposal_size = total_proposal_size.saturating_add(tx_size); - if total_proposal_size >= MAX_BLOCK_PAYLOAD_SIZE_BYTES { + let tx_size = tx.len(); + if tx_size > MAX_TRANSACTION_SIZE_BYTES { + return response::ProcessProposal::Reject; + } + + total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size); + if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES { return response::ProcessProposal::Reject; } From 4586b2549b8e780ae4ccb03513aa0dd7c88bcc16 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 19 Jun 2024 19:05:22 -0400 Subject: [PATCH 8/9] app: clear evidence buffer and add comment --- crates/core/app/src/app/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index 24b7b4217d..31f41fcde0 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -253,6 +253,9 @@ impl App { let mut bytes_tracker = 0usize; for evidence in proposal.misbehavior { + // This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total + // but a single evidence datum should be an order of magnitude smaller than that. + evidence_buffer.clear(); let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into(); let evidence_size = match proto_evidence.encode(&mut evidence_buffer) { Ok(_) => evidence_buffer.len(), From a222cd8bfdb348ac0733120c12d37000a8890097 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 24 Jun 2024 16:55:31 -0400 Subject: [PATCH 9/9] mempool: remove polling for state updates --- crates/core/app/src/server/mempool.rs | 57 +++++++++------------------ 1 file changed, 19 insertions(+), 38 deletions(-) diff --git a/crates/core/app/src/server/mempool.rs b/crates/core/app/src/server/mempool.rs index 2bf48fd937..64818f9f2a 100644 --- a/crates/core/app/src/server/mempool.rs +++ b/crates/core/app/src/server/mempool.rs @@ -1,12 +1,12 @@ use anyhow::Result; -use cnidarium::{Snapshot, Storage}; +use cnidarium::Storage; use tendermint::v0_37::abci::{ request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp, MempoolRequest as Request, MempoolResponse as Response, }; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tower_actor::Message; use tracing::Instrument; @@ -15,8 +15,7 @@ use crate::{app::App, metrics}; /// A mempool service that applies transaction checks against an isolated application fork. pub struct Mempool { queue: mpsc::Receiver>, - snapshot: Snapshot, - rx_snapshot: watch::Receiver, + storage: Storage, } impl Mempool { @@ -24,14 +23,7 @@ impl Mempool { storage: Storage, queue: mpsc::Receiver>, ) -> Self { - let snapshot = storage.latest_snapshot(); - let snapshot_rx = storage.subscribe(); - - Self { - queue, - snapshot, - rx_snapshot: snapshot_rx, - } + Self { queue, storage } } pub async fn check_tx(&mut self, req: Request) -> Result { @@ -45,7 +37,7 @@ impl Mempool { CheckTxKind::Recheck => "recheck", }; - let mut app = App::new(self.snapshot.clone()); + let mut app = App::new(self.storage.latest_snapshot()); match app.deliver_tx_bytes(tx_bytes.as_ref()).await { Ok(events) => { @@ -72,31 +64,20 @@ impl Mempool { } pub async fn run(mut self) -> Result<(), tower::BoxError> { - loop { - tokio::select! { - // Use a biased select to poll for height changes *before* polling for messages. - biased; - // Check whether the height has changed, which requires us to throw away our - // ephemeral mempool state, and create a new one based on the new state. - change = self.rx_snapshot.changed() => { - if let Ok(()) = change { - let snapshot = self.rx_snapshot.borrow().clone(); - tracing::debug!(height = ?snapshot.version(), "mempool has rewired to use the latest snapshot"); - self.snapshot = snapshot; - } else { - tracing::info!("state notification channel closed, shutting down"); - return Ok(()); - } - } - message = self.queue.recv() => { - if let Some(Message {req, rsp_sender, span }) = message { - let _ = rsp_sender.send(self.check_tx(req).instrument(span).await); - } else { - // The queue is closed, so we're done. - return Ok(()); - } - } - } + tracing::info!("mempool service started"); + while let Some(Message { + req, + rsp_sender, + span, + // We could perform `CheckTx` asynchronously, and poll many + // entries from the queue: + // See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many + }) = self.queue.recv().await + { + let result = self.check_tx(req).instrument(span).await; + let _ = rsp_sender.send(result); } + tracing::info!("mempool service stopped"); + Ok(()) } }