Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: drop stateful mempool #4627

Merged
merged 9 commits into from
Jun 24, 2024
118 changes: 92 additions & 26 deletions crates/core/app/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub mod state_key;
/// The inter-block state being written to by the application.
type InterBlockState = Arc<StateDelta<Snapshot>>;

/// The maximum size of a CometBFT block payload (1MB)
const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024;

/// The maximum size of a single individual transaction (30KB).
const MAX_TRANSACTION_SIZE_BYTES: usize = 30 * 1024;
Comment on lines +54 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we ever choose these in a principled way? Should they be parameters rather than hardcoded constants?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was picked with three things in mind:

  • large enough to allow JIT liquidity for multiple pairs
  • large enough to allow a ranged collection of positions
  • large enough to allow multiple IBC client updates
  • small enough to allow many of those actions to be performed in a single block even in the extreme case where each transaction is one of those things

Making them parameters make sense, I didn't do it because it would require more work with penumbra-governance and hooking them into https://docs.rs/tendermint/latest/tendermint/consensus/params/struct.Params.html


/// The maximum size of the evidence portion of a block (30KB).
const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024;
erwanor marked this conversation as resolved.
Show resolved Hide resolved

/// The Penumbra application, written as a bundle of [`Component`]s.
///
/// The [`App`] is not a [`Component`], but
Expand Down Expand Up @@ -181,47 +190,104 @@ impl App {
num_candidate_txs
);

let mut proposal_size_bytes = 0u64;
// This is a node controlled parameter that is different from the homonymous
// mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this
// limit, presuming that a subset of those transactions will be shed.
// More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md
erwanor marked this conversation as resolved.
Show resolved Hide resolved
let max_proposal_size_bytes = proposal.max_tx_bytes as u64;
// The CometBFT spec requires that application "MUST" check that the list
// of transactions in the proposal does not exceed `max_tx_bytes`. And shed
// excess transactions so as to be "as close as possible" to the target
// parameter.
//
// A couple things to note about this:
// - `max_tx_bytes` here is an operator controlled parameter
// - it is different than the homonymous mempool configuration
// parameter controlling the maximum size of a single tx.
// - the motivation for this check is that even though `PrepareProposal`
// is only called by the proposer process, CometBFT might not honor
// the target, presuming that some transactions might be yanked.
// For more details, see the specification:
// - Adapting existing applications to use ABCI+:
// https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_comet_expected_behavior.md#adapting-existing-applications-that-use-abci
// - Application requirements:
// https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements
// Tracking the size of the proposal
let mut proposal_size_bytes = 0u64;

for tx in proposal.txs {
let tx_len_bytes = tx.len() as u64;
proposal_size_bytes = proposal_size_bytes.saturating_add(tx_len_bytes);
if proposal_size_bytes <= max_proposal_size_bytes {
included_txs.push(tx);
} else {
let transaction_size = tx.len() as u64;

// We compute the total proposal size if we were to include this transaction.
let total_with_tx = proposal_size_bytes.saturating_add(transaction_size);

// First, we filter proposals to fit within the block limit.
if total_with_tx >= max_proposal_size_bytes {
break;
}

// Then, we make sure to only include successful transactions.
match self.deliver_tx_bytes(&tx).await {
Ok(_) => {
proposal_size_bytes = total_with_tx;
included_txs.push(tx)
}
Err(_) => continue,
}
}

// The evidence payload is validated by Comet, we can lean on three guarantees:
// 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES`
// 2. Expired evidence is filtered
// 3. Evidence is valid.
tracing::debug!(
"finished processing PrepareProposal, including {}/{} candidate transactions",
included_txs.len(),
num_candidate_txs
);

response::PrepareProposal { txs: included_txs }
}

#[instrument(skip_all, ret, level = "debug")]
pub async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> response::ProcessProposal {
tracing::debug!(?proposal, "processing proposal");
tracing::debug!(
height = proposal.height.value(),
proposer = ?proposal.proposer_address,
proposal_hash = ?proposal.hash,
"processing proposal"
);

// Proposal validation:
// 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`]
// 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`]
// 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`]
// 4. Each transaction applies successfully.
let mut evidence_buffer: Vec<u8> = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES);
let mut bytes_tracker = 0usize;

