Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented future notes support in block-producer #390

Merged
merged 18 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions crates/block-producer/src/batch_builder/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::BTreeSet;

use miden_objects::{
accounts::AccountId,
batches::BatchNoteTree,
block::BlockAccountUpdate,
crypto::hash::blake::{Blake3Digest, Blake3_256},
notes::Nullifier,
notes::{NoteId, Nullifier},
transaction::{OutputNote, TransactionId, TxAccountUpdate},
Digest, MAX_NOTES_PER_BATCH,
};
Expand All @@ -24,9 +26,10 @@ pub type BatchId = Blake3Digest<32>;
pub struct TransactionBatch {
id: BatchId,
updated_accounts: Vec<(TransactionId, TxAccountUpdate)>,
unauthenticated_input_notes: BTreeSet<NoteId>,
produced_nullifiers: Vec<Nullifier>,
created_notes_smt: BatchNoteTree,
created_notes: Vec<OutputNote>,
output_notes_smt: BatchNoteTree,
output_notes: Vec<OutputNote>,
}

impl TransactionBatch {
Expand All @@ -44,38 +47,50 @@ impl TransactionBatch {
pub fn new(txs: Vec<ProvenTransaction>) -> Result<Self, BuildBatchError> {
let id = Self::compute_id(&txs);

// TODO: we need to handle a possibility that a batch contains multiple transactions against
// the same account (e.g., transaction `x` takes account from state `A` to `B` and
// transaction `y` takes account from state `B` to `C`). These will need to be merged
// into a single "update" `A` to `C`.
let updated_accounts =
txs.iter().map(|tx| (tx.id(), tx.account_update().clone())).collect();
let mut updated_accounts = vec![];
let mut produced_nullifiers = vec![];
let mut unauthenticated_input_notes = BTreeSet::new();
for tx in &txs {
// TODO: we need to handle a possibility that a batch contains multiple transactions against
// the same account (e.g., transaction `x` takes account from state `A` to `B` and
// transaction `y` takes account from state `B` to `C`). These will need to be merged
// into a single "update" `A` to `C`.
updated_accounts.push((tx.id(), tx.account_update().clone()));

for note in tx.input_notes() {
produced_nullifiers.push(note.nullifier());
if let Some(note_id) = note.note_id() {
unauthenticated_input_notes.insert(note_id);
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

let produced_nullifiers = txs
// Populate batch created notes, filtering out notes consumed in the same batch.
// Consumed notes are also removed from the unauthenticated input note map.
let output_notes: Vec<_> = txs
.iter()
.flat_map(|tx| tx.input_notes().iter())
.map(|note| note.nullifier())
.flat_map(|tx| tx.output_notes().iter())
.filter(|&note| !unauthenticated_input_notes.remove(&note.id()))
.cloned()
.collect();
bobbinth marked this conversation as resolved.
Show resolved Hide resolved

let created_notes: Vec<_> =
txs.iter().flat_map(|tx| tx.output_notes().iter()).cloned().collect();

if created_notes.len() > MAX_NOTES_PER_BATCH {
return Err(BuildBatchError::TooManyNotesCreated(created_notes.len(), txs));
if output_notes.len() > MAX_NOTES_PER_BATCH {
return Err(BuildBatchError::TooManyNotesCreated(output_notes.len(), txs));
}

// TODO: document under what circumstances SMT creating can fail
let created_notes_smt = BatchNoteTree::with_contiguous_leaves(
created_notes.iter().map(|note| (note.id(), note.metadata())),
let output_notes_smt = BatchNoteTree::with_contiguous_leaves(
output_notes.iter().map(|note| (note.id(), note.metadata())),
)
.map_err(|e| BuildBatchError::NotesSmtError(e, txs))?;

Ok(Self {
id,
updated_accounts,
unauthenticated_input_notes,
produced_nullifiers,
created_notes_smt,
created_notes,
output_notes_smt,
output_notes,
})
}

Expand Down Expand Up @@ -108,19 +123,24 @@ impl TransactionBatch {
})
}

/// Returns unauthenticated input notes set consumed by the transactions in this batch.
pub fn unauthenticated_input_notes(&self) -> &BTreeSet<NoteId> {
&self.unauthenticated_input_notes
}

/// Returns an iterator over produced nullifiers for all consumed notes.
pub fn produced_nullifiers(&self) -> impl Iterator<Item = Nullifier> + '_ {
self.produced_nullifiers.iter().cloned()
}

/// Returns the root hash of the created notes SMT.
pub fn created_notes_root(&self) -> Digest {
self.created_notes_smt.root()
/// Returns the root hash of the output notes SMT.
pub fn output_notes_root(&self) -> Digest {
self.output_notes_smt.root()
}

/// Returns created notes list.
pub fn created_notes(&self) -> &Vec<OutputNote> {
&self.created_notes
/// Returns output notes list.
pub fn output_notes(&self) -> &Vec<OutputNote> {
&self.output_notes
}

// HELPER FUNCTIONS
Expand Down
61 changes: 51 additions & 10 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::{cmp::min, sync::Arc, time::Duration};
use std::{cmp::min, collections::BTreeSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use tokio::{sync::RwLock, time};
use miden_objects::{
notes::NoteId,
transaction::{InputNoteCommitment, OutputNote},
};
use tokio::time;
use tracing::{debug, info, instrument, Span};

use crate::{block_builder::BlockBuilder, ProvenTransaction, SharedRwVec, COMPONENT};
Expand All @@ -13,7 +17,7 @@ pub mod batch;
pub use batch::TransactionBatch;
use miden_node_utils::formatting::{format_array, format_blake3_digest};

use crate::errors::BuildBatchError;
use crate::{errors::BuildBatchError, store::Store};

// BATCH BUILDER
// ================================================================================================
Expand Down Expand Up @@ -45,31 +49,35 @@ pub struct DefaultBatchBuilderOptions {
pub max_batches_per_block: usize,
}

pub struct DefaultBatchBuilder<BB> {
/// Batches ready to be included in a block
ready_batches: SharedRwVec<TransactionBatch>,
pub struct DefaultBatchBuilder<S, BB> {
store: Arc<S>,

block_builder: Arc<BB>,

options: DefaultBatchBuilderOptions,

/// Batches ready to be included in a block
ready_batches: SharedRwVec<TransactionBatch>,
}

// FIXME: remove the allow when the upstream clippy issue is fixed:
// https://github.com/rust-lang/rust-clippy/issues/12281
#[allow(clippy::blocks_in_conditions)]
impl<BB> DefaultBatchBuilder<BB>
impl<S, BB> DefaultBatchBuilder<S, BB>
where
S: Store,
BB: BlockBuilder,
{
// CONSTRUCTOR
// --------------------------------------------------------------------------------------------
/// Returns an new [BatchBuilder] instantiated with the provided [BlockBuilder] and the
/// specified options.
pub fn new(block_builder: Arc<BB>, options: DefaultBatchBuilderOptions) -> Self {
pub fn new(store: Arc<S>, block_builder: Arc<BB>, options: DefaultBatchBuilderOptions) -> Self {
Self {
ready_batches: Arc::new(RwLock::new(Vec::new())),
store,
block_builder,
options,
ready_batches: Default::default(),
}
}

Expand Down Expand Up @@ -112,14 +120,35 @@ where
},
}
}

async fn find_dangling_notes(&self, txs: &[ProvenTransaction]) -> Vec<NoteId> {
// TODO: We can optimize this by looking at the notes created in the previous batches
let mut note_created: BTreeSet<NoteId> = txs
.iter()
.flat_map(|tx| tx.output_notes().iter().map(OutputNote::id))
.chain(
self.ready_batches
.read()
.await
.iter()
.flat_map(|batch| batch.output_notes().iter().map(OutputNote::id)),
)
.collect();

txs.iter()
.flat_map(|tx| tx.input_notes().iter().filter_map(InputNoteCommitment::note_id))
.filter(|note_id| !note_created.remove(note_id))
.collect()
}
}

// FIXME: remove the allow when the upstream clippy issue is fixed:
// https://github.com/rust-lang/rust-clippy/issues/12281
#[allow(clippy::blocks_in_conditions)]
#[async_trait]
impl<BB> BatchBuilder for DefaultBatchBuilder<BB>
impl<S, BB> BatchBuilder for DefaultBatchBuilder<S, BB>
where
S: Store,
BB: BlockBuilder,
{
#[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))]
Expand All @@ -129,6 +158,18 @@ where
info!(target: COMPONENT, num_txs, "Building a transaction batch");
debug!(target: COMPONENT, txs = %format_array(txs.iter().map(|tx| tx.id().to_hex())));

let dangling_notes = self.find_dangling_notes(&txs).await;
if !dangling_notes.is_empty() {
let missing_notes = match self.store.get_missing_notes(&dangling_notes).await {
Ok(notes) => notes,
Err(err) => return Err(BuildBatchError::GetMissingNotesRequestError(err, txs)),
};

if !missing_notes.is_empty() {
return Err(BuildBatchError::UnauthenticatedNotesNotFound(missing_notes, txs));
}
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
}

let batch = TransactionBatch::new(txs)?;

info!(target: COMPONENT, "Transaction batch built");
Expand Down
15 changes: 14 additions & 1 deletion crates/block-producer/src/batch_builder/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::iter;

use tokio::sync::RwLock;

use super::*;
use crate::{errors::BuildBlockError, test_utils::MockProvenTxBuilder};
use crate::{
errors::BuildBlockError,
test_utils::{MockProvenTxBuilder, MockStoreSuccessBuilder},
};

// STRUCTS
// ================================================================================================
Expand Down Expand Up @@ -43,9 +50,11 @@ async fn test_block_size_doesnt_exceed_limit() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand Down Expand Up @@ -81,9 +90,11 @@ async fn test_build_block_called_when_no_batches() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderSuccess::default());

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand All @@ -106,9 +117,11 @@ async fn test_batches_added_back_to_queue_on_block_build_failure() {
let block_frequency = Duration::from_millis(20);
let max_batches_per_block = 2;

let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build());
let block_builder = Arc::new(BlockBuilderFailure);

let batch_builder = Arc::new(DefaultBatchBuilder::new(
store,
block_builder.clone(),
DefaultBatchBuilderOptions { block_frequency, max_batches_per_block },
));
Expand Down
27 changes: 20 additions & 7 deletions crates/block-producer/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ use miden_node_proto::{
};
use miden_objects::{
accounts::AccountId,
crypto::merkle::{MerklePath, MmrPeaks, SmtProof},
notes::Nullifier,
crypto::{
hash::rpo::RpoDigest,
merkle::{MerklePath, MmrPeaks, SmtProof},
},
notes::{NoteId, Nullifier},
BlockHeader, Digest,
};

Expand All @@ -31,6 +34,9 @@ pub struct BlockInputs {

/// The requested nullifiers and their authentication paths
pub nullifiers: BTreeMap<Nullifier, SmtProof>,

/// List of notes that were not found in the store
pub missing_notes: Vec<NoteId>,
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, Debug, Default)]
Expand All @@ -42,8 +48,8 @@ pub struct AccountWitness {
impl TryFrom<GetBlockInputsResponse> for BlockInputs {
type Error = BlockInputsError;

fn try_from(get_block_inputs: GetBlockInputsResponse) -> Result<Self, Self::Error> {
let block_header: BlockHeader = get_block_inputs
fn try_from(response: GetBlockInputsResponse) -> Result<Self, Self::Error> {
let block_header: BlockHeader = response
.block_header
.ok_or(GetBlockInputsResponse::missing_field(stringify!(block_header)))?
.try_into()?;
Expand All @@ -57,15 +63,15 @@ impl TryFrom<GetBlockInputsResponse> for BlockInputs {

MmrPeaks::new(
num_leaves,
get_block_inputs
response
.mmr_peaks
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()?,
)?
};

let accounts = get_block_inputs
let accounts = response
.account_states
.into_iter()
.map(|entry| {
Expand All @@ -78,7 +84,7 @@ impl TryFrom<GetBlockInputsResponse> for BlockInputs {
})
.collect::<Result<BTreeMap<_, _>, ConversionError>>()?;

let nullifiers = get_block_inputs
let nullifiers = response
.nullifiers
.into_iter()
.map(|entry| {
Expand All @@ -87,11 +93,18 @@ impl TryFrom<GetBlockInputsResponse> for BlockInputs {
})
.collect::<Result<BTreeMap<_, _>, ConversionError>>()?;

let missing_notes = response
.missing_notes
.into_iter()
.map(|digest| Ok(RpoDigest::try_from(digest)?.into()))
.collect::<Result<Vec<_>, ConversionError>>()?;

Ok(Self {
block_header,
chain_peaks,
accounts,
nullifiers,
missing_notes,
})
}
}
Loading