Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements parallel leaf hashing in process_sorted_pairs_to_leaves #2

Open
wants to merge 5 commits into
base: krushimir/subtree_mutations
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Generate reverse mutations set on applying of mutations set, implemented serialization of `MutationsSet` (#355).
- Added parallel implementation of `Smt::compute_mutations` with better performance (#365).
- Implemented parallel leaf hashing in `SparseMerkleTree::process_sorted_pairs_to_leaves`.

## 0.13.0 (2024-11-24)

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ name = "store"
harness = false

[features]
concurrent = ["dep:rayon"]
concurrent = ["dep:rayon", "hashbrown?/rayon"]
default = ["std", "concurrent"]
executable = ["dep:clap", "dep:rand-utils", "std"]
smt_hashmaps = ["dep:hashbrown"]
Expand Down
32 changes: 26 additions & 6 deletions src/merkle/smt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
/// The type for a value
type Value: Clone + PartialEq;
/// The type for a leaf
type Leaf: Clone;
type Leaf: Clone + Send + Sync;
/// The type for an opening (i.e. a "proof") of a leaf
type Opening;

Expand All @@ -81,6 +81,8 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
fn with_entries_par(entries: Vec<(Self::Key, Self::Value)>) -> Result<Self, MerkleError>
where
Self: Sized,
Self::Key: Send + Sync,
Self::Value: Send + Sync,
{
let (inner_nodes, leaves) = Self::build_subtrees(entries);
let root = inner_nodes.get(&NodeIndex::root()).unwrap().hash();
Expand Down Expand Up @@ -535,6 +537,7 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {

/// Computes leaves from a set of key-value pairs and current leaf values.
/// Derived from `sorted_pairs_to_leaves`
#[cfg(feature = "concurrent")]
fn sorted_pairs_to_mutated_subtree_leaves(
&self,
pairs: Vec<(Self::Key, Self::Value)>,
Expand Down Expand Up @@ -680,6 +683,7 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
/// # Panics
/// With debug assertions on, this function panics if it detects that `pairs` is not correctly
/// sorted. Without debug assertions, the returned computations will be incorrect.
#[cfg(feature = "concurrent")]
fn sorted_pairs_to_leaves(
pairs: Vec<(Self::Key, Self::Value)>,
) -> PairComputations<u64, Self::Leaf> {
Expand Down Expand Up @@ -709,17 +713,18 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
///
/// # Panics
/// This function will panic in debug mode if the input `pairs` are not sorted by column index.
#[cfg(feature = "concurrent")]
fn process_sorted_pairs_to_leaves<F>(
pairs: Vec<(Self::Key, Self::Value)>,
mut process_leaf: F,
) -> PairComputations<u64, Self::Leaf>
where
F: FnMut(Vec<(Self::Key, Self::Value)>) -> Self::Leaf,
{
use rayon::prelude::*;
debug_assert!(pairs.is_sorted_by_key(|(key, _)| Self::key_to_leaf_index(key).value()));

let mut accumulator: PairComputations<u64, Self::Leaf> = Default::default();
let mut accumulated_leaves: Vec<SubtreeLeaf> = Vec::with_capacity(pairs.len() / 2);

// As we iterate, we'll keep track of the kv-pairs we've seen so far that correspond to a
// single leaf. When we see a pair that's in a different leaf, we'll swap these pairs
Expand Down Expand Up @@ -748,14 +753,23 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
// it's time to swap out our buffer.
let leaf_pairs = mem::take(&mut current_leaf_buffer);
let leaf = process_leaf(leaf_pairs);
let hash = Self::hash_leaf(&leaf);

accumulator.nodes.insert(col, leaf);
accumulated_leaves.push(SubtreeLeaf { col, hash });

debug_assert!(current_leaf_buffer.is_empty());
}

// Compute the leaves from the nodes concurrently
let mut accumulated_leaves: Vec<SubtreeLeaf> = accumulator
.nodes
.clone()
.into_par_iter()
.map(|(col, leaf)| SubtreeLeaf { col, hash: Self::hash_leaf(&leaf) })
.collect();

// Sort the leaves by column
accumulated_leaves.par_sort_by_key(|leaf| leaf.col);

// TODO: determine is there is any notable performance difference between computing
// subtree boundaries after the fact as an iterator adapter (like this), versus computing
// subtree boundaries as we go. Either way this function is only used at the beginning of a
Expand All @@ -770,8 +784,14 @@ pub(crate) trait SparseMerkleTree<const DEPTH: u8> {
#[cfg(feature = "concurrent")]
fn build_subtrees(
mut entries: Vec<(Self::Key, Self::Value)>,
) -> (InnerNodes, Leaves<Self::Leaf>) {
entries.sort_by_key(|item| {
) -> (InnerNodes, Leaves<Self::Leaf>)
where
Self::Key: Send + Sync,
Self::Value: Send + Sync,
{
use rayon::prelude::*;

entries.par_sort_by_key(|item| {
let index = Self::key_to_leaf_index(&item.0);
index.value()
});
Expand Down
5 changes: 5 additions & 0 deletions src/merkle/smt/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn smtleaf_to_subtree_leaf(leaf: &SmtLeaf) -> SubtreeLeaf {
}

#[test]
#[cfg(feature = "concurrent")]
fn test_sorted_pairs_to_leaves() {
let entries: Vec<(RpoDigest, Word)> = vec![
// Subtree 0.
Expand Down Expand Up @@ -143,6 +144,7 @@ fn generate_updates(entries: Vec<(RpoDigest, Word)>, updates: usize) -> Vec<(Rpo
}

#[test]
#[cfg(feature = "concurrent")]
fn test_single_subtree() {
// A single subtree's worth of leaves.
const PAIR_COUNT: u64 = COLS_PER_SUBTREE;
Expand Down Expand Up @@ -182,6 +184,7 @@ fn test_single_subtree() {
// subtree into computing another. In other words, test that `build_subtree()` is correctly
// composable.
#[test]
#[cfg(feature = "concurrent")]
fn test_two_subtrees() {
// Two subtrees' worth of leaves.
const PAIR_COUNT: u64 = COLS_PER_SUBTREE * 2;
Expand Down Expand Up @@ -239,6 +242,7 @@ fn test_two_subtrees() {
}

#[test]
#[cfg(feature = "concurrent")]
fn test_singlethreaded_subtrees() {
const PAIR_COUNT: u64 = COLS_PER_SUBTREE * 64;

Expand Down Expand Up @@ -450,6 +454,7 @@ fn test_with_entries_parallel() {
}

#[test]
#[cfg(feature = "concurrent")]
fn test_singlethreaded_subtree_mutations() {
const PAIR_COUNT: u64 = COLS_PER_SUBTREE * 64;

Expand Down
Loading