for evidence in proposal.misbehavior {
// This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total
// but a single evidence datum should be an order of magnitude smaller than that.
evidence_buffer.clear();
let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into();
let evidence_size = match proto_evidence.encode(&mut evidence_buffer) {
Ok(_) => evidence_buffer.len(),
Err(_) => return response::ProcessProposal::Reject,
};
bytes_tracker = bytes_tracker.saturating_add(evidence_size);
if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES {
return response::ProcessProposal::Reject;
}
}

// The evidence payload is valid, now we validate the block txs
// payload: they MUST be below the tx size limit, and apply cleanly on
// state fork.
let mut total_txs_payload_size = 0usize;
for tx in proposal.txs {
let tx_size = tx.len();
if tx_size > MAX_TRANSACTION_SIZE_BYTES {
return response::ProcessProposal::Reject;
}

total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size);
if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES {
return response::ProcessProposal::Reject;
}

match self.deliver_tx_bytes(&tx).await {
Ok(_) => continue,
Err(_) => return response::ProcessProposal::Reject,
}
}

response::ProcessProposal::Accept
}

Expand Down Expand Up @@ -563,10 +629,10 @@ impl App {
jmt_root
}

pub fn tendermint_validator_updates(&self) -> Vec<Update> {
pub fn cometbft_validator_updates(&self) -> Vec<Update> {
self.state
.cometbft_validator_updates()
// If the tendermint validator updates are not set, we return an empty
// If the cometbft validator updates are not set, we return an empty
// update set, signaling no change to Tendermint.
.unwrap_or_default()
}
Expand Down
39 changes: 23 additions & 16 deletions crates/core/app/src/server/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ impl Consensus {
.await
.expect("init_chain must succeed"),
),
Request::PrepareProposal(proposal) => Response::PrepareProposal(
self.prepare_proposal(proposal)
.instrument(span)
.await
.expect("prepare proposal must succeed"),
),
Request::ProcessProposal(proposal) => Response::ProcessProposal(
self.process_proposal(proposal)
.instrument(span)
.await
.expect("process proposal must succeed"),
),
Request::BeginBlock(begin_block) => Response::BeginBlock(
self.begin_block(begin_block)
.instrument(span)
Expand All @@ -88,18 +100,6 @@ impl Consensus {
.await
.expect("commit must succeed"),
),
Request::PrepareProposal(proposal) => Response::PrepareProposal(
self.prepare_proposal(proposal)
.instrument(span)
.await
.expect("prepare proposal must succeed"),
),
Request::ProcessProposal(proposal) => Response::ProcessProposal(
self.process_proposal(proposal)
.instrument(span)
.await
.expect("process proposal must succeed"),
),
}));
}
Ok(())
Expand All @@ -123,7 +123,7 @@ impl Consensus {
// to be provided inside the initial app genesis state (`GenesisAppState`). Returning those
// validators in InitChain::Response tells Tendermint that they are the initial validator
// set. See https://docs.tendermint.com/master/spec/abci/abci.html#initchain
let validators = self.app.tendermint_validator_updates();
let validators = self.app.cometbft_validator_updates();

let app_hash = match &app_state {
crate::genesis::AppState::Checkpoint(h) => {
Expand Down Expand Up @@ -162,15 +162,22 @@ impl Consensus {
proposal: request::PrepareProposal,
) -> Result<response::PrepareProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal");
Ok(self.app.prepare_proposal(proposal).await)
// We prepare a proposal against an isolated fork of the application state.
let mut tmp_app = App::new(self.storage.latest_snapshot());
// Once we are done, we discard it so that the application state doesn't get corrupted
// if another round of consensus is required because the proposal fails to finalize.
Ok(tmp_app.prepare_proposal(proposal).await)
}

async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> Result<response::ProcessProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, hash = %proposal.hash, "processing proposal");
Ok(self.app.process_proposal(proposal).await)
// We process the propopsal in an isolated state fork. Eventually, we should cache this work and
// re-use it when processing a `FinalizeBlock` message (starting in `0.38.x`).
erwanor marked this conversation as resolved.
Show resolved Hide resolved
let mut tmp_app = App::new(self.storage.latest_snapshot());
Ok(tmp_app.process_proposal(proposal).await)
}

