From c750a4fdb6885a6ebb6f129dde2b1a1f853784cd Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Mon, 11 Dec 2023 18:06:09 +0200 Subject: [PATCH] Reachability database optimization -- removing quadratic disk writes (#347) * reachability database refactor: working prototype * refactor set access structs to remove duplicate logic * use StoreError in iterators and avoid unwrap calls in low-level code * unrelated: opt access delete all and add tests * renames and comments * use set access for DAG tips store * implement delete_range properly * minor simpa change * minor * review comments: remove get_height logic from append_child + additional test assert * some struct doc comments --- consensus/src/consensus/factory.rs | 2 +- consensus/src/consensus/mod.rs | 6 +- consensus/src/model/stores/reachability.rs | 430 ++++++++++++++---- consensus/src/model/stores/tips.rs | 65 ++- consensus/src/model/stores/utxo_set.rs | 3 +- .../pipeline/pruning_processor/processor.rs | 3 + .../pipeline/virtual_processor/processor.rs | 1 + .../src/processes/reachability/inquirer.rs | 4 +- .../src/processes/reachability/interval.rs | 2 +- .../src/processes/reachability/tests/mod.rs | 7 +- consensus/src/processes/reachability/tree.rs | 3 +- database/src/access.rs | 50 +- database/src/cache.rs | 6 +- database/src/item.rs | 83 +++- database/src/key.rs | 8 +- database/src/lib.rs | 4 +- database/src/registry.rs | 4 + database/src/set_access.rs | 152 +++++-- database/src/writer.rs | 35 ++ indexes/utxoindex/src/stores/indexed_utxos.rs | 3 +- simpa/src/main.rs | 12 +- 21 files changed, 662 insertions(+), 221 deletions(-) diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index 74017e185..7fb9f6214 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -54,7 +54,7 @@ pub struct MultiConsensusMetadata { version: u32, } -const LATEST_DB_VERSION: u32 = 1; +const LATEST_DB_VERSION: u32 = 2; impl Default for MultiConsensusMetadata { fn default() -> Self { Self { diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index cd689266b..d96dfcaad 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -312,8 +312,8 @@ impl Consensus { (async { brx.await.unwrap() }, async { vrx.await.unwrap() }) } - pub fn body_tips(&self) -> Arc { - self.body_tips_store.read().get().unwrap() + pub fn body_tips(&self) -> BlockHashSet { + self.body_tips_store.read().get().unwrap().read().clone() } pub fn block_status(&self, hash: Hash) -> BlockStatus { @@ -578,7 +578,7 @@ impl ConsensusApi for Consensus { } fn get_tips(&self) -> Vec { - self.body_tips().iter().copied().collect_vec() + self.body_tips_store.read().get().unwrap().read().iter().copied().collect_vec() } fn get_pruning_point_utxos( diff --git a/consensus/src/model/stores/reachability.rs b/consensus/src/model/stores/reachability.rs index 853611526..f423a439f 100644 --- a/consensus/src/model/stores/reachability.rs +++ b/consensus/src/model/stores/reachability.rs @@ -4,7 +4,7 @@ use kaspa_consensus_core::{ BlockHashMap, BlockHashSet, BlockHasher, BlockLevel, HashMapCustomHasher, }; use kaspa_database::{ - prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DbKey, DirectDbWriter, StoreError, DB}, + prelude::{BatchDbWriter, Cache, CachedDbAccess, CachedDbItem, DbKey, DbSetAccess, DbWriter, DirectDbWriter, StoreError, DB}, registry::{DatabaseStorePrefixes, SEPARATOR}, }; use kaspa_hashes::Hash; @@ -13,20 +13,22 @@ use itertools::Itertools; use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard}; use rocksdb::WriteBatch; use serde::{Deserialize, Serialize}; -use std::{collections::hash_map::Entry::Vacant, iter::once, sync::Arc}; +use std::{ + collections::hash_map::Entry::{Occupied, Vacant}, + iter::once, + sync::Arc, +}; #[derive(Clone, Serialize, Deserialize)] -pub struct ReachabilityData { - pub children: BlockHashes, +pub(crate) struct ReachabilityData { pub parent: Hash, pub interval: Interval, pub height: u64, - pub future_covering_set: BlockHashes, } impl ReachabilityData { pub fn new(parent: Hash, interval: Interval, height: u64) -> Self { - Self { children: Arc::new(vec![]), parent, interval, height, future_covering_set: Arc::new(vec![]) } + Self { parent, interval, height } } } @@ -49,22 +51,125 @@ pub trait ReachabilityStore: ReachabilityStoreReader { fn init(&mut self, origin: Hash, capacity: Interval) -> Result<(), StoreError>; fn insert(&mut self, hash: Hash, parent: Hash, interval: Interval, height: u64) -> Result<(), StoreError>; fn set_interval(&mut self, hash: Hash, interval: Interval) -> Result<(), StoreError>; - fn append_child(&mut self, hash: Hash, child: Hash) -> Result; + fn append_child(&mut self, hash: Hash, child: Hash) -> Result<(), StoreError>; fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError>; fn set_parent(&mut self, hash: Hash, new_parent: Hash) -> Result<(), StoreError>; - fn replace_child(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError>; - fn replace_future_covering_item(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError>; + fn replace_child( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError>; + fn replace_future_covering_item( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError>; fn delete(&mut self, hash: Hash) -> Result<(), StoreError>; fn get_height(&self, hash: Hash) -> Result; fn set_reindex_root(&mut self, root: Hash) -> Result<(), StoreError>; fn get_reindex_root(&self) -> Result; } +/// DB cached ordered `Set` access (manages a set per entry with cache and ordering). +/// Used both for the tree children set and for the future covering set (per block) +#[derive(Clone)] +struct DbReachabilitySet { + access: DbSetAccess, + cache: Cache, +} + +impl DbReachabilitySet { + fn new(set_access: DbSetAccess, set_cache: Cache) -> Self { + Self { access: set_access, cache: set_cache } + } + + fn append(&mut self, writer: impl DbWriter, hash: Hash, element: Hash) -> Result<(), StoreError> { + if let Some(mut entry) = self.cache.get(&hash) { + Arc::make_mut(&mut entry).push(element); + self.cache.insert(hash, entry); + } + self.access.write(writer, hash, element)?; + Ok(()) + } + + fn insert(&mut self, writer: impl DbWriter, hash: Hash, element: Hash, insertion_index: usize) -> Result<(), StoreError> { + if let Some(mut entry) = self.cache.get(&hash) { + Arc::make_mut(&mut entry).insert(insertion_index, element); + self.cache.insert(hash, entry); + } + self.access.write(writer, hash, element)?; + Ok(()) + } + + fn replace( + &mut self, + mut writer: impl DbWriter, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { + if let Some(mut entry) = self.cache.get(&hash) { + { + let removed_elements = + Arc::make_mut(&mut entry).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + debug_assert_eq!(replaced_hash, removed_elements.exactly_one().unwrap()); + } + self.cache.insert(hash, entry); + } + self.access.delete(&mut writer, hash, replaced_hash)?; + for added_element in replace_with.iter().copied() { + self.access.write(&mut writer, hash, added_element)?; + } + Ok(()) + } + + fn commit_staging_entry(&mut self, mut writer: impl DbWriter, hash: Hash, entry: StagingSetEntry) -> Result<(), StoreError> { + self.cache.insert(hash, entry.set); + for removed_element in entry.deletions { + self.access.delete(&mut writer, hash, removed_element)?; + } + for added_element in entry.additions { + self.access.write(&mut writer, hash, added_element)?; + } + Ok(()) + } + + fn delete(&mut self, writer: impl DbWriter, hash: Hash) -> Result<(), StoreError> { + self.cache.remove(&hash); + self.access.delete_bucket(writer, hash) + } + + fn read(&self, hash: Hash, f: F) -> Result + where + F: FnMut(&Hash) -> K, + K: Ord, + { + if let Some(entry) = self.cache.get(&hash) { + return Ok(entry); + } + + let mut set: Vec = self.access.bucket_iterator(hash).collect::>()?; + // Apply the ordering rule before caching + set.sort_by_cached_key(f); + let set = BlockHashes::new(set); + self.cache.insert(hash, set.clone()); + + Ok(set) + } +} + /// A DB + cache implementation of `ReachabilityStore` trait, with concurrent readers support. #[derive(Clone)] pub struct DbReachabilityStore { db: Arc, - access: CachedDbAccess, BlockHasher>, + access: CachedDbAccess, // Main access + children_access: DbReachabilitySet, // Tree children + fcs_access: DbReachabilitySet, // Future Covering Set reindex_root: CachedDbItem, prefix_end: u8, } @@ -81,10 +186,15 @@ impl DbReachabilityStore { fn with_prefix_end(db: Arc, cache_size: u64, prefix_end: u8) -> Self { let store_prefix = DatabaseStorePrefixes::Reachability.into_iter().chain(once(prefix_end)).collect_vec(); + let children_prefix = DatabaseStorePrefixes::ReachabilityTreeChildren.into_iter().chain(once(prefix_end)).collect_vec(); + let fcs_prefix = DatabaseStorePrefixes::ReachabilityFutureCoveringSet.into_iter().chain(once(prefix_end)).collect_vec(); let reindex_root_prefix = DatabaseStorePrefixes::ReachabilityReindexRoot.into_iter().chain(once(prefix_end)).collect_vec(); + let access = CachedDbAccess::new(db.clone(), cache_size, store_prefix); Self { - db: Arc::clone(&db), - access: CachedDbAccess::new(Arc::clone(&db), cache_size, store_prefix), + db: db.clone(), + access, + children_access: DbReachabilitySet::new(DbSetAccess::new(db.clone(), children_prefix), Cache::new(cache_size)), + fcs_access: DbReachabilitySet::new(DbSetAccess::new(db.clone(), fcs_prefix), Cache::new(cache_size)), reindex_root: CachedDbItem::new(db, reindex_root_prefix), prefix_end, } @@ -97,9 +207,9 @@ impl DbReachabilityStore { impl ReachabilityStore for DbReachabilityStore { fn init(&mut self, origin: Hash, capacity: Interval) -> Result<(), StoreError> { - debug_assert!(!self.access.has(origin)?); + assert!(!self.access.has(origin)?); - let data = Arc::new(ReachabilityData::new(blockhash::NONE, capacity, 0)); + let data = ReachabilityData::new(blockhash::NONE, capacity, 0); let mut batch = WriteBatch::default(); self.access.write(BatchDbWriter::new(&mut batch), origin, data)?; self.reindex_root.write(BatchDbWriter::new(&mut batch), &origin)?; @@ -112,56 +222,51 @@ impl ReachabilityStore for DbReachabilityStore { if self.access.has(hash)? { return Err(StoreError::HashAlreadyExists(hash)); } - let data = Arc::new(ReachabilityData::new(parent, interval, height)); + let data = ReachabilityData::new(parent, interval, height); self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } fn set_interval(&mut self, hash: Hash, interval: Interval) -> Result<(), StoreError> { let mut data = self.access.read(hash)?; - Arc::make_mut(&mut data).interval = interval; + data.interval = interval; self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } - fn append_child(&mut self, hash: Hash, child: Hash) -> Result { - let mut data = self.access.read(hash)?; - let height = data.height; - let mut_data = Arc::make_mut(&mut data); - Arc::make_mut(&mut mut_data.children).push(child); - self.access.write(DirectDbWriter::new(&self.db), hash, data)?; - Ok(height) + fn append_child(&mut self, hash: Hash, child: Hash) -> Result<(), StoreError> { + self.children_access.append(DirectDbWriter::new(&self.db), hash, child) } fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError> { - let mut data = self.access.read(hash)?; - let mut_data = Arc::make_mut(&mut data); - Arc::make_mut(&mut mut_data.future_covering_set).insert(insertion_index, fci); - self.access.write(DirectDbWriter::new(&self.db), hash, data)?; - Ok(()) + self.fcs_access.insert(DirectDbWriter::new(&self.db), hash, fci, insertion_index) } fn set_parent(&mut self, hash: Hash, new_parent: Hash) -> Result<(), StoreError> { let mut data = self.access.read(hash)?; - Arc::make_mut(&mut data).parent = new_parent; + data.parent = new_parent; self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } - fn replace_child(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { - let mut data = self.access.read(hash)?; - let mut_data = Arc::make_mut(&mut data); - Arc::make_mut(&mut mut_data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - self.access.write(DirectDbWriter::new(&self.db), hash, data)?; - Ok(()) + fn replace_child( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { + self.children_access.replace(DirectDbWriter::new(&self.db), hash, replaced_hash, replaced_index, replace_with) } - fn replace_future_covering_item(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { - let mut data = self.access.read(hash)?; - let mut_data = Arc::make_mut(&mut data); - Arc::make_mut(&mut mut_data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - self.access.write(DirectDbWriter::new(&self.db), hash, data)?; - Ok(()) + fn replace_future_covering_item( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { + self.fcs_access.replace(DirectDbWriter::new(&self.db), hash, replaced_hash, replaced_index, replace_with) } fn delete(&mut self, hash: Hash) -> Result<(), StoreError> { @@ -195,11 +300,13 @@ impl ReachabilityStoreReader for DbReachabilityStore { } fn get_children(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.access.read(hash)?.children)) + // Cached reachability sets are assumed to be ordered by interval in order to allow binary search over them + self.children_access.read(hash, |&h| self.access.read(h).unwrap().interval) } fn get_future_covering_set(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.access.read(hash)?.future_covering_set)) + // Cached reachability sets are assumed to be ordered by interval in order to allow binary search over them + self.fcs_access.read(hash, |&h| self.access.read(h).unwrap().interval) } fn count(&self) -> Result { @@ -207,28 +314,101 @@ impl ReachabilityStoreReader for DbReachabilityStore { } } +/// Represents a staging set entry which was modified. The set can be either the tree children set or +/// the future covering set of a block. This struct saves the full cached updated set, as well as tracks the exact +/// changes that were made to it (additions/deletions). When committing the entry to the underlying DB store +/// these changes are used in order to efficiently update the DB only about the actual changes (thus avoiding quadratic disk writes). +/// Note that the cached set is still fully copied when reading/committing (in order to preserve order semantics). This too can be +/// optimized but for now these mem-copies don't seem to be a bottleneck so we favor the simplicity +struct StagingSetEntry { + set: BlockHashes, // The full cached (ordered) set + additions: BlockHashSet, // additions diff + deletions: BlockHashSet, // deletions diff +} + +impl StagingSetEntry { + fn new(cached_set: BlockHashes) -> Self { + Self { set: cached_set, additions: Default::default(), deletions: Default::default() } + } + + fn append(&mut self, element: Hash) { + Arc::make_mut(&mut self.set).push(element); + self.mark_addition(element); + } + + fn insert(&mut self, element: Hash, insertion_index: usize) { + Arc::make_mut(&mut self.set).insert(insertion_index, element); + self.mark_addition(element); + } + + fn replace(&mut self, replaced_hash: Hash, replaced_index: usize, replace_with: &[Hash]) { + Arc::make_mut(&mut self.set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + self.mark_deletion(replaced_hash); + for added_element in replace_with.iter().copied() { + self.mark_addition(added_element); + } + } + + fn mark_addition(&mut self, addition: Hash) { + if !self.deletions.remove(&addition) { + self.additions.insert(addition); + } + } + + fn mark_deletion(&mut self, deletion: Hash) { + if !self.additions.remove(&deletion) { + self.deletions.insert(deletion); + } + } +} + pub struct StagingReachabilityStore<'a> { store_read: RwLockUpgradableReadGuard<'a, DbReachabilityStore>, staging_writes: BlockHashMap, + staging_children: BlockHashMap, + staging_fcs: BlockHashMap, staging_deletions: BlockHashSet, staging_reindex_root: Option, } impl<'a> StagingReachabilityStore<'a> { pub fn new(store_read: RwLockUpgradableReadGuard<'a, DbReachabilityStore>) -> Self { - Self { store_read, staging_writes: BlockHashMap::new(), staging_deletions: Default::default(), staging_reindex_root: None } + Self { + store_read, + staging_writes: Default::default(), + staging_children: Default::default(), + staging_fcs: Default::default(), + staging_deletions: Default::default(), + staging_reindex_root: None, + } } pub fn commit(self, batch: &mut WriteBatch) -> Result, StoreError> { let mut store_write = RwLockUpgradableReadGuard::upgrade(self.store_read); + let mut writer = BatchDbWriter::new(batch); + for (k, v) in self.staging_writes { - let data = Arc::new(v); - store_write.access.write(BatchDbWriter::new(batch), k, data)? + store_write.access.write(&mut writer, k, v)? + } + + for (k, v) in self.staging_children { + store_write.children_access.commit_staging_entry(&mut writer, k, v)?; + } + + for (k, v) in self.staging_fcs { + store_write.fcs_access.commit_staging_entry(&mut writer, k, v)?; } + // Deletions always come after mutations - store_write.access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?; + store_write.access.delete_many(&mut writer, &mut self.staging_deletions.iter().copied())?; + + for fully_deleted in self.staging_deletions { + store_write.children_access.delete(&mut writer, fully_deleted)?; + store_write.fcs_access.delete(&mut writer, fully_deleted)?; + } + if let Some(root) = self.staging_reindex_root { - store_write.reindex_root.write(BatchDbWriter::new(batch), &root)?; + store_write.reindex_root.write(&mut writer, &root)?; } Ok(store_write) } @@ -270,37 +450,40 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { return Ok(()); } - let mut data = (*self.store_read.access.read(hash)?).clone(); + let mut data = self.store_read.access.read(hash)?; data.interval = interval; self.staging_writes.insert(hash, data); Ok(()) } - fn append_child(&mut self, hash: Hash, child: Hash) -> Result { - if let Some(data) = self.staging_writes.get_mut(&hash) { - Arc::make_mut(&mut data.children).push(child); - return Ok(data.height); + fn append_child(&mut self, hash: Hash, child: Hash) -> Result<(), StoreError> { + match self.staging_children.entry(hash) { + Occupied(mut e) => { + e.get_mut().append(child); + } + Vacant(e) => { + let mut set = StagingSetEntry::new(self.store_read.get_children(hash)?); + set.append(child); + e.insert(set); + } } - let mut data = (*self.store_read.access.read(hash)?).clone(); - let height = data.height; - Arc::make_mut(&mut data.children).push(child); - self.staging_writes.insert(hash, data); - - Ok(height) + Ok(()) } fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError> { - if let Some(data) = self.staging_writes.get_mut(&hash) { - Arc::make_mut(&mut data.future_covering_set).insert(insertion_index, fci); - return Ok(()); + match self.staging_fcs.entry(hash) { + Occupied(mut e) => { + e.get_mut().insert(fci, insertion_index); + } + Vacant(e) => { + let mut set = StagingSetEntry::new(self.store_read.get_future_covering_set(hash)?); + set.insert(fci, insertion_index); + e.insert(set); + } } - let mut data = (*self.store_read.access.read(hash)?).clone(); - Arc::make_mut(&mut data.future_covering_set).insert(insertion_index, fci); - self.staging_writes.insert(hash, data); - Ok(()) } @@ -310,36 +493,52 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { return Ok(()); } - let mut data = (*self.store_read.access.read(hash)?).clone(); + let mut data = self.store_read.access.read(hash)?; data.parent = new_parent; self.staging_writes.insert(hash, data); Ok(()) } - fn replace_child(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { - if let Some(data) = self.staging_writes.get_mut(&hash) { - Arc::make_mut(&mut data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - return Ok(()); + fn replace_child( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { + match self.staging_children.entry(hash) { + Occupied(mut e) => { + e.get_mut().replace(replaced_hash, replaced_index, replace_with); + } + Vacant(e) => { + let mut set = StagingSetEntry::new(self.store_read.get_children(hash)?); + set.replace(replaced_hash, replaced_index, replace_with); + e.insert(set); + } } - let mut data = (*self.store_read.access.read(hash)?).clone(); - Arc::make_mut(&mut data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - self.staging_writes.insert(hash, data); - Ok(()) } - fn replace_future_covering_item(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { - if let Some(data) = self.staging_writes.get_mut(&hash) { - Arc::make_mut(&mut data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - return Ok(()); + fn replace_future_covering_item( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { + match self.staging_fcs.entry(hash) { + Occupied(mut e) => { + e.get_mut().replace(replaced_hash, replaced_index, replace_with); + } + Vacant(e) => { + let mut set = StagingSetEntry::new(self.store_read.get_future_covering_set(hash)?); + set.replace(replaced_hash, replaced_index, replace_with); + e.insert(set); + } } - let mut data = (*self.store_read.access.read(hash)?).clone(); - Arc::make_mut(&mut data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); - self.staging_writes.insert(hash, data); - Ok(()) } @@ -400,8 +599,9 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { fn get_children(&self, hash: Hash) -> Result { self.check_not_in_deletions(hash)?; - if let Some(data) = self.staging_writes.get(&hash) { - Ok(BlockHashes::clone(&data.children)) + + if let Some(e) = self.staging_children.get(&hash) { + Ok(BlockHashes::clone(&e.set)) } else { self.store_read.get_children(hash) } @@ -409,8 +609,9 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { fn get_future_covering_set(&self, hash: Hash) -> Result { self.check_not_in_deletions(hash)?; - if let Some(data) = self.staging_writes.get(&hash) { - Ok(BlockHashes::clone(&data.future_covering_set)) + + if let Some(e) = self.staging_fcs.get(&hash) { + Ok(BlockHashes::clone(&e.set)) } else { self.store_read.get_future_covering_set(hash) } @@ -431,8 +632,25 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { } } +/// Used only by the (test-intended) memory store. Groups all reachability data including +/// tree children and the future covering set unlike the DB store where they are decomposed +#[derive(Clone, Serialize, Deserialize)] +struct MemoryReachabilityData { + pub children: BlockHashes, + pub parent: Hash, + pub interval: Interval, + pub height: u64, + pub future_covering_set: BlockHashes, +} + +impl MemoryReachabilityData { + pub fn new(parent: Hash, interval: Interval, height: u64) -> Self { + Self { children: Arc::new(vec![]), parent, interval, height, future_covering_set: Arc::new(vec![]) } + } +} + pub struct MemoryReachabilityStore { - map: BlockHashMap, + map: BlockHashMap, reindex_root: Option, } @@ -447,14 +665,14 @@ impl MemoryReachabilityStore { Self { map: BlockHashMap::new(), reindex_root: None } } - fn get_data_mut(&mut self, hash: Hash) -> Result<&mut ReachabilityData, StoreError> { + fn get_data_mut(&mut self, hash: Hash) -> Result<&mut MemoryReachabilityData, StoreError> { match self.map.get_mut(&hash) { Some(data) => Ok(data), None => Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash))), } } - fn get_data(&self, hash: Hash) -> Result<&ReachabilityData, StoreError> { + fn get_data(&self, hash: Hash) -> Result<&MemoryReachabilityData, StoreError> { match self.map.get(&hash) { Some(data) => Ok(data), None => Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash))), @@ -471,7 +689,7 @@ impl ReachabilityStore for MemoryReachabilityStore { fn insert(&mut self, hash: Hash, parent: Hash, interval: Interval, height: u64) -> Result<(), StoreError> { if let Vacant(e) = self.map.entry(hash) { - e.insert(ReachabilityData::new(parent, interval, height)); + e.insert(MemoryReachabilityData::new(parent, interval, height)); Ok(()) } else { Err(StoreError::HashAlreadyExists(hash)) @@ -484,10 +702,10 @@ impl ReachabilityStore for MemoryReachabilityStore { Ok(()) } - fn append_child(&mut self, hash: Hash, child: Hash) -> Result { + fn append_child(&mut self, hash: Hash, child: Hash) -> Result<(), StoreError> { let data = self.get_data_mut(hash)?; Arc::make_mut(&mut data.children).push(child); - Ok(data.height) + Ok(()) } fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError> { @@ -502,15 +720,30 @@ impl ReachabilityStore for MemoryReachabilityStore { Ok(()) } - fn replace_child(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { + fn replace_child( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { let data = self.get_data_mut(hash)?; - Arc::make_mut(&mut data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + let removed_hash = Arc::make_mut(&mut data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + debug_assert_eq!(replaced_hash, removed_hash.exactly_one().unwrap()); Ok(()) } - fn replace_future_covering_item(&mut self, hash: Hash, replaced_index: usize, replace_with: &[Hash]) -> Result<(), StoreError> { + fn replace_future_covering_item( + &mut self, + hash: Hash, + replaced_hash: Hash, + replaced_index: usize, + replace_with: &[Hash], + ) -> Result<(), StoreError> { let data = self.get_data_mut(hash)?; - Arc::make_mut(&mut data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + let removed_hash = + Arc::make_mut(&mut data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); + debug_assert_eq!(replaced_hash, removed_hash.exactly_one().unwrap()); Ok(()) } @@ -572,7 +805,8 @@ mod tests { let (hash, parent) = (7.into(), 15.into()); let interval = Interval::maximal(); store.insert(hash, parent, interval, 5).unwrap(); - let height = store.append_child(hash, 31.into()).unwrap(); + store.append_child(hash, 31.into()).unwrap(); + let height = store.get_height(hash).unwrap(); assert_eq!(height, 5); let children = store.get_children(hash).unwrap(); println!("{children:?}"); diff --git a/consensus/src/model/stores/tips.rs b/consensus/src/model/stores/tips.rs index 2bc428d28..06dd13b0c 100644 --- a/consensus/src/model/stores/tips.rs +++ b/consensus/src/model/stores/tips.rs @@ -1,23 +1,31 @@ use std::sync::Arc; use kaspa_consensus_core::BlockHashSet; +use kaspa_consensus_core::BlockHasher; +use kaspa_database::prelude::CachedDbSetItem; use kaspa_database::prelude::DbWriter; +use kaspa_database::prelude::ReadLock; use kaspa_database::prelude::StoreResult; use kaspa_database::prelude::StoreResultExtensions; use kaspa_database::prelude::DB; -use kaspa_database::prelude::{BatchDbWriter, CachedDbItem, DirectDbWriter}; +use kaspa_database::prelude::{BatchDbWriter, DirectDbWriter}; use kaspa_database::registry::DatabaseStorePrefixes; use kaspa_hashes::Hash; use rocksdb::WriteBatch; /// Reader API for `TipsStore`. pub trait TipsStoreReader { - fn get(&self) -> StoreResult>; + fn get(&self) -> StoreResult>; } pub trait TipsStore: TipsStoreReader { - fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult>; - fn add_tip_batch(&mut self, batch: &mut WriteBatch, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult> { + fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult>; + fn add_tip_batch( + &mut self, + batch: &mut WriteBatch, + new_tip: Hash, + new_tip_parents: &[Hash], + ) -> StoreResult> { self.add_tip_with_writer(BatchDbWriter::new(batch), new_tip, new_tip_parents) } fn add_tip_with_writer( @@ -25,7 +33,7 @@ pub trait TipsStore: TipsStoreReader { writer: impl DbWriter, new_tip: Hash, new_tip_parents: &[Hash], - ) -> StoreResult>; + ) -> StoreResult>; fn prune_tips_batch(&mut self, batch: &mut WriteBatch, pruned_tips: &[Hash]) -> StoreResult<()> { self.prune_tips_with_writer(BatchDbWriter::new(batch), pruned_tips) } @@ -36,12 +44,12 @@ pub trait TipsStore: TipsStoreReader { #[derive(Clone)] pub struct DbTipsStore { db: Arc, - access: CachedDbItem>, + access: CachedDbSetItem, } impl DbTipsStore { pub fn new(db: Arc) -> Self { - Self { db: Arc::clone(&db), access: CachedDbItem::new(db, DatabaseStorePrefixes::Tips.into()) } + Self { db: Arc::clone(&db), access: CachedDbSetItem::new(db, DatabaseStorePrefixes::Tips.into()) } } pub fn clone_with_new_cache(&self) -> Self { @@ -53,29 +61,20 @@ impl DbTipsStore { } pub fn init_batch(&mut self, batch: &mut WriteBatch, initial_tips: &[Hash]) -> StoreResult<()> { - self.access.write(BatchDbWriter::new(batch), &Arc::new(BlockHashSet::from_iter(initial_tips.iter().copied()))) - } -} - -/// Updates the internal data if possible -fn update_tips(mut current_tips: Arc, new_tip_parents: &[Hash], new_tip: Hash) -> Arc { - let tips = Arc::make_mut(&mut current_tips); - for parent in new_tip_parents { - tips.remove(parent); + self.access.update(BatchDbWriter::new(batch), initial_tips, &[])?; + Ok(()) } - tips.insert(new_tip); - current_tips } impl TipsStoreReader for DbTipsStore { - fn get(&self) -> StoreResult> { + fn get(&self) -> StoreResult> { self.access.read() } } impl TipsStore for DbTipsStore { - fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult> { - self.access.update(DirectDbWriter::new(&self.db), |tips| update_tips(tips, new_tip_parents, new_tip)) + fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult> { + self.access.update(DirectDbWriter::new(&self.db), &[new_tip], new_tip_parents) } fn add_tip_with_writer( @@ -83,21 +82,16 @@ impl TipsStore for DbTipsStore { writer: impl DbWriter, new_tip: Hash, new_tip_parents: &[Hash], - ) -> StoreResult> { - self.access.update(writer, |tips| update_tips(tips, new_tip_parents, new_tip)) + ) -> StoreResult> { + // New tip parents are no longer tips and hence removed + self.access.update(writer, &[new_tip], new_tip_parents) } fn prune_tips_with_writer(&mut self, writer: impl DbWriter, pruned_tips: &[Hash]) -> StoreResult<()> { if pruned_tips.is_empty() { return Ok(()); } - self.access.update(writer, |mut tips| { - let mut_tips = Arc::make_mut(&mut tips); - for pruned_tip in pruned_tips { - mut_tips.remove(pruned_tip); - } - tips - })?; + self.access.update(writer, &[], pruned_tips)?; Ok(()) } } @@ -105,11 +99,16 @@ impl TipsStore for DbTipsStore { #[cfg(test)] mod tests { use super::*; + use kaspa_database::{create_temp_db, prelude::ConnBuilder}; #[test] fn test_update_tips() { - let mut tips = Arc::new(BlockHashSet::from_iter([1.into(), 3.into(), 5.into()])); - tips = update_tips(tips, &[3.into(), 5.into()], 7.into()); - assert_eq!(Arc::try_unwrap(tips).unwrap(), BlockHashSet::from_iter([1.into(), 7.into()])); + let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let mut store = DbTipsStore::new(db.clone()); + store.add_tip(1.into(), &[]).unwrap(); + store.add_tip(3.into(), &[]).unwrap(); + store.add_tip(5.into(), &[]).unwrap(); + let tips = store.add_tip(7.into(), &[3.into(), 5.into()]).unwrap(); + assert_eq!(tips.read().clone(), BlockHashSet::from_iter([1.into(), 7.into()])); } } diff --git a/consensus/src/model/stores/utxo_set.rs b/consensus/src/model/stores/utxo_set.rs index 3390257b0..48c0e49df 100644 --- a/consensus/src/model/stores/utxo_set.rs +++ b/consensus/src/model/stores/utxo_set.rs @@ -127,8 +127,7 @@ impl DbUtxoSetStore { /// Clear the store completely in DB and cache pub fn clear(&mut self) -> Result<(), StoreError> { - let writer = DirectDbWriter::new(&self.db); - self.access.delete_all(writer) + self.access.delete_all(DirectDbWriter::new(&self.db)) } /// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index 4b76ab1c0..5770a573b 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -208,6 +208,7 @@ impl PruningProcessor { } fn assert_utxo_commitment(&self, pruning_point: Hash) { + info!("Verifying the new pruning point UTXO commitment (sanity test)"); let commitment = self.headers_store.get_header(pruning_point).unwrap().utxo_commitment; let mut multiset = MuHash::new(); let pruning_utxoset_read = self.pruning_utxoset_stores.read(); @@ -215,6 +216,7 @@ impl PruningProcessor { multiset.add_utxo(&outpoint, &entry); } assert_eq!(multiset.finalize(), commitment, "Updated pruning point utxo set does not match the header utxo commitment"); + info!("Pruning point UTXO commitment was verified correctly (sanity test)"); } fn prune(&self, new_pruning_point: Hash) { @@ -291,6 +293,7 @@ impl PruningProcessor { let pruned_tips = tips_write .get() .unwrap() + .read() .iter() .copied() .filter(|&h| !reachability_read.is_dag_ancestor_of_result(new_pruning_point, h).unwrap()) diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 8148aee7a..d74120bb8 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -265,6 +265,7 @@ impl VirtualStateProcessor { .read() .get() .unwrap() + .read() .iter() .copied() .filter(|&h| self.reachability_service.is_dag_ancestor_of(finality_point, h)) diff --git a/consensus/src/processes/reachability/inquirer.rs b/consensus/src/processes/reachability/inquirer.rs index 1bc25c908..3e1b3695c 100644 --- a/consensus/src/processes/reachability/inquirer.rs +++ b/consensus/src/processes/reachability/inquirer.rs @@ -83,7 +83,7 @@ pub fn delete_block(store: &mut (impl ReachabilityStore + ?Sized), block: Hash, } }; - store.replace_child(parent, block_index, &children)?; + store.replace_child(parent, block, block_index, &children)?; for child in children.iter().copied() { store.set_parent(child, parent)?; @@ -94,7 +94,7 @@ pub fn delete_block(store: &mut (impl ReachabilityStore + ?Sized), block: Hash, SearchOutput::NotFound(_) => return Err(ReachabilityError::DataInconsistency), SearchOutput::Found(hash, i) => { debug_assert_eq!(hash, block); - store.replace_future_covering_item(merged_block, i, &children)?; + store.replace_future_covering_item(merged_block, block, i, &children)?; } } } diff --git a/consensus/src/processes/reachability/interval.rs b/consensus/src/processes/reachability/interval.rs index eb7b21127..9f8d7fbd0 100644 --- a/consensus/src/processes/reachability/interval.rs +++ b/consensus/src/processes/reachability/interval.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; -#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)] pub struct Interval { pub(super) start: u64, pub(super) end: u64, diff --git a/consensus/src/processes/reachability/tests/mod.rs b/consensus/src/processes/reachability/tests/mod.rs index f9cf2ec0c..67131f7f9 100644 --- a/consensus/src/processes/reachability/tests/mod.rs +++ b/consensus/src/processes/reachability/tests/mod.rs @@ -41,7 +41,12 @@ impl<'a, T: ReachabilityStore + ?Sized> StoreBuilder<'a, T> { } pub fn add_block(&mut self, hash: Hash, parent: Hash) -> &mut Self { - let parent_height = if !parent.is_none() { self.store.append_child(parent, hash).unwrap() } else { 0 }; + let parent_height = if !parent.is_none() { + self.store.append_child(parent, hash).unwrap(); + self.store.get_height(parent).unwrap() + } else { + 0 + }; self.store.insert(hash, parent, Interval::empty(), parent_height + 1).unwrap(); self } diff --git a/consensus/src/processes/reachability/tree.rs b/consensus/src/processes/reachability/tree.rs index a913cab7f..faf35a696 100644 --- a/consensus/src/processes/reachability/tree.rs +++ b/consensus/src/processes/reachability/tree.rs @@ -18,7 +18,8 @@ pub fn add_tree_block( // Get the remaining interval capacity let remaining = store.interval_remaining_after(parent)?; // Append the new child to `parent.children` - let parent_height = store.append_child(parent, new_block)?; + store.append_child(parent, new_block)?; + let parent_height = store.get_height(parent)?; if remaining.is_empty() { // Init with the empty interval. // Note: internal logic relies on interval being this specific interval diff --git a/database/src/access.rs b/database/src/access.rs index d03781c91..989d977aa 100644 --- a/database/src/access.rs +++ b/database/src/access.rs @@ -1,8 +1,7 @@ use crate::{db::DB, errors::StoreError}; use super::prelude::{Cache, DbKey, DbWriter}; -use itertools::Itertools; -use rocksdb::{Direction, IteratorMode, ReadOptions}; +use rocksdb::{Direction, IterateBounds, IteratorMode, ReadOptions}; use serde::{de::DeserializeOwned, Serialize}; use std::{collections::hash_map::RandomState, error::Error, hash::BuildHasher, sync::Arc}; @@ -153,26 +152,15 @@ where Ok(()) } + /// Deletes all entries in the store using the underlying rocksdb `delete_range` operation pub fn delete_all(&self, mut writer: impl DbWriter) -> Result<(), StoreError> where TKey: Clone + AsRef<[u8]>, { self.cache.remove_all(); - //TODO: Consider using column families to make it faster let db_key = DbKey::prefix_only(&self.prefix); - let mut read_opts = ReadOptions::default(); - read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref())); - let keys = self - .db - .iterator_opt(IteratorMode::From(db_key.as_ref(), Direction::Forward), read_opts) - .map(|iter_result| match iter_result { - Ok((key, _)) => Ok::<_, rocksdb::Error>(key), - Err(e) => Err(e), - }) - .collect_vec(); - for key in keys { - writer.delete(key.unwrap())?; - } + let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds(); + writer.delete_range(from.unwrap(), to.unwrap())?; Ok(()) } @@ -225,3 +213,33 @@ where &self.prefix } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + create_temp_db, + prelude::{BatchDbWriter, ConnBuilder, DirectDbWriter}, + }; + use kaspa_hashes::Hash; + use rocksdb::WriteBatch; + + #[test] + fn test_delete_all() { + let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let access = CachedDbAccess::::new(db.clone(), 2, vec![1, 2]); + + access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap(); + assert_eq!(16, access.iterator().count()); + access.delete_all(DirectDbWriter::new(&db)).unwrap(); + assert_eq!(0, access.iterator().count()); + + access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap(); + assert_eq!(16, access.iterator().count()); + let mut batch = WriteBatch::default(); + access.delete_all(BatchDbWriter::new(&mut batch)).unwrap(); + assert_eq!(16, access.iterator().count()); + db.write(batch).unwrap(); + assert_eq!(0, access.iterator().count()); + } +} diff --git a/database/src/cache.rs b/database/src/cache.rs index ca25ed0f2..b66aedb67 100644 --- a/database/src/cache.rs +++ b/database/src/cache.rs @@ -47,12 +47,12 @@ impl Option { if self.size == 0 { - return; + return None; } let mut write_guard = self.map.write(); - write_guard.swap_remove(key); + write_guard.swap_remove(key) } pub fn remove_many(&self, key_iter: &mut impl Iterator) { diff --git a/database/src/item.rs b/database/src/item.rs index 6d60001fb..bb14b40b7 100644 --- a/database/src/item.rs +++ b/database/src/item.rs @@ -1,9 +1,17 @@ -use crate::{db::DB, errors::StoreError}; +use crate::{ + db::DB, + errors::StoreError, + prelude::{DbSetAccess, ReadLock}, +}; use super::prelude::{DbKey, DbWriter}; use parking_lot::RwLock; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::Arc; +use std::{ + collections::{hash_map::RandomState, HashSet}, + hash::BuildHasher, + sync::Arc, +}; /// A cached DB item with concurrency support #[derive(Clone)] @@ -73,3 +81,74 @@ where { Ok(item) } } + +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +struct EmptyKey; + +impl AsRef<[u8]> for EmptyKey { + fn as_ref(&self) -> &[u8] { + &[] + } +} + +type LockedSet = Arc>>; + +#[derive(Clone)] +pub struct CachedDbSetItem { + access: DbSetAccess, + cached_set: Arc>>>, +} + +impl CachedDbSetItem +where + T: Clone + std::hash::Hash + Eq + Send + Sync + DeserializeOwned + Serialize, + S: BuildHasher + Default, +{ + pub fn new(db: Arc, key: Vec) -> Self { + Self { access: DbSetAccess::new(db, key), cached_set: Arc::new(RwLock::new(None)) } + } + + fn read_locked_set(&self) -> Result, StoreError> + where + T: Clone + DeserializeOwned, + { + if let Some(item) = self.cached_set.read().clone() { + return Ok(item); + } + let set = self.access.bucket_iterator(EmptyKey).collect::, _>>()?; + let set = Arc::new(RwLock::new(set)); + self.cached_set.write().replace(set.clone()); + Ok(set) + } + + pub fn read(&self) -> Result>, StoreError> + where + T: Clone + DeserializeOwned, + { + Ok(ReadLock::new(self.read_locked_set()?)) + } + + pub fn update( + &mut self, + mut writer: impl DbWriter, + added_items: &[T], + removed_items: &[T], + ) -> Result>, StoreError> + where + T: Clone + Serialize, + { + let set = self.read_locked_set()?; + { + let mut set_write = set.write(); + for item in removed_items.iter() { + self.access.delete(&mut writer, EmptyKey, item.clone())?; + set_write.remove(item); + } + for item in added_items.iter().cloned() { + self.access.write(&mut writer, EmptyKey, item.clone())?; + set_write.insert(item); + } + } + Ok(ReadLock::new(set)) + } +} diff --git a/database/src/key.rs b/database/src/key.rs index 3a8d1aa56..e8aeff091 100644 --- a/database/src/key.rs +++ b/database/src/key.rs @@ -71,7 +71,13 @@ impl Display for DbKey { pos += 1; if self.prefix_len > 1 { match prefix { - Ghostdag | GhostdagCompact | RelationsParents | RelationsChildren | Reachability => { + Ghostdag + | GhostdagCompact + | RelationsParents + | RelationsChildren + | Reachability + | ReachabilityTreeChildren + | ReachabilityFutureCoveringSet => { if self.path[1] != SEPARATOR { // Expected to be a block level so we display as a number Display::fmt(&self.path[1], f)?; diff --git a/database/src/lib.rs b/database/src/lib.rs index 387a7b5d3..945c4e0f7 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -15,9 +15,9 @@ pub mod prelude { pub use super::access::CachedDbAccess; pub use super::cache::Cache; - pub use super::item::CachedDbItem; + pub use super::item::{CachedDbItem, CachedDbSetItem}; pub use super::key::DbKey; - pub use super::set_access::{CachedDbSetAccess, ReadLock}; + pub use super::set_access::{CachedDbSetAccess, DbSetAccess, ReadLock}; pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, DirectWriter, MemoryWriter}; pub use db::{delete_db, ConnBuilder, DB}; pub use errors::{StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions}; diff --git a/database/src/registry.rs b/database/src/registry.rs index 3ab68f7a2..9e1b129d6 100644 --- a/database/src/registry.rs +++ b/database/src/registry.rs @@ -37,6 +37,10 @@ pub enum DatabaseStorePrefixes { VirtualUtxoset = 27, VirtualState = 28, + // ---- Decomposed reachability stores ---- + ReachabilityTreeChildren = 30, + ReachabilityFutureCoveringSet = 31, + // ---- Metadata ---- MultiConsensusMetadata = 124, ConsensusEntries = 125, diff --git a/database/src/set_access.rs b/database/src/set_access.rs index 5a91572bf..5efe41b54 100644 --- a/database/src/set_access.rs +++ b/database/src/set_access.rs @@ -2,33 +2,16 @@ use crate::{db::DB, errors::StoreError}; use super::prelude::{Cache, DbKey, DbWriter}; use parking_lot::{RwLock, RwLockReadGuard}; -use rocksdb::{IteratorMode, ReadOptions}; +use rocksdb::{IterateBounds, IteratorMode, ReadOptions}; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::{hash_map::RandomState, HashSet}, - error::Error, fmt::Debug, hash::BuildHasher, + marker::PhantomData, sync::Arc, }; -/// A concurrent DB store for **set** access with typed caching. -#[derive(Clone)] -pub struct CachedDbSetAccess -where - TKey: Clone + std::hash::Hash + Eq + Send + Sync, - TData: Clone + Send + Sync, - W: Send + Sync, -{ - db: Arc, - - // Cache - cache: Cache>>, S>, - - // DB bucket/path - prefix: Vec, -} - /// A read-only lock. Essentially a wrapper to [`parking_lot::RwLock`] which allows only reading. #[derive(Default, Debug)] pub struct ReadLock(Arc>); @@ -49,6 +32,21 @@ impl From for ReadLock { } } +/// A concurrent DB store for **set** access with typed caching. +#[derive(Clone)] +pub struct CachedDbSetAccess +where + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Send + Sync, + W: Send + Sync, +{ + // The inner uncached DB access + inner: DbSetAccess, + + // Cache + cache: Cache>>, S>, +} + impl CachedDbSetAccess where TKey: Clone + std::hash::Hash + Eq + Send + Sync + AsRef<[u8]>, @@ -57,7 +55,7 @@ where W: BuildHasher + Default + Send + Sync, { pub fn new(db: Arc, cache_size: u64, prefix: Vec) -> Self { - Self { db, cache: Cache::new(cache_size), prefix } + Self { inner: DbSetAccess::new(db, prefix), cache: Cache::new(cache_size) } } pub fn read_from_cache(&self, key: TKey) -> Option>> { @@ -69,7 +67,7 @@ where if let Some(data) = self.cache.get(&key) { Ok(data) } else { - let data: HashSet = self.bucket_iterator(key.clone()).map(|x| x.unwrap()).collect(); + let data: HashSet = self.inner.bucket_iterator(key.clone()).collect::>()?; let data = Arc::new(RwLock::new(data)); self.cache.insert(key, data.clone()); Ok(data) @@ -85,11 +83,54 @@ where if let Some(locked_entry) = self.cache.get(&key) { locked_entry.write().insert(data.clone()); } - self.write_to_db(writer, key, &data) + self.inner.write(writer, key, data) } - fn write_to_db(&self, mut writer: impl DbWriter, key: TKey, data: &TData) -> Result<(), StoreError> { - writer.put(self.get_db_key(&key, data)?, [])?; + pub fn delete_bucket(&self, writer: impl DbWriter, key: TKey) -> Result<(), StoreError> { + self.cache.remove(&key); + self.inner.delete_bucket(writer, key) + } + + pub fn delete(&self, writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError> { + // We remove the item from cache only if the full set entry already exists in the cache + if let Some(locked_entry) = self.cache.get(&key) { + locked_entry.write().remove(&data); + } + self.inner.delete(writer, key, data)?; + Ok(()) + } + + pub fn prefix(&self) -> &[u8] { + self.inner.prefix() + } +} + +/// A concurrent DB store for typed **set** access *without* caching. +#[derive(Clone)] +pub struct DbSetAccess +where + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Send + Sync, +{ + db: Arc, + + // DB bucket/path + prefix: Vec, + + _phantom: PhantomData<(TKey, TData)>, +} + +impl DbSetAccess +where + TKey: Clone + std::hash::Hash + Eq + Send + Sync + AsRef<[u8]>, + TData: Clone + std::hash::Hash + Eq + Send + Sync + DeserializeOwned + Serialize, +{ + pub fn new(db: Arc, prefix: Vec) -> Self { + Self { db, prefix, _phantom: Default::default() } + } + + pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError> { + writer.put(self.get_db_key(&key, &data)?, [])?; Ok(()) } @@ -99,20 +140,13 @@ where } pub fn delete_bucket(&self, mut writer: impl DbWriter, key: TKey) -> Result<(), StoreError> { - let locked_entry = self.read_locked_entry(key.clone())?; - // TODO: check if DB supports delete by prefix - for data in locked_entry.read().iter() { - writer.delete(self.get_db_key(&key, data)?)?; - } - self.cache.remove(&key); + let db_key = DbKey::new_with_bucket(&self.prefix, &key, []); + let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds(); + writer.delete_range(from.unwrap(), to.unwrap())?; Ok(()) } pub fn delete(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError> { - // We remove the item from cache only if the full set entry already exists in the cache - if let Some(locked_entry) = self.cache.get(&key) { - locked_entry.write().remove(&data); - } writer.delete(self.get_db_key(&key, &data)?)?; Ok(()) } @@ -122,17 +156,12 @@ where key: TKey, limit: usize, // amount to take. skip_first: bool, // skips the first value, (useful in conjunction with the seek-key, as to not re-retrieve). - ) -> impl Iterator, Box>> + '_ + ) -> impl Iterator, StoreError>> + '_ where TKey: Clone + AsRef<[u8]>, TData: DeserializeOwned, { - let db_key = { - let mut db_key = DbKey::prefix_only(&self.prefix); - db_key.add_bucket(&key); - db_key - }; - + let db_key = DbKey::new_with_bucket(&self.prefix, &key, []); let mut read_opts = ReadOptions::default(); read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref())); @@ -152,14 +181,47 @@ where &self.prefix } - fn bucket_iterator(&self, key: TKey) -> impl Iterator>> + '_ + pub fn bucket_iterator(&self, key: TKey) -> impl Iterator> + '_ where TKey: Clone + AsRef<[u8]>, TData: DeserializeOwned, { - self.seek_iterator(key, usize::MAX, false).map(|res| { - let data = res.unwrap(); - Ok(bincode::deserialize(&data)?) + self.seek_iterator(key, usize::MAX, false).map(|res| match res { + Ok(data) => Ok(bincode::deserialize(&data)?), + Err(err) => Err(err), }) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + create_temp_db, + prelude::{BatchDbWriter, ConnBuilder, DirectDbWriter}, + }; + use kaspa_hashes::Hash; + use rocksdb::WriteBatch; + + #[test] + fn test_delete_bucket() { + let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let access = DbSetAccess::::new(db.clone(), vec![1, 2]); + + for i in 0..16 { + for j in 0..2 { + access.write(DirectDbWriter::new(&db), i.into(), i + j).unwrap(); + } + } + for i in 0..16 { + assert_eq!(2, access.bucket_iterator(i.into()).count()); + } + access.delete_bucket(DirectDbWriter::new(&db), 3.into()).unwrap(); + assert_eq!(0, access.bucket_iterator(3.into()).count()); + + let mut batch = WriteBatch::default(); + access.delete_bucket(BatchDbWriter::new(&mut batch), 6.into()).unwrap(); + db.write(batch).unwrap(); + assert_eq!(0, access.bucket_iterator(6.into()).count()); + } +} diff --git a/database/src/writer.rs b/database/src/writer.rs index 3d64f0458..1599e6d77 100644 --- a/database/src/writer.rs +++ b/database/src/writer.rs @@ -10,6 +10,9 @@ pub trait DbWriter { K: AsRef<[u8]>, V: AsRef<[u8]>; fn delete>(&mut self, key: K) -> Result<(), rocksdb::Error>; + fn delete_range(&mut self, from: K, to: K) -> Result<(), rocksdb::Error> + where + K: AsRef<[u8]>; } /// A trait which is intentionally not implemented for the batch writer. @@ -42,6 +45,15 @@ impl DbWriter for DirectDbWriter<'_> { fn delete>(&mut self, key: K) -> Result<(), rocksdb::Error> { self.db.delete(key) } + + fn delete_range(&mut self, from: K, to: K) -> Result<(), rocksdb::Error> + where + K: AsRef<[u8]>, + { + let mut batch = WriteBatch::default(); + batch.delete_range(from, to); + self.db.write(batch) + } } impl DirectWriter for DirectDbWriter<'_> {} @@ -70,6 +82,14 @@ impl DbWriter for BatchDbWriter<'_> { self.batch.delete(key); Ok(()) } + + fn delete_range(&mut self, from: K, to: K) -> Result<(), rocksdb::Error> + where + K: AsRef<[u8]>, + { + self.batch.delete_range(from, to); + Ok(()) + } } impl DbWriter for &mut T { @@ -86,6 +106,14 @@ impl DbWriter for &mut T { fn delete>(&mut self, key: K) -> Result<(), rocksdb::Error> { (*self).delete(key) } + + #[inline] + fn delete_range(&mut self, from: K, to: K) -> Result<(), rocksdb::Error> + where + K: AsRef<[u8]>, + { + (*self).delete_range(from, to) + } } impl DirectWriter for &mut T {} @@ -106,6 +134,13 @@ impl DbWriter for MemoryWriter { fn delete>(&mut self, _key: K) -> Result<(), rocksdb::Error> { Ok(()) } + + fn delete_range(&mut self, _from: K, _to: K) -> Result<(), rocksdb::Error> + where + K: AsRef<[u8]>, + { + Ok(()) + } } impl DirectWriter for MemoryWriter {} diff --git a/indexes/utxoindex/src/stores/indexed_utxos.rs b/indexes/utxoindex/src/stores/indexed_utxos.rs index 47d1e1540..3de467dd4 100644 --- a/indexes/utxoindex/src/stores/indexed_utxos.rs +++ b/indexes/utxoindex/src/stores/indexed_utxos.rs @@ -254,7 +254,6 @@ impl UtxoSetByScriptPublicKeyStore for DbUtxoSetByScriptPublicKeyStore { /// Removes all entries in the cache and db, besides prefixes themselves. fn delete_all(&mut self) -> StoreResult<()> { - let mut writer = DirectDbWriter::new(&self.db); - self.access.delete_all(&mut writer) + self.access.delete_all(DirectDbWriter::new(&self.db)) } } diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 37302fbb8..a7573e499 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -12,7 +12,7 @@ use kaspa_consensus::{ headers::HeaderStoreReader, relations::RelationsStoreReader, }, - params::{Params, Testnet11Bps, DEVNET_PARAMS, TESTNET11_PARAMS}, + params::{Params, Testnet11Bps, DEVNET_PARAMS, NETWORK_DELAY_BOUND, TESTNET11_PARAMS}, }; use kaspa_consensus_core::{ api::ConsensusApi, block::Block, blockstatus::BlockStatus, config::bps::calculate_ghostdag_k, errors::block::BlockProcessResult, @@ -263,7 +263,8 @@ fn apply_args_to_consensus_params(args: &Args, params: &mut Params) { params.past_median_time_window_size(0), ); } else { - let k = u64::max(calculate_ghostdag_k(2.0 * args.delay * args.bps, 0.05), params.ghostdag_k as u64); + let max_delay = args.delay.max(NETWORK_DELAY_BOUND as f64); + let k = u64::max(calculate_ghostdag_k(2.0 * max_delay * args.bps, 0.05), params.ghostdag_k as u64); let k = u64::min(k, KType::MAX as u64) as KType; // Clamp to KType::MAX params.ghostdag_k = k; params.mergeset_size_limit = k as u64 * 10; @@ -285,12 +286,7 @@ fn apply_args_to_consensus_params(args: &Args, params: &mut Params) { params.difficulty_sample_rate = (2.0 * args.bps) as u64; } - info!( - "The delay times bps product is larger than 2 (2Dλ={}), setting GHOSTDAG K={}, DAA window size={})", - 2.0 * args.delay * args.bps, - k, - params.difficulty_window_size(0) - ); + info!("2Dλ={}, GHOSTDAG K={}, DAA window size={}", 2.0 * args.delay * args.bps, k, params.difficulty_window_size(0)); } if args.test_pruning { params.pruning_proof_m = 16;