Skip to content

Commit

Permalink
Reachability database optimization -- removing quadratic disk writes (k…
Browse files Browse the repository at this point in the history
…aspanet#347)

* reachability database refactor: working prototype

* refactor set access structs to remove duplicate logic

* use StoreError in iterators and avoid unwrap calls in low-level code

* unrelated: opt access delete all and add tests

* renames and comments

* use set access for DAG tips store

* implement delete_range properly

* minor simpa change

* minor

* review comments: remove get_height logic from append_child + additional test assert

* some struct doc comments
  • Loading branch information
michaelsutton committed Dec 11, 2023
1 parent c525961 commit c750a4f
Show file tree
Hide file tree
Showing 21 changed files with 662 additions and 221 deletions.
2 changes: 1 addition & 1 deletion consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct MultiConsensusMetadata {
version: u32,
}

const LATEST_DB_VERSION: u32 = 1;
const LATEST_DB_VERSION: u32 = 2;
impl Default for MultiConsensusMetadata {
fn default() -> Self {
Self {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ impl Consensus {
(async { brx.await.unwrap() }, async { vrx.await.unwrap() })
}

pub fn body_tips(&self) -> Arc<BlockHashSet> {
self.body_tips_store.read().get().unwrap()
pub fn body_tips(&self) -> BlockHashSet {
self.body_tips_store.read().get().unwrap().read().clone()
}

pub fn block_status(&self, hash: Hash) -> BlockStatus {
Expand Down Expand Up @@ -578,7 +578,7 @@ impl ConsensusApi for Consensus {
}

fn get_tips(&self) -> Vec<Hash> {
self.body_tips().iter().copied().collect_vec()
self.body_tips_store.read().get().unwrap().read().iter().copied().collect_vec()
}

fn get_pruning_point_utxos(
Expand Down
430 changes: 332 additions & 98 deletions consensus/src/model/stores/reachability.rs

Large diffs are not rendered by default.

65 changes: 32 additions & 33 deletions consensus/src/model/stores/tips.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
use std::sync::Arc;

use kaspa_consensus_core::BlockHashSet;
use kaspa_consensus_core::BlockHasher;
use kaspa_database::prelude::CachedDbSetItem;
use kaspa_database::prelude::DbWriter;
use kaspa_database::prelude::ReadLock;
use kaspa_database::prelude::StoreResult;
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{BatchDbWriter, CachedDbItem, DirectDbWriter};
use kaspa_database::prelude::{BatchDbWriter, DirectDbWriter};
use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use rocksdb::WriteBatch;

/// Reader API for `TipsStore`.
pub trait TipsStoreReader {
fn get(&self) -> StoreResult<Arc<BlockHashSet>>;
fn get(&self) -> StoreResult<ReadLock<BlockHashSet>>;
}

pub trait TipsStore: TipsStoreReader {
fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult<Arc<BlockHashSet>>;
fn add_tip_batch(&mut self, batch: &mut WriteBatch, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult<Arc<BlockHashSet>> {
fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult<ReadLock<BlockHashSet>>;
fn add_tip_batch(
&mut self,
batch: &mut WriteBatch,
new_tip: Hash,
new_tip_parents: &[Hash],
) -> StoreResult<ReadLock<BlockHashSet>> {
self.add_tip_with_writer(BatchDbWriter::new(batch), new_tip, new_tip_parents)
}
fn add_tip_with_writer(
&mut self,
writer: impl DbWriter,
new_tip: Hash,
new_tip_parents: &[Hash],
) -> StoreResult<Arc<BlockHashSet>>;
) -> StoreResult<ReadLock<BlockHashSet>>;
fn prune_tips_batch(&mut self, batch: &mut WriteBatch, pruned_tips: &[Hash]) -> StoreResult<()> {
self.prune_tips_with_writer(BatchDbWriter::new(batch), pruned_tips)
}
Expand All @@ -36,12 +44,12 @@ pub trait TipsStore: TipsStoreReader {
#[derive(Clone)]
pub struct DbTipsStore {
db: Arc<DB>,
access: CachedDbItem<Arc<BlockHashSet>>,
access: CachedDbSetItem<Hash, BlockHasher>,
}

impl DbTipsStore {
pub fn new(db: Arc<DB>) -> Self {
Self { db: Arc::clone(&db), access: CachedDbItem::new(db, DatabaseStorePrefixes::Tips.into()) }
Self { db: Arc::clone(&db), access: CachedDbSetItem::new(db, DatabaseStorePrefixes::Tips.into()) }
}

pub fn clone_with_new_cache(&self) -> Self {
Expand All @@ -53,63 +61,54 @@ impl DbTipsStore {
}

pub fn init_batch(&mut self, batch: &mut WriteBatch, initial_tips: &[Hash]) -> StoreResult<()> {
self.access.write(BatchDbWriter::new(batch), &Arc::new(BlockHashSet::from_iter(initial_tips.iter().copied())))
}
}

/// Updates the internal data if possible
fn update_tips(mut current_tips: Arc<BlockHashSet>, new_tip_parents: &[Hash], new_tip: Hash) -> Arc<BlockHashSet> {
let tips = Arc::make_mut(&mut current_tips);
for parent in new_tip_parents {
tips.remove(parent);
self.access.update(BatchDbWriter::new(batch), initial_tips, &[])?;
Ok(())
}
tips.insert(new_tip);
current_tips
}

impl TipsStoreReader for DbTipsStore {
fn get(&self) -> StoreResult<Arc<BlockHashSet>> {
fn get(&self) -> StoreResult<ReadLock<BlockHashSet>> {
self.access.read()
}
}

impl TipsStore for DbTipsStore {
fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult<Arc<BlockHashSet>> {
self.access.update(DirectDbWriter::new(&self.db), |tips| update_tips(tips, new_tip_parents, new_tip))
fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult<ReadLock<BlockHashSet>> {
self.access.update(DirectDbWriter::new(&self.db), &[new_tip], new_tip_parents)
}

fn add_tip_with_writer(
&mut self,
writer: impl DbWriter,
new_tip: Hash,
new_tip_parents: &[Hash],
) -> StoreResult<Arc<BlockHashSet>> {
self.access.update(writer, |tips| update_tips(tips, new_tip_parents, new_tip))
) -> StoreResult<ReadLock<BlockHashSet>> {
// New tip parents are no longer tips and hence removed
self.access.update(writer, &[new_tip], new_tip_parents)
}

fn prune_tips_with_writer(&mut self, writer: impl DbWriter, pruned_tips: &[Hash]) -> StoreResult<()> {
if pruned_tips.is_empty() {
return Ok(());
}
self.access.update(writer, |mut tips| {
let mut_tips = Arc::make_mut(&mut tips);
for pruned_tip in pruned_tips {
mut_tips.remove(pruned_tip);
}
tips
})?;
self.access.update(writer, &[], pruned_tips)?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use kaspa_database::{create_temp_db, prelude::ConnBuilder};

#[test]
fn test_update_tips() {
let mut tips = Arc::new(BlockHashSet::from_iter([1.into(), 3.into(), 5.into()]));
tips = update_tips(tips, &[3.into(), 5.into()], 7.into());
assert_eq!(Arc::try_unwrap(tips).unwrap(), BlockHashSet::from_iter([1.into(), 7.into()]));
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let mut store = DbTipsStore::new(db.clone());
store.add_tip(1.into(), &[]).unwrap();
store.add_tip(3.into(), &[]).unwrap();
store.add_tip(5.into(), &[]).unwrap();
let tips = store.add_tip(7.into(), &[3.into(), 5.into()]).unwrap();
assert_eq!(tips.read().clone(), BlockHashSet::from_iter([1.into(), 7.into()]));
}
}
3 changes: 1 addition & 2 deletions consensus/src/model/stores/utxo_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ impl DbUtxoSetStore {

/// Clear the store completely in DB and cache
pub fn clear(&mut self) -> Result<(), StoreError> {
let writer = DirectDbWriter::new(&self.db);
self.access.delete_all(writer)
self.access.delete_all(DirectDbWriter::new(&self.db))
}

/// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ impl PruningProcessor {
}

fn assert_utxo_commitment(&self, pruning_point: Hash) {
info!("Verifying the new pruning point UTXO commitment (sanity test)");
let commitment = self.headers_store.get_header(pruning_point).unwrap().utxo_commitment;
let mut multiset = MuHash::new();
let pruning_utxoset_read = self.pruning_utxoset_stores.read();
for (outpoint, entry) in pruning_utxoset_read.utxo_set.iterator().map(|r| r.unwrap()) {
multiset.add_utxo(&outpoint, &entry);
}
assert_eq!(multiset.finalize(), commitment, "Updated pruning point utxo set does not match the header utxo commitment");
info!("Pruning point UTXO commitment was verified correctly (sanity test)");
}

fn prune(&self, new_pruning_point: Hash) {
Expand Down Expand Up @@ -291,6 +293,7 @@ impl PruningProcessor {
let pruned_tips = tips_write
.get()
.unwrap()
.read()
.iter()
.copied()
.filter(|&h| !reachability_read.is_dag_ancestor_of_result(new_pruning_point, h).unwrap())
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl VirtualStateProcessor {
.read()
.get()
.unwrap()
.read()
.iter()
.copied()
.filter(|&h| self.reachability_service.is_dag_ancestor_of(finality_point, h))
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn delete_block(store: &mut (impl ReachabilityStore + ?Sized), block: Hash,
}
};

store.replace_child(parent, block_index, &children)?;
store.replace_child(parent, block, block_index, &children)?;

for child in children.iter().copied() {
store.set_parent(child, parent)?;
Expand All @@ -94,7 +94,7 @@ pub fn delete_block(store: &mut (impl ReachabilityStore + ?Sized), block: Hash,
SearchOutput::NotFound(_) => return Err(ReachabilityError::DataInconsistency),
SearchOutput::Found(hash, i) => {
debug_assert_eq!(hash, block);
store.replace_future_covering_item(merged_block, i, &children)?;
store.replace_future_covering_item(merged_block, block, i, &children)?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/reachability/interval.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
pub struct Interval {
pub(super) start: u64,
pub(super) end: u64,
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/processes/reachability/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ impl<'a, T: ReachabilityStore + ?Sized> StoreBuilder<'a, T> {
}

pub fn add_block(&mut self, hash: Hash, parent: Hash) -> &mut Self {
let parent_height = if !parent.is_none() { self.store.append_child(parent, hash).unwrap() } else { 0 };
let parent_height = if !parent.is_none() {
self.store.append_child(parent, hash).unwrap();
self.store.get_height(parent).unwrap()
} else {
0
};
self.store.insert(hash, parent, Interval::empty(), parent_height + 1).unwrap();
self
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/processes/reachability/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub fn add_tree_block(
// Get the remaining interval capacity
let remaining = store.interval_remaining_after(parent)?;
// Append the new child to `parent.children`
let parent_height = store.append_child(parent, new_block)?;
store.append_child(parent, new_block)?;
let parent_height = store.get_height(parent)?;
if remaining.is_empty() {
// Init with the empty interval.
// Note: internal logic relies on interval being this specific interval
Expand Down
50 changes: 34 additions & 16 deletions database/src/access.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{db::DB, errors::StoreError};

use super::prelude::{Cache, DbKey, DbWriter};
use itertools::Itertools;
use rocksdb::{Direction, IteratorMode, ReadOptions};
use rocksdb::{Direction, IterateBounds, IteratorMode, ReadOptions};
use serde::{de::DeserializeOwned, Serialize};
use std::{collections::hash_map::RandomState, error::Error, hash::BuildHasher, sync::Arc};

Expand Down Expand Up @@ -153,26 +152,15 @@ where
Ok(())
}

/// Deletes all entries in the store using the underlying rocksdb `delete_range` operation
pub fn delete_all(&self, mut writer: impl DbWriter) -> Result<(), StoreError>
where
TKey: Clone + AsRef<[u8]>,
{
self.cache.remove_all();
//TODO: Consider using column families to make it faster
let db_key = DbKey::prefix_only(&self.prefix);
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));
let keys = self
.db
.iterator_opt(IteratorMode::From(db_key.as_ref(), Direction::Forward), read_opts)
.map(|iter_result| match iter_result {
Ok((key, _)) => Ok::<_, rocksdb::Error>(key),
Err(e) => Err(e),
})
.collect_vec();
for key in keys {
writer.delete(key.unwrap())?;
}
let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds();
writer.delete_range(from.unwrap(), to.unwrap())?;
Ok(())
}

Expand Down Expand Up @@ -225,3 +213,33 @@ where
&self.prefix
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
create_temp_db,
prelude::{BatchDbWriter, ConnBuilder, DirectDbWriter},
};
use kaspa_hashes::Hash;
use rocksdb::WriteBatch;

#[test]
fn test_delete_all() {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let access = CachedDbAccess::<Hash, u64>::new(db.clone(), 2, vec![1, 2]);

access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
assert_eq!(16, access.iterator().count());
access.delete_all(DirectDbWriter::new(&db)).unwrap();
assert_eq!(0, access.iterator().count());

access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
assert_eq!(16, access.iterator().count());
let mut batch = WriteBatch::default();
access.delete_all(BatchDbWriter::new(&mut batch)).unwrap();
assert_eq!(16, access.iterator().count());
db.write(batch).unwrap();
assert_eq!(0, access.iterator().count());
}
}
6 changes: 3 additions & 3 deletions database/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ impl<TKey: Clone + std::hash::Hash + Eq + Send + Sync, TData: Clone + Send + Syn
}
}

pub fn remove(&self, key: &TKey) {
pub fn remove(&self, key: &TKey) -> Option<TData> {
if self.size == 0 {
return;
return None;
}
let mut write_guard = self.map.write();
write_guard.swap_remove(key);
write_guard.swap_remove(key)
}

pub fn remove_many(&self, key_iter: &mut impl Iterator<Item = TKey>) {
Expand Down
Loading

0 comments on commit c750a4f

Please sign in to comment.