Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-producer): batch builder #515

Merged
merged 35 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
596ba65
Make block inclusion proofs a map instead of a vector.
Mirko-von-Leipzig Oct 7, 2024
8c54b48
Typesafe note and transaction wrappers
Mirko-von-Leipzig Oct 7, 2024
1ec3e02
BatchId from type alias to new wrapper
Mirko-von-Leipzig Oct 8, 2024
79084a8
Implement batch builder.
Mirko-von-Leipzig Oct 8, 2024
d49e8a2
fmt
Mirko-von-Leipzig Oct 8, 2024
e477d09
Actually use the output notes smt
Mirko-von-Leipzig Oct 8, 2024
d8eeb0f
Make builder more type safe
Mirko-von-Leipzig Oct 8, 2024
5f2e418
Not so dangerous
Mirko-von-Leipzig Oct 8, 2024
02ff7ac
Separate variants for batch limits.
Mirko-von-Leipzig Oct 9, 2024
8205951
Replace TransactionBatch with ProvenBatch
Mirko-von-Leipzig Oct 9, 2024
92bc31a
Reuse existing batch type instead of new one.
Mirko-von-Leipzig Oct 10, 2024
e0a6024
Reuse existing block builder
Mirko-von-Leipzig Oct 10, 2024
190c6de
Track inflight output notes
Mirko-von-Leipzig Oct 10, 2024
bb79cb1
Fixup some abstractions
Mirko-von-Leipzig Oct 10, 2024
6ac6733
fixup comment
Mirko-von-Leipzig Oct 10, 2024
2057ad5
Shuffle
Mirko-von-Leipzig Oct 10, 2024
4c48c91
Refactor transaction batch tests
Mirko-von-Leipzig Oct 11, 2024
59fac0e
Simplify via subtraction.
Mirko-von-Leipzig Oct 14, 2024
2e46d51
Fixup inflight state
Mirko-von-Leipzig Oct 14, 2024
be98ab9
Revert miden-air usage
Mirko-von-Leipzig Oct 14, 2024
857b40d
Use block builder trait
Mirko-von-Leipzig Oct 14, 2024
bbaa5eb
tests: inflight state
Mirko-von-Leipzig Oct 14, 2024
293fd48
Move account status into separate module and simplify
Mirko-von-Leipzig Oct 15, 2024
d8e3497
fmt
Mirko-von-Leipzig Oct 15, 2024
771c9c7
Tests for inflight account state
Mirko-von-Leipzig Oct 16, 2024
8e7d839
Switch to a seedable random generator
Mirko-von-Leipzig Oct 17, 2024
220949f
Test transaction graph
Mirko-von-Leipzig Oct 17, 2024
db9d3ed
Revert note inclusion proof changes
Mirko-von-Leipzig Oct 17, 2024
ed0a44c
chore: fix typos, format comments, add section headers
bobbinth Oct 20, 2024
c62b876
Merge tx with inputs in a new type
Mirko-von-Leipzig Oct 21, 2024
fdee9f5
Address review comments #1
Mirko-von-Leipzig Oct 22, 2024
1f8dfa5
Move committed state into inflight state tracking
Mirko-von-Leipzig Oct 22, 2024
a0c85d8
Move transaction into domain module
Mirko-von-Leipzig Oct 22, 2024
92aa16b
fmt
Mirko-von-Leipzig Oct 22, 2024
a884d23
Fix test
Mirko-von-Leipzig Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ miden-lib = { workspace = true, features = ["testing"] }
miden-node-test-macro = { path = "../test-macro" }
miden-objects = { workspace = true, features = ["testing"] }
miden-tx = { workspace = true, features = ["testing"] }
rand = "0.8.5"
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
rand_chacha = { version = "0.3", default-features = false }
tokio = { workspace = true, features = ["test-util"] }
winterfell = { version = "0.9" }
3 changes: 2 additions & 1 deletion crates/block-producer/src/batch_builder/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ pub type BatchId = Blake3Digest<32>;
// TRANSACTION BATCH
// ================================================================================================

