diff --git a/crates/core/app/src/app/mod.rs b/crates/core/app/src/app/mod.rs index ec1ccff920..31f41fcde0 100644 --- a/crates/core/app/src/app/mod.rs +++ b/crates/core/app/src/app/mod.rs @@ -51,6 +51,15 @@ pub mod state_key; /// The inter-block state being written to by the application. type InterBlockState = Arc>; +/// The maximum size of a CometBFT block payload (1MB) +const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024; + +/// The maximum size of a single individual transaction (30KB). +const MAX_TRANSACTION_SIZE_BYTES: usize = 30 * 1024; + +/// The maximum size of the evidence portion of a block (30KB). +const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024; + /// The Penumbra application, written as a bundle of [`Component`]s. /// /// The [`App`] is not a [`Component`], but @@ -181,47 +190,104 @@ impl App { num_candidate_txs ); - let mut proposal_size_bytes = 0u64; + // This is a node controlled parameter that is different from the homonymous + // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this + // limit, presuming that a subset of those transactions will be shed. + // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md let max_proposal_size_bytes = proposal.max_tx_bytes as u64; - // The CometBFT spec requires that application "MUST" check that the list - // of transactions in the proposal does not exceed `max_tx_bytes`. And shed - // excess transactions so as to be "as close as possible" to the target - // parameter. - // - // A couple things to note about this: - // - `max_tx_bytes` here is an operator controlled parameter - // - it is different than the homonymous mempool configuration - // parameter controlling the maximum size of a single tx. - // - the motivation for this check is that even though `PrepareProposal` - // is only called by the proposer process, CometBFT might not honor - // the target, presuming that some transactions might be yanked. - // For more details, see the specification: - // - Adapting existing applications to use ABCI+: - // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_comet_expected_behavior.md#adapting-existing-applications-that-use-abci - // - Application requirements: - // https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements + // Tracking the size of the proposal + let mut proposal_size_bytes = 0u64; + for tx in proposal.txs { - let tx_len_bytes = tx.len() as u64; - proposal_size_bytes = proposal_size_bytes.saturating_add(tx_len_bytes); - if proposal_size_bytes <= max_proposal_size_bytes { - included_txs.push(tx); - } else { + let transaction_size = tx.len() as u64; + + // We compute the total proposal size if we were to include this transaction. + let total_with_tx = proposal_size_bytes.saturating_add(transaction_size); + + // First, we filter proposals to fit within the block limit. + if total_with_tx >= max_proposal_size_bytes { break; } + + // Then, we make sure to only include successful transactions. + match self.deliver_tx_bytes(&tx).await { + Ok(_) => { + proposal_size_bytes = total_with_tx; + included_txs.push(tx) + } + Err(_) => continue, + } } + + // The evidence payload is validated by Comet, we can lean on three guarantees: + // 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES` + // 2. Expired evidence is filtered + // 3. Evidence is valid. tracing::debug!( "finished processing PrepareProposal, including {}/{} candidate transactions", included_txs.len(), num_candidate_txs ); + response::PrepareProposal { txs: included_txs } } + #[instrument(skip_all, ret, level = "debug")] pub async fn process_proposal( &mut self, proposal: request::ProcessProposal, ) -> response::ProcessProposal { - tracing::debug!(?proposal, "processing proposal"); + tracing::debug!( + height = proposal.height.value(), + proposer = ?proposal.proposer_address, + proposal_hash = ?proposal.hash, + "processing proposal" + ); + + // Proposal validation: + // 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`] + // 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`] + // 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`] + // 4. Each transaction applies successfully. + let mut evidence_buffer: Vec = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES); + let mut bytes_tracker = 0usize; + + for evidence in proposal.misbehavior { + // This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total + // but a single evidence datum should be an order of magnitude smaller than that. + evidence_buffer.clear(); + let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into(); + let evidence_size = match proto_evidence.encode(&mut evidence_buffer) { + Ok(_) => evidence_buffer.len(), + Err(_) => return response::ProcessProposal::Reject, + }; + bytes_tracker = bytes_tracker.saturating_add(evidence_size); + if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES { + return response::ProcessProposal::Reject; + } + } + + // The evidence payload is valid, now we validate the block txs + // payload: they MUST be below the tx size limit, and apply cleanly on + // state fork. + let mut total_txs_payload_size = 0usize; + for tx in proposal.txs { + let tx_size = tx.len(); + if tx_size > MAX_TRANSACTION_SIZE_BYTES { + return response::ProcessProposal::Reject; + } + + total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size); + if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES { + return response::ProcessProposal::Reject; + } + + match self.deliver_tx_bytes(&tx).await { + Ok(_) => continue, + Err(_) => return response::ProcessProposal::Reject, + } + } + response::ProcessProposal::Accept } @@ -563,10 +629,10 @@ impl App { jmt_root } - pub fn tendermint_validator_updates(&self) -> Vec { + pub fn cometbft_validator_updates(&self) -> Vec { self.state .cometbft_validator_updates() - // If the tendermint validator updates are not set, we return an empty + // If the cometbft validator updates are not set, we return an empty // update set, signaling no change to Tendermint. .unwrap_or_default() } diff --git a/crates/core/app/src/server/consensus.rs b/crates/core/app/src/server/consensus.rs index d687baf507..5f4ecdca7b 100644 --- a/crates/core/app/src/server/consensus.rs +++ b/crates/core/app/src/server/consensus.rs @@ -70,6 +70,18 @@ impl Consensus { .await .expect("init_chain must succeed"), ), + Request::PrepareProposal(proposal) => Response::PrepareProposal( + self.prepare_proposal(proposal) + .instrument(span) + .await + .expect("prepare proposal must succeed"), + ), + Request::ProcessProposal(proposal) => Response::ProcessProposal( + self.process_proposal(proposal) + .instrument(span) + .await + .expect("process proposal must succeed"), + ), Request::BeginBlock(begin_block) => Response::BeginBlock( self.begin_block(begin_block) .instrument(span) @@ -88,18 +100,6 @@ impl Consensus { .await .expect("commit must succeed"), ), - Request::PrepareProposal(proposal) => Response::PrepareProposal( - self.prepare_proposal(proposal) - .instrument(span) - .await - .expect("prepare proposal must succeed"), - ), - Request::ProcessProposal(proposal) => Response::ProcessProposal( - self.process_proposal(proposal) - .instrument(span) - .await - .expect("process proposal must succeed"), - ), })); } Ok(()) @@ -123,7 +123,7 @@ impl Consensus { // to be provided inside the initial app genesis state (`GenesisAppState`). Returning those // validators in InitChain::Response tells Tendermint that they are the initial validator // set. See https://docs.tendermint.com/master/spec/abci/abci.html#initchain - let validators = self.app.tendermint_validator_updates(); + let validators = self.app.cometbft_validator_updates(); let app_hash = match &app_state { crate::genesis::AppState::Checkpoint(h) => { @@ -162,7 +162,11 @@ impl Consensus { proposal: request::PrepareProposal, ) -> Result { tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal"); - Ok(self.app.prepare_proposal(proposal).await) + // We prepare a proposal against an isolated fork of the application state. + let mut tmp_app = App::new(self.storage.latest_snapshot()); + // Once we are done, we discard it so that the application state doesn't get corrupted + // if another round of consensus is required because the proposal fails to finalize. + Ok(tmp_app.prepare_proposal(proposal).await) } async fn process_proposal( @@ -170,7 +174,10 @@ impl Consensus { proposal: request::ProcessProposal, ) -> Result { tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, hash = %proposal.hash, "processing proposal"); - Ok(self.app.process_proposal(proposal).await) + // We process the propopsal in an isolated state fork. Eventually, we should cache this work and + // re-use it when processing a `FinalizeBlock` message (starting in `0.38.x`). + let mut tmp_app = App::new(self.storage.latest_snapshot()); + Ok(tmp_app.process_proposal(proposal).await) } async fn begin_block( @@ -218,7 +225,7 @@ impl Consensus { // validators and voting power. This must be the last step performed, // after all voting power calculations and validator state transitions have // been completed. - let validator_updates = self.app.tendermint_validator_updates(); + let validator_updates = self.app.cometbft_validator_updates(); tracing::debug!( ?validator_updates, diff --git a/crates/core/app/src/server/mempool.rs b/crates/core/app/src/server/mempool.rs index daccf449bb..64818f9f2a 100644 --- a/crates/core/app/src/server/mempool.rs +++ b/crates/core/app/src/server/mempool.rs @@ -1,33 +1,21 @@ use anyhow::Result; -use cnidarium::{Snapshot, Storage}; +use cnidarium::Storage; use tendermint::v0_37::abci::{ request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp, MempoolRequest as Request, MempoolResponse as Response, }; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tower_actor::Message; use tracing::Instrument; use crate::{app::App, metrics}; -/// When using ABCI, we can't control block proposal directly, so we could -/// potentially end up creating blocks with mutually incompatible transactions. -/// While we'd reject one of them during execution, it's nicer to try to filter -/// them out at the mempool stage. Currently, the way we do this is by having -/// the mempool worker maintain an ephemeral fork of the entire execution state, -/// and execute incoming transactions against the fork. This prevents -/// conflicting transactions in the local mempool, since we'll update the fork, -/// then reject the second transaction against the forked state. When we learn a -/// new state has been committed, we discard and recreate the ephemeral fork. -/// -/// After switching to ABCI++, we can eliminate this mechanism and just build -/// blocks we want. +/// A mempool service that applies transaction checks against an isolated application fork. pub struct Mempool { queue: mpsc::Receiver>, - app: App, - rx_snapshot: watch::Receiver, + storage: Storage, } impl Mempool { @@ -35,14 +23,7 @@ impl Mempool { storage: Storage, queue: mpsc::Receiver>, ) -> Self { - let app = App::new(storage.latest_snapshot()); - let snapshot_rx = storage.subscribe(); - - Self { - queue, - app, - rx_snapshot: snapshot_rx, - } + Self { queue, storage } } pub async fn check_tx(&mut self, req: Request) -> Result { @@ -56,7 +37,9 @@ impl Mempool { CheckTxKind::Recheck => "recheck", }; - match self.app.deliver_tx_bytes(tx_bytes.as_ref()).await { + let mut app = App::new(self.storage.latest_snapshot()); + + match app.deliver_tx_bytes(tx_bytes.as_ref()).await { Ok(events) => { let elapsed = start.elapsed(); tracing::info!(?elapsed, "tx accepted"); @@ -81,34 +64,20 @@ impl Mempool { } pub async fn run(mut self) -> Result<(), tower::BoxError> { - loop { - tokio::select! { - // Use a biased select to poll for height changes *before* polling for messages. - biased; - // Check whether the height has changed, which requires us to throw away our - // ephemeral mempool state, and create a new one based on the new state. - change = self.rx_snapshot.changed() => { - if let Ok(()) = change { - let snapshot = self.rx_snapshot.borrow().clone(); - tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state"); - self.app = App::new(snapshot); - } else { - // TODO: what triggers this, now that the channel is owned by the - // shared Storage instance, rather than the consensus worker? - tracing::info!("state notification channel closed, shutting down"); - // old: The consensus worker shut down, we should too. - return Ok(()); - } - } - message = self.queue.recv() => { - if let Some(Message {req, rsp_sender, span }) = message { - let _ = rsp_sender.send(self.check_tx(req).instrument(span).await); - } else { - // The queue is closed, so we're done. - return Ok(()); - } - } - } + tracing::info!("mempool service started"); + while let Some(Message { + req, + rsp_sender, + span, + // We could perform `CheckTx` asynchronously, and poll many + // entries from the queue: + // See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many + }) = self.queue.recv().await + { + let result = self.check_tx(req).instrument(span).await; + let _ = rsp_sender.send(result); } + tracing::info!("mempool service stopped"); + Ok(()) } }