Skip to content

Commit

Permalink
app: make the mempool stateless (#4627)
Browse files Browse the repository at this point in the history
## Describe your changes

This is a draft PR that removes the stateful mempool from the
application, replacing it with:
- a stateless mempool that gates inclusion on validity against the
latest version of the app state
- a prepare proposal implementation that filters for valid transactions
- a process proposal implementation that validates block quality and
lays the groundwork to do optimistic execution

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > Consensus breaking

---------

Signed-off-by: Erwan Or <[email protected]>
Co-authored-by: dynst <[email protected]>
  • Loading branch information
erwanor and dynst committed Jun 24, 2024
1 parent 67511d8 commit d95258b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 95 deletions.
118 changes: 92 additions & 26 deletions crates/core/app/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub mod state_key;
/// The inter-block state being written to by the application.
type InterBlockState = Arc<StateDelta<Snapshot>>;

/// The maximum size of a CometBFT block payload (1MB)
const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024;

/// The maximum size of a single individual transaction (30KB).
const MAX_TRANSACTION_SIZE_BYTES: usize = 30 * 1024;

/// The maximum size of the evidence portion of a block (30KB).
const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024;

/// The Penumbra application, written as a bundle of [`Component`]s.
///
/// The [`App`] is not a [`Component`], but
Expand Down Expand Up @@ -181,47 +190,104 @@ impl App {
num_candidate_txs
);

let mut proposal_size_bytes = 0u64;
// This is a node controlled parameter that is different from the homonymous
// mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this
// limit, presuming that a subset of those transactions will be shed.
// More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md
let max_proposal_size_bytes = proposal.max_tx_bytes as u64;
// The CometBFT spec requires that application "MUST" check that the list
// of transactions in the proposal does not exceed `max_tx_bytes`. And shed
// excess transactions so as to be "as close as possible" to the target
// parameter.
//
// A couple things to note about this:
// - `max_tx_bytes` here is an operator controlled parameter
// - it is different than the homonymous mempool configuration
// parameter controlling the maximum size of a single tx.
// - the motivation for this check is that even though `PrepareProposal`
// is only called by the proposer process, CometBFT might not honor
// the target, presuming that some transactions might be yanked.
// For more details, see the specification:
// - Adapting existing applications to use ABCI+:
// https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_comet_expected_behavior.md#adapting-existing-applications-that-use-abci
// - Application requirements:
// https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements
// Tracking the size of the proposal
let mut proposal_size_bytes = 0u64;

for tx in proposal.txs {
let tx_len_bytes = tx.len() as u64;
proposal_size_bytes = proposal_size_bytes.saturating_add(tx_len_bytes);
if proposal_size_bytes <= max_proposal_size_bytes {
included_txs.push(tx);
} else {
let transaction_size = tx.len() as u64;

// We compute the total proposal size if we were to include this transaction.
let total_with_tx = proposal_size_bytes.saturating_add(transaction_size);

// First, we filter proposals to fit within the block limit.
if total_with_tx >= max_proposal_size_bytes {
break;
}

// Then, we make sure to only include successful transactions.
match self.deliver_tx_bytes(&tx).await {
Ok(_) => {
proposal_size_bytes = total_with_tx;
included_txs.push(tx)
}
Err(_) => continue,
}
}

// The evidence payload is validated by Comet, we can lean on three guarantees:
// 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES`
// 2. Expired evidence is filtered
// 3. Evidence is valid.
tracing::debug!(
"finished processing PrepareProposal, including {}/{} candidate transactions",
included_txs.len(),
num_candidate_txs
);

response::PrepareProposal { txs: included_txs }
}

#[instrument(skip_all, ret, level = "debug")]
pub async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> response::ProcessProposal {
tracing::debug!(?proposal, "processing proposal");
tracing::debug!(
height = proposal.height.value(),
proposer = ?proposal.proposer_address,
proposal_hash = ?proposal.hash,
"processing proposal"
);

// Proposal validation:
// 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`]
// 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`]
// 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`]
// 4. Each transaction applies successfully.
let mut evidence_buffer: Vec<u8> = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES);
let mut bytes_tracker = 0usize;

for evidence in proposal.misbehavior {
// This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total
// but a single evidence datum should be an order of magnitude smaller than that.
evidence_buffer.clear();
let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into();
let evidence_size = match proto_evidence.encode(&mut evidence_buffer) {
Ok(_) => evidence_buffer.len(),
Err(_) => return response::ProcessProposal::Reject,
};
bytes_tracker = bytes_tracker.saturating_add(evidence_size);
if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES {
return response::ProcessProposal::Reject;
}
}

// The evidence payload is valid, now we validate the block txs
// payload: they MUST be below the tx size limit, and apply cleanly on
// state fork.
let mut total_txs_payload_size = 0usize;
for tx in proposal.txs {
let tx_size = tx.len();
if tx_size > MAX_TRANSACTION_SIZE_BYTES {
return response::ProcessProposal::Reject;
}

total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size);
if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES {
return response::ProcessProposal::Reject;
}

match self.deliver_tx_bytes(&tx).await {
Ok(_) => continue,
Err(_) => return response::ProcessProposal::Reject,
}
}