/// A batch of transactions that share a common proof.
/// A batch of transactions that share a common proof. For any given account, at most 1 transaction
/// in the batch must be addressing that account (issue: #186).
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note: Until recursive proofs are available in the Miden VM, we don't include the common proof.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
56 changes: 43 additions & 13 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::{cmp::min, collections::BTreeSet, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
cmp::min,
collections::{BTreeMap, BTreeSet},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};

use miden_objects::{
accounts::AccountId,
notes::NoteId,
transaction::{OutputNote, TransactionId},
Digest,
};
use tokio::{sync::Mutex, time};
use tokio::{sync::Mutex, task::JoinSet, time};
use tonic::async_trait;
use tracing::{debug, info, instrument, Span};

Expand Down Expand Up @@ -219,31 +227,53 @@ pub struct BatchProducer {
pub workers: NonZeroUsize,
pub mempool: Arc<Mutex<Mempool>>,
pub tx_per_batch: usize,
pub simulated_proof_time: Duration,
}

type BatchResult = Result<BatchJobId, (BatchJobId, BuildBatchError)>;
type BatchResult = Result<(BatchJobId, TransactionBatch), (BatchJobId, BuildBatchError)>;

/// Wrapper around tokio's JoinSet that remains pending if the set is empty,
/// instead of returning None.
struct WorkerPool(tokio::task::JoinSet<BatchResult>);
struct WorkerPool {
in_progress: JoinSet<BatchResult>,
simulated_proof_time: Duration,
}

