diff --git a/Cargo.lock b/Cargo.lock index cf34633e..35791034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1978,6 +1978,7 @@ dependencies = [ "miden-processor", "miden-stdlib", "miden-tx", + "rand", "rand_chacha", "serde", "thiserror", diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 1d1476fd..5355d30f 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -41,6 +41,7 @@ miden-lib = { workspace = true, features = ["testing"] } miden-node-test-macro = { path = "../test-macro" } miden-objects = { workspace = true, features = ["testing"] } miden-tx = { workspace = true, features = ["testing"] } +rand = { version = "0.8.5" } rand_chacha = { version = "0.3", default-features = false } tokio = { workspace = true, features = ["test-util"] } winterfell = { version = "0.9" } diff --git a/crates/block-producer/src/batch_builder/batch.rs b/crates/block-producer/src/batch_builder/batch.rs index 6490a95a..e8a13fb3 100644 --- a/crates/block-producer/src/batch_builder/batch.rs +++ b/crates/block-producer/src/batch_builder/batch.rs @@ -22,7 +22,8 @@ pub type BatchId = Blake3Digest<32>; // TRANSACTION BATCH // ================================================================================================ -/// A batch of transactions that share a common proof. +/// A batch of transactions that share a common proof. For any given account, at most 1 transaction +/// in the batch must be addressing that account (issue: #186). /// /// Note: Until recursive proofs are available in the Miden VM, we don't include the common proof. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index db4d7807..65f69300 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -1,15 +1,24 @@ -use std::{cmp::min, collections::BTreeSet, num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{ + cmp::min, + collections::{BTreeMap, BTreeSet}, + num::NonZeroUsize, + sync::Arc, + time::Duration, +}; use miden_objects::{ + accounts::AccountId, notes::NoteId, transaction::{OutputNote, TransactionId}, + Digest, }; -use tokio::{sync::Mutex, time}; +use tokio::{sync::Mutex, task::JoinSet, time}; use tonic::async_trait; use tracing::{debug, info, instrument, Span}; use crate::{ block_builder::BlockBuilder, + domain::transaction::AuthenticatedTransaction, mempool::{BatchJobId, Mempool}, ProvenTransaction, SharedRwVec, COMPONENT, }; @@ -219,41 +228,74 @@ pub struct BatchProducer { pub workers: NonZeroUsize, pub mempool: Arc>, pub tx_per_batch: usize, + pub simulated_proof_time: Duration, } -type BatchResult = Result; +type BatchResult = Result<(BatchJobId, TransactionBatch), (BatchJobId, BuildBatchError)>; /// Wrapper around tokio's JoinSet that remains pending if the set is empty, /// instead of returning None. -struct WorkerPool(tokio::task::JoinSet); +struct WorkerPool { + in_progress: JoinSet, + simulated_proof_time: Duration, +} impl WorkerPool { + fn new(simulated_proof_time: Duration) -> Self { + Self { + simulated_proof_time, + in_progress: JoinSet::new(), + } + } + async fn join_next(&mut self) -> Result { - if self.0.is_empty() { + if self.in_progress.is_empty() { std::future::pending().await } else { // Cannot be None as its not empty. - self.0.join_next().await.unwrap() + self.in_progress.join_next().await.unwrap() } } fn len(&self) -> usize { - self.0.len() + self.in_progress.len() } - fn spawn(&mut self, id: BatchJobId, transactions: Vec) { - self.0.spawn(async move { - todo!("Do actual work like aggregating transaction data"); + fn spawn(&mut self, id: BatchJobId, transactions: Vec) { + self.in_progress.spawn({ + let simulated_proof_time = self.simulated_proof_time; + async move { + tracing::debug!("Begin proving batch."); + + let transactions = + transactions.into_iter().map(AuthenticatedTransaction::into_raw).collect(); + + let batch = TransactionBatch::new(transactions, Default::default()) + .map_err(|err| (id, err))?; + + tokio::time::sleep(simulated_proof_time).await; + tracing::debug!("Batch proof completed."); + + Ok((id, batch)) + } }); } } impl BatchProducer { + /// Starts the [BlockProducer], infinitely producing blocks at the configured interval. + /// + /// Block production is sequential and consists of + /// + /// 1. Pulling the next set of batches from the [Mempool] + /// 2. Compiling these batches into the next block + /// 3. Proving the block (this is not yet implemented) + /// 4. Committing the block to the store pub async fn run(self) { let mut interval = tokio::time::interval(self.batch_interval); interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); - let mut inflight = WorkerPool(tokio::task::JoinSet::new()); + let mut inflight = WorkerPool::new(self.simulated_proof_time); loop { tokio::select! { @@ -285,8 +327,8 @@ impl BatchProducer { tracing::warn!(%batch_id, %err, "Batch job failed."); mempool.batch_failed(batch_id); }, - Ok(Ok(batch_id)) => { - mempool.batch_proved(batch_id); + Ok(Ok((batch_id, batch))) => { + mempool.batch_proved(batch_id, batch); } } } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 111c7c3d..9d9deda0 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeSet, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use async_trait::async_trait; use miden_node_utils::formatting::{format_array, format_blake3_digest}; @@ -146,12 +149,13 @@ where } } -struct BlockProducer { +struct BlockProducer { pub mempool: Arc>, pub block_interval: tokio::time::Duration, + pub block_builder: BB, } -impl BlockProducer { +impl BlockProducer { pub async fn run(self) { let mut interval = tokio::time::interval(self.block_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -160,8 +164,9 @@ impl BlockProducer { interval.tick().await; let (block_number, batches) = self.mempool.lock().await.select_block(); + let batches = batches.into_values().collect::>(); - let result = self.build_and_commit_block(batches).await; + let result = self.block_builder.build_block(&batches).await; let mut mempool = self.mempool.lock().await; match result { @@ -170,8 +175,4 @@ impl BlockProducer { } } } - - async fn build_and_commit_block(&self, batches: BTreeSet) -> Result<(), ()> { - todo!("Aggregate, prove and commit block"); - } } diff --git a/crates/block-producer/src/domain/mod.rs b/crates/block-producer/src/domain/mod.rs new file mode 100644 index 00000000..37f08066 --- /dev/null +++ b/crates/block-producer/src/domain/mod.rs @@ -0,0 +1 @@ +pub mod transaction; diff --git a/crates/block-producer/src/domain/transaction.rs b/crates/block-producer/src/domain/transaction.rs new file mode 100644 index 00000000..756af859 --- /dev/null +++ b/crates/block-producer/src/domain/transaction.rs @@ -0,0 +1,151 @@ +use std::collections::BTreeSet; + +use miden_objects::{ + accounts::AccountId, + notes::{NoteId, Nullifier}, + transaction::{ProvenTransaction, TransactionId, TxAccountUpdate}, + Digest, +}; + +use crate::{errors::VerifyTxError, mempool::BlockNumber, store::TransactionInputs}; + +/// A transaction who's proof has been verified, and which has been authenticated against the store. +/// +/// Authentication ensures that all nullifiers are unspent, and additionally authenticates some +/// previously unauthenticated input notes. +/// +/// Note that this is of course only valid for the chain height of the authentication. +#[derive(Clone, PartialEq)] +pub struct AuthenticatedTransaction { + inner: ProvenTransaction, + /// The account state provided by the store [inputs](TransactionInputs). + /// + /// This does not necessarily have to match the transaction's initial state + /// as this may still be modified by inflight transactions. + store_account_state: Option, + /// Unauthenticates notes that have now been authenticated by the store + /// [inputs](TransactionInputs). + /// + /// In other words, notes which were unauthenticated at the time the transaction was proven, + /// but which have since been committed to, and authenticated by the store. + notes_authenticated_by_store: BTreeSet, + /// Chain height that the authentication took place at. + authentication_height: BlockNumber, +} + +impl AuthenticatedTransaction { + /// Verifies the transaction against the inputs, enforcing that all nullifiers are unspent. + /// + /// __No__ proof verification is peformed. The caller takes responsibility for ensuring + /// that the proof is valid. + /// + /// # Errors + /// + /// Returns an error if any of the transaction's nullifiers are marked as spent by the inputs. + pub fn new( + tx: ProvenTransaction, + inputs: TransactionInputs, + ) -> Result { + let nullifiers_already_spent = tx + .get_nullifiers() + .filter(|nullifier| inputs.nullifiers.get(nullifier).cloned().flatten().is_some()) + .collect::>(); + if !nullifiers_already_spent.is_empty() { + return Err(VerifyTxError::InputNotesAlreadyConsumed(nullifiers_already_spent)); + } + + // Invert the missing notes; i.e. we now know the rest were actually found. + let authenticated_notes = tx + .get_unauthenticated_notes() + .map(|header| header.id()) + .filter(|note_id| !inputs.missing_unauthenticated_notes.contains(note_id)) + .collect(); + + Ok(AuthenticatedTransaction { + inner: tx, + notes_authenticated_by_store: authenticated_notes, + authentication_height: BlockNumber::new(inputs.current_block_height), + store_account_state: inputs.account_hash, + }) + } + + pub fn id(&self) -> TransactionId { + self.inner.id() + } + + pub fn account_id(&self) -> AccountId { + self.inner.account_id() + } + + pub fn account_update(&self) -> &TxAccountUpdate { + self.inner.account_update() + } + + pub fn store_account_state(&self) -> Option { + self.store_account_state + } + + pub fn authentication_height(&self) -> BlockNumber { + self.authentication_height + } + + pub fn nullifiers(&self) -> impl Iterator + '_ { + self.inner.get_nullifiers() + } + + pub fn output_notes(&self) -> impl Iterator + '_ { + self.inner.output_notes().iter().map(|note| note.id()) + } + + /// Notes which were unauthenticate in the transaction __and__ which were + /// not authenticated by the store inputs. + pub fn unauthenticated_notes(&self) -> impl Iterator + '_ { + self.inner + .get_unauthenticated_notes() + .cloned() + .map(|header| header.id()) + .filter(|note_id| !self.notes_authenticated_by_store.contains(note_id)) + } + + pub fn into_raw(self) -> ProvenTransaction { + self.inner + } +} + +#[cfg(test)] +impl AuthenticatedTransaction { + //! Builder methods intended for easier test setup. + + /// Short-hand for `Self::new` where the input's are setup to match the transaction's initial + /// account state. + pub fn from_inner(inner: ProvenTransaction) -> Self { + let store_account_state = match inner.account_update().init_state_hash() { + zero if zero == Digest::default() => None, + non_zero => Some(non_zero), + }; + Self { + inner, + store_account_state, + notes_authenticated_by_store: Default::default(), + authentication_height: Default::default(), + } + } + + /// Overrides the authentication height with the given value. + pub fn with_authentication_height(mut self, height: u32) -> Self { + self.authentication_height = BlockNumber::new(height); + self + } + + /// Overrides the store state with the given value. + pub fn with_store_state(mut self, state: Digest) -> Self { + self.store_account_state = Some(state); + self + } + + /// Unsets the store state. + pub fn with_empty_store_state(mut self) -> Self { + self.store_account_state = None; + self + } +} diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index a963236d..4791971a 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeSet; - use miden_node_proto::errors::ConversionError; use miden_node_utils::formatting::format_opt; use miden_objects::{ @@ -11,7 +9,6 @@ use miden_objects::{ MAX_BATCHES_PER_BLOCK, MAX_INPUT_NOTES_PER_BATCH, MAX_OUTPUT_NOTES_PER_BATCH, }; use miden_processor::ExecutionError; -use miden_tx::TransactionVerifierError; use thiserror::Error; use crate::mempool::BlockNumber; @@ -32,6 +29,9 @@ pub enum VerifyTxError { )] UnauthenticatedNotesNotFound(Vec), + #[error("Output note IDs already used: {0:?}")] + OutputNotesAlreadyExist(Vec), + /// The account's initial hash did not match the current account's hash #[error("Incorrect account's initial hash ({tx_initial_account_hash}, current: {})", format_opt(.current_account_hash.as_ref()))] IncorrectAccountInitialHash { @@ -61,31 +61,36 @@ pub enum VerifyTxError { pub enum AddTransactionError { #[error("Transaction verification failed: {0}")] VerificationFailed(#[from] VerifyTxError), -} -#[derive(thiserror::Error, Debug, PartialEq)] -pub enum AddTransactionErrorRework { - #[error("Transaction's initial account state {expected} did not match the current account state {current}.")] - InvalidAccountState { current: Digest, expected: Digest }, - #[error("Transaction input data is stale. Required data fresher than {stale_limit} but inputs are from {input_block}.")] + #[error("Transaction input data is stale. Required data from {stale_limit} or newer, but inputs are from {input_block}.")] StaleInputs { input_block: BlockNumber, stale_limit: BlockNumber, }, - #[error("Authenticated note nullifier {0} not found.")] - AuthenticatedNoteNotFound(Nullifier), - #[error("Unauthenticated note {0} not found.")] - UnauthenticatedNoteNotFound(NoteId), - #[error("Note nullifiers already consumed: {0:?}")] - NotesAlreadyConsumed(BTreeSet), - #[error(transparent)] - TxInputsError(#[from] TxInputsError), - #[error(transparent)] - ProofVerificationFailed(#[from] TransactionVerifierError), - #[error("Failed to deserialize transaction: {0}.")] + + #[error("Deserialization failed: {0}")] DeserializationError(String), } +impl From for tonic::Status { + fn from(value: AddTransactionError) -> Self { + use AddTransactionError::*; + match value { + VerificationFailed(VerifyTxError::InputNotesAlreadyConsumed(_)) + | VerificationFailed(VerifyTxError::UnauthenticatedNotesNotFound(_)) + | VerificationFailed(VerifyTxError::OutputNotesAlreadyExist(_)) + | VerificationFailed(VerifyTxError::IncorrectAccountInitialHash { .. }) + | VerificationFailed(VerifyTxError::InvalidTransactionProof(_)) + | DeserializationError(_) => Self::invalid_argument(value.to_string()), + + // Internal errors which should not be communicated to the user. + VerificationFailed(VerifyTxError::TransactionInputError(_)) + | VerificationFailed(VerifyTxError::StoreConnectionFailed(_)) + | StaleInputs { .. } => Self::internal("Internal error"), + } + } +} + // Batch building errors // ================================================================================================= diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index d40ce14c..421718ce 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -12,6 +12,7 @@ pub mod test_utils; mod batch_builder; mod block_builder; +mod domain; mod errors; mod mempool; mod state_view; diff --git a/crates/block-producer/src/mempool/batch_graph.rs b/crates/block-producer/src/mempool/batch_graph.rs index fe963b02..a6984927 100644 --- a/crates/block-producer/src/mempool/batch_graph.rs +++ b/crates/block-producer/src/mempool/batch_graph.rs @@ -4,6 +4,10 @@ use miden_objects::transaction::TransactionId; use miden_tx::utils::collections::KvMap; use super::BatchJobId; +use crate::batch_builder::batch::TransactionBatch; + +// BATCH GRAPH +// ================================================================================================ #[derive(Default, Clone)] pub struct BatchGraph { @@ -44,7 +48,7 @@ impl BatchGraph { // Insert the new node into the graph. let batch = Node { - status: Status::InFlight, + status: Status::Queued, transactions, parents, children: Default::default(), @@ -53,12 +57,12 @@ impl BatchGraph { // New node might be a root. // - // This could be optimised by inlining this inside the parent loop. This would prevent the + // This could be optimized by inlining this inside the parent loop. This would prevent the // double iteration over parents, at the cost of some code duplication. self.try_make_root(id); } - /// Removes the batches and all their descendents from the graph. + /// Removes the batches and all their descendants from the graph. /// /// Returns all removed batches and their transactions. pub fn purge_subgraphs( @@ -76,8 +80,8 @@ impl BatchGraph { continue; }; - // All the child batches are also removed so no need to chec - // for new roots. No new roots are possible as a result of this subgraph removal. + // All the child batches are also removed so no need to check for new roots. No new + // roots are possible as a result of this subgraph removal. self.roots.remove(&node_id); for transaction in &node.transactions { @@ -86,8 +90,8 @@ impl BatchGraph { // Inform parent that this child no longer exists. // - // The same is not required for children of this batch as we will - // be removing those as well. + // The same is not required for children of this batch as we will be removing those as + // well. for parent in &node.parents { // Parent could already be removed as part of this subgraph removal. if let Some(parent) = self.nodes.get_mut(parent) { @@ -102,7 +106,7 @@ impl BatchGraph { removed } - /// Removes a set of batches from the graph without removing any descendents. + /// Removes a set of batches from the graph without removing any descendants. /// /// This is intended to cull completed batches from stale blocs. pub fn remove_committed(&mut self, batches: BTreeSet) -> Vec { @@ -110,7 +114,7 @@ impl BatchGraph { for batch in batches { let node = self.nodes.remove(&batch).expect("Node must be in graph"); - assert_eq!(node.status, Status::InBlock); + assert_eq!(node.status, Status::Processed); // Remove batch from graph. No need to update parents as they should be removed in this // call as well. @@ -129,32 +133,37 @@ impl BatchGraph { } /// Mark a batch as proven if it exists. - pub fn mark_proven(&mut self, id: BatchJobId) { - // Its possible for inflight batches to have been removed as part - // of another batches failure. + pub fn mark_proven(&mut self, id: BatchJobId, batch: TransactionBatch) { + // Its possible for inflight batches to have been removed as part of another batches + // failure. if let Some(node) = self.nodes.get_mut(&id) { - node.status = Status::Proven; + assert!(node.status == Status::Queued); + node.status = Status::Proven(batch); self.try_make_root(id); } } /// Returns at most `count` __indepedent__ batches which are ready for inclusion in a block. - pub fn select_block(&mut self, count: usize) -> BTreeSet { - let mut batches = BTreeSet::new(); + pub fn select_block(&mut self, count: usize) -> BTreeMap { + let mut batches = BTreeMap::new(); // Track children so we can evaluate them for root afterwards. let mut children = BTreeSet::new(); - for batch in &self.roots { - let mut node = self.nodes.get_mut(batch).expect("Root node must be in graph"); + for batch_id in &self.roots { + let mut node = self.nodes.get_mut(batch_id).expect("Root node must be in graph"); // Filter out batches which have dependencies in our selection so far. - if batches.union(&node.parents).next().is_some() { + if node.parents.iter().any(|parent| batches.contains_key(parent)) { continue; } - batches.insert(*batch); - node.status = Status::Proven; + let Status::Proven(batch) = node.status.clone() else { + unreachable!("Root batch must be in proven state."); + }; + + batches.insert(*batch_id, batch); + node.status = Status::Processed; if batches.len() == count { break; @@ -177,7 +186,7 @@ impl BatchGraph { for parent in node.parents.clone() { let parent = self.nodes.get(&parent).expect("Parent must be in pool"); - if parent.status != Status::InBlock { + if parent.status != Status::Processed { return; } } @@ -193,9 +202,13 @@ struct Node { children: BTreeSet, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] enum Status { - InFlight, - Proven, - InBlock, + /// The batch is a busy being proven. + Queued, + /// The batch is proven. It may be placed in a block + /// __IFF__ all of its parents are already in a block. + Proven(TransactionBatch), + /// Batch is part of a block. + Processed, } diff --git a/crates/block-producer/src/mempool/inflight_state.rs b/crates/block-producer/src/mempool/inflight_state.rs deleted file mode 100644 index d2df8d2b..00000000 --- a/crates/block-producer/src/mempool/inflight_state.rs +++ /dev/null @@ -1,314 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet, VecDeque}, - sync::Arc, -}; - -use miden_objects::{ - accounts::AccountId, - notes::Nullifier, - transaction::{ProvenTransaction, TransactionId}, - Digest, -}; - -use crate::{errors::AddTransactionErrorRework, store::TransactionInputs}; - -/// Tracks the inflight state of the mempool. This includes recently committed blocks. -/// -/// Allows appending and reverting transactions as well as marking them -/// as part of a committed block. Committed state can also be pruned once the -/// state is considered past the stale threshold. -#[derive(Default)] -pub struct InflightState { - /// Non-empty inflight account states. - /// - /// Accounts which are [AccountStatus::Empty] are immedietely pruned. - accounts: BTreeMap, - - /// Nullifiers emitted by inflight transactions and recently committed blocks. - nullifiers: BTreeSet, -} - -/// Describes the impact that a set of transactions had on the state. -/// -/// TODO: this is a terrible name. -pub struct StateDiff { - /// The number of transactions that affected each account. - account_transactions: BTreeMap, - - /// The nullifiers that were emitted by the transactions. - nullifiers: BTreeSet, - // TODO: input/output notes -} - -impl StateDiff { - pub fn new(txs: &[Arc]) -> Self { - let mut account_transactions = BTreeMap::::new(); - let mut nullifiers = BTreeSet::new(); - - for tx in txs { - *account_transactions.entry(tx.account_id()).or_default() += 1; - nullifiers.extend(tx.get_nullifiers()); - } - - Self { account_transactions, nullifiers } - } -} - -impl InflightState { - /// Appends the transaction to the inflight state. - /// - /// This operation is atomic i.e. a rejected transaction has no impact of the state. - pub fn add_transaction( - &mut self, - tx: &ProvenTransaction, - inputs: &TransactionInputs, - ) -> Result, AddTransactionErrorRework> { - // Separate verification and state mutation so that a rejected transaction - // does not impact the state (atomicity). - self.verify_transaction(tx, inputs)?; - - let parents = self.insert_transaction(tx); - - Ok(parents) - } - - fn verify_transaction( - &mut self, - tx: &ProvenTransaction, - inputs: &TransactionInputs, - ) -> Result<(), AddTransactionErrorRework> { - // Ensure current account state is correct. - let current = self - .accounts - .get(&tx.account_id()) - .and_then(|account_state| account_state.latest_state()) - .copied() - .or(inputs.account_hash) - .unwrap_or_default(); - let expected = tx.account_update().init_state_hash(); - - if expected != current { - return Err(AddTransactionErrorRework::InvalidAccountState { current, expected }); - } - - // Ensure nullifiers aren't already present. - // TODO: Verifying the inputs nullifiers should be done externally already. - // TODO: The above should cause a change in type for inputs indicating this. - let input_nullifiers = tx.get_nullifiers().collect::>(); - let double_spend = - self.nullifiers.union(&input_nullifiers).copied().collect::>(); - if !double_spend.is_empty() { - return Err(AddTransactionErrorRework::NotesAlreadyConsumed(double_spend)); - } - - // TODO: additional input and output note checks, depends on the transaction type changes. - - Ok(()) - } - - /// Aggregate the transaction into the state, returning its parent transactions. - fn insert_transaction(&mut self, tx: &ProvenTransaction) -> BTreeSet { - let account_parent = self - .accounts - .entry(tx.account_id()) - .or_default() - .insert(tx.account_update().final_state_hash(), tx.id()); - - self.nullifiers.extend(tx.get_nullifiers()); - - // TODO: input and output notes - - account_parent.into_iter().collect() - } - - /// Reverts the given state diff. - /// - /// # Panics - /// - /// Panics if any part of the diff isn't present in the state. Callers should take - /// care to only revert transaction sets who's ancestors are all either committed or reverted. - pub fn revert_transactions(&mut self, diff: StateDiff) { - for (account, count) in diff.account_transactions { - let status = self.accounts.get_mut(&account).expect("Account must exist").revert(count); - - // Prune empty accounts. - if status.is_empty() { - self.accounts.remove(&account); - } - } - - for nullifier in diff.nullifiers { - assert!(self.nullifiers.remove(&nullifier), "Nullifier must exist"); - } - - // TODO: input and output notes - } - - /// Marks the given state diff as committed. - /// - /// These transactions are no longer considered inflight. Callers should take care to only - /// commit transactions who's ancestors are all committed. - /// - /// # Panics - /// - /// Panics if the accounts don't have enough inflight transactions to commit. - pub fn commit_transactions(&mut self, diff: &StateDiff) { - for (account, count) in &diff.account_transactions { - self.accounts.get_mut(account).expect("Account must exist").commit(*count); - } - } - - /// Drops the given state diff from memory. - /// - /// # Panics - /// - /// Panics if the accounts don't have enough inflight transactions to commit. - pub fn prune_committed_state(&mut self, diff: StateDiff) { - for (account, count) in diff.account_transactions { - let status = self - .accounts - .get_mut(&account) - .expect("Account must exist") - .remove_commited(count); - - // Prune empty accounts. - if status.is_empty() { - self.accounts.remove(&account); - } - } - - for nullifier in diff.nullifiers { - self.nullifiers.remove(&nullifier); - } - } -} - -/// Tracks the state of a single account. -#[derive(Default)] -struct AccountState { - /// State progression of this account over all uncommitted inflight transactions. - /// - /// Ordering is chronological from front (oldest) to back (newest). - inflight: VecDeque<(Digest, TransactionId)>, - - /// The latest committed state. - /// - /// Only valid if the committed count is greater than zero. - committed_state: Digest, - - /// The number of committed transactions. - /// - /// If this is zero then the committed state is meaningless. - committed_count: usize, -} - -impl AccountState { - /// Inserts a new transaction and its state, returning the previous inflight transaction, if - /// any. - pub fn insert(&mut self, state: Digest, tx: TransactionId) -> Option { - let previous = self.inflight.back().map(|(_, tx)| tx).copied(); - - self.inflight.push_back((state, tx)); - - previous - } - - /// The latest state of this account. - pub fn latest_state(&self) -> Option<&Digest> { - self.inflight - .back() - .map(|(state, _)| state) - // A count of zero indicates no committed state. - .or((self.committed_count > 1).then_some(&self.committed_state)) - } - - /// Reverts the most recent `n` inflight transactions. - /// - /// # Returns - /// - /// Returns the emptyness state of the account. - /// - /// # Panics - /// - /// Panics if there are less than `n` inflight transactions. - pub fn revert(&mut self, n: usize) -> AccountStatus { - let length = self.inflight.len(); - assert!( - length >= n, "Attempted to revert {n} transactions which is more than the {length} which are inflight.", - ); - - self.inflight.drain(length - n..); - - self.emptyness() - } - - /// Commits the first `n` inflight transactions. - /// - /// # Panics - /// - /// Panics if there are less than `n` inflight transactions. - pub fn commit(&mut self, n: usize) { - if n == 0 { - return; - } - - let length = self.inflight.len(); - assert!( - length >= n, "Attempted to revert {n} transactions which is more than the {length} which are inflight.", - ); - - self.committed_state = self - .inflight - .drain(..n) - .last() - .map(|(state, _)| state) - .expect("Must be Some as n > 0"); - self.committed_count += n; - } - - /// Removes `n` committed transactions. - /// - /// # Returns - /// - /// Returns the emptyness state of the account. - /// - /// # Panics - /// - /// Panics if there are less than `n` committed transactions. - pub fn remove_commited(&mut self, n: usize) -> AccountStatus { - assert!( - n <= self.committed_count, - "Attempted to remove {n} committed transactions, but only {} are present.", - self.committed_count - ); - self.committed_count -= n; - - self.emptyness() - } - - fn emptyness(&self) -> AccountStatus { - if self.inflight.is_empty() && self.committed_count == 0 { - AccountStatus::Empty - } else { - AccountStatus::NonEmpty - } - } -} - -/// Describes the emptyness of an [AccountState]. -/// -/// Is marked as #[must_use] so that callers handle prune empty accounts. -#[must_use] -#[derive(Clone, Copy, PartialEq, Eq)] -enum AccountStatus { - /// [AccountState] contains no state and should be pruned. - Empty, - /// [AccountState] contains state and should be kept. - NonEmpty, -} - -impl AccountStatus { - fn is_empty(&self) -> bool { - *self == Self::Empty - } -} diff --git a/crates/block-producer/src/mempool/inflight_state/account_state.rs b/crates/block-producer/src/mempool/inflight_state/account_state.rs new file mode 100644 index 00000000..d2888d03 --- /dev/null +++ b/crates/block-producer/src/mempool/inflight_state/account_state.rs @@ -0,0 +1,336 @@ +use std::{ + collections::{BTreeMap, BTreeSet, VecDeque}, + sync::Arc, +}; + +use miden_objects::{ + accounts::AccountId, + notes::{NoteId, Nullifier}, + transaction::{OutputNote, OutputNotes, ProvenTransaction, TransactionId}, + Digest, +}; + +use crate::{ + errors::{AddTransactionError, VerifyTxError}, + store::TransactionInputs, +}; + +// IN-FLIGHT ACCOUNT STATE +// ================================================================================================ + +/// Tracks the inflight state of an account. +#[derive(Clone, Default, Debug, PartialEq)] +pub struct InflightAccountState { + /// This account's state transitions due to inflight transactions. + /// + /// Ordering is chronological from front (oldest) to back (newest). + states: VecDeque<(Digest, TransactionId)>, + + /// The number of inflight states that have been committed. + /// + /// This acts as a pivot index for `self.states`, splitting it into two segments. The first + /// contains committed states and the second those that are uncommitted. + committed: usize, +} + +impl InflightAccountState { + /// Appends the new state, returning the previous state's transaction ID __IFF__ it is + /// uncommitted. + pub fn insert(&mut self, state: Digest, tx: TransactionId) -> Option { + let mut parent = self.states.back().map(|(_, tx)| tx).copied(); + + // Only return uncommitted parent ID. + // + // Since this is the latest state's ID, we need at least one uncommitted state. + if self.uncommitted_count() == 0 { + parent.take(); + } + + self.states.push_back((state, tx)); + + parent + } + + /// The latest state of this account. + pub fn current_state(&self) -> Option<&Digest> { + self.states.back().map(|(state, _)| state) + } + + /// Reverts the most recent `n` uncommitted inflight transactions. + /// + /// # Returns + /// + /// Returns the emptiness state of the account. + /// + /// # Panics + /// + /// Panics if there are less than `n` uncommitted inflight transactions. + pub fn revert(&mut self, n: usize) -> AccountStatus { + let uncommitted = self.uncommitted_count(); + assert!( + uncommitted >= n, "Attempted to revert {n} transactions which is more than the {uncommitted} which are uncommitted.", + ); + + self.states.drain(self.states.len() - n..); + + self.emptiness() + } + + /// Commits the next `n` uncommitted inflight transactions. + /// + /// # Panics + /// + /// Panics if there are less than `n` uncommitted inflight transactions. + pub fn commit(&mut self, n: usize) { + let uncommitted = self.uncommitted_count(); + assert!( + uncommitted >= n, "Attempted to revert {n} transactions which is more than the {uncommitted} which are uncommitted." + ); + + self.committed += n; + } + + /// Removes `n` committed transactions. + /// + /// # Returns + /// + /// Returns the emptiness state of the account. + /// + /// # Panics + /// + /// Panics if there are less than `n` committed transactions. + pub fn prune_committed(&mut self, n: usize) -> AccountStatus { + assert!( + self.committed >= n, + "Attempted to prune {n} transactions, which is more than the {} which are committed", + self.committed + ); + + self.committed -= n; + self.states.drain(..n); + + self.emptiness() + } + + /// This is essentially `is_empty` with the additional benefit that [AccountStatus] is marked + /// as `#[must_use]`, forcing callers to handle empty accounts (which should be pruned). + fn emptiness(&self) -> AccountStatus { + if self.states.is_empty() { + AccountStatus::Empty + } else { + AccountStatus::NonEmpty + } + } + + /// The number of uncommitted inflight transactions. + fn uncommitted_count(&self) -> usize { + self.states.len() - self.committed + } +} + +/// Describes the emptiness of an [AccountState]. +/// +/// Is marked as #[must_use] so that callers handle prune empty accounts. +#[must_use] +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum AccountStatus { + /// [AccountState] contains no state and should be pruned. + Empty, + /// [AccountState] contains state and should be kept. + NonEmpty, +} + +impl AccountStatus { + pub fn is_empty(&self) -> bool { + *self == Self::Empty + } +} + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::Random; + + #[test] + fn current_state_is_the_most_recently_inserted() { + let mut rng = Random::with_random_seed(); + let mut uut = InflightAccountState::default(); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + let expected = rng.draw_digest(); + uut.insert(expected, rng.draw_tx_id()); + + assert_eq!(uut.current_state(), Some(&expected)); + } + + #[test] + fn parent_is_the_most_recently_inserted() { + let mut rng = Random::with_random_seed(); + let mut uut = InflightAccountState::default(); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + let expected = rng.draw_tx_id(); + uut.insert(rng.draw_digest(), expected); + + let parent = uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + assert_eq!(parent, Some(expected)); + } + + #[test] + fn empty_account_has_no_parent() { + let mut rng = Random::with_random_seed(); + let mut uut = InflightAccountState::default(); + let parent = uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + assert!(parent.is_none()); + } + + #[test] + fn fully_committed_account_has_no_parent() { + let mut rng = Random::with_random_seed(); + let mut uut = InflightAccountState::default(); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.commit(1); + let parent = uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + assert!(parent.is_none()); + } + + #[test] + fn uncommitted_account_has_a_parent() { + let mut rng = Random::with_random_seed(); + let expected_parent = rng.draw_tx_id(); + + let mut uut = InflightAccountState::default(); + uut.insert(rng.draw_digest(), expected_parent); + + let parent = uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + assert_eq!(parent, Some(expected_parent)); + } + + #[test] + fn partially_committed_account_has_a_parent() { + let mut rng = Random::with_random_seed(); + let expected_parent = rng.draw_tx_id(); + + let mut uut = InflightAccountState::default(); + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + uut.insert(rng.draw_digest(), expected_parent); + uut.commit(1); + + let parent = uut.insert(rng.draw_digest(), rng.draw_tx_id()); + + assert_eq!(parent, Some(expected_parent)); + } + + #[test] + fn reverted_txs_are_nonextant() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + const REVERT: usize = 2; + + let states = (0..N).map(|_| (rng.draw_digest(), rng.draw_tx_id())).collect::>(); + + let mut uut = InflightAccountState::default(); + for (state, tx) in &states { + uut.insert(*state, *tx); + } + uut.revert(REVERT); + + let mut expected = InflightAccountState::default(); + for (state, tx) in states.iter().rev().skip(REVERT).rev() { + expected.insert(*state, *tx); + } + + assert_eq!(uut, expected); + } + + #[test] + fn pruned_txs_are_nonextant() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + const PRUNE: usize = 2; + + let states = (0..N).map(|_| (rng.draw_digest(), rng.draw_tx_id())).collect::>(); + + let mut uut = InflightAccountState::default(); + for (state, tx) in &states { + uut.insert(*state, *tx); + } + uut.commit(PRUNE); + uut.prune_committed(PRUNE); + + let mut expected = InflightAccountState::default(); + for (state, tx) in states.iter().skip(PRUNE) { + expected.insert(*state, *tx); + } + + assert_eq!(uut, expected); + } + + #[test] + fn is_empty_after_full_commit_and_prune() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + let mut uut = InflightAccountState::default(); + for _ in 0..N { + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + } + + uut.commit(N); + uut.prune_committed(N); + + assert_eq!(uut, Default::default()); + } + + #[test] + fn is_empty_after_full_revert() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + let mut uut = InflightAccountState::default(); + let mut rng = Random::with_random_seed(); + for _ in 0..N { + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + } + + uut.revert(N); + + assert_eq!(uut, Default::default()); + } + + #[test] + #[should_panic] + fn revert_panics_on_out_of_bounds() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + let mut uut = InflightAccountState::default(); + for _ in 0..N { + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + } + + uut.commit(1); + uut.revert(N); + } + + #[test] + #[should_panic] + fn commit_panics_on_out_of_bounds() { + let mut rng = Random::with_random_seed(); + const N: usize = 5; + let mut uut = InflightAccountState::default(); + for _ in 0..N { + uut.insert(rng.draw_digest(), rng.draw_tx_id()); + } + + uut.commit(N + 1); + } +} diff --git a/crates/block-producer/src/mempool/inflight_state/mod.rs b/crates/block-producer/src/mempool/inflight_state/mod.rs new file mode 100644 index 00000000..673a924f --- /dev/null +++ b/crates/block-producer/src/mempool/inflight_state/mod.rs @@ -0,0 +1,653 @@ +use std::{ + collections::{BTreeMap, BTreeSet, VecDeque}, + sync::Arc, +}; + +use miden_objects::{ + accounts::AccountId, + notes::{NoteId, Nullifier}, + transaction::{OutputNote, OutputNotes, ProvenTransaction, TransactionId}, + Digest, +}; + +use crate::{ + domain::transaction::AuthenticatedTransaction, + errors::{AddTransactionError, VerifyTxError}, + store::TransactionInputs, +}; + +mod account_state; + +use account_state::InflightAccountState; + +use super::BlockNumber; + +// IN-FLIGHT STATE +// ================================================================================================ + +/// Tracks the inflight state of the mempool. This includes recently committed blocks. +/// +/// Allows appending and reverting transactions as well as marking them as part of a committed +/// block. Committed state can also be pruned once the state is considered past the stale +/// threshold. +#[derive(Clone, Debug, PartialEq)] +pub struct InflightState { + /// Account states from inflight transactions. + /// + /// Accounts which are [AccountStatus::Empty] are immediately pruned. + accounts: BTreeMap, + + /// Nullifiers produced by the input notes of inflight transactions. + nullifiers: BTreeSet, + + /// Notes created by inflight transactions. + /// + /// Some of these may already be consumed - check the nullifiers. + output_notes: BTreeMap, + + /// Delta's representing the impact of each recently committed blocks on the inflight state. + /// + /// These are used to prune committed state after `num_retained_blocks` have passed. + committed_state: VecDeque, + + /// Amount of recently committed blocks we retain in addition to the inflight state. + /// + /// This provides an overlap between committed and inflight state, giving a grace + /// period for incoming transactions to be verified against both without requiring it + /// to be an atomic action. + num_retained_blocks: usize, + + /// The latest committed block height. + chain_tip: BlockNumber, +} + +/// The aggregated impact of a set of sequential transactions on the [InflightState]. +#[derive(Clone, Default, Debug, PartialEq)] +struct StateDelta { + /// The number of transactions that affected each account. + account_transactions: BTreeMap, + + /// The nullifiers consumed by the transactions. + nullifiers: BTreeSet, + + /// The notes produced by the transactions. + output_notes: BTreeSet, +} + +impl StateDelta { + fn new(txs: &[AuthenticatedTransaction]) -> Self { + let mut account_transactions = BTreeMap::::new(); + let mut nullifiers = BTreeSet::new(); + let mut output_notes = BTreeSet::new(); + + for tx in txs { + *account_transactions.entry(tx.account_id()).or_default() += 1; + nullifiers.extend(tx.nullifiers()); + output_notes.extend(tx.output_notes()); + } + + Self { + account_transactions, + nullifiers, + output_notes, + } + } +} + +impl InflightState { + /// Creates an [InflightState] which will retain committed state for the given + /// amount of blocks before pruning them. + pub fn new(chain_tip: BlockNumber, num_retained_blocks: usize) -> Self { + Self { + num_retained_blocks, + chain_tip, + accounts: Default::default(), + nullifiers: Default::default(), + output_notes: Default::default(), + committed_state: Default::default(), + } + } + + /// Appends the transaction to the inflight state. + /// + /// This operation is atomic i.e. a rejected transaction has no impact of the state. + pub fn add_transaction( + &mut self, + tx: &AuthenticatedTransaction, + ) -> Result, AddTransactionError> { + // Separate verification and state mutation so that a rejected transaction + // does not impact the state (atomicity). + self.verify_transaction(tx)?; + + let parents = self.insert_transaction(tx); + + Ok(parents) + } + + fn oldest_committed_state(&self) -> BlockNumber { + let committed_len: u32 = self + .committed_state + .len() + .try_into() + .expect("We should not be storing many blocks"); + self.chain_tip + .checked_sub(BlockNumber::new(committed_len)) + .expect("Chain height cannot be less than number of committed blocks") + } + + fn verify_transaction(&self, tx: &AuthenticatedTransaction) -> Result<(), AddTransactionError> { + // The mempool retains recently committed blocks, in addition to the state that is currently + // inflight. This overlap with the committed state allows us to verify incoming + // transactions against the current state (committed + inflight). Transactions are + // first authenticated against the committed state prior to being submitted to the + // mempool. The overlap provides a grace period between transaction authentication + // against committed state and verification against inflight state. + // + // Here we just ensure that this authentication point is still within this overlap zone. + // This should only fail if the grace period is too restrictive for the current + // combination of block rate, transaction throughput and database IO. + let stale_limit = self.oldest_committed_state(); + if tx.authentication_height() < stale_limit { + return Err(AddTransactionError::StaleInputs { + input_block: tx.authentication_height(), + stale_limit, + }); + } + + // Ensure current account state is correct. + let current = self + .accounts + .get(&tx.account_id()) + .and_then(|account_state| account_state.current_state()) + .copied() + .or(tx.store_account_state()); + let expected = tx.account_update().init_state_hash(); + + // SAFETY: a new accounts state is set to zero ie default. + if expected != current.unwrap_or_default() { + return Err(VerifyTxError::IncorrectAccountInitialHash { + tx_initial_account_hash: expected, + current_account_hash: current, + } + .into()); + } + + // Ensure nullifiers aren't already present. + // + // We don't need to check the store state here because that was + // already performed as part of authenticated the transaction. + let double_spend = tx + .nullifiers() + .filter(|nullifier| self.nullifiers.contains(nullifier)) + .collect::>(); + if !double_spend.is_empty() { + return Err(VerifyTxError::InputNotesAlreadyConsumed(double_spend).into()); + } + + // Ensure output notes aren't already present. + let duplicates = tx + .output_notes() + .filter(|note| self.output_notes.contains_key(note)) + .collect::>(); + if !duplicates.is_empty() { + return Err(VerifyTxError::OutputNotesAlreadyExist(duplicates).into()); + } + + // Ensure that all unauthenticated notes have an inflight output note to consume. + // + // We don't need to worry about double spending them since we already checked for + // that using the nullifiers. + // + // Note that the authenticated transaction already filters out notes that were + // previously unauthenticated, but were authenticated by the store. + let missing = tx + .unauthenticated_notes() + .filter(|note_id| !self.output_notes.contains_key(note_id)) + .collect::>(); + if !missing.is_empty() { + return Err(VerifyTxError::UnauthenticatedNotesNotFound(missing).into()); + } + + Ok(()) + } + + /// Aggregate the transaction into the state, returning its parent transactions. + fn insert_transaction(&mut self, tx: &AuthenticatedTransaction) -> BTreeSet { + let account_parent = self + .accounts + .entry(tx.account_id()) + .or_default() + .insert(tx.account_update().final_state_hash(), tx.id()); + + self.nullifiers.extend(tx.nullifiers()); + self.output_notes + .extend(tx.output_notes().map(|note_id| (note_id, OutputNoteState::new(tx.id())))); + + // Authenticated input notes (provably) consume notes that are already committed + // on chain. They therefore cannot form part of the inflight dependency chain. + // + // Additionally, we only care about parents which have not been committed yet. + let note_parents = tx + .unauthenticated_notes() + .filter_map(|note_id| self.output_notes.get(¬e_id)) + .filter_map(|note| note.transaction()) + .copied(); + + account_parent.into_iter().chain(note_parents).collect() + } + + /// Reverts the given state diff. + /// + /// # Panics + /// + /// Panics if any part of the diff isn't present in the state. Callers should take + /// care to only revert transaction sets who's ancestors are all either committed or reverted. + pub fn revert_transactions(&mut self, txs: &[AuthenticatedTransaction]) { + let delta = StateDelta::new(txs); + for (account, count) in delta.account_transactions { + let status = self.accounts.get_mut(&account).expect("Account must exist").revert(count); + + // Prune empty accounts. + if status.is_empty() { + self.accounts.remove(&account); + } + } + + for nullifier in delta.nullifiers { + assert!(self.nullifiers.remove(&nullifier), "Nullifier must exist"); + } + + for note in delta.output_notes { + assert!(self.output_notes.remove(¬e).is_some(), "Output note must exist"); + } + } + + /// Marks the given state diff as committed. + /// + /// These transactions are no longer considered inflight. Callers should take care to only + /// commit transactions who's ancestors are all committed. + /// + /// Note that this state is still retained for the configured number of blocks. The oldest + /// retained block is also pruned from the state. + /// + /// # Panics + /// + /// Panics if the accounts don't have enough inflight transactions to commit or if + /// the output notes don't exist. + pub fn commit_block(&mut self, txs: &[AuthenticatedTransaction]) { + let delta = StateDelta::new(txs); + for (account, count) in &delta.account_transactions { + self.accounts.get_mut(account).expect("Account must exist").commit(*count); + } + + for note in &delta.output_notes { + self.output_notes.get_mut(note).expect("Output note must exist").commit(); + } + + self.committed_state.push_back(delta); + + if self.committed_state.len() > self.num_retained_blocks { + let delta = self.committed_state.pop_front().expect("Must be some due to length check"); + self.prune_committed_state(delta); + } + + self.chain_tip.increment(); + } + + /// Removes the delta from inflight state. + /// + /// # Panics + /// + /// Panics if the accounts don't have enough inflight transactions to commit. + fn prune_committed_state(&mut self, diff: StateDelta) { + for (account, count) in diff.account_transactions { + let status = self + .accounts + .get_mut(&account) + .expect("Account must exist") + .prune_committed(count); + + // Prune empty accounts. + if status.is_empty() { + self.accounts.remove(&account); + } + } + + for nullifier in diff.nullifiers { + self.nullifiers.remove(&nullifier); + } + + for output_note in diff.output_notes { + self.output_notes.remove(&output_note); + } + } +} + +/// Describes the state of an inflight output note. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum OutputNoteState { + /// Output note is part of a committed block, and its source transaction should no longer be + /// considered for dependency tracking. + Committed, + /// Output note is still inflight and should be considered for dependency tracking. + Inflight(TransactionId), +} + +impl OutputNoteState { + /// Creates a new inflight output note state. + fn new(tx: TransactionId) -> Self { + Self::Inflight(tx) + } + + /// Commits the output note, removing the source transaction. + fn commit(&mut self) { + *self = Self::Committed; + } + + /// Returns the source transaction ID if the output note is not yet committed. + fn transaction(&self) -> Option<&TransactionId> { + if let Self::Inflight(tx) = self { + Some(tx) + } else { + None + } + } +} + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use miden_air::Felt; + use miden_objects::{accounts::AccountType, testing::account_id::AccountIdBuilder}; + + use super::*; + use crate::test_utils::{ + mock_account_id, mock_proven_tx, + note::{mock_note, mock_output_note}, + MockPrivateAccount, MockProvenTxBuilder, + }; + + #[test] + fn rejects_duplicate_nullifiers() { + let account = mock_account_id(1); + let states = (1u8..=4).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + let note_seed = 123; + // We need to make the note available first, in order for it to be consumed at all. + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]) + .output_notes(vec![mock_output_note(note_seed)]) + .build(); + let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]) + .unauthenticated_notes(vec![mock_note(note_seed)]) + .build(); + let tx2 = MockProvenTxBuilder::with_account(account, states[2], states[3]) + .unauthenticated_notes(vec![mock_note(note_seed)]) + .build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1)).unwrap(); + + let err = uut.add_transaction(&AuthenticatedTransaction::from_inner(tx2)).unwrap_err(); + + assert_eq!( + err, + VerifyTxError::InputNotesAlreadyConsumed(vec![mock_note(note_seed).nullifier()]).into() + ); + } + + #[test] + fn rejects_duplicate_output_notes() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + let note = mock_output_note(123); + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]) + .output_notes(vec![note.clone()]) + .build(); + let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]) + .output_notes(vec![note.clone()]) + .build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); + + let err = uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1)).unwrap_err(); + + assert_eq!(err, VerifyTxError::OutputNotesAlreadyExist(vec![note.id()]).into()); + } + + #[test] + fn rejects_account_state_mismatch() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + let tx = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + let err = uut + .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_store_state(states[2])) + .unwrap_err(); + + assert_eq!( + err, + VerifyTxError::IncorrectAccountInitialHash { + tx_initial_account_hash: states[0], + current_account_hash: states[2].into() + } + .into() + ); + } + + #[test] + fn account_state_transitions() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); + let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]).build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1).with_empty_store_state()) + .unwrap(); + } + + #[test] + fn new_account_state_defaults_to_zero() { + let account = mock_account_id(1); + + let tx = MockProvenTxBuilder::with_account( + account, + [0u8, 0, 0, 0].into(), + [1u8, 0, 0, 0].into(), + ) + .build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) + .unwrap(); + } + + #[test] + fn inflight_account_state_overrides_input_state() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); + let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]).build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); + + // Feed in an old state via input. This should be ignored, and the previous tx's final + // state should be used. + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1).with_store_state(states[0])) + .unwrap(); + } + + #[test] + fn dependency_tracking() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + let note_seed = 123; + + // Parent via account state. + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); + // Parent via output note. + let tx1 = MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) + .output_notes(vec![mock_output_note(note_seed)]) + .build(); + + let tx = MockProvenTxBuilder::with_account(account, states[1], states[2]) + .unauthenticated_notes(vec![mock_note(note_seed)]) + .build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0.clone())).unwrap(); + uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1.clone())).unwrap(); + + let parents = uut + .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) + .unwrap(); + let expected = BTreeSet::from([tx0.id(), tx1.id()]); + + assert_eq!(parents, expected); + } + + #[test] + fn committed_parents_are_not_tracked() { + let account = mock_account_id(1); + let states = (1u8..=3).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + let note_seed = 123; + + // Parent via account state. + let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); + let tx0 = AuthenticatedTransaction::from_inner(tx0); + // Parent via output note. + let tx1 = MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) + .output_notes(vec![mock_output_note(note_seed)]) + .build(); + let tx1 = AuthenticatedTransaction::from_inner(tx1); + + let tx = MockProvenTxBuilder::with_account(account, states[1], states[2]) + .unauthenticated_notes(vec![mock_note(note_seed)]) + .build(); + + let mut uut = InflightState::new(BlockNumber::default(), 1); + uut.add_transaction(&tx0.clone()).unwrap(); + uut.add_transaction(&tx1.clone()).unwrap(); + + // Commit the parents, which should remove them from dependency tracking. + uut.commit_block(&[tx0, tx1]); + + let parents = uut + .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) + .unwrap(); + + assert!(parents.is_empty()); + } + + #[test] + fn tx_insertions_and_reversions_cancel_out() { + // Reverting txs should be equivalent to them never being inserted. + // + // We test this by reverting some txs and equating it to the remaining set. + // This is a form of proprty test. + let states = (1u8..=5).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + let txs = vec![ + MockProvenTxBuilder::with_account(mock_account_id(1), states[0], states[1]), + MockProvenTxBuilder::with_account(mock_account_id(1), states[1], states[2]) + .output_notes(vec![mock_output_note(111), mock_output_note(222)]), + MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) + .unauthenticated_notes(vec![mock_note(222)]), + MockProvenTxBuilder::with_account(mock_account_id(1), states[2], states[3]), + MockProvenTxBuilder::with_account(mock_account_id(2), states[1], states[2]) + .unauthenticated_notes(vec![mock_note(111)]) + .output_notes(vec![mock_output_note(45)]), + ]; + + let txs = txs + .into_iter() + .map(MockProvenTxBuilder::build) + .map(AuthenticatedTransaction::from_inner) + .collect::>(); + + for i in 0..states.len() { + // Insert all txs and then revert the last `i` of them. + // This should match only inserting the first `N-i` of them. + let mut reverted = InflightState::new(BlockNumber::default(), 1); + for (idx, tx) in txs.iter().enumerate() { + reverted.add_transaction(tx).unwrap_or_else(|err| { + panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}") + }); + } + reverted.revert_transactions(&txs[txs.len() - i..]); + + let mut inserted = InflightState::new(BlockNumber::default(), 1); + for (idx, tx) in txs.iter().rev().skip(i).rev().enumerate() { + inserted.add_transaction(tx).unwrap_or_else(|err| { + panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}") + }); + } + + assert_eq!(reverted, inserted, "Iteration {i}"); + } + } + + #[test] + fn pruning_committed_state() { + //! This is a form of property test, where we assert that pruning the first `i` of `N` + //! transactions is equivalent to only inserting the last `N-i` transactions. + let states = (1u8..=5).map(|x| Digest::from([x, 0, 0, 0])).collect::>(); + + // Skipping initial txs means that output notes required for subsequent unauthenticated + // input notes wont' always be present. To work around this, we instead only use + // authenticated input notes. + let txs = vec![ + MockProvenTxBuilder::with_account(mock_account_id(1), states[0], states[1]), + MockProvenTxBuilder::with_account(mock_account_id(1), states[1], states[2]) + .output_notes(vec![mock_output_note(111), mock_output_note(222)]), + MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) + .nullifiers(vec![mock_note(222).nullifier()]), + MockProvenTxBuilder::with_account(mock_account_id(1), states[2], states[3]), + MockProvenTxBuilder::with_account(mock_account_id(2), states[1], states[2]) + .nullifiers(vec![mock_note(111).nullifier()]) + .output_notes(vec![mock_output_note(45)]), + ]; + + let txs = txs + .into_iter() + .map(MockProvenTxBuilder::build) + .map(AuthenticatedTransaction::from_inner) + .collect::>(); + + for i in 0..states.len() { + // Insert all txs and then commit and prune the first `i` of them. + // + // This should match only inserting the final `N-i` transactions. + // Note: we force all committed state to immedietely be pruned by setting + // it to zero. + let mut committed = InflightState::new(BlockNumber::default(), 0); + for (idx, tx) in txs.iter().enumerate() { + committed.add_transaction(tx).unwrap_or_else(|err| { + panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}") + }); + } + committed.commit_block(&txs[..i]); + + let mut inserted = InflightState::new(BlockNumber::new(1), 0); + for (idx, tx) in txs.iter().skip(i).enumerate() { + // We need to adjust the height since we are effectively at block "1" now. + let tx = tx.clone().with_authentication_height(1); + inserted.add_transaction(&tx).unwrap_or_else(|err| { + panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}") + }); + } + + assert_eq!(committed, inserted, "Iteration {i}"); + } + } +} diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 67081009..6df7f347 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use batch_graph::BatchGraph; -use inflight_state::{InflightState, StateDiff}; +use inflight_state::InflightState; use miden_objects::{ accounts::AccountId, notes::{NoteId, Nullifier}, @@ -17,7 +17,9 @@ use miden_tx::{utils::collections::KvMap, TransactionVerifierError}; use transaction_graph::TransactionGraph; use crate::{ - errors::AddTransactionErrorRework, + batch_builder::batch::TransactionBatch, + domain::transaction::AuthenticatedTransaction, + errors::{AddTransactionError, VerifyTxError}, store::{TransactionInputs, TxInputsError}, }; @@ -40,7 +42,7 @@ impl BatchJobId { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct BlockNumber(u32); impl Display for BlockNumber { @@ -50,6 +52,10 @@ impl Display for BlockNumber { } impl BlockNumber { + pub fn new(x: u32) -> Self { + Self(x) + } + pub fn next(&self) -> Self { let mut ret = *self; ret.increment(); @@ -61,7 +67,7 @@ impl BlockNumber { self.checked_sub(Self(1)) } - pub fn increment(mut self) { + pub fn increment(&mut self) { self.0 += 1; } @@ -70,22 +76,17 @@ impl BlockNumber { } } +// MEMPOOL +// ================================================================================================ + pub struct Mempool { /// The latest inflight state of each account. /// /// Accounts without inflight transactions are not stored. state: InflightState, - /// Note's consumed by inflight transactions. - nullifiers: BTreeSet, - - /// Notes produced by inflight transactions. - /// - /// It is possible for these to already be consumed - check nullifiers. - notes: BTreeMap, - /// Inflight transactions. - transactions: TransactionGraph, + transactions: TransactionGraph, /// Inflight batches. batches: BatchGraph, @@ -93,17 +94,11 @@ pub struct Mempool { /// The next batches ID. next_batch_id: BatchJobId, - /// Blocks which have been committed but are not yet considered stale. - committed_diffs: VecDeque, - /// The current block height of the chain. chain_tip: BlockNumber, block_in_progress: Option>, - /// Number of blocks before transaction input is considered stale. - staleness: BlockNumber, - batch_transaction_limit: usize, block_batch_limit: usize, } @@ -120,23 +115,12 @@ impl Mempool { /// Returns an error if the transaction's initial conditions don't match the current state. pub fn add_transaction( &mut self, - transaction: ProvenTransaction, - inputs: TransactionInputs, - ) -> Result { - // Ensure inputs aren't stale. - if let Some(stale_block) = self.stale_block() { - if inputs.current_block_height <= stale_block.0 { - return Err(AddTransactionErrorRework::StaleInputs { - input_block: BlockNumber(inputs.current_block_height), - stale_limit: stale_block, - }); - } - } - + transaction: AuthenticatedTransaction, + ) -> Result { // Add transaction to inflight state. - let parents = self.state.add_transaction(&transaction, &inputs)?; + let parents = self.state.add_transaction(&transaction)?; - self.transactions.insert(transaction, parents); + self.transactions.insert(transaction.id(), transaction, parents); Ok(self.chain_tip.0) } @@ -146,34 +130,35 @@ impl Mempool { /// Transactions are returned in a valid execution ordering. /// /// Returns `None` if no transactions are available. - pub fn select_batch(&mut self) -> Option<(BatchJobId, Vec)> { + pub fn select_batch(&mut self) -> Option<(BatchJobId, Vec)> { let mut parents = BTreeSet::new(); let mut batch = Vec::with_capacity(self.batch_transaction_limit); + let mut tx_ids = Vec::with_capacity(self.batch_transaction_limit); for _ in 0..self.batch_transaction_limit { // Select transactions according to some strategy here. For now its just arbitrary. - let Some((tx, tx_parents)) = self.transactions.pop_for_batching() else { + let Some((tx, tx_parents)) = self.transactions.pop_for_processing() else { break; }; batch.push(tx); parents.extend(tx_parents); } - // Update the depedency graph to reflect parents at the batch level by removing - // all edges within this batch. + // Update the dependency graph to reflect parents at the batch level by removing all edges + // within this batch. for tx in &batch { - parents.remove(tx); + parents.remove(&tx.id()); } let batch_id = self.next_batch_id; self.next_batch_id.increment(); - self.batches.insert(batch_id, batch.clone(), parents); + self.batches.insert(batch_id, tx_ids, parents); Some((batch_id, batch)) } - /// Drops the failed batch and all of its descendents. + /// Drops the failed batch and all of its descendants. /// /// Transactions are placed back in the queue. pub fn batch_failed(&mut self, batch: BatchJobId) { @@ -194,8 +179,8 @@ impl Mempool { } /// Marks a batch as proven if it exists. - pub fn batch_proved(&mut self, batch_id: BatchJobId) { - self.batches.mark_proven(batch_id); + pub fn batch_proved(&mut self, batch_id: BatchJobId, batch: TransactionBatch) { + self.batches.mark_proven(batch_id, batch); } /// Select batches for the next block. @@ -205,16 +190,16 @@ impl Mempool { /// # Panics /// /// Panics if there is already a block in flight. - pub fn select_block(&mut self) -> (BlockNumber, BTreeSet) { + pub fn select_block(&mut self) -> (BlockNumber, BTreeMap) { assert!(self.block_in_progress.is_none(), "Cannot have two blocks inflight."); let batches = self.batches.select_block(self.block_batch_limit); - self.block_in_progress = Some(batches.clone()); + self.block_in_progress = Some(batches.keys().cloned().collect()); (self.chain_tip.next(), batches) } - /// Notify the pool that the block was succesfully completed. + /// Notify the pool that the block was successfully completed. /// /// # Panics /// @@ -225,19 +210,10 @@ impl Mempool { // Remove committed batches and transactions from graphs. let batches = self.block_in_progress.take().expect("No block in progress to commit"); let transactions = self.batches.remove_committed(batches); - let transactions = self.transactions.remove_committed(&transactions); + let transactions = self.transactions.prune_processed(&transactions); // Inform inflight state about committed data. - let diff = StateDiff::new(&transactions); - self.state.commit_transactions(&diff); - - self.committed_diffs.push_back(diff); - if self.committed_diffs.len() > self.staleness.0 as usize { - // SAFETY: just checked that length is non-zero. - let stale_block = self.committed_diffs.pop_front().unwrap(); - - self.state.prune_committed_state(stale_block); - } + self.state.commit_block(&transactions); self.chain_tip.increment(); } @@ -261,15 +237,6 @@ impl Mempool { let transactions = self.transactions.purge_subgraphs(transactions); // Rollback state. - let impact = StateDiff::new(&transactions); - self.state.revert_transactions(impact); - // TODO: revert nullifiers and notes. - } - - /// The highest block height we consider stale. - /// - /// Returns [None] if the blockchain is so short that all blocks are considered fresh. - fn stale_block(&self) -> Option { - self.chain_tip.checked_sub(self.staleness) + self.state.revert_transactions(&transactions); } } diff --git a/crates/block-producer/src/mempool/transaction_graph.rs b/crates/block-producer/src/mempool/transaction_graph.rs index 2384924b..1a2d9cd9 100644 --- a/crates/block-producer/src/mempool/transaction_graph.rs +++ b/crates/block-producer/src/mempool/transaction_graph.rs @@ -5,70 +5,120 @@ use std::{ use miden_objects::transaction::{ProvenTransaction, TransactionId}; -use super::BatchJobId; +// TRANSACTION GRAPH +// ================================================================================================ -#[derive(Default, Clone, Debug)] -pub struct TransactionGraph { +/// Tracks the dependency graph and status of transactions. +#[derive(Clone, Debug, PartialEq)] +pub struct TransactionGraph { /// All transactions currently inflight. - nodes: BTreeMap, + nodes: BTreeMap>, - /// Transactions ready for inclusion in a batch. + /// Transactions ready for being processed. /// - /// aka transactions whose parent transactions are already included in batches. + /// aka transactions whose parents are already processed. roots: BTreeSet, } -impl TransactionGraph { - pub fn insert(&mut self, transaction: ProvenTransaction, parents: BTreeSet) { - let id = transaction.id(); - // Inform parent's of their new child. +impl Default for TransactionGraph { + fn default() -> Self { + Self { + nodes: Default::default(), + roots: Default::default(), + } + } +} + +impl TransactionGraph { + /// Inserts a new transaction node, with edges to the given parent nodes. + /// + /// # Panics + /// + /// Panics if: + /// - any of the given parents are not part of the graph, + /// - the transaction is already present + pub fn insert(&mut self, id: TransactionId, data: T, parents: BTreeSet) { + // Inform parents of their new child. for parent in &parents { self.nodes.get_mut(parent).expect("Parent must be in pool").children.insert(id); } - let transaction = Node::new(transaction, parents); - if self.nodes.insert(id, transaction).is_some() { + let node = Node::new(data, parents); + if self.nodes.insert(id, node).is_some() { panic!("Transaction already exists in pool"); } - // This could be optimised by inlining this inside the parent loop. This would prevent the + // This could be optimized by inlining this inside the parent loop. This would prevent the // double iteration over parents, at the cost of some code duplication. self.try_make_root(id); } - pub fn pop_for_batching(&mut self) -> Option<(TransactionId, BTreeSet)> { - let tx_id = self.roots.pop_first()?; - let node = self.nodes.get_mut(&tx_id).expect("Root transaction must be in graph"); - node.status = Status::Processed; + /// Returns the next transaction ready for processing, and its parent edges. + /// + /// Internally this transaction is now marked as processed and is no longer considered inqueue. + pub fn pop_for_processing(&mut self) -> Option<(T, BTreeSet)> { + let tx_id = self.roots.first()?; + + Some(self.process(*tx_id)) + } + + /// Marks the transaction as processed and returns its data and parents. + /// + /// Separated out from the actual strategy of choosing so that we have more + /// fine grained control available for tests. + /// + /// # Panics + /// + /// Panics if the transaction: + /// - does not exist + /// - is already processed + /// - is not ready for processing + fn process(&mut self, id: TransactionId) -> (T, BTreeSet) { + assert!(self.roots.remove(&id), "Process target must form part of roots"); + let node = self.nodes.get_mut(&id).expect("Root transaction must be in graph"); + node.mark_as_processed(); // Work around multiple mutable borrows of self. let parents = node.parents.clone(); let children = node.children.clone(); + let tx = node.data.clone(); for child in children { self.try_make_root(child); } - Some((tx_id, parents)) + (tx, parents) } - /// Marks the given transactions as being back inqueue. + /// Marks the given transactions as being back in queue. + /// + /// # Panics + /// + /// Panics if any of the transactions are + /// - not part of the graph + /// - are already in queue aka not processed pub fn requeue_transactions(&mut self, transactions: BTreeSet) { for tx in &transactions { - self.nodes.get_mut(tx).expect("Node must exist").status = Status::InQueue; + self.nodes.get_mut(tx).expect("Node must exist").mark_as_inqueue(); } // All requeued transactions are potential roots, and current roots may have been // invalidated. - let mut potential_roots = transactions; - potential_roots.extend(&self.roots); - self.roots.clear(); + let mut potential_roots = std::mem::take(&mut self.roots); + potential_roots.extend(transactions); for tx in potential_roots { self.try_make_root(tx); } } - pub fn remove_committed(&mut self, tx_ids: &[TransactionId]) -> Vec> { + /// Prunes processed transactions from the graph. + /// + /// # Panics + /// + /// Panics if any of the given transactions are: + /// - not part of the graph + /// - are in queue aka not processed + pub fn prune_processed(&mut self, tx_ids: &[TransactionId]) -> Vec { let mut transactions = Vec::with_capacity(tx_ids.len()); for transaction in tx_ids { let node = self.nodes.remove(transaction).expect("Node must be in graph"); @@ -90,13 +140,10 @@ impl TransactionGraph { transactions } - /// Removes the transactions and all their descendents from the graph. + /// Removes the transactions and all their descendants from the graph. /// /// Returns all transactions removed. - pub fn purge_subgraphs( - &mut self, - transactions: Vec, - ) -> Vec> { + pub fn purge_subgraphs(&mut self, transactions: Vec) -> Vec { let mut removed = Vec::new(); let mut to_process = transactions; @@ -108,8 +155,9 @@ impl TransactionGraph { continue; }; - // All the child batches are also removed so no need to check - // for new roots. No new roots are possible as a result of this subgraph removal. + // All the children will also be removed so no need to check for new roots. + // + // No new roots are possible as a result of this subgraph removal. self.roots.remove(&node_id); // Inform parent that this child no longer exists. @@ -130,13 +178,20 @@ impl TransactionGraph { removed } + /// Adds the given transaction to the set of roots _IFF_ all of its parents are marked as + /// processed. + /// + /// # Panics + /// + /// Panics if the transaction or any of its parents do not exist. This would constitute an + /// internal bookkeeping failure. fn try_make_root(&mut self, tx_id: TransactionId) { let tx = self.nodes.get_mut(&tx_id).expect("Transaction must be in graph"); for parent in tx.parents.clone() { let parent = self.nodes.get(&parent).expect("Parent must be in pool"); - if parent.status != Status::Processed { + if !parent.is_processed() { return; } } @@ -144,27 +199,398 @@ impl TransactionGraph { } } -#[derive(Clone, Debug)] -struct Node { +#[derive(Clone, Debug, PartialEq)] +struct Node { status: Status, - data: Arc, + data: T, parents: BTreeSet, children: BTreeSet, } -impl Node { - fn new(tx: ProvenTransaction, parents: BTreeSet) -> Self { +impl Node { + /// Creates a new inflight [Node] with no children. + fn new(data: T, parents: BTreeSet) -> Self { Self { - status: Status::InQueue, - data: Arc::new(tx), + status: Status::Queued, + data, parents, children: Default::default(), } } + + /// Marks the node as [Status::Processed]. + /// + /// # Panics + /// + /// Panics if the node is already processed. + fn mark_as_processed(&mut self) { + assert!(!self.is_processed()); + self.status = Status::Processed + } + + /// Marks the node as [Status::Inqueue]. + /// + /// # Panics + /// + /// Panics if the node is already inqueue. + fn mark_as_inqueue(&mut self) { + assert!(!self.is_inqueue()); + self.status = Status::Queued + } + + fn is_processed(&self) -> bool { + self.status == Status::Processed + } + + fn is_inqueue(&self) -> bool { + self.status == Status::Queued + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Status { - InQueue, + Queued, Processed, } + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::Random; + + /// Simplified graph type which uses the transaction ID as the data value. + /// + /// Production usage will have `T: ProvenTransaction` however this is cumbersome + /// to generate. Since this graph doesn't actually care about the data type, we + /// simplify test data generation by just duplicating the ID. + type TestGraph = TransactionGraph; + + /// Test helpers and aliases. + impl TestGraph { + /// Alias to insert a transaction with no parents. + fn insert_with_no_parent(&mut self, id: TransactionId) { + self.insert_with_parents(id, Default::default()); + } + + /// Alias for inserting a transaction with parents. + fn insert_with_parents(&mut self, id: TransactionId, parents: BTreeSet) { + self.insert(id, id, parents); + } + + /// Alias for inserting a transaction with a single parent. + fn insert_with_parent(&mut self, id: TransactionId, parent: TransactionId) { + self.insert_with_parents(id, [parent].into()); + } + + /// Calls `pop_for_processing` until it returns `None`. + /// + /// This should result in a fully processed graph, barring bugs. + /// + /// Panics if the graph is not fully processed. + fn process_all(&mut self) -> Vec { + let mut processed = Vec::new(); + while let Some((id, _)) = self.pop_for_processing() { + processed.push(id); + } + + assert!(self.nodes.values().all(Node::is_processed)); + + processed + } + } + + #[test] + fn pruned_nodes_are_nonextant() { + //! Checks that processed and then pruned nodes behave as if they + //! never existed in the graph. We test this by comparing it to + //! a reference graph created without these ancestor nodes. + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_both = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(child_a, ancestor_a); + uut.insert_with_parent(child_b, ancestor_b); + uut.insert_with_parents(child_both, [ancestor_a, ancestor_b].into()); + + uut.process(ancestor_a); + uut.process(ancestor_b); + uut.prune_processed(&[ancestor_a, ancestor_b]); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parent(child_a); + reference.insert_with_no_parent(child_b); + reference.insert_with_no_parent(child_both); + + assert_eq!(uut, reference); + } + + #[test] + fn inserted_node_is_considered_for_root() { + //! Ensure that a fresh node who's parent is + //! already processed will be considered for processing. + let mut rng = Random::with_random_seed(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(parent_a); + uut.insert_with_no_parent(parent_b); + uut.process(parent_a); + + uut.insert_with_parent(child_a, parent_a); + uut.insert_with_parent(child_b, parent_b); + + assert!(uut.roots.contains(&child_a)); + assert!(!uut.roots.contains(&child_b)); + } + + #[test] + fn fifo_order_is_maintained() { + //! This test creates a simple queue graph, expecting that the processed items should + //! be emitted in the same order. + let mut rng = Random::with_random_seed(); + let input = (0..10).map(|_| rng.draw_tx_id()).collect::>(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(input[0]); + for pairs in input.windows(2) { + let (parent, id) = (pairs[0], pairs[1]); + uut.insert_with_parent(id, parent); + } + + let result = uut.process_all(); + assert_eq!(result, input); + } + + #[test] + fn requeuing_resets_graph_state() { + //! Requeuing transactions should cause the internal state to reset + //! to the same state as before these transactions were emitted + //! for processing. + + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_c = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(parent_a, ancestor_a); + uut.insert_with_parent(parent_b, ancestor_b); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()); + uut.insert_with_parent(child_c, parent_b); + + let mut reference = uut.clone(); + + uut.process(ancestor_a); + uut.process(ancestor_b); + uut.process(parent_a); + uut.process(parent_b); + uut.process(child_c); + + // Requeue all except ancestor a. This is a somewhat arbitrary choice. + // The reference graph should therefore only have ancestor a processed. + uut.requeue_transactions([ancestor_b, parent_a, parent_b, child_c].into()); + reference.process(ancestor_a); + + assert_eq!(uut, reference); + } + + #[test] + fn nodes_are_processed_exactly_once() { + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_c = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(parent_a, ancestor_a); + uut.insert_with_parent(parent_b, ancestor_b); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()); + uut.insert_with_parent(child_c, parent_b); + + let mut result = uut.process_all(); + result.sort(); + + let mut expected = + vec![ancestor_a, ancestor_b, parent_a, parent_b, child_a, child_b, child_c]; + expected.sort(); + + assert_eq!(result, expected); + } + + #[test] + fn processed_data_and_parent_tracking() { + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_c = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(parent_a, ancestor_a); + uut.insert_with_parent(parent_b, ancestor_b); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()); + uut.insert_with_parent(child_c, parent_b); + + let result = uut.process(ancestor_a); + assert_eq!(result, (ancestor_a, Default::default())); + + let result = uut.process(ancestor_b); + assert_eq!(result, (ancestor_b, Default::default())); + + let result = uut.process(parent_a); + assert_eq!(result, (parent_a, [ancestor_a].into())); + + let result = uut.process(parent_b); + assert_eq!(result, (parent_b, [ancestor_b].into())); + + let result = uut.process(child_a); + assert_eq!(result, (child_a, [ancestor_a, parent_a].into())); + + let result = uut.process(child_b); + assert_eq!(result, (child_b, [parent_a, parent_b].into())); + + let result = uut.process(child_c); + assert_eq!(result, (child_c, [parent_b].into())); + } + + #[test] + fn purging_subgraph_handles_internal_nodes() { + //! Purging a subgraph should correctly handle nodes already deleted within that subgraph. + //! + //! This is a concern for errors as we are deleting parts of the subgraph while we are + //! iterating through the nodes to purge. This means its likely a node will already + //! have been deleted before processing it as an input. + //! + //! We can somewhat force this to occur by re-ordering the inputs relative to the actual + //! dependency order. + + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_c = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(parent_a, ancestor_a); + uut.insert_with_parent(parent_b, ancestor_b); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()); + uut.insert_with_parent(child_c, parent_b); + + uut.purge_subgraphs(vec![child_b, parent_a]); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parent(ancestor_a); + reference.insert_with_no_parent(ancestor_b); + reference.insert_with_parent(parent_b, ancestor_b); + reference.insert_with_parent(child_c, parent_b); + + assert_eq!(uut, reference); + } + + #[test] + fn purging_removes_all_descendents() { + let mut rng = Random::with_random_seed(); + + let ancestor_a = rng.draw_tx_id(); + let ancestor_b = rng.draw_tx_id(); + let parent_a = rng.draw_tx_id(); + let parent_b = rng.draw_tx_id(); + let child_a = rng.draw_tx_id(); + let child_b = rng.draw_tx_id(); + let child_c = rng.draw_tx_id(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parent(ancestor_a); + uut.insert_with_no_parent(ancestor_b); + uut.insert_with_parent(parent_a, ancestor_a); + uut.insert_with_parent(parent_b, ancestor_b); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()); + uut.insert_with_parent(child_c, parent_b); + + uut.purge_subgraphs(vec![parent_a]); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parent(ancestor_a); + reference.insert_with_no_parent(ancestor_b); + reference.insert_with_parent(parent_b, ancestor_b); + reference.insert_with_parent(child_c, parent_b); + + assert_eq!(uut, reference); + } + + #[test] + #[should_panic] + fn duplicate_insert() { + let mut rng = Random::with_random_seed(); + let mut uut = TestGraph::default(); + + let id = rng.draw_tx_id(); + uut.insert_with_no_parent(id); + uut.insert_with_no_parent(id); + } + + #[test] + #[should_panic] + fn missing_parents_in_insert() { + let mut rng = Random::with_random_seed(); + let mut uut = TestGraph::default(); + + uut.insert_with_parents(rng.draw_tx_id(), [rng.draw_tx_id()].into()); + } + + #[test] + #[should_panic] + fn requeueing_an_already_queued_tx() { + let mut rng = Random::with_random_seed(); + let mut uut = TestGraph::default(); + + let id = rng.draw_tx_id(); + uut.insert_with_no_parent(id); + uut.requeue_transactions([id].into()); + } +} diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 56114be7..a5a8c320 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -4,9 +4,12 @@ use std::{ sync::Arc, }; -use miden_node_proto::generated::{ - block_producer::api_server, requests::SubmitProvenTransactionRequest, - responses::SubmitProvenTransactionResponse, store::api_client as store_client, +use miden_node_proto::{ + domain::nullifiers, + generated::{ + block_producer::api_server, requests::SubmitProvenTransactionRequest, + responses::SubmitProvenTransactionResponse, store::api_client as store_client, + }, }; use miden_node_utils::{ errors::ApiError, @@ -25,7 +28,8 @@ use crate::{ batch_builder::{DefaultBatchBuilder, DefaultBatchBuilderOptions}, block_builder::DefaultBlockBuilder, config::BlockProducerConfig, - errors::AddTransactionErrorRework, + domain::transaction::AuthenticatedTransaction, + errors::{AddTransactionError, VerifyTxError}, mempool::Mempool, state_view::DefaultStateView, store::{DefaultStore, Store}, @@ -145,19 +149,8 @@ impl api_server::Api for Server { self.submit_proven_transaction(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|err| match err { - AddTransactionErrorRework::InvalidAccountState { .. } - | AddTransactionErrorRework::AuthenticatedNoteNotFound(_) - | AddTransactionErrorRework::UnauthenticatedNoteNotFound(_) - | AddTransactionErrorRework::NotesAlreadyConsumed(_) - | AddTransactionErrorRework::DeserializationError(_) - | AddTransactionErrorRework::ProofVerificationFailed(_) => { - Status::invalid_argument(err.to_string()) - }, - // Internal errors. - AddTransactionErrorRework::StaleInputs { .. } - | AddTransactionErrorRework::TxInputsError(_) => Status::internal("Internal error"), - }) + // This Status::from mapping takes care of hiding internal errors. + .map_err(Into::into) } } @@ -171,11 +164,11 @@ impl Server { async fn submit_proven_transaction( &self, request: SubmitProvenTransactionRequest, - ) -> Result { + ) -> Result { debug!(target: COMPONENT, ?request); let tx = ProvenTransaction::read_from_bytes(&request.transaction) - .map_err(|err| AddTransactionErrorRework::DeserializationError(err.to_string()))?; + .map_err(|err| AddTransactionError::DeserializationError(err.to_string()))?; let tx_id = tx.id(); @@ -192,40 +185,17 @@ impl Server { ); debug!(target: COMPONENT, proof = ?tx.proof()); - let mut inputs = self.store.get_tx_inputs(&tx).await?; - - let mut authenticated_notes = BTreeSet::new(); - let mut unauthenticated_notes = BTreeMap::new(); - - for note in tx.input_notes() { - match note.header() { - Some(header) => { - unauthenticated_notes.insert(header.id(), note.nullifier()); - }, - None => { - authenticated_notes.insert(note.nullifier()); - }, - } - } - - // Authenticated note nullifiers must be present in the store and must be unconsumed. - for nullifier in &authenticated_notes { - let nullifier_state = inputs - .nullifiers - .remove(nullifier) - .ok_or(AddTransactionErrorRework::AuthenticatedNoteNotFound(*nullifier))?; - - if nullifier_state.is_some() { - return Err(AddTransactionErrorRework::NotesAlreadyConsumed([*nullifier].into())); - } - } + let inputs = self.store.get_tx_inputs(&tx).await.map_err(VerifyTxError::from)?; + + // SAFETY: we assume that the rpc component has verified the transaction proof already. + let tx = AuthenticatedTransaction::new(tx, inputs)?; self.mempool .lock() .await .lock() .await - .add_transaction(tx, inputs) + .add_transaction(tx) .map(|block_height| SubmitProvenTransactionResponse { block_height }) } } diff --git a/crates/block-producer/src/test_utils/mod.rs b/crates/block-producer/src/test_utils/mod.rs index 07a7a4b5..3a8d7de4 100644 --- a/crates/block-producer/src/test_utils/mod.rs +++ b/crates/block-producer/src/test_utils/mod.rs @@ -1,7 +1,11 @@ use std::sync::Arc; -use miden_objects::{accounts::AccountId, Digest}; -use tokio::sync::RwLock; +use miden_objects::{ + accounts::AccountId, + crypto::rand::{FeltRng, RpoRandomCoin}, + transaction::TransactionId, + Digest, +}; mod proven_tx; @@ -9,6 +13,7 @@ pub use proven_tx::{mock_proven_tx, MockProvenTxBuilder}; mod store; +use rand::Rng; pub use store::{MockStoreFailure, MockStoreSuccess, MockStoreSuccessBuilder}; mod account; @@ -20,3 +25,31 @@ pub mod block; pub mod batch; pub mod note; + +/// Generates random values for tests. +/// +/// It prints its seed on construction which allows us to reproduce +/// test failures. +pub struct Random(RpoRandomCoin); + +impl Random { + /// Creates a [Random] with a random seed. This seed is logged + /// so that it is known for test failures. + pub fn with_random_seed() -> Self { + let seed: [u32; 4] = rand::random(); + + println!("Random::with_random_seed: {seed:?}"); + + let seed = Digest::from(seed).into(); + + Self(RpoRandomCoin::new(seed)) + } + + pub fn draw_tx_id(&mut self) -> TransactionId { + self.0.draw_word().into() + } + + pub fn draw_digest(&mut self) -> Digest { + self.0.draw_word().into() + } +} diff --git a/crates/block-producer/src/test_utils/store.rs b/crates/block-producer/src/test_utils/store.rs index bf1e8932..f1f65bd2 100644 --- a/crates/block-producer/src/test_utils/store.rs +++ b/crates/block-producer/src/test_utils/store.rs @@ -12,6 +12,7 @@ use miden_objects::{ notes::{NoteId, NoteInclusionProof, Nullifier}, BlockHeader, ACCOUNT_TREE_DEPTH, EMPTY_WORD, ZERO, }; +use tokio::sync::RwLock; use super::*; use crate::{