response::ProcessProposal::Accept
}

Expand Down Expand Up @@ -563,10 +629,10 @@ impl App {
jmt_root
}

pub fn tendermint_validator_updates(&self) -> Vec<Update> {
pub fn cometbft_validator_updates(&self) -> Vec<Update> {
self.state
.cometbft_validator_updates()
// If the tendermint validator updates are not set, we return an empty
// If the cometbft validator updates are not set, we return an empty
// update set, signaling no change to Tendermint.
.unwrap_or_default()
}
Expand Down
39 changes: 23 additions & 16 deletions crates/core/app/src/server/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ impl Consensus {
.await
.expect("init_chain must succeed"),
),
Request::PrepareProposal(proposal) => Response::PrepareProposal(
self.prepare_proposal(proposal)
.instrument(span)
.await
.expect("prepare proposal must succeed"),
),
Request::ProcessProposal(proposal) => Response::ProcessProposal(
self.process_proposal(proposal)
.instrument(span)
.await
.expect("process proposal must succeed"),
),
Request::BeginBlock(begin_block) => Response::BeginBlock(
self.begin_block(begin_block)
.instrument(span)
Expand All @@ -88,18 +100,6 @@ impl Consensus {
.await
.expect("commit must succeed"),
),
Request::PrepareProposal(proposal) => Response::PrepareProposal(
self.prepare_proposal(proposal)
.instrument(span)
.await
.expect("prepare proposal must succeed"),
),
Request::ProcessProposal(proposal) => Response::ProcessProposal(
self.process_proposal(proposal)
.instrument(span)
.await
.expect("process proposal must succeed"),
),
}));
}
Ok(())
Expand All @@ -123,7 +123,7 @@ impl Consensus {
// to be provided inside the initial app genesis state (`GenesisAppState`). Returning those
// validators in InitChain::Response tells Tendermint that they are the initial validator
// set. See https://docs.tendermint.com/master/spec/abci/abci.html#initchain
let validators = self.app.tendermint_validator_updates();
let validators = self.app.cometbft_validator_updates();

let app_hash = match &app_state {
crate::genesis::AppState::Checkpoint(h) => {
Expand Down Expand Up @@ -162,15 +162,22 @@ impl Consensus {
proposal: request::PrepareProposal,
) -> Result<response::PrepareProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal");
Ok(self.app.prepare_proposal(proposal).await)
// We prepare a proposal against an isolated fork of the application state.
let mut tmp_app = App::new(self.storage.latest_snapshot());
// Once we are done, we discard it so that the application state doesn't get corrupted
// if another round of consensus is required because the proposal fails to finalize.
Ok(tmp_app.prepare_proposal(proposal).await)
}

async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> Result<response::ProcessProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, hash = %proposal.hash, "processing proposal");
Ok(self.app.process_proposal(proposal).await)
// We process the propopsal in an isolated state fork. Eventually, we should cache this work and
// re-use it when processing a `FinalizeBlock` message (starting in `0.38.x`).
let mut tmp_app = App::new(self.storage.latest_snapshot());
Ok(tmp_app.process_proposal(proposal).await)
}