async fn begin_block(
Expand Down Expand Up @@ -218,7 +225,7 @@ impl Consensus {
// validators and voting power. This must be the last step performed,
// after all voting power calculations and validator state transitions have
// been completed.
let validator_updates = self.app.tendermint_validator_updates();
let validator_updates = self.app.cometbft_validator_updates();

tracing::debug!(
?validator_updates,
Expand Down
75 changes: 22 additions & 53 deletions crates/core/app/src/server/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,29 @@
use anyhow::Result;

use cnidarium::{Snapshot, Storage};
use cnidarium::Storage;

use tendermint::v0_37::abci::{
request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp,
MempoolRequest as Request, MempoolResponse as Response,
};
use tokio::sync::{mpsc, watch};
use tokio::sync::mpsc;
use tower_actor::Message;
use tracing::Instrument;

use crate::{app::App, metrics};

/// When using ABCI, we can't control block proposal directly, so we could
/// potentially end up creating blocks with mutually incompatible transactions.
/// While we'd reject one of them during execution, it's nicer to try to filter
/// them out at the mempool stage. Currently, the way we do this is by having
/// the mempool worker maintain an ephemeral fork of the entire execution state,
/// and execute incoming transactions against the fork. This prevents
/// conflicting transactions in the local mempool, since we'll update the fork,
/// then reject the second transaction against the forked state. When we learn a
/// new state has been committed, we discard and recreate the ephemeral fork.
///
/// After switching to ABCI++, we can eliminate this mechanism and just build
/// blocks we want.
/// A mempool service that applies transaction checks against an isolated application fork.
pub struct Mempool {
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
app: App,
rx_snapshot: watch::Receiver<Snapshot>,
storage: Storage,
}

impl Mempool {
pub fn new(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Self {
let app = App::new(storage.latest_snapshot());
let snapshot_rx = storage.subscribe();

Self {
queue,
app,
rx_snapshot: snapshot_rx,
}
Self { queue, storage }
}

pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
Expand All @@ -56,7 +37,9 @@ impl Mempool {
CheckTxKind::Recheck => "recheck",
};

match self.app.deliver_tx_bytes(tx_bytes.as_ref()).await {
let mut app = App::new(self.storage.latest_snapshot());

match app.deliver_tx_bytes(tx_bytes.as_ref()).await {
Ok(events) => {
let elapsed = start.elapsed();
tracing::info!(?elapsed, "tx accepted");
Expand All @@ -81,34 +64,20 @@ impl Mempool {
}

pub async fn run(mut self) -> Result<(), tower::BoxError> {
loop {
tokio::select! {
// Use a biased select to poll for height changes *before* polling for messages.
biased;
// Check whether the height has changed, which requires us to throw away our
// ephemeral mempool state, and create a new one based on the new state.
change = self.rx_snapshot.changed() => {
if let Ok(()) = change {
let snapshot = self.rx_snapshot.borrow().clone();
tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state");
self.app = App::new(snapshot);
} else {
// TODO: what triggers this, now that the channel is owned by the
// shared Storage instance, rather than the consensus worker?
tracing::info!("state notification channel closed, shutting down");
// old: The consensus worker shut down, we should too.
return Ok(());
}
}
message = self.queue.recv() => {
if let Some(Message {req, rsp_sender, span }) = message {
let _ = rsp_sender.send(self.check_tx(req).instrument(span).await);
} else {
// The queue is closed, so we're done.
return Ok(());
}
}
}
tracing::info!("mempool service started");
while let Some(Message {
req,
rsp_sender,
span,
// We could perform `CheckTx` asynchronously, and poll many
// entries from the queue:
// See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
}) = self.queue.recv().await
{
let result = self.check_tx(req).instrument(span).await;
let _ = rsp_sender.send(result);
}
tracing::info!("mempool service stopped");
Ok(())
}
}
Loading