diff --git a/crates/astria-core/src/protocol/abci.rs b/crates/astria-core/src/protocol/abci.rs index 5caf357459..7cacc6b780 100644 --- a/crates/astria-core/src/protocol/abci.rs +++ b/crates/astria-core/src/protocol/abci.rs @@ -22,6 +22,9 @@ impl AbciErrorCode { pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) }); pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) }); pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) }); + pub const ALREADY_PRESENT: Self = Self(unsafe { NonZeroU32::new_unchecked(14) }); + pub const NONCE_TAKEN: Self = Self(unsafe { NonZeroU32::new_unchecked(15) }); + pub const ACCOUNT_SIZE_LIMIT: Self = Self(unsafe { NonZeroU32::new_unchecked(16) }); } impl AbciErrorCode { @@ -52,6 +55,13 @@ impl AbciErrorCode { } Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(), Self::BAD_REQUEST => "the request payload was malformed".into(), + Self::ALREADY_PRESENT => "the transaction is already present in the mempool".into(), + Self::NONCE_TAKEN => "there is already a transaction with the same nonce for the \ + account in the mempool" + .into(), + Self::ACCOUNT_SIZE_LIMIT => { + "the account has reached the maximum number of parked transactions".into() + } Self(other) => { format!("invalid error code {other}: should be unreachable (this is a bug)") } diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 38dc2816c9..ee97bb92c7 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -218,8 +218,8 @@ pub(crate) async fn initialize_app_with_storage( .await .expect("failed to create temp storage backing chain state"); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index 1a506f4578..d75d00df13 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -229,6 +229,7 @@ mod tests { sequencerblock::v1alpha1::SequencerBlock, }; use cnidarium::StateDelta; + use telemetry::Metrics; use super::*; use crate::{ @@ -256,7 +257,8 @@ mod tests { async fn test_get_sequencer_block() { let block = make_test_sequencer_block(1); let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); state_tx.put_block_height(1).unwrap(); state_tx.put_sequencer_block(block).unwrap(); @@ -274,7 +276,8 @@ mod tests { #[tokio::test] async fn get_pending_nonce_in_mempool() { let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); @@ -324,7 +327,8 @@ mod tests { use crate::accounts::StateWriteExt as _; let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index 5b646231ea..fb710a3148 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -17,6 +17,7 @@ use sha2::{ Digest as _, Sha256, }; +use telemetry::Metrics; use crate::{ app::test_utils::{ @@ -103,7 +104,8 @@ fn init_mempool() -> Mempool { .enable_all() .build() .unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_mock_balance = mock_balances(0, 0); let tx_mock_cost = mock_tx_cost(0, 0, 0); runtime.block_on(async { diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index fde6479b25..6573bbfbd5 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -20,7 +20,6 @@ use astria_core::{ use astria_eyre::eyre::Result; pub(crate) use mempool_state::get_account_balances; use tokio::{ - join, sync::{ RwLock, RwLockWriteGuard, @@ -38,7 +37,10 @@ use transactions_container::{ TimemarkedTransaction, }; -use crate::accounts; +use crate::{ + accounts, + Metrics, +}; #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum RemovalReason { @@ -104,6 +106,35 @@ impl RemovalCache { } } +struct ContainedTxLock<'a> { + mempool: &'a Mempool, + txs: RwLockWriteGuard<'a, HashSet<[u8; 32]>>, +} + +impl<'a> ContainedTxLock<'a> { + fn add(&mut self, id: [u8; 32]) { + if !self.txs.insert(id) { + self.mempool.metrics.increment_internal_logic_error(); + error!( + tx_hash = %telemetry::display::hex(&id), + "attempted to add transaction already tracked in mempool's tracked container, is logic \ + error" + ); + } + } + + fn remove(&mut self, id: [u8; 32]) { + if !self.txs.remove(&id) { + self.mempool.metrics.increment_internal_logic_error(); + error!( + tx_hash = %telemetry::display::hex(&id), + "attempted to remove transaction absent from mempool's tracked container, is logic \ + error" + ); + } + } +} + /// [`Mempool`] is an account-based structure for maintaining transactions for execution. /// /// The transactions are split between pending and parked, where pending transactions are ready for @@ -133,11 +164,13 @@ pub(crate) struct Mempool { pending: Arc>, parked: Arc>>, comet_bft_removal_cache: Arc>, + contained_txs: Arc>>, + metrics: &'static Metrics, } impl Mempool { #[must_use] - pub(crate) fn new() -> Self { + pub(crate) fn new(metrics: &'static Metrics) -> Self { Self { pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), @@ -145,6 +178,8 @@ impl Mempool { NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) .expect("Removal cache cannot be zero sized"), ))), + contained_txs: Arc::new(RwLock::new(HashSet::new())), + metrics, } } @@ -152,12 +187,14 @@ impl Mempool { #[must_use] #[instrument(skip_all)] pub(crate) async fn len(&self) -> usize { - #[rustfmt::skip] - let (pending_len, parked_len) = join!( - async { self.pending.read().await.len() }, - async { self.parked.read().await.len() } - ); - pending_len.saturating_add(parked_len) + self.contained_txs.read().await.len() + } + + async fn lock_contained_txs(&self) -> ContainedTxLock<'_> { + ContainedTxLock { + mempool: self, + txs: self.contained_txs.write().await, + } } /// Inserts a transaction into the mempool and does not allow for transaction replacement. @@ -171,7 +208,7 @@ impl Mempool { transaction_cost: HashMap, ) -> Result<(), InsertionError> { let timemarked_tx = TimemarkedTransaction::new(tx, transaction_cost); - + let id = timemarked_tx.id(); let (mut pending, mut parked) = self.acquire_both_locks().await; // try insert into pending @@ -184,11 +221,18 @@ impl Mempool { // Release the lock asap. drop(pending); // try to add to parked queue - parked.add( + match parked.add( timemarked_tx, current_account_nonce, ¤t_account_balances, - ) + ) { + Ok(()) => { + // track in contained txs + self.lock_contained_txs().await.add(id); + Ok(()) + } + Err(err) => Err(err), + } } error @ Err( InsertionError::AlreadyPresent @@ -211,15 +255,25 @@ impl Mempool { ); // promote the transactions for ttx in to_promote { + let tx_id = ttx.id(); if let Err(error) = pending.add(ttx, current_account_nonce, ¤t_account_balances) { + // NOTE: this branch is not expected to be hit so grabbing the lock inside + // of the loop is more performant. + self.lock_contained_txs().await.remove(timemarked_tx.id()); error!( current_account_nonce, - "failed to promote transaction during insertion: {error:#}" + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to promote transaction during insertion" ); } } + + // track in contained txs + self.lock_contained_txs().await.add(timemarked_tx.id()); + Ok(()) } } @@ -266,10 +320,13 @@ impl Mempool { // Add all removed to removal cache for cometbft. let mut removal_cache = self.comet_bft_removal_cache.write().await; - // Add the original tx first, since it will also be listed in `removed_txs`. The second + + // Add the original tx first to preserve its reason for removal. The second // attempt to add it inside the loop below will be a no-op. removal_cache.add(tx_hash, reason); + let mut contained_lock = self.lock_contained_txs().await; for removed_tx in removed_txs { + contained_lock.remove(removed_tx); removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } @@ -281,6 +338,12 @@ impl Mempool { self.comet_bft_removal_cache.write().await.remove(tx_hash) } + /// Returns true if the transaction is tracked as inserted. + #[instrument(skip_all)] + pub(crate) async fn is_tracked(&self, tx_hash: [u8; 32]) -> bool { + self.contained_txs.read().await.contains(&tx_hash) + } + /// Updates stored transactions to reflect current blockchain state. Will remove transactions /// that have stale nonces or are expired. Will also shift transation between pending and /// parked to relfect changes in account balances. @@ -356,21 +419,38 @@ impl Mempool { parked.find_promotables(address, highest_pending_nonce, &remaining_balances); for tx in promtion_txs { + let tx_id = tx.id(); if let Err(error) = pending.add(tx, current_nonce, ¤t_balances) { + // NOTE: this shouldn't happen. Promotions should never fail. This also + // means grabbing the lock inside the loop is more + // performant. + self.lock_contained_txs().await.remove(tx_id); + self.metrics.increment_internal_logic_error(); error!( + address = %telemetry::display::base64(&address), current_nonce, - "failed to promote transaction during maintenance: {error:#}" + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to promote transaction during maintenance" ); } } } else { // add demoted transactions to parked for tx in demotion_txs { - if let Err(err) = parked.add(tx, current_nonce, ¤t_balances) { - // this shouldn't happen + let tx_id = tx.id(); + if let Err(error) = parked.add(tx, current_nonce, ¤t_balances) { + // NOTE: this shouldn't happen normally but could on the edge case of + // the parked queue being full for the account. This also means + // grabbing the lock inside the loop is more performant. + self.lock_contained_txs().await.remove(tx_id); + self.metrics.increment_internal_logic_error(); error!( - address = %telemetry::display::base64(&address), - "failed to demote transaction during maintenance: {err:#}" + address = %telemetry::display::base64(&address), + current_nonce, + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to demote transaction during maintenance" ); } } @@ -380,10 +460,12 @@ impl Mempool { drop(parked); drop(pending); - // add to removal cache for cometbft + // add to removal cache for cometbft and remove from the tracked set let mut removal_cache = self.comet_bft_removal_cache.write().await; + let mut contained_lock = self.lock_contained_txs().await; for (tx_hash, reason) in removed_txs { removal_cache.add(tx_hash, reason); + contained_lock.remove(tx_hash); } } @@ -410,6 +492,8 @@ impl Mempool { #[cfg(test)] mod tests { + use telemetry::Metrics; + use super::*; use crate::{ app::test_utils::{ @@ -429,7 +513,8 @@ mod tests { #[tokio::test] async fn insert() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -442,6 +527,7 @@ mod tests { .is_ok(), "should be able to insert nonce 1 transaction into mempool" ); + assert_eq!(mempool.len().await, 1); // try to insert again assert_eq!( @@ -491,8 +577,8 @@ mod tests { // nonce at 1, and then cleans the pool to nonce 4. This tests some of the // odder edge cases that can be hit if a node goes offline or fails to see // some transactions that other nodes include into their proposed blocks. - - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -593,7 +679,8 @@ mod tests { #[tokio::test] async fn run_maintenance_promotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); // create transaction setup to trigger promotions // @@ -665,7 +752,8 @@ mod tests { #[tokio::test] async fn run_maintenance_demotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); // create transaction setup to trigger demotions // @@ -759,7 +847,8 @@ mod tests { #[tokio::test] async fn remove_invalid() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 10); @@ -861,7 +950,8 @@ mod tests { #[tokio::test] async fn should_get_pending_nonce() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -1000,4 +1090,114 @@ mod tests { "first removal reason should be presenved" ); } + + #[tokio::test] + async fn tx_tracked_invalid_removal_removes_all() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(0).build(); + let tx1 = MockTxBuilder::new().nonce(1).build(); + + // check that the parked transaction is in the tracked set + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + assert!(mempool.is_tracked(tx1.id().get()).await); + + // check that the pending transaction is in the tracked set + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + assert!(mempool.is_tracked(tx0.id().get()).await); + + // remove the transactions from the mempool, should remove both + mempool + .remove_tx_invalid(tx0.clone(), RemovalReason::Expired) + .await; + + // check that the transactions are not in the tracked set + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); + } + + #[tokio::test] + async fn tx_tracked_maintenance_removes_all() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(0).build(); + let tx1 = MockTxBuilder::new().nonce(1).build(); + + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // remove the transacitons from the mempool via maintenance + let mut mock_state = mock_state_getter().await; + mock_state_put_account_nonce( + &mut mock_state, + astria_address_from_hex_string(ALICE_ADDRESS).as_bytes(), + 2, + ); + mempool.run_maintenance(&mock_state, false).await; + + // check that the transactions are not in the tracked set + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); + } + + #[tokio::test] + async fn tx_tracked_reinsertion_ok() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(0).build(); + let tx1 = MockTxBuilder::new().nonce(1).build(); + + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // remove the transactions from the mempool, should remove both + mempool + .remove_tx_invalid(tx0.clone(), RemovalReason::Expired) + .await; + + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); + + // re-insert the transactions into the mempool + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // check that the transactions are in the tracked set on re-insertion + assert!(mempool.is_tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); + } } diff --git a/crates/astria-sequencer/src/mempool/transactions_container.rs b/crates/astria-sequencer/src/mempool/transactions_container.rs index 68ceeb9631..10d120df15 100644 --- a/crates/astria-sequencer/src/mempool/transactions_container.rs +++ b/crates/astria-sequencer/src/mempool/transactions_container.rs @@ -109,6 +109,10 @@ impl TimemarkedTransaction { pub(super) fn cost(&self) -> &HashMap { &self.cost } + + pub(super) fn id(&self) -> [u8; 32] { + self.tx_hash + } } impl fmt::Display for TimemarkedTransaction { diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index 8dadf7fcdc..dbb3858ef7 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -24,10 +24,10 @@ pub struct Metrics { check_tx_removed_expired: Counter, check_tx_removed_failed_execution: Counter, check_tx_removed_failed_stateless: Counter, - check_tx_removed_stale_nonce: Counter, check_tx_duration_seconds_parse_tx: Histogram, check_tx_duration_seconds_check_stateless: Histogram, - check_tx_duration_seconds_check_nonce: Histogram, + check_tx_duration_seconds_fetch_nonce: Histogram, + check_tx_duration_seconds_check_tracked: Histogram, check_tx_duration_seconds_check_chain_id: Histogram, check_tx_duration_seconds_check_removed: Histogram, check_tx_duration_seconds_convert_address: Histogram, @@ -38,6 +38,7 @@ pub struct Metrics { transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, mempool_recosted: Counter, + internal_logic_error: Counter, } impl Metrics { @@ -88,10 +89,6 @@ impl Metrics { self.check_tx_removed_failed_stateless.increment(1); } - pub(crate) fn increment_check_tx_removed_stale_nonce(&self) { - self.check_tx_removed_stale_nonce.increment(1); - } - pub(crate) fn record_check_tx_duration_seconds_parse_tx(&self, duration: Duration) { self.check_tx_duration_seconds_parse_tx.record(duration); } @@ -101,8 +98,13 @@ impl Metrics { .record(duration); } - pub(crate) fn record_check_tx_duration_seconds_check_nonce(&self, duration: Duration) { - self.check_tx_duration_seconds_check_nonce.record(duration); + pub(crate) fn record_check_tx_duration_seconds_fetch_nonce(&self, duration: Duration) { + self.check_tx_duration_seconds_fetch_nonce.record(duration); + } + + pub(crate) fn record_check_tx_duration_seconds_check_tracked(&self, duration: Duration) { + self.check_tx_duration_seconds_check_tracked + .record(duration); } pub(crate) fn record_check_tx_duration_seconds_check_chain_id(&self, duration: Duration) { @@ -153,6 +155,10 @@ impl Metrics { pub(crate) fn increment_mempool_recosted(&self) { self.mempool_recosted.increment(1); } + + pub(crate) fn increment_internal_logic_error(&self) { + self.internal_logic_error.increment(1); + } } impl telemetry::Metrics for Metrics { @@ -262,6 +268,21 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let check_tx_duration_seconds_fetch_nonce = builder + .new_histogram_factory( + CHECK_TX_DURATION_SECONDS_FETCH_NONCE, + "The amount of time taken in seconds to fetch an account's nonce", + )? + .register()?; + + let check_tx_duration_seconds_check_tracked = builder + .new_histogram_factory( + CHECK_TX_DURATION_SECONDS_CHECK_TRACKED, + "The amount of time taken in seconds to check if the transaction is already in \ + the mempool", + )? + .register()?; + let check_tx_removed_failed_stateless = builder .new_counter_factory( CHECK_TX_REMOVED_FAILED_STATELESS, @@ -269,15 +290,6 @@ impl telemetry::Metrics for Metrics { failing the stateless check", )? .register()?; - - let check_tx_removed_stale_nonce = builder - .new_counter_factory( - CHECK_TX_REMOVED_STALE_NONCE, - "The number of transactions that have been removed from the mempool due to having \ - a stale nonce", - )? - .register()?; - let mut check_tx_duration_factory = builder.new_histogram_factory( CHECK_TX_DURATION_SECONDS, "The amount of time taken in seconds to successfully complete the various stages of \ @@ -288,8 +300,6 @@ impl telemetry::Metrics for Metrics { )?; let check_tx_duration_seconds_check_stateless = check_tx_duration_factory .register_with_labels(&[(CHECK_TX_STAGE, "stateless check".to_string())])?; - let check_tx_duration_seconds_check_nonce = check_tx_duration_factory - .register_with_labels(&[(CHECK_TX_STAGE, "nonce check".to_string())])?; let check_tx_duration_seconds_check_chain_id = check_tx_duration_factory .register_with_labels(&[(CHECK_TX_STAGE, "chain id check".to_string())])?; let check_tx_duration_seconds_check_removed = check_tx_duration_factory @@ -325,6 +335,14 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let internal_logic_error = builder + .new_counter_factory( + INTERNAL_LOGIC_ERROR, + "The number of times a transaction has been rejected due to logic errors in the \ + mempool", + )? + .register()?; + Ok(Self { prepare_proposal_excluded_transactions_cometbft_space, prepare_proposal_excluded_transactions_sequencer_space, @@ -337,10 +355,10 @@ impl telemetry::Metrics for Metrics { check_tx_removed_expired, check_tx_removed_failed_execution, check_tx_removed_failed_stateless, - check_tx_removed_stale_nonce, check_tx_duration_seconds_parse_tx, check_tx_duration_seconds_check_stateless, - check_tx_duration_seconds_check_nonce, + check_tx_duration_seconds_fetch_nonce, + check_tx_duration_seconds_check_tracked, check_tx_duration_seconds_check_chain_id, check_tx_duration_seconds_check_removed, check_tx_duration_seconds_convert_address, @@ -351,6 +369,7 @@ impl telemetry::Metrics for Metrics { transaction_in_mempool_size_bytes, transactions_in_mempool_total, mempool_recosted, + internal_logic_error, }) } } @@ -367,16 +386,18 @@ metric_names!(const METRICS_NAMES: CHECK_TX_REMOVED_EXPIRED, CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_ACCOUNT_BALANCE, CHECK_TX_DURATION_SECONDS, CHECK_TX_DURATION_SECONDS_CONVERT_ADDRESS, CHECK_TX_DURATION_SECONDS_FETCH_BALANCES, + CHECK_TX_DURATION_SECONDS_FETCH_NONCE, CHECK_TX_DURATION_SECONDS_FETCH_TX_COST, + CHECK_TX_DURATION_SECONDS_CHECK_TRACKED, ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, - MEMPOOL_RECOSTED + MEMPOOL_RECOSTED, + INTERNAL_LOGIC_ERROR ); #[cfg(test)] @@ -388,8 +409,8 @@ mod tests { CHECK_TX_REMOVED_EXPIRED, CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_TOO_LARGE, + INTERNAL_LOGIC_ERROR, MEMPOOL_RECOSTED, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, @@ -444,7 +465,6 @@ mod tests { CHECK_TX_REMOVED_FAILED_STATELESS, "check_tx_removed_failed_stateless", ); - assert_const(CHECK_TX_REMOVED_STALE_NONCE, "check_tx_removed_stale_nonce"); assert_const( CHECK_TX_REMOVED_ACCOUNT_BALANCE, "check_tx_removed_account_balance", @@ -463,5 +483,6 @@ mod tests { "transactions_in_mempool_total", ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); + assert_const(INTERNAL_LOGIC_ERROR, "internal_logic_error"); } } diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 3bd1bfd64c..a6df51421b 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -84,7 +84,7 @@ impl Sequencer { .wrap_err("failed to load storage backing chain state")?; let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); + let mempool = Mempool::new(metrics); let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 95597fbd7c..071fdac943 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -470,8 +470,8 @@ mod tests { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs deleted file mode 100644 index cb1fe981f6..0000000000 --- a/crates/astria-sequencer/src/service/mempool.rs +++ /dev/null @@ -1,376 +0,0 @@ -use std::{ - collections::HashMap, - pin::Pin, - sync::Arc, - task::{ - Context, - Poll, - }, - time::Instant, -}; - -use astria_core::{ - generated::protocol::transactions::v1alpha1 as raw, - primitive::v1::asset::IbcPrefixed, - protocol::{ - abci::AbciErrorCode, - transaction::v1alpha1::SignedTransaction, - }, -}; -use astria_eyre::eyre::WrapErr as _; -use cnidarium::Storage; -use futures::{ - Future, - FutureExt, -}; -use prost::Message as _; -use tendermint::{ - abci::Code, - v0_38::abci::{ - request, - response, - MempoolRequest, - MempoolResponse, - }, -}; -use tower::Service; -use tower_abci::BoxError; -use tracing::{ - instrument, - Instrument as _, -}; - -use crate::{ - accounts, - address, - app::ActionHandler as _, - mempool::{ - get_account_balances, - Mempool as AppMempool, - RemovalReason, - }, - metrics::Metrics, - transaction, -}; - -const MAX_TX_SIZE: usize = 256_000; // 256 KB - -/// Mempool handles [`request::CheckTx`] abci requests. -// -/// It performs a stateless check of the given transaction, -/// returning a [`tendermint::v0_38::abci::response::CheckTx`]. -#[derive(Clone)] -pub(crate) struct Mempool { - storage: Storage, - inner: AppMempool, - metrics: &'static Metrics, -} - -impl Mempool { - pub(crate) fn new(storage: Storage, mempool: AppMempool, metrics: &'static Metrics) -> Self { - Self { - storage, - inner: mempool, - metrics, - } - } -} - -impl Service for Mempool { - type Error = BoxError; - type Future = Pin> + Send + 'static>>; - type Response = MempoolResponse; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: MempoolRequest) -> Self::Future { - use penumbra_tower_trace::v038::RequestExt as _; - let span = req.create_span(); - let storage = self.storage.clone(); - let mut mempool = self.inner.clone(); - let metrics = self.metrics; - async move { - let rsp = match req { - MempoolRequest::CheckTx(req) => MempoolResponse::CheckTx( - handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await, - ), - }; - Ok(rsp) - } - .instrument(span) - .boxed() - } -} - -/// Handles a [`request::CheckTx`] request. -/// -/// Performs stateless checks (decoding and signature check), -/// as well as stateful checks (nonce and balance checks). -/// -/// If the tx passes all checks, status code 0 is returned. -#[instrument(skip_all)] -async fn handle_check_tx( - req: request::CheckTx, - state: S, - mempool: &mut AppMempool, - metrics: &'static Metrics, -) -> response::CheckTx { - use sha2::Digest as _; - - let start_parsing = Instant::now(); - - let request::CheckTx { - tx, .. - } = req; - - let tx_hash = sha2::Sha256::digest(&tx).into(); - let tx_len = tx.len(); - - if tx_len > MAX_TX_SIZE { - metrics.increment_check_tx_removed_too_large(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), - log: format!( - "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", - tx.len() - ), - info: AbciErrorCode::TRANSACTION_TOO_LARGE.info(), - ..response::CheckTx::default() - }; - } - - let raw_signed_tx = match raw::SignedTransaction::decode(tx) { - Ok(tx) => tx, - Err(e) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - log: format!("{e:#}"), - info: "failed decoding bytes as a protobuf SignedTransaction".into(), - ..response::CheckTx::default() - }; - } - }; - let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { - Ok(tx) => tx, - Err(e) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ - the signature was invalid" - .into(), - log: format!("{e:#}"), - ..response::CheckTx::default() - }; - } - }; - - let finished_parsing = Instant::now(); - metrics.record_check_tx_duration_seconds_parse_tx( - finished_parsing.saturating_duration_since(start_parsing), - ); - - if let Err(e) = signed_tx.check_stateless().await { - metrics.increment_check_tx_removed_failed_stateless(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - info: "transaction failed stateless check".into(), - log: format!("{e:#}"), - ..response::CheckTx::default() - }; - }; - - let finished_check_stateless = Instant::now(); - metrics.record_check_tx_duration_seconds_check_stateless( - finished_check_stateless.saturating_duration_since(finished_parsing), - ); - - if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await { - metrics.increment_check_tx_removed_stale_nonce(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "failed verifying transaction nonce".into(), - log: format!("{e:#}"), - ..response::CheckTx::default() - }; - }; - - let finished_check_nonce = Instant::now(); - metrics.record_check_tx_duration_seconds_check_nonce( - finished_check_nonce.saturating_duration_since(finished_check_stateless), - ); - - if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_CHAIN_ID.value()), - info: "failed verifying chain id".into(), - log: format!("{e:#}"), - ..response::CheckTx::default() - }; - } - - let finished_check_chain_id = Instant::now(); - metrics.record_check_tx_duration_seconds_check_chain_id( - finished_check_chain_id.saturating_duration_since(finished_check_nonce), - ); - - if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { - match removal_reason { - RemovalReason::Expired => { - metrics.increment_check_tx_removed_expired(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), - info: "transaction expired in app's mempool".into(), - log: "Transaction expired in the app's mempool".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::FailedPrepareProposal(err) => { - metrics.increment_check_tx_removed_failed_execution(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), - info: "transaction failed execution in prepare_proposal()".into(), - log: format!("transaction failed execution because: {err}"), - ..response::CheckTx::default() - }; - } - RemovalReason::NonceStale => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "transaction removed from app mempool due to stale nonce".into(), - log: "Transaction from app mempool due to stale nonce".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::LowerNonceInvalidated => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), - info: "transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - log: "Transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - ..response::CheckTx::default() - }; - } - } - }; - - let finished_check_removed = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_check_removed.saturating_duration_since(finished_check_chain_id), - ); - - // tx is valid, push to mempool with current state - let address = match state - .try_base_prefixed(signed_tx.verification_key().address_bytes()) - .await - .context("failed to generate address for signed transaction") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to generate address because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(address) => address, - }; - - // fetch current account - let current_account_nonce = match state - .get_account_nonce(&address) - .await - .wrap_err("failed fetching nonce for account") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account nonce because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(nonce) => nonce, - }; - - let finished_convert_address = Instant::now(); - metrics.record_check_tx_duration_seconds_convert_address( - finished_convert_address.saturating_duration_since(finished_check_removed), - ); - - // grab cost of transaction - let transaction_cost = match transaction::get_total_transaction_cost(&signed_tx, &state) - .await - .context("failed fetching cost of the transaction") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch cost of the transaction because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(transaction_cost) => transaction_cost, - }; - - let finished_fetch_tx_cost = Instant::now(); - metrics.record_check_tx_duration_seconds_fetch_tx_cost( - finished_fetch_tx_cost.saturating_duration_since(finished_convert_address), - ); - - // grab current account's balances - let current_account_balance: HashMap = - match get_account_balances(&state, &address) - .await - .with_context(|| "failed fetching balances for account `{address}`") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account balances because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(account_balance) => account_balance, - }; - - let finished_fetch_balances = Instant::now(); - metrics.record_check_tx_duration_seconds_fetch_balances( - finished_fetch_balances.saturating_duration_since(finished_fetch_tx_cost), - ); - - let actions_count = signed_tx.actions().len(); - - if let Err(err) = mempool - .insert( - Arc::new(signed_tx), - current_account_nonce, - current_account_balance, - transaction_cost, - ) - .await - { - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_INSERTION_FAILED.value()), - info: "transaction insertion failed".into(), - log: format!("transaction insertion failed because: {err:#}"), - ..response::CheckTx::default() - }; - } - - let mempool_len = mempool.len().await; - - metrics - .record_check_tx_duration_seconds_insert_to_app_mempool(finished_fetch_balances.elapsed()); - metrics.record_actions_per_transaction_in_mempool(actions_count); - metrics.record_transaction_in_mempool_size_bytes(tx_len); - metrics.set_transactions_in_mempool_total(mempool_len); - - response::CheckTx::default() -} diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs new file mode 100644 index 0000000000..425d199798 --- /dev/null +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -0,0 +1,487 @@ +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + task::{ + Context, + Poll, + }, + time::Instant, +}; + +use astria_core::{ + generated::protocol::transactions::v1alpha1 as raw, + primitive::v1::asset::IbcPrefixed, + protocol::{ + abci::AbciErrorCode, + transaction::v1alpha1::SignedTransaction, + }, +}; +use astria_eyre::eyre::WrapErr as _; +use bytes::Bytes; +use cnidarium::{ + StateRead, + Storage, +}; +use futures::{ + Future, + FutureExt, +}; +use prost::Message as _; +use tendermint::{ + abci::Code, + v0_38::abci::{ + request, + response, + MempoolRequest, + MempoolResponse, + }, +}; +use tower::Service; +use tower_abci::BoxError; +use tracing::{ + instrument, + Instrument as _, +}; + +use crate::{ + accounts::StateReadExt as _, + address::StateReadExt as _, + app::ActionHandler as _, + mempool::{ + get_account_balances, + InsertionError, + Mempool as AppMempool, + RemovalReason, + }, + metrics::Metrics, + transaction, +}; + +#[cfg(test)] +mod tests; + +const MAX_TX_SIZE: usize = 256_000; // 256 KB + +pub(crate) trait IntoCheckTxResponse { + fn into_check_tx_response(self) -> response::CheckTx; +} + +impl IntoCheckTxResponse for RemovalReason { + fn into_check_tx_response(self) -> response::CheckTx { + match self { + RemovalReason::Expired => response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), + info: AbciErrorCode::TRANSACTION_EXPIRED.to_string(), + log: "transaction expired in the app's mempool".into(), + ..response::CheckTx::default() + }, + RemovalReason::FailedPrepareProposal(err) => response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), + info: AbciErrorCode::TRANSACTION_FAILED.to_string(), + log: format!("transaction failed execution because: {err}"), + ..response::CheckTx::default() + }, + RemovalReason::NonceStale => response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: "transaction removed from app mempool due to stale nonce".into(), + log: "transaction from app mempool due to stale nonce".into(), + ..response::CheckTx::default() + }, + RemovalReason::LowerNonceInvalidated => response::CheckTx { + code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), + info: AbciErrorCode::LOWER_NONCE_INVALIDATED.to_string(), + log: "transaction removed from app mempool due to lower nonce being invalidated" + .into(), + ..response::CheckTx::default() + }, + } + } +} + +impl IntoCheckTxResponse for InsertionError { + fn into_check_tx_response(self) -> response::CheckTx { + match self { + InsertionError::AlreadyPresent => response::CheckTx { + code: Code::Err(AbciErrorCode::ALREADY_PRESENT.value()), + info: AbciErrorCode::ALREADY_PRESENT.to_string(), + log: InsertionError::AlreadyPresent.to_string(), + ..response::CheckTx::default() + }, + InsertionError::NonceTooLow => response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: AbciErrorCode::INVALID_NONCE.to_string(), + log: InsertionError::NonceTooLow.to_string(), + ..response::CheckTx::default() + }, + InsertionError::NonceTaken => response::CheckTx { + code: Code::Err(AbciErrorCode::NONCE_TAKEN.value()), + info: AbciErrorCode::NONCE_TAKEN.to_string(), + log: InsertionError::NonceTaken.to_string(), + ..response::CheckTx::default() + }, + InsertionError::AccountSizeLimit => response::CheckTx { + code: Code::Err(AbciErrorCode::ACCOUNT_SIZE_LIMIT.value()), + info: AbciErrorCode::ACCOUNT_SIZE_LIMIT.to_string(), + log: InsertionError::AccountSizeLimit.to_string(), + ..response::CheckTx::default() + }, + InsertionError::AccountBalanceTooLow | InsertionError::NonceGap => { + // NOTE: these are handled interally by the mempool and don't + // block transaction inclusion in the mempool. they shouldn't + // be bubbled up to the client. + response::CheckTx { + code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), + info: AbciErrorCode::INTERNAL_ERROR.info(), + log: "transaction failed insertion because of an internal error".into(), + ..response::CheckTx::default() + } + } + } + } +} + +fn error_response(abci_error_code: AbciErrorCode, log: String) -> response::CheckTx { + response::CheckTx { + code: Code::Err(abci_error_code.value()), + info: abci_error_code.info(), + log, + ..response::CheckTx::default() + } +} + +/// Mempool handles [`request::CheckTx`] abci requests. +// +/// It performs a stateless check of the given transaction, +/// returning a [`tendermint::v0_38::abci::response::CheckTx`]. +#[derive(Clone)] +pub(crate) struct Mempool { + storage: Storage, + inner: AppMempool, + metrics: &'static Metrics, +} + +impl Mempool { + pub(crate) fn new(storage: Storage, mempool: AppMempool, metrics: &'static Metrics) -> Self { + Self { + storage, + inner: mempool, + metrics, + } + } +} + +impl Service for Mempool { + type Error = BoxError; + type Future = Pin> + Send + 'static>>; + type Response = MempoolResponse; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: MempoolRequest) -> Self::Future { + use penumbra_tower_trace::v038::RequestExt as _; + let span = req.create_span(); + let storage = self.storage.clone(); + let mut mempool = self.inner.clone(); + let metrics = self.metrics; + async move { + let rsp = match req { + MempoolRequest::CheckTx(req) => MempoolResponse::CheckTx( + handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await, + ), + }; + Ok(rsp) + } + .instrument(span) + .boxed() + } +} + +/// Handles a [`request::CheckTx`] request. +/// +/// This function will error if: +/// - the transaction has been removed from the app's mempool (will throw error once) +/// - the transaction fails stateless checks +/// - the transaction fails insertion into the mempool +/// +/// The function will return a [`response::CheckTx`] with a status code of 0 if the transaction: +/// - Is already in the appside mempool +/// - Passes stateless checks and insertion into the mempool is successful +#[instrument(skip_all)] +async fn handle_check_tx( + req: request::CheckTx, + state: S, + mempool: &mut AppMempool, + metrics: &'static Metrics, +) -> response::CheckTx { + use sha2::Digest as _; + + let request::CheckTx { + tx, .. + } = req; + + let tx_hash = sha2::Sha256::digest(&tx).into(); + + // check if the transaction has been removed from the appside mempool + if let Err(rsp) = check_removed_comet_bft(tx_hash, mempool, metrics).await { + return rsp; + } + + // check if the transaction is already in the mempool + if is_tracked(tx_hash, mempool, metrics).await { + return response::CheckTx::default(); + } + + // perform stateless checks + let signed_tx = match stateless_checks(tx, &state, metrics).await { + Ok(signed_tx) => signed_tx, + Err(rsp) => return rsp, + }; + + // attempt to insert the transaction into the mempool + if let Err(rsp) = insert_into_mempool(mempool, &state, signed_tx, metrics).await { + return rsp; + } + + // insertion successful + metrics.set_transactions_in_mempool_total(mempool.len().await); + + response::CheckTx::default() +} + +/// Checks if the transaction is already in the mempool. +async fn is_tracked(tx_hash: [u8; 32], mempool: &AppMempool, metrics: &Metrics) -> bool { + let start_tracked_check = Instant::now(); + + let result = mempool.is_tracked(tx_hash).await; + + metrics.record_check_tx_duration_seconds_check_tracked(start_tracked_check.elapsed()); + + result +} + +/// Checks if the transaction has been removed from the appside mempool. +/// +/// Returns an `Err(response::CheckTx)` with an error code and message if the transaction has been +/// removed from the appside mempool. +async fn check_removed_comet_bft( + tx_hash: [u8; 32], + mempool: &AppMempool, + metrics: &Metrics, +) -> Result<(), response::CheckTx> { + let start_removal_check = Instant::now(); + + // check if the transaction has been removed from the appside mempool and handle + // the removal reason + if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { + match removal_reason { + RemovalReason::Expired => { + metrics.increment_check_tx_removed_expired(); + return Err(removal_reason.into_check_tx_response()); + } + RemovalReason::FailedPrepareProposal(_) => { + metrics.increment_check_tx_removed_failed_execution(); + return Err(removal_reason.into_check_tx_response()); + } + _ => return Err(removal_reason.into_check_tx_response()), + } + }; + + metrics.record_check_tx_duration_seconds_check_removed(start_removal_check.elapsed()); + + Ok(()) +} + +/// Performs stateless checks on the transaction. +/// +/// Returns an `Err(response::CheckTx)` if the transaction fails any of the checks. +/// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. +async fn stateless_checks( + tx: Bytes, + state: &S, + metrics: &'static Metrics, +) -> Result { + let start_parsing = Instant::now(); + + let tx_len = tx.len(); + + if tx_len > MAX_TX_SIZE { + metrics.increment_check_tx_removed_too_large(); + return Err(error_response( + AbciErrorCode::TRANSACTION_TOO_LARGE, + format!( + "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", + tx.len() + ), + )); + } + + let raw_signed_tx = match raw::SignedTransaction::decode(tx) { + Ok(tx) => tx, + Err(e) => { + return Err(error_response( + AbciErrorCode::INVALID_PARAMETER, + format!("failed decoding bytes as a protobuf SignedTransaction: {e:#}"), + )); + } + }; + let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { + Ok(tx) => tx, + Err(e) => { + return Err(error_response( + AbciErrorCode::INVALID_PARAMETER, + format!( + "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ + the signature was invalid: {e:#}" + ), + )); + } + }; + + let finished_parsing = Instant::now(); + metrics.record_check_tx_duration_seconds_parse_tx( + finished_parsing.saturating_duration_since(start_parsing), + ); + + if let Err(e) = signed_tx.check_stateless().await { + metrics.increment_check_tx_removed_failed_stateless(); + return Err(error_response( + AbciErrorCode::INVALID_PARAMETER, + format!("transaction failed stateless check: {e:#}"), + )); + }; + + let finished_check_stateless = Instant::now(); + metrics.record_check_tx_duration_seconds_check_stateless( + finished_check_stateless.saturating_duration_since(finished_parsing), + ); + + if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await { + return Err(error_response( + AbciErrorCode::INVALID_CHAIN_ID, + format!("failed verifying chain id: {e:#}"), + )); + } + + metrics.record_check_tx_duration_seconds_check_chain_id(finished_check_stateless.elapsed()); + + // NOTE: decide if worth moving to post-insertion, would have to recalculate cost + metrics.record_transaction_in_mempool_size_bytes(tx_len); + + Ok(signed_tx) +} + +/// Attempts to insert the transaction into the mempool. +/// +/// Returns a `Err(response::CheckTx)` with an error code and message if the transaction fails +/// insertion into the mempool. +async fn insert_into_mempool( + mempool: &AppMempool, + state: &S, + signed_tx: SignedTransaction, + metrics: &'static Metrics, +) -> Result<(), response::CheckTx> { + let start_convert_address = Instant::now(); + + // TODO: just use address bytes directly https://github.com/astriaorg/astria/issues/1620 + // generate address for the signed transaction + let address = match state + .try_base_prefixed(signed_tx.verification_key().address_bytes()) + .await + .context("failed to generate address for signed transaction") + { + Err(err) => { + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to generate address because: {err:#}"), + )); + } + Ok(address) => address, + }; + + let finished_convert_address = Instant::now(); + metrics.record_check_tx_duration_seconds_convert_address( + finished_convert_address.saturating_duration_since(start_convert_address), + ); + + // fetch current account nonce + let current_account_nonce = match state + .get_account_nonce(&address) + .await + .wrap_err("failed fetching nonce for account") + { + Err(err) => { + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch account nonce because: {err:#}"), + )); + } + Ok(nonce) => nonce, + }; + + let finished_fetch_nonce = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_nonce( + finished_fetch_nonce.saturating_duration_since(finished_convert_address), + ); + + // grab cost of transaction + let transaction_cost = match transaction::get_total_transaction_cost(&signed_tx, &state) + .await + .context("failed fetching cost of the transaction") + { + Err(err) => { + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch cost of the transaction because: {err:#}"), + )); + } + Ok(transaction_cost) => transaction_cost, + }; + + let finished_fetch_tx_cost = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_tx_cost( + finished_fetch_tx_cost.saturating_duration_since(finished_fetch_nonce), + ); + + // grab current account's balances + let current_account_balance: HashMap = + match get_account_balances(&state, &address) + .await + .with_context(|| "failed fetching balances for account `{address}`") + { + Err(err) => { + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch account balances because: {err:#}"), + )); + } + Ok(account_balance) => account_balance, + }; + + let finished_fetch_balances = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_balances( + finished_fetch_balances.saturating_duration_since(finished_fetch_tx_cost), + ); + + let actions_count = signed_tx.actions().len(); + + if let Err(err) = mempool + .insert( + Arc::new(signed_tx), + current_account_nonce, + current_account_balance, + transaction_cost, + ) + .await + { + return Err(err.into_check_tx_response()); + } + + metrics + .record_check_tx_duration_seconds_insert_to_app_mempool(finished_fetch_balances.elapsed()); + metrics.record_actions_per_transaction_in_mempool(actions_count); + + Ok(()) +} diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs new file mode 100644 index 0000000000..b6511e9c67 --- /dev/null +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -0,0 +1,168 @@ +use std::num::NonZeroU32; + +use prost::Message as _; +use telemetry::Metrics; +use tendermint::{ + abci::{ + request::CheckTxKind, + Code, + }, + v0_38::abci::request::CheckTx, +}; + +use crate::{ + app::{ + test_utils::MockTxBuilder, + App, + }, + mempool::{ + Mempool, + RemovalReason, + }, +}; + +#[tokio::test] +async fn future_nonces_are_accepted() { + // The mempool should allow future nonces. + let storage = cnidarium::TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); + let genesis_state = crate::app::test_utils::genesis_state(); + + app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) + .await + .unwrap(); + app.commit(storage.clone()).await; + + let the_future_nonce = 10; + let tx = MockTxBuilder::new().nonce(the_future_nonce).build(); + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::New, + }; + + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); + + // mempool should contain single transaction still + assert_eq!(mempool.len().await, 1); +} + +#[tokio::test] +async fn rechecks_pass() { + // The mempool should not fail rechecks of transactions. + let storage = cnidarium::TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); + let genesis_state = crate::app::test_utils::genesis_state(); + + app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) + .await + .unwrap(); + app.commit(storage.clone()).await; + + let tx = MockTxBuilder::new().nonce(0).build(); + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::New, + }; + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); + + // recheck also passes + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::Recheck, + }; + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); + + // mempool should contain single transaction still + assert_eq!(mempool.len().await, 1); +} + +#[tokio::test] +async fn can_reinsert_after_recheck_fail() { + // The mempool should be able to re-insert a transaction after a recheck fails due to the + // transaction being removed from the appside mempool. This is to allow users to re-insert + // if they wish to do so. + let storage = cnidarium::TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); + let genesis_state = crate::app::test_utils::genesis_state(); + + app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) + .await + .unwrap(); + app.commit(storage.clone()).await; + + let tx = MockTxBuilder::new().nonce(0).build(); + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::New, + }; + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); + + // remove the transaction from the mempool to make recheck fail + mempool + .remove_tx_invalid(tx.clone(), RemovalReason::Expired) + .await; + + // see recheck fails + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::Recheck, + }; + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Err(NonZeroU32::new(9).unwrap()), "{rsp:#?}"); + + // can re-insert the transaction after first recheck fail + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::New, + }; + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); +} + +#[tokio::test] +async fn recheck_adds_non_tracked_tx() { + // The mempool should be able to insert a transaction on recheck if it isn't in the mempool. + // This could happen in the case of a sequencer restart as the cometbft mempool persists but + // the appside one does not. + let storage = cnidarium::TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); + let genesis_state = crate::app::test_utils::genesis_state(); + + app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) + .await + .unwrap(); + app.commit(storage.clone()).await; + + let tx = MockTxBuilder::new().nonce(0).build(); + let req = CheckTx { + tx: tx.to_raw().encode_to_vec().into(), + kind: CheckTxKind::Recheck, + }; + + // recheck should pass and add transaction to mempool + let rsp = super::handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await; + assert_eq!(rsp.code, Code::Ok, "{rsp:#?}"); + + // mempool should contain single transaction still + assert_eq!(mempool.len().await, 1); +} diff --git a/crates/astria-sequencer/src/transaction/checks.rs b/crates/astria-sequencer/src/transaction/checks.rs index 07858213ea..6ce301fa44 100644 --- a/crates/astria-sequencer/src/transaction/checks.rs +++ b/crates/astria-sequencer/src/transaction/checks.rs @@ -25,32 +25,11 @@ use tracing::instrument; use crate::{ accounts::StateReadExt as _, - address::StateReadExt as _, app::StateReadExt as _, bridge::StateReadExt as _, ibc::StateReadExt as _, }; -#[instrument(skip_all)] -pub(crate) async fn check_nonce_mempool( - tx: &SignedTransaction, - state: &S, -) -> Result<()> { - let signer_address = state - .try_base_prefixed(tx.verification_key().address_bytes()) - .await - .wrap_err( - "failed constructing the signer address from signed transaction verification and \ - prefix provided by app state", - )?; - let curr_nonce = state - .get_account_nonce(&signer_address) - .await - .wrap_err("failed to get account nonce")?; - ensure!(tx.nonce() >= curr_nonce, "nonce already used by account"); - Ok(()) -} - #[instrument(skip_all)] pub(crate) async fn check_chain_id_mempool( tx: &SignedTransaction, diff --git a/crates/astria-sequencer/src/transaction/mod.rs b/crates/astria-sequencer/src/transaction/mod.rs index 6b616f7b41..47225cacfb 100644 --- a/crates/astria-sequencer/src/transaction/mod.rs +++ b/crates/astria-sequencer/src/transaction/mod.rs @@ -20,7 +20,6 @@ use astria_eyre::{ pub(crate) use checks::{ check_balance_for_total_fees_and_transfers, check_chain_id_mempool, - check_nonce_mempool, get_total_transaction_cost, }; use cnidarium::StateWrite;