Skip to content

Commit

Permalink
Merge remote-tracking branch 'ds-origin/optimize-window-cache-buildin…
Browse files Browse the repository at this point in the history
…g-for-ibd' into gd-tn11-uni-2-coder
  • Loading branch information
coderofstuff committed Oct 22, 2024
2 parents 980268f + aecdef8 commit 797cf96
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 74 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub mod perf {

const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000;
const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000;

#[derive(Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub enum RuleError {
#[error("expected header blue work {0} but got {1}")]
UnexpectedHeaderBlueWork(BlueWorkType, BlueWorkType),

#[error("block difficulty of {0} is not the expected value of {1}")]
UnexpectedDifficulty(u32, u32),
#[error("block difficulty of {0} has value of {1} and is not the expected value of {2}")]
UnexpectedDifficulty(Hash, u32, u32),

#[error("block timestamp of {0} is not after expected {1}")]
TimeTooOld(u64, u64),
Expand Down
1 change: 1 addition & 0 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl MemSizeEstimator for GhostdagData {
impl MemSizeEstimator for CompactGhostdagData {}

impl From<&GhostdagData> for CompactGhostdagData {
#[inline(always)]
fn from(value: &GhostdagData) -> Self {
Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_consensus_core::block::Block;
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use once_cell::unsync::Lazy;
use std::sync::Arc;

impl BlockBodyProcessor {
Expand All @@ -18,13 +19,21 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?;
// TODO: this is somewhat expensive during ibd, as it incurs cache misses.
let pmt_res =
Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) {
Ok((pmt, _)) => Ok(pmt),
Err(e) => Err(e),
});

for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
}
// quick check to avoid the expensive Lazy eval during ibd (in most cases).
if tx.lock_time != 0 {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*pmt_res).clone()?) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
};
};
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl HeaderProcessor {
ctx.mergeset_non_daa = Some(daa_window.mergeset_non_daa);

if header.bits != expected_bits {
return Err(RuleError::UnexpectedDifficulty(header.bits, expected_bits));
return Err(RuleError::UnexpectedDifficulty(header.hash, header.bits, expected_bits));
}

ctx.block_window_for_difficulty = Some(daa_window.window);
Expand Down
42 changes: 38 additions & 4 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use crate::{
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
block_window_cache::BlockWindowCacheStore,
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStoreReader},
headers::{DbHeadersStore, HeaderStoreReader},
past_pruning_points::DbPastPruningPointsStore,
pruning::{DbPruningStore, PruningStoreReader},
Expand Down Expand Up @@ -152,6 +153,10 @@ pub struct VirtualStateProcessor {
pub(super) parents_manager: DbParentsManager,
pub(super) depth_manager: DbBlockDepthManager,

// block window caches
pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,

Expand Down Expand Up @@ -209,6 +214,9 @@ impl VirtualStateProcessor {
pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
lkg_virtual_state: storage.lkg_virtual_state.clone(),

block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

ghostdag_manager: services.ghostdag_manager.clone(),
reachability_service: services.reachability_service.clone(),
relations_service: services.relations_service.clone(),
Expand Down Expand Up @@ -306,11 +314,19 @@ impl VirtualStateProcessor {
.expect("all possible rule errors are unexpected here");

// Update the pruning processor about the virtual state change
let sink_ghostdag_data = self.ghostdag_store.get_compact_data(new_sink).unwrap();
let compact_sink_ghostdag_data = if prev_sink != new_sink {
// we need to check with full data here, since we may need to update the window caches
let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap();
// update window caches - for ibd performance. see method comment for more details.
self.maybe_commit_windows(new_sink, &sink_ghostdag_data);
CompactGhostdagData::from(sink_ghostdag_data.as_ref())
} else {
self.ghostdag_store.get_compact_data(new_sink).unwrap()
};
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
// the internal channel does not grow with no need (since we only care about the most recent message)
let _consume = self.pruning_receiver.try_iter().count();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap();

// Emit notifications
let accumulated_diff = Arc::new(accumulated_diff);
Expand All @@ -322,7 +338,7 @@ impl VirtualStateProcessor {
.notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
Expand Down Expand Up @@ -543,6 +559,24 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) {
// this is only important for ibd performance, as we incur expensive cache misses otherwise.
// this occurs because we cannot rely on header processing to pre-cache in this scenario.

// TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd.
// above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method.

if !self.block_window_cache_for_difficulty.contains_key(&new_sink) {
self.block_window_cache_for_difficulty
.insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window);
};

if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) {
self.block_window_cache_for_past_median_time
.insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data).unwrap().1);
};
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
///
/// Guaranteed to be `>= self.max_block_parents`
Expand Down
157 changes: 96 additions & 61 deletions consensus/src/processes/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use kaspa_consensus_core::{
};
use kaspa_hashes::Hash;
use kaspa_math::Uint256;
use kaspa_utils::refs::Refs;
use kaspa_utils::{arc::ArcExtensions, refs::Refs};
use once_cell::unsync::Lazy;
use std::{cmp::Reverse, iter::once, ops::Deref, sync::Arc};

Expand Down Expand Up @@ -332,52 +332,30 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
WindowType::FullDifficultyWindow | WindowType::VaryingWindow(_) => None,
};

if let Some(cache) = cache {
if let Some(selected_parent_binary_heap) = cache.get(&ghostdag_data.selected_parent) {
// Only use the cached window if it originates from here
if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() {
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap();

let mut heap =
Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_binary_heap).clone()));
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}

return if let Ok(heap) = Lazy::into_value(heap) {
Ok(Arc::new(heap.binary_heap))
} else {
Ok(selected_parent_binary_heap.clone())
};
}
}
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap();