impl WorkerPool {
fn new(simulated_proof_time: Duration) -> Self {
Self {
simulated_proof_time,
in_progress: JoinSet::new(),
}
}

async fn join_next(&mut self) -> Result<BatchResult, tokio::task::JoinError> {
if self.0.is_empty() {
if self.in_progress.is_empty() {
std::future::pending().await
} else {
// Cannot be None as its not empty.
self.0.join_next().await.unwrap()
self.in_progress.join_next().await.unwrap()
}
}

fn len(&self) -> usize {
self.0.len()
self.in_progress.len()
}

fn spawn(&mut self, id: BatchJobId, transactions: Vec<TransactionId>) {
self.0.spawn(async move {
todo!("Do actual work like aggregating transaction data");
fn spawn(&mut self, id: BatchJobId, transactions: Vec<ProvenTransaction>) {
self.in_progress.spawn({
let simulated_proof_time = self.simulated_proof_time;
async move {
tracing::debug!("Begin proving batch.");

let batch = TransactionBatch::new(transactions, Default::default())
.map_err(|err| (id, err))?;

tokio::time::sleep(simulated_proof_time).await;
tracing::debug!("Batch proof completed.");
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved

Ok((id, batch))
}
});
}
}
Expand All @@ -253,7 +283,7 @@ impl BatchProducer {
let mut interval = tokio::time::interval(self.batch_interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut inflight = WorkerPool(tokio::task::JoinSet::new());
let mut inflight = WorkerPool::new(self.simulated_proof_time);

loop {
tokio::select! {
Expand Down Expand Up @@ -285,8 +315,8 @@ impl BatchProducer {
tracing::warn!(%batch_id, %err, "Batch job failed.");
mempool.batch_failed(batch_id);
},
Ok(Ok(batch_id)) => {
mempool.batch_proved(batch_id);
Ok(Ok((batch_id, batch))) => {
mempool.batch_proved(batch_id, batch);
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use async_trait::async_trait;
use miden_node_utils::formatting::{format_array, format_blake3_digest};
Expand Down Expand Up @@ -146,12 +149,13 @@ where
}
}

struct BlockProducer {
struct BlockProducer<BB> {
pub mempool: Arc<Mutex<Mempool>>,
pub block_interval: tokio::time::Duration,
pub block_builder: BB,
}

impl BlockProducer {
impl<BB: BlockBuilder> BlockProducer<BB> {
pub async fn run(self) {
let mut interval = tokio::time::interval(self.block_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Expand All @@ -160,8 +164,9 @@ impl BlockProducer {
interval.tick().await;

let (block_number, batches) = self.mempool.lock().await.select_block();
let batches = batches.into_values().collect::<Vec<_>>();

let result = self.build_and_commit_block(batches).await;
let result = self.block_builder.build_block(&batches).await;
let mut mempool = self.mempool.lock().await;

match result {
Expand All @@ -170,8 +175,4 @@ impl BlockProducer {
}
}
}

async fn build_and_commit_block(&self, batches: BTreeSet<BatchJobId>) -> Result<(), ()> {
todo!("Aggregate, prove and commit block");
}
}
43 changes: 24 additions & 19 deletions crates/block-producer/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::BTreeSet;

use miden_node_proto::errors::ConversionError;
use miden_node_utils::formatting::format_opt;
use miden_objects::{
Expand All @@ -11,7 +9,6 @@ use miden_objects::{
MAX_BATCHES_PER_BLOCK, MAX_INPUT_NOTES_PER_BATCH, MAX_OUTPUT_NOTES_PER_BATCH,
};
use miden_processor::ExecutionError;
use miden_tx::TransactionVerifierError;
use thiserror::Error;

use crate::mempool::BlockNumber;
Expand All @@ -32,6 +29,9 @@ pub enum VerifyTxError {
)]
UnauthenticatedNotesNotFound(Vec<NoteId>),

#[error("Output note IDs already used: {0:?}")]
OutputNotesAlreadyExist(Vec<NoteId>),

/// The account's initial hash did not match the current account's hash
#[error("Incorrect account's initial hash ({tx_initial_account_hash}, current: {})", format_opt(.current_account_hash.as_ref()))]
IncorrectAccountInitialHash {
Expand Down Expand Up @@ -61,31 +61,36 @@ pub enum VerifyTxError {
pub enum AddTransactionError {
#[error("Transaction verification failed: {0}")]
VerificationFailed(#[from] VerifyTxError),
}

#[derive(thiserror::Error, Debug, PartialEq)]
pub enum AddTransactionErrorRework {
#[error("Transaction's initial account state {expected} did not match the current account state {current}.")]
InvalidAccountState { current: Digest, expected: Digest },
#[error("Transaction input data is stale. Required data fresher than {stale_limit} but inputs are from {input_block}.")]
StaleInputs {
input_block: BlockNumber,
stale_limit: BlockNumber,
},
#[error("Authenticated note nullifier {0} not found.")]
AuthenticatedNoteNotFound(Nullifier),
#[error("Unauthenticated note {0} not found.")]
UnauthenticatedNoteNotFound(NoteId),
#[error("Note nullifiers already consumed: {0:?}")]
NotesAlreadyConsumed(BTreeSet<Nullifier>),
#[error(transparent)]
TxInputsError(#[from] TxInputsError),
#[error(transparent)]
ProofVerificationFailed(#[from] TransactionVerifierError),
#[error("Failed to deserialize transaction: {0}.")]

#[error("Deserialization failed: {0}")]
DeserializationError(String),
}

impl From<AddTransactionError> for tonic::Status {
fn from(value: AddTransactionError) -> Self {
use AddTransactionError::*;
match value {
VerificationFailed(VerifyTxError::InputNotesAlreadyConsumed(_))
| VerificationFailed(VerifyTxError::UnauthenticatedNotesNotFound(_))
| VerificationFailed(VerifyTxError::OutputNotesAlreadyExist(_))
| VerificationFailed(VerifyTxError::IncorrectAccountInitialHash { .. })
| VerificationFailed(VerifyTxError::InvalidTransactionProof(_))
| DeserializationError(_) => Self::invalid_argument(value.to_string()),

// Internal errors which should not be communicated to the user.
VerificationFailed(VerifyTxError::TransactionInputError(_))
| VerificationFailed(VerifyTxError::StoreConnectionFailed(_))
| StaleInputs { .. } => Self::internal("Internal error"),
}
}
}

// Batch building errors
// =================================================================================================

Expand Down
48 changes: 28 additions & 20 deletions crates/block-producer/src/mempool/batch_graph.rs
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use miden_objects::transaction::TransactionId;
use miden_tx::utils::collections::KvMap;

use super::BatchJobId;
use crate::batch_builder::batch::TransactionBatch;

// BATCH GRAPH
// ================================================================================================

#[derive(Default, Clone)]
pub struct BatchGraph {
Expand Down Expand Up @@ -53,12 +57,12 @@ impl BatchGraph {

// New node might be a root.
//
// This could be optimised by inlining this inside the parent loop. This would prevent the
// This could be optimized by inlining this inside the parent loop. This would prevent the
// double iteration over parents, at the cost of some code duplication.
self.try_make_root(id);
}

/// Removes the batches and all their descendents from the graph.
/// Removes the batches and all their descendants from the graph.
///
/// Returns all removed batches and their transactions.
pub fn purge_subgraphs(
Expand All @@ -76,8 +80,8 @@ impl BatchGraph {
continue;
};

// All the child batches are also removed so no need to chec
// for new roots. No new roots are possible as a result of this subgraph removal.
// All the child batches are also removed so no need to check for new roots. No new
// roots are possible as a result of this subgraph removal.
self.roots.remove(&node_id);

for transaction in &node.transactions {
Expand All @@ -86,8 +90,8 @@ impl BatchGraph {

// Inform parent that this child no longer exists.
//
// The same is not required for children of this batch as we will
// be removing those as well.
// The same is not required for children of this batch as we will be removing those as
// well.
for parent in &node.parents {
// Parent could already be removed as part of this subgraph removal.
if let Some(parent) = self.nodes.get_mut(parent) {
Expand All @@ -102,7 +106,7 @@ impl BatchGraph {
removed
}

/// Removes a set of batches from the graph without removing any descendents.
/// Removes a set of batches from the graph without removing any descendants.
///
/// This is intended to cull completed batches from stale blocs.
pub fn remove_committed(&mut self, batches: BTreeSet<BatchJobId>) -> Vec<TransactionId> {
Expand All @@ -129,32 +133,36 @@ impl BatchGraph {
}

/// Mark a batch as proven if it exists.
pub fn mark_proven(&mut self, id: BatchJobId) {
// Its possible for inflight batches to have been removed as part
// of another batches failure.
pub fn mark_proven(&mut self, id: BatchJobId, batch: TransactionBatch) {
// Its possible for inflight batches to have been removed as part of another batches
// failure.
if let Some(node) = self.nodes.get_mut(&id) {
node.status = Status::Proven;
node.status = Status::Proven(batch);
self.try_make_root(id);
}
}

/// Returns at most `count` __indepedent__ batches which are ready for inclusion in a block.
pub fn select_block(&mut self, count: usize) -> BTreeSet<BatchJobId> {
let mut batches = BTreeSet::new();
pub fn select_block(&mut self, count: usize) -> BTreeMap<BatchJobId, TransactionBatch> {
let mut batches = BTreeMap::new();

// Track children so we can evaluate them for root afterwards.
let mut children = BTreeSet::new();

for batch in &self.roots {
let mut node = self.nodes.get_mut(batch).expect("Root node must be in graph");
for batch_id in &self.roots {
let mut node = self.nodes.get_mut(batch_id).expect("Root node must be in graph");

// Filter out batches which have dependencies in our selection so far.
if batches.union(&node.parents).next().is_some() {
if node.parents.iter().any(|parent| batches.contains_key(parent)) {
continue;
}

batches.insert(*batch);
node.status = Status::Proven;
let Status::Proven(batch) = node.status.clone() else {
unreachable!("Root batch must be in proven state.");
};

batches.insert(*batch_id, batch);
node.status = Status::InBlock;

if batches.len() == count {
break;
Expand Down Expand Up @@ -193,9 +201,9 @@ struct Node {
children: BTreeSet<BatchJobId>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
enum Status {
InFlight,
Proven,
Proven(TransactionBatch),
InBlock,
}
Loading
Loading