Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-producer): batch builder #515

Merged
merged 35 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
596ba65
Make block inclusion proofs a map instead of a vector.
Mirko-von-Leipzig Oct 7, 2024
8c54b48
Typesafe note and transaction wrappers
Mirko-von-Leipzig Oct 7, 2024
1ec3e02
BatchId from type alias to new wrapper
Mirko-von-Leipzig Oct 8, 2024
79084a8
Implement batch builder.
Mirko-von-Leipzig Oct 8, 2024
d49e8a2
fmt
Mirko-von-Leipzig Oct 8, 2024
e477d09
Actually use the output notes smt
Mirko-von-Leipzig Oct 8, 2024
d8eeb0f
Make builder more type safe
Mirko-von-Leipzig Oct 8, 2024
5f2e418
Not so dangerous
Mirko-von-Leipzig Oct 8, 2024
02ff7ac
Separate variants for batch limits.
Mirko-von-Leipzig Oct 9, 2024
8205951
Replace TransactionBatch with ProvenBatch
Mirko-von-Leipzig Oct 9, 2024
92bc31a
Reuse existing batch type instead of new one.
Mirko-von-Leipzig Oct 10, 2024
e0a6024
Reuse existing block builder
Mirko-von-Leipzig Oct 10, 2024
190c6de
Track inflight output notes
Mirko-von-Leipzig Oct 10, 2024
bb79cb1
Fixup some abstractions
Mirko-von-Leipzig Oct 10, 2024
6ac6733
fixup comment
Mirko-von-Leipzig Oct 10, 2024
2057ad5
Shuffle
Mirko-von-Leipzig Oct 10, 2024
4c48c91
Refactor transaction batch tests
Mirko-von-Leipzig Oct 11, 2024
59fac0e
Simplify via subtraction.
Mirko-von-Leipzig Oct 14, 2024
2e46d51
Fixup inflight state
Mirko-von-Leipzig Oct 14, 2024
be98ab9
Revert miden-air usage
Mirko-von-Leipzig Oct 14, 2024
857b40d
Use block builder trait
Mirko-von-Leipzig Oct 14, 2024
bbaa5eb
tests: inflight state
Mirko-von-Leipzig Oct 14, 2024
293fd48
Move account status into separate module and simplify
Mirko-von-Leipzig Oct 15, 2024
d8e3497
fmt
Mirko-von-Leipzig Oct 15, 2024
771c9c7
Tests for inflight account state
Mirko-von-Leipzig Oct 16, 2024
8e7d839
Switch to a seedable random generator
Mirko-von-Leipzig Oct 17, 2024
220949f
Test transaction graph
Mirko-von-Leipzig Oct 17, 2024
db9d3ed
Revert note inclusion proof changes
Mirko-von-Leipzig Oct 17, 2024
ed0a44c
chore: fix typos, format comments, add section headers
bobbinth Oct 20, 2024
c62b876
Merge tx with inputs in a new type
Mirko-von-Leipzig Oct 21, 2024
fdee9f5
Address review comments #1
Mirko-von-Leipzig Oct 22, 2024
1f8dfa5
Move committed state into inflight state tracking
Mirko-von-Leipzig Oct 22, 2024
a0c85d8
Move transaction into domain module
Mirko-von-Leipzig Oct 22, 2024
92aa16b
fmt
Mirko-von-Leipzig Oct 22, 2024
a884d23
Fix test
Mirko-von-Leipzig Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ miden-lib = { workspace = true, features = ["testing"] }
miden-node-test-macro = { path = "../test-macro" }
miden-objects = { workspace = true, features = ["testing"] }
miden-tx = { workspace = true, features = ["testing"] }
rand = { version = "0.8.5" }
rand_chacha = { version = "0.3", default-features = false }
tokio = { workspace = true, features = ["test-util"] }
winterfell = { version = "0.9" }
3 changes: 2 additions & 1 deletion crates/block-producer/src/batch_builder/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ pub type BatchId = Blake3Digest<32>;
// TRANSACTION BATCH
// ================================================================================================

/// A batch of transactions that share a common proof.
/// A batch of transactions that share a common proof. For any given account, at most 1 transaction
/// in the batch must be addressing that account (issue: #186).
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note: Until recursive proofs are available in the Miden VM, we don't include the common proof.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
68 changes: 55 additions & 13 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use std::{cmp::min, collections::BTreeSet, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
cmp::min,
collections::{BTreeMap, BTreeSet},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};

use miden_objects::{
accounts::AccountId,
notes::NoteId,
transaction::{OutputNote, TransactionId},
Digest,
};
use tokio::{sync::Mutex, time};
use tokio::{sync::Mutex, task::JoinSet, time};
use tonic::async_trait;
use tracing::{debug, info, instrument, Span};