//try to initialize the window from the cache directly
if let Some(res) = self.try_init_from_cache(
window_size,
sample_rate,
cache,
ghostdag_data,
selected_parent_blue_work,
&mut mergeset_non_daa_inserter,
) {
return Ok(res);
}

// else we populate the window with the passed ghostdag_data.
let mut window_heap = BoundedSizeBlockHeap::new(WindowOrigin::Sampled, window_size);
let parent_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap();

for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, parent_ghostdag.blue_work) {
match block {
SampledBlock::Sampled(block) => {
window_heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}
self.push_mergeset(&mut window_heap, sample_rate, ghostdag_data, selected_parent_blue_work, mergeset_non_daa_inserter);
let mut current_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap();

let mut current_ghostdag = parent_ghostdag;
// Note: no need to check for cache here, as we already tried to initialize from the passed ghostdag's selected parent cache in `self.try_init_from_cache`

// Walk down the chain until we cross the window boundaries
// Walk down the chain until we cross the window boundaries.
loop {
// check if we may exit early.
if current_ghostdag.selected_parent.is_origin() {
// Reaching origin means there's no more data, so we expect the window to already be full, otherwise we err.
// This error can happen only during an IBD from pruning proof when processing the first headers in the pruning point's
Expand All @@ -387,50 +365,97 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
} else {
return Err(RuleError::InsufficientDaaWindowSize(window_heap.binary_heap.len()));
}
}

if current_ghostdag.selected_parent == self.genesis_hash {
} else if current_ghostdag.selected_parent == self.genesis_hash {
break;
}

let parent_ghostdag = self.ghostdag_store.get_data(current_ghostdag.selected_parent).unwrap();
let selected_parent_blue_work_too_low =
self.try_push_mergeset(&mut window_heap, sample_rate, &current_ghostdag, parent_ghostdag.blue_work);
// No need to further iterate since past of selected parent has even lower blue work
if selected_parent_blue_work_too_low {

// No need to further iterate since past of selected parent has only lower blue work
if !window_heap.can_push(current_ghostdag.selected_parent, parent_ghostdag.blue_work) {
break;
}

// push the current mergeset into the window
self.push_mergeset(&mut window_heap, sample_rate, &current_ghostdag, parent_ghostdag.blue_work, move |_| {});

// see if we can inherit and merge with the selected parent cache
if self.try_merge_with_selected_parent_cache(&mut window_heap, cache, &current_ghostdag.selected_parent) {
// if successful, we may break out of the loop, with the window already filled.
break;
};

// update the current ghostdag to the parent ghostdag, and continue the loop.
current_ghostdag = parent_ghostdag;
}

Ok(Arc::new(window_heap.binary_heap))
}

fn try_push_mergeset(
fn push_mergeset(
&self,
heap: &mut BoundedSizeBlockHeap,
sample_rate: u64,
ghostdag_data: &GhostdagData,
selected_parent_blue_work: BlueWorkType,
) -> bool {
// If the window is full and the selected parent is less than the minimum then we break
// because this means that there cannot be any more blocks in the past with higher blue work
if !heap.can_push(ghostdag_data.selected_parent, selected_parent_blue_work) {
return true;
}

mut mergeset_non_daa_inserter: impl FnMut(Hash),
) {
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
if !heap.try_push(block.hash, block.blue_work) {
break;
}
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(_) => {}
SampledBlock::NonDaa(hash) => mergeset_non_daa_inserter(hash),
}
}
false
}

fn try_init_from_cache(
&self,
window_size: usize,
sample_rate: u64,
cache: Option<&Arc<U>>,
ghostdag_data: &GhostdagData,
selected_parent_blue_work: BlueWorkType,
mut mergeset_non_daa_inserter: impl FnMut(Hash),
) -> Option<Arc<BlockWindowHeap>> {
cache.and_then(|cache| {
cache.get(&ghostdag_data.selected_parent).map(|selected_parent_window| {
let mut heap = Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_window).clone()));
//Note: calling self.push_mergeset here voids the lazy evaluation optimization of the heap. so we don't do that.
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}

if let Ok(heap) = Lazy::into_value(heap) {
Arc::new(heap.binary_heap)
} else {
selected_parent_window.clone()
}
})
})
}

fn try_merge_with_selected_parent_cache(
&self,
heap: &mut BoundedSizeBlockHeap,
cache: Option<&Arc<U>>,
selected_parent: &Hash,
) -> bool {
cache
.and_then(|cache| {
cache.get(selected_parent).map(|selected_parent_window| {
heap.merge_ancestor_heap(&mut selected_parent_window.unwrap_or_clone());
})
})
.is_some()
}

fn sampled_mergeset_iterator<'a>(
Expand Down Expand Up @@ -686,4 +711,14 @@ impl BoundedSizeBlockHeap {
self.binary_heap.push(r_sortable_block);
true
}

// This method is intended to be used to merge the ancestor heap with the current heap.
fn merge_ancestor_heap(&mut self, ancestor_heap: &mut BlockWindowHeap) {
self.binary_heap.blocks.append(&mut ancestor_heap.blocks);
// below we saturate for cases where ancestor may be close to, the origin, or genesis.
// note: this is a no-op if overflow_amount is 0, i.e. because of the saturating sub, the sum of the two heaps is less or equal to the size bound.
for _ in 0..self.binary_heap.len().saturating_sub(self.size_bound) {
self.binary_heap.blocks.pop();
}
}
}

0 comments on commit 797cf96

Please sign in to comment.