async fn begin_block(
Expand Down Expand Up @@ -218,7 +225,7 @@ impl Consensus {
// validators and voting power. This must be the last step performed,
// after all voting power calculations and validator state transitions have
// been completed.
let validator_updates = self.app.tendermint_validator_updates();
let validator_updates = self.app.cometbft_validator_updates();

tracing::debug!(
?validator_updates,
Expand Down
75 changes: 22 additions & 53 deletions crates/core/app/src/server/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,29 @@
use anyhow::Result;

use cnidarium::{Snapshot, Storage};
use cnidarium::Storage;

use tendermint::v0_37::abci::{
request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp,
MempoolRequest as Request, MempoolResponse as Response,
};
use tokio::sync::{mpsc, watch};
use tokio::sync::mpsc;
use tower_actor::Message;
use tracing::Instrument;

use crate::{app::App, metrics};

/// When using ABCI, we can't control block proposal directly, so we could
/// potentially end up creating blocks with mutually incompatible transactions.
/// While we'd reject one of them during execution, it's nicer to try to filter
/// them out at the mempool stage. Currently, the way we do this is by having
/// the mempool worker maintain an ephemeral fork of the entire execution state,
/// and execute incoming transactions against the fork. This prevents
/// conflicting transactions in the local mempool, since we'll update the fork,
/// then reject the second transaction against the forked state. When we learn a
/// new state has been committed, we discard and recreate the ephemeral fork.
///
/// After switching to ABCI++, we can eliminate this mechanism and just build
/// blocks we want.
/// A mempool service that applies transaction checks against an isolated application fork.
pub struct Mempool {
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
app: App,
rx_snapshot: watch::Receiver<Snapshot>,
storage: Storage,
}

impl Mempool {
pub fn new(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Self {
let app = App::new(storage.latest_snapshot());
let snapshot_rx = storage.subscribe();

Self {
queue,
app,
rx_snapshot: snapshot_rx,
}
Self { queue, storage }
}

pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
Expand All @@ -56,7 +37,9 @@ impl Mempool {
CheckTxKind::Recheck => "recheck",
};

match self.app.deliver_tx_bytes(tx_bytes.as_ref()).await {
let mut app = App::new(self.storage.latest_snapshot());

match app.deliver_tx_bytes(tx_bytes.as_ref()).await {
Ok(events) => {
let elapsed = start.elapsed();
tracing::info!(?elapsed, "tx accepted");
Expand All @@ -81,34 +64,20 @@ impl Mempool {
}

pub async fn run(mut self) -> Result<(), tower::BoxError> {
loop {
tokio::select! {
// Use a biased select to poll for height changes *before* polling for messages.
biased;
// Check whether the height has changed, which requires us to throw away our
// ephemeral mempool state, and create a new one based on the new state.
change = self.rx_snapshot.changed() => {
if let Ok(()) = change {
let snapshot = self.rx_snapshot.borrow().clone();
tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state");
self.app = App::new(snapshot);
} else {
// TODO: what triggers this, now that the channel is owned by the
// shared Storage instance, rather than the consensus worker?
tracing::info!("state notification channel closed, shutting down");
// old: The consensus worker shut down, we should too.
return Ok(());
}
}
message = self.queue.recv() => {
if let Some(Message {req, rsp_sender, span }) = message {
let _ = rsp_sender.send(self.check_tx(req).instrument(span).await);
} else {
// The queue is closed, so we're done.
return Ok(());
}
}
}
tracing::info!("mempool service started");
while let Some(Message {
req,
rsp_sender,
span,
// We could perform `CheckTx` asynchronously, and poll many
// entries from the queue:
// See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
}) = self.queue.recv().await
{
let result = self.check_tx(req).instrument(span).await;
let _ = rsp_sender.send(result);
}
tracing::info!("mempool service stopped");
Ok(())
}
}

0 comments on commit d95258b

Please sign in to comment.