Skip to content

Commit 0df2de5

Browse files
Muhash parallel reduce -- optimize U3072 mul when LHS = one (kaspanet#581)
* semantic: add `from` ext methods * muhash from txs benchmark * optimization: in u3072 mul test if lhs is one * extract `parallelism_in_power_steps` * comment
1 parent c59a0d1 commit 0df2de5

File tree

9 files changed

+102
-28
lines changed

9 files changed

+102
-28
lines changed

consensus/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ kaspa-txscript-errors.workspace = true
5757
kaspa-addresses.workspace = true
5858

5959
[[bench]]
60-
name = "hash_benchmarks"
60+
name = "parallel_muhash"
6161
harness = false
6262

6363
[[bench]]

consensus/benches/check_scripts.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use kaspa_consensus_core::subnets::SubnetworkId;
99
use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry};
1010
use kaspa_txscript::caches::Cache;
1111
use kaspa_txscript::pay_to_address_script;
12+
use kaspa_utils::iter::parallelism_in_power_steps;
1213
use rand::{thread_rng, Rng};
1314
use secp256k1::Keypair;
14-
use std::thread::available_parallelism;
1515

1616
// You may need to add more detailed mocks depending on your actual code.
1717
fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec<UtxoEntry>) {
@@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) {
9898
});
9999

100100
// Iterate powers of two up to available parallelism
101-
for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) {
101+
for i in parallelism_in_power_steps() {
102102
if inputs_count >= i {
103103
group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| {
104104
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());

consensus/benches/hash_benchmarks.rs

Lines changed: 0 additions & 15 deletions
This file was deleted.

consensus/benches/parallel_muhash.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
2+
use itertools::Itertools;
3+
use kaspa_consensus_core::{
4+
muhash::MuHashExtensions,
5+
subnets::SUBNETWORK_ID_NATIVE,
6+
tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry},
7+
};
8+
use kaspa_hashes::TransactionID;
9+
use kaspa_muhash::MuHash;
10+
use kaspa_utils::iter::parallelism_in_power_steps;
11+
use rayon::prelude::*;
12+
13+
fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction {
14+
let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]);
15+
let mut entries = vec![];
16+
for i in 0..ins {
17+
let mut hasher = TransactionID::new();
18+
hasher.write(i.to_le_bytes());
19+
hasher.write(randomness.to_le_bytes());
20+
let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1);
21+
let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false);
22+
tx.inputs.push(input);
23+
entries.push(entry);
24+
}
25+
for _ in 0..outs {
26+
let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34]));
27+
tx.outputs.push(output);
28+
}
29+
tx.finalize();
30+
SignableTransaction::with_entries(tx, entries)
31+
}
32+
33+
pub fn parallel_muhash_benchmark(c: &mut Criterion) {
34+
let mut group = c.benchmark_group("muhash txs");
35+
let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec();
36+
group.bench_function("seq", |b| {
37+
b.iter(|| {
38+
let mut mh = MuHash::new();
39+
for tx in txs.iter() {
40+
mh.add_transaction(&tx.as_verifiable(), 222);
41+
}
42+
black_box(mh)
43+
})
44+
});
45+
46+
for threads in parallelism_in_power_steps() {
47+
group.bench_function(format!("par {threads}"), |b| {
48+
let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap();
49+
b.iter(|| {
50+
pool.install(|| {
51+
let mh =
52+
txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| {
53+
a.combine(&b);
54+
a
55+
});
56+
black_box(mh)
57+
})
58+
})
59+
});
60+
}
61+
62+
group.finish();
63+
}
64+
65+
criterion_group!(benches, parallel_muhash_benchmark);
66+
criterion_main!(benches);

consensus/core/src/muhash.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use kaspa_muhash::MuHash;
88
pub trait MuHashExtensions {
99
fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64);
1010
fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry);
11+
fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self;
12+
fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self;
1113
}
1214

1315
impl MuHashExtensions for MuHash {
@@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash {
3032
write_utxo(&mut writer, entry, outpoint);
3133
writer.finalize();
3234
}
35+
36+
fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self {
37+
let mut mh = Self::new();
38+
mh.add_transaction(tx, block_daa_score);
39+
mh
40+
}
41+
42+
fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self {
43+
let mut mh = Self::new();
44+
mh.add_utxo(outpoint, entry);
45+
mh
46+
}
3347
}
3448

3549
fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) {

consensus/src/consensus/mod.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -774,14 +774,8 @@ impl ConsensusApi for Consensus {
774774
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();
775775

776776
// Parallelize processing
777-
let inner_multiset = utxoset_chunk
778-
.par_iter()
779-
.map(|(outpoint, entry)| {
780-
let mut inner_multiset = MuHash::new();
781-
inner_multiset.add_utxo(outpoint, entry);
782-
inner_multiset
783-
})
784-
.reduce(MuHash::new, |mut a, b| {
777+
let inner_multiset =
778+
utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| {
785779
a.combine(&b);
786780
a
787781
});

consensus/src/pipeline/virtual_processor/utxo_validation.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ impl VirtualStateProcessor {
248248
.enumerate()
249249
.skip(1) // Skip the coinbase tx.
250250
.filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| {
251-
let mut mh = MuHash::new();
252-
mh.add_transaction(&vtx, pov_daa_score);
251+
let mh = MuHash::from_transaction(&vtx, pov_daa_score);
253252
(smallvec![(vtx, i as u32)], mh)
254253
}
255254
))

crypto/muhash/src/u3072.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ impl U3072 {
8888
}
8989

9090
fn mul(&mut self, other: &U3072) {
91+
/*
92+
Optimization: short-circuit when LHS is one
93+
- This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS)
94+
- If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose
95+
- Benchmarks show that general performance remains the same while parallel reduction gains ~35%
96+
*/
97+
if *self == Self::one() {
98+
*self = *other;
99+
return;
100+
}
91101
let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0);
92102
let mut tmp = Self::one();
93103

utils/src/iter.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@ where
4848
self.inner.clone().fmt(f)
4949
}
5050
}
51+
52+
/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`,
53+
/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16`
54+
pub fn parallelism_in_power_steps() -> impl Iterator<Item = usize> {
55+
(1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x))
56+
}

0 commit comments

Comments
 (0)