use crate::{
block_builder::BlockBuilder,
domain::transaction::AuthenticatedTransaction,
mempool::{BatchJobId, Mempool},
ProvenTransaction, SharedRwVec, COMPONENT,
};
Expand Down Expand Up @@ -219,41 +228,74 @@ pub struct BatchProducer {
pub workers: NonZeroUsize,
pub mempool: Arc<Mutex<Mempool>>,
pub tx_per_batch: usize,
pub simulated_proof_time: Duration,
}

type BatchResult = Result<BatchJobId, (BatchJobId, BuildBatchError)>;
type BatchResult = Result<(BatchJobId, TransactionBatch), (BatchJobId, BuildBatchError)>;

/// Wrapper around tokio's JoinSet that remains pending if the set is empty,
/// instead of returning None.
struct WorkerPool(tokio::task::JoinSet<BatchResult>);
struct WorkerPool {
in_progress: JoinSet<BatchResult>,
simulated_proof_time: Duration,
}

impl WorkerPool {
fn new(simulated_proof_time: Duration) -> Self {
Self {
simulated_proof_time,
in_progress: JoinSet::new(),
}
}

async fn join_next(&mut self) -> Result<BatchResult, tokio::task::JoinError> {
if self.0.is_empty() {
if self.in_progress.is_empty() {
std::future::pending().await
} else {
// Cannot be None as its not empty.
self.0.join_next().await.unwrap()
self.in_progress.join_next().await.unwrap()
}
}

fn len(&self) -> usize {
self.0.len()
self.in_progress.len()
}

fn spawn(&mut self, id: BatchJobId, transactions: Vec<TransactionId>) {
self.0.spawn(async move {
todo!("Do actual work like aggregating transaction data");
fn spawn(&mut self, id: BatchJobId, transactions: Vec<AuthenticatedTransaction>) {
self.in_progress.spawn({
let simulated_proof_time = self.simulated_proof_time;
async move {
tracing::debug!("Begin proving batch.");

let transactions =
transactions.into_iter().map(AuthenticatedTransaction::into_raw).collect();

let batch = TransactionBatch::new(transactions, Default::default())
.map_err(|err| (id, err))?;

tokio::time::sleep(simulated_proof_time).await;
tracing::debug!("Batch proof completed.");
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved

Ok((id, batch))
}
});
}
}

impl BatchProducer {
/// Starts the [BlockProducer], infinitely producing blocks at the configured interval.
///
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
/// Block production is sequential and consists of
///
/// 1. Pulling the next set of batches from the [Mempool]
/// 2. Compiling these batches into the next block
/// 3. Proving the block (this is not yet implemented)
/// 4. Committing the block to the store
pub async fn run(self) {
let mut interval = tokio::time::interval(self.batch_interval);
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut inflight = WorkerPool(tokio::task::JoinSet::new());
let mut inflight = WorkerPool::new(self.simulated_proof_time);

loop {
tokio::select! {
Expand Down Expand Up @@ -285,8 +327,8 @@ impl BatchProducer {
tracing::warn!(%batch_id, %err, "Batch job failed.");
mempool.batch_failed(batch_id);
},
Ok(Ok(batch_id)) => {
mempool.batch_proved(batch_id);
Ok(Ok((batch_id, batch))) => {
mempool.batch_proved(batch_id, batch);
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use async_trait::async_trait;
use miden_node_utils::formatting::{format_array, format_blake3_digest};
Expand Down Expand Up @@ -146,12 +149,13 @@ where
}
}

struct BlockProducer {
struct BlockProducer<BB> {
pub mempool: Arc<Mutex<Mempool>>,
pub block_interval: tokio::time::Duration,
pub block_builder: BB,
}

impl BlockProducer {
impl<BB: BlockBuilder> BlockProducer<BB> {
pub async fn run(self) {
let mut interval = tokio::time::interval(self.block_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Expand All @@ -160,8 +164,9 @@ impl BlockProducer {
interval.tick().await;

let (block_number, batches) = self.mempool.lock().await.select_block();
let batches = batches.into_values().collect::<Vec<_>>();

let result = self.build_and_commit_block(batches).await;
let result = self.block_builder.build_block(&batches).await;
let mut mempool = self.mempool.lock().await;

match result {
Expand All @@ -170,8 +175,4 @@ impl BlockProducer {
}
}
}

async fn build_and_commit_block(&self, batches: BTreeSet<BatchJobId>) -> Result<(), ()> {
todo!("Aggregate, prove and commit block");
}
}
1 change: 1 addition & 0 deletions crates/block-producer/src/domain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod transaction;
151 changes: 151 additions & 0 deletions crates/block-producer/src/domain/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::collections::BTreeSet;

use miden_objects::{
accounts::AccountId,
notes::{NoteId, Nullifier},
transaction::{ProvenTransaction, TransactionId, TxAccountUpdate},
Digest,
};

use crate::{errors::VerifyTxError, mempool::BlockNumber, store::TransactionInputs};

/// A transaction who's proof has been verified, and which has been authenticated against the store.
///
/// Authentication ensures that all nullifiers are unspent, and additionally authenticates some
/// previously unauthenticated input notes.
///
/// Note that this is of course only valid for the chain height of the authentication.
#[derive(Clone, PartialEq)]
pub struct AuthenticatedTransaction {
inner: ProvenTransaction,
/// The account state provided by the store [inputs](TransactionInputs).
///
/// This does not necessarily have to match the transaction's initial state
/// as this may still be modified by inflight transactions.
store_account_state: Option<Digest>,
/// Unauthenticates notes that have now been authenticated by the store
/// [inputs](TransactionInputs).
///
/// In other words, notes which were unauthenticated at the time the transaction was proven,
/// but which have since been committed to, and authenticated by the store.
notes_authenticated_by_store: BTreeSet<NoteId>,
/// Chain height that the authentication took place at.
authentication_height: BlockNumber,
}

impl AuthenticatedTransaction {
/// Verifies the transaction against the inputs, enforcing that all nullifiers are unspent.
///
/// __No__ proof verification is peformed. The caller takes responsibility for ensuring
/// that the proof is valid.
///
/// # Errors
///
/// Returns an error if any of the transaction's nullifiers are marked as spent by the inputs.
pub fn new(
tx: ProvenTransaction,
inputs: TransactionInputs,
) -> Result<AuthenticatedTransaction, VerifyTxError> {
let nullifiers_already_spent = tx
.get_nullifiers()
.filter(|nullifier| inputs.nullifiers.get(nullifier).cloned().flatten().is_some())
.collect::<Vec<_>>();
if !nullifiers_already_spent.is_empty() {
return Err(VerifyTxError::InputNotesAlreadyConsumed(nullifiers_already_spent));
}

// Invert the missing notes; i.e. we now know the rest were actually found.
let authenticated_notes = tx
.get_unauthenticated_notes()
.map(|header| header.id())
.filter(|note_id| !inputs.missing_unauthenticated_notes.contains(note_id))
.collect();

Ok(AuthenticatedTransaction {
inner: tx,
notes_authenticated_by_store: authenticated_notes,
authentication_height: BlockNumber::new(inputs.current_block_height),
store_account_state: inputs.account_hash,
})
}

pub fn id(&self) -> TransactionId {
self.inner.id()
}

pub fn account_id(&self) -> AccountId {
self.inner.account_id()
}

pub fn account_update(&self) -> &TxAccountUpdate {
self.inner.account_update()
}

pub fn store_account_state(&self) -> Option<Digest> {
self.store_account_state
}

pub fn authentication_height(&self) -> BlockNumber {
self.authentication_height
}

pub fn nullifiers(&self) -> impl Iterator<Item = Nullifier> + '_ {
self.inner.get_nullifiers()
}

pub fn output_notes(&self) -> impl Iterator<Item = NoteId> + '_ {
self.inner.output_notes().iter().map(|note| note.id())
}

/// Notes which were unauthenticate in the transaction __and__ which were
/// not authenticated by the store inputs.
pub fn unauthenticated_notes(&self) -> impl Iterator<Item = NoteId> + '_ {
self.inner
.get_unauthenticated_notes()
.cloned()
.map(|header| header.id())
.filter(|note_id| !self.notes_authenticated_by_store.contains(note_id))
}

pub fn into_raw(self) -> ProvenTransaction {
self.inner
}
}

#[cfg(test)]
impl AuthenticatedTransaction {
//! Builder methods intended for easier test setup.

/// Short-hand for `Self::new` where the input's are setup to match the transaction's initial
/// account state.
pub fn from_inner(inner: ProvenTransaction) -> Self {
let store_account_state = match inner.account_update().init_state_hash() {
zero if zero == Digest::default() => None,
non_zero => Some(non_zero),
};
Self {
inner,
store_account_state,
notes_authenticated_by_store: Default::default(),
authentication_height: Default::default(),
}
}

/// Overrides the authentication height with the given value.
pub fn with_authentication_height(mut self, height: u32) -> Self {
self.authentication_height = BlockNumber::new(height);
self
}

/// Overrides the store state with the given value.
pub fn with_store_state(mut self, state: Digest) -> Self {
self.store_account_state = Some(state);
self
}

/// Unsets the store state.
pub fn with_empty_store_state(mut self) -> Self {
self.store_account_state = None;
self
}
}
Loading
Loading