Skip to content

Commit

Permalink
Orphans improvements part 2 (kaspanet#373)
Browse files Browse the repository at this point in the history
* track avg mergesets

* unorphan ancestors if they exist w/o roots

* refactor processing batch object + test fix

* use warn

* improve log

* comment

* orphan eviction policy

* oops: use the correct len

* review comments
  • Loading branch information
michaelsutton committed Jan 3, 2024
1 parent 1905cee commit 0c76768
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 56 deletions.
30 changes: 30 additions & 0 deletions components/consensusmanager/src/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use kaspa_consensus_core::{api::BlockValidationFuture, block::Block};
use std::fmt::Debug;

pub struct BlockProcessingBatch {
pub blocks: Vec<Block>,
pub block_tasks: Option<Vec<BlockValidationFuture>>,
pub virtual_state_tasks: Option<Vec<BlockValidationFuture>>,
}

impl BlockProcessingBatch {
pub fn new(blocks: Vec<Block>, block_tasks: Vec<BlockValidationFuture>, virtual_state_tasks: Vec<BlockValidationFuture>) -> Self {
Self { blocks, block_tasks: Some(block_tasks), virtual_state_tasks: Some(virtual_state_tasks) }
}

pub fn zip(self) -> impl Iterator<Item = (Block, BlockValidationFuture)> {
self.blocks.into_iter().zip(self.virtual_state_tasks.unwrap())
}
}

impl Default for BlockProcessingBatch {
fn default() -> Self {
Self { blocks: Default::default(), block_tasks: Some(Default::default()), virtual_state_tasks: Some(Default::default()) }
}
}

impl Debug for BlockProcessingBatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockProcessingBatch").field("blocks", &self.blocks).finish()
}
}
2 changes: 2 additions & 0 deletions components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use kaspa_core::{core::Core, debug, service::Service};
use parking_lot::RwLock;
use std::{collections::VecDeque, ops::Deref, sync::Arc, thread::JoinHandle};

mod batch;
mod session;

pub use batch::BlockProcessingBatch;
pub use session::{
spawn_blocking, ConsensusInstance, ConsensusProxy, ConsensusSessionBlocking, SessionLock, SessionReadGuard, SessionWriteGuard,
};
Expand Down
15 changes: 15 additions & 0 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::{ops::Deref, sync::Arc};

pub use tokio::task::spawn_blocking;

use crate::BlockProcessingBatch;

#[derive(Clone)]
pub struct SessionOwnedReadGuard(Arc<RfRwLockOwnedReadGuard>);

Expand Down Expand Up @@ -158,6 +160,19 @@ impl ConsensusSessionOwned {
self.consensus.validate_and_insert_block(block)
}

pub fn validate_and_insert_block_batch(&self, mut batch: Vec<Block>) -> BlockProcessingBatch {
// Sort by blue work in order to ensure topological order
batch.sort_by(|a, b| a.header.blue_work.partial_cmp(&b.header.blue_work).unwrap());
let (block_tasks, virtual_state_tasks) = batch
.iter()
.map(|b| {
let BlockValidationFutures { block_task, virtual_state_task } = self.consensus.validate_and_insert_block(b.clone());
(block_task, virtual_state_task)
})
.unzip();
BlockProcessingBatch::new(batch, block_tasks, virtual_state_tasks)
}

pub fn validate_and_insert_trusted_block(&self, tb: TrustedBlock) -> BlockValidationFutures {
self.consensus.validate_and_insert_trusted_block(tb)
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ impl HeaderProcessor {
.unwrap_or_else(|| Arc::new(self.ghostdag_managers[level].ghostdag(&ctx.known_parents[level])))
})
.collect_vec();

self.counters.mergeset_counts.fetch_add(ghostdag_data[0].mergeset_size() as u64, Ordering::Relaxed);
ctx.ghostdag_data = Some(ghostdag_data);
}

Expand Down
4 changes: 4 additions & 0 deletions consensus/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct ProcessingCounters {
pub blocks_submitted: AtomicU64,
pub header_counts: AtomicU64,
pub dep_counts: AtomicU64,
pub mergeset_counts: AtomicU64,
pub body_counts: AtomicU64,
pub txs_counts: AtomicU64,
pub chain_block_counts: AtomicU64,
Expand All @@ -24,6 +25,7 @@ impl ProcessingCounters {
blocks_submitted: self.blocks_submitted.load(Ordering::Relaxed),
header_counts: self.header_counts.load(Ordering::Relaxed),
dep_counts: self.dep_counts.load(Ordering::Relaxed),
mergeset_counts: self.mergeset_counts.load(Ordering::Relaxed),
body_counts: self.body_counts.load(Ordering::Relaxed),
txs_counts: self.txs_counts.load(Ordering::Relaxed),
chain_block_counts: self.chain_block_counts.load(Ordering::Relaxed),
Expand All @@ -37,6 +39,7 @@ pub struct ProcessingCountersSnapshot {
pub blocks_submitted: u64,
pub header_counts: u64,
pub dep_counts: u64,
pub mergeset_counts: u64,
pub body_counts: u64,
pub txs_counts: u64,
pub chain_block_counts: u64,
Expand All @@ -51,6 +54,7 @@ impl core::ops::Sub for &ProcessingCountersSnapshot {
blocks_submitted: self.blocks_submitted.checked_sub(rhs.blocks_submitted).unwrap_or_default(),
header_counts: self.header_counts.checked_sub(rhs.header_counts).unwrap_or_default(),
dep_counts: self.dep_counts.checked_sub(rhs.dep_counts).unwrap_or_default(),
mergeset_counts: self.mergeset_counts.checked_sub(rhs.mergeset_counts).unwrap_or_default(),
body_counts: self.body_counts.checked_sub(rhs.body_counts).unwrap_or_default(),
txs_counts: self.txs_counts.checked_sub(rhs.txs_counts).unwrap_or_default(),
chain_block_counts: self.chain_block_counts.checked_sub(rhs.chain_block_counts).unwrap_or_default(),
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/pipeline/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ impl ConsensusMonitor {
let now = Instant::now();

info!(
"Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} parent references; {} UTXO-validated blocks; {:.2} avg txs per block; {} avg block mass)",
"Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} UTXO-validated blocks; {:.2} parents; {:.2} mergesets; {:.2} TPB; {:.1} mass)",
delta.body_counts,
delta.header_counts,
(now - last_log_time).as_secs_f64(),
delta.txs_counts,
delta.dep_counts,
delta.chain_block_counts,
if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 },
if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 },
if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
if delta.body_counts != 0 { delta.mass_counts / delta.body_counts } else{ 0 },
);
Expand Down
22 changes: 14 additions & 8 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use kaspa_consensus_notify::{
notification::{Notification, PruningPointUtxoSetOverrideNotification},
root::ConsensusNotificationRoot,
};
use kaspa_consensusmanager::{ConsensusInstance, ConsensusManager, ConsensusProxy};
use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy};
use kaspa_core::{
debug, info,
kaspad_env::{name, version},
Expand Down Expand Up @@ -102,15 +102,15 @@ impl BlockEventLogger {

/// Start the logger listener. Must be called from an async tokio context
fn start(&self) {
let chunk_limit = self.bps * 4; // We prefer that the 1 sec timeout forces the log, but nonetheless still want a reasonable bound on each chunk
let chunk_limit = self.bps * 10; // We prefer that the 1 sec timeout forces the log, but nonetheless still want a reasonable bound on each chunk
let receiver = self.receiver.lock().take().expect("expected to be called once");
tokio::spawn(async move {
let chunk_stream = UnboundedReceiverStream::new(receiver).chunks_timeout(chunk_limit, Duration::from_secs(1));
tokio::pin!(chunk_stream);
while let Some(chunk) = chunk_stream.next().await {
#[derive(Default)]
struct LogSummary {
// Representative
// Representatives
relay_rep: Option<Hash>,
submit_rep: Option<Hash>,
orphan_rep: Option<Hash>,
Expand Down Expand Up @@ -511,7 +511,7 @@ impl FlowContext {
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;

self.on_new_block(consensus, block, virtual_state_task).await;
self.on_new_block(consensus, Default::default(), block, virtual_state_task).await;
self.log_block_event(BlockLogEvent::Submit(hash));

Ok(())
Expand Down Expand Up @@ -539,7 +539,13 @@ impl FlowContext {
/// and possibly rebroadcast manually added transactions when not in IBD.
///
/// _GO-KASPAD: OnNewBlock + broadcastTransactionsAfterBlockAdded_
pub async fn on_new_block(&self, consensus: &ConsensusProxy, block: Block, virtual_state_task: BlockValidationFuture) {
pub async fn on_new_block(
&self,
consensus: &ConsensusProxy,
ancestor_batch: BlockProcessingBatch,
block: Block,
virtual_state_task: BlockValidationFuture,
) {
let hash = block.hash();
let mut blocks = self.unorphan_blocks(consensus, hash).await;

Expand All @@ -554,7 +560,7 @@ impl FlowContext {
blocks.sort_by(|a, b| a.0.header.blue_work.partial_cmp(&b.0.header.blue_work).unwrap());
// Use a ProcessQueue so we get rid of duplicates
let mut transactions_to_broadcast = ProcessQueue::new();
for (block, virtual_state_task) in once((block, virtual_state_task)).chain(blocks.into_iter()) {
for (block, virtual_state_task) in ancestor_batch.zip().chain(once((block, virtual_state_task))).chain(blocks.into_iter()) {
// We only care about waiting for virtual to process the block at this point, before proceeding with post-processing
// actions such as updating the mempool. We know this will not err since `block_task` already completed w/o error
let _ = virtual_state_task.await;
Expand All @@ -568,8 +574,8 @@ impl FlowContext {
}
}

// Don't relay transactions when in IBD
if self.is_ibd_running() {
// Transaction relay is disabled if the node is out of sync and thus not mining
if !consensus.async_is_nearly_synced().await {
return;
}

Expand Down
110 changes: 84 additions & 26 deletions protocol/flows/src/flowcontext/orphans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use kaspa_consensus_core::{
api::{BlockValidationFuture, BlockValidationFutures},
block::Block,
};
use kaspa_consensusmanager::ConsensusProxy;
use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusProxy};
use kaspa_core::debug;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
Expand All @@ -20,12 +20,21 @@ use super::process_queue::ProcessQueue;
pub enum OrphanOutput {
/// Block is orphan with the provided missing roots
Roots(Vec<Hash>),
/// Block has no missing roots (but it might have known orphan ancestors)
NoRoots,
/// Block has no missing roots (but it might have known orphan ancestors which are returned
/// along with their corresponding consensus processing tasks)
NoRoots(BlockProcessingBatch),
/// The block does not exist in the orphan pool
Unknown,
}

#[derive(Debug)]
enum FindRootsOutput {
/// Block is orphan with the provided missing roots and a possible set of known orphan ancestors
Roots(Vec<Hash>, HashSet<Hash>),
/// Block has no missing roots (but it might have known orphan ancestors)
NoRoots(HashSet<Hash>),
}

struct OrphanBlock {
/// The actual block
block: Block,
Expand All @@ -49,11 +58,17 @@ pub struct OrphanBlocksPool {
orphans: IndexMap<Hash, OrphanBlock>,
/// Max number of orphans to keep in the pool
max_orphans: usize,
/// The log base 2 of `max_orphans`
max_orphans_log: usize,
}

impl OrphanBlocksPool {
pub fn new(max_orphans: usize) -> Self {
Self { orphans: IndexMap::with_capacity(max_orphans), max_orphans }
Self {
orphans: IndexMap::with_capacity(max_orphans),
max_orphans,
max_orphans_log: (max_orphans as f64).log2().ceil() as usize,
}
}

/// Adds the provided block to the orphan pool. Returns None if the block is already
Expand All @@ -63,11 +78,45 @@ impl OrphanBlocksPool {
if self.orphans.contains_key(&orphan_hash) {
return None;
}

let (roots, orphan_ancestors) =
match self.get_orphan_roots(consensus, orphan_block.header.direct_parents().iter().copied().collect()).await {
FindRootsOutput::Roots(roots, orphan_ancestors) => (roots, orphan_ancestors),
FindRootsOutput::NoRoots(orphan_ancestors) => {
let blocks: Vec<_> =
orphan_ancestors.into_iter().map(|h| self.orphans.remove(&h).expect("orphan ancestor").block).collect();
return Some(OrphanOutput::NoRoots(consensus.validate_and_insert_block_batch(blocks)));
}
};

if self.orphans.len() == self.max_orphans {
debug!("Orphan blocks pool size exceeded. Evicting a random orphan block.");
// Evict a random orphan in order to keep pool size under the limit
if let Some((evicted, _)) = self.orphans.swap_remove_index(rand::thread_rng().gen_range(0..self.max_orphans)) {
debug!("Evicted {} from the orphan blocks pool", evicted);
let mut eviction_succeeded = false;
debug!("Orphan blocks pool size exceeded. Trying to evict a random orphan block.");
// Retry up to a logarithmic number of times
for i in 0..self.max_orphans_log {
// Evict a random orphan in order to keep pool size under the limit
let rand_index = rand::thread_rng().gen_range(0..self.orphans.len());
if !orphan_ancestors.is_empty() {
// IndexMap has no API for getting a removable Entry by index
if let Some(rand_hash) = self.orphans.get_index(rand_index).map(|(&h, _)| h) {
if orphan_ancestors.contains(&rand_hash) {
continue; // Do not evict an ancestor of this new orphan
}
}
}
if let Some((evicted, _)) = self.orphans.swap_remove_index(rand_index) {
debug!("Evicted {} from the orphan blocks pool for new block {} (after {} retries)", evicted, orphan_hash, i);
eviction_succeeded = true;
break;
}
}
if !eviction_succeeded {
// All retries have found an existing ancestor, so we reject the new block
debug!(
"Tried to evict a random orphan for new orphan {}, but all {} retries found an existing ancestor. Rejecting.",
orphan_hash, self.max_orphans_log
);
return None;
}
}
for parent in orphan_block.header.direct_parents() {
Expand All @@ -77,8 +126,8 @@ impl OrphanBlocksPool {
}
// Insert
self.orphans.insert(orphan_block.hash(), OrphanBlock::new(orphan_block, self.iterate_child_orphans(orphan_hash).collect()));
// Get roots
Some(self.get_orphan_roots(consensus, orphan_hash).await)
// Return roots
Some(OrphanOutput::Roots(roots))
}

/// Returns whether this block is in the orphan pool.
Expand All @@ -90,19 +139,26 @@ impl OrphanBlocksPool {
/// not in the orphan pool AND do not exist consensus-wise or are header-only. Given an orphan relayed by
/// a peer, these blocks should be the next-in-line to be requested from that peer.
pub async fn get_orphan_roots_if_known(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput {
if !self.orphans.contains_key(&orphan) {
return OrphanOutput::Unknown;
if let Some(orphan_block) = self.orphans.get(&orphan) {
match self.get_orphan_roots(consensus, orphan_block.block.header.direct_parents().iter().copied().collect()).await {
FindRootsOutput::Roots(roots, _) => OrphanOutput::Roots(roots),
FindRootsOutput::NoRoots(_) => OrphanOutput::NoRoots(Default::default()),
}
} else {
OrphanOutput::Unknown
}
self.get_orphan_roots(consensus, orphan).await
}

/// Internal get roots method. Assumes 'orphan' is within the pool
async fn get_orphan_roots(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput {
/// Internal get roots method. The arg `queue` is the set of blocks to perform BFS from and
/// search through the orphan pool and consensus until finding any unknown roots or finding
/// out that no ancestor is missing.
async fn get_orphan_roots(&self, consensus: &ConsensusProxy, mut queue: VecDeque<Hash>) -> FindRootsOutput {
let mut roots = Vec::new();
let mut queue = VecDeque::from([orphan]);
let mut visited = HashSet::from([orphan]); // We avoid the custom block hasher here. See comment on `orphans` above.
let mut visited: HashSet<_> = queue.iter().copied().collect();
let mut orphan_ancestors = HashSet::new();
while let Some(current) = queue.pop_front() {
if let Some(block) = self.orphans.get(&current) {
orphan_ancestors.insert(current);
for parent in block.block.header.direct_parents().iter().copied() {
if visited.insert(parent) {
queue.push_back(parent);
Expand All @@ -118,9 +174,9 @@ impl OrphanBlocksPool {
}

if roots.is_empty() {
OrphanOutput::NoRoots
FindRootsOutput::NoRoots(orphan_ancestors)
} else {
OrphanOutput::Roots(roots)
FindRootsOutput::Roots(roots, orphan_ancestors)
}
}

Expand Down Expand Up @@ -269,10 +325,12 @@ mod tests {
let b = Block::from_precomputed_hash(9.into(), vec![]);
let c = Block::from_precomputed_hash(10.into(), roots.clone());
let d = Block::from_precomputed_hash(11.into(), vec![10.into()]);

let e = Block::from_precomputed_hash(12.into(), vec![10.into()]);
let f = Block::from_precomputed_hash(13.into(), vec![12.into()]);
let g = Block::from_precomputed_hash(14.into(), vec![13.into()]);
let h = Block::from_precomputed_hash(15.into(), vec![14.into()]);
let k = Block::from_precomputed_hash(16.into(), vec![15.into()]);

pool.add_orphan(&consensus, c.clone()).await.unwrap();
pool.add_orphan(&consensus, d.clone()).await.unwrap();
Expand All @@ -289,18 +347,18 @@ mod tests {
assert!(pool.orphans.is_empty());

// Test revalidation
pool.add_orphan(&consensus, d.clone()).await.unwrap();
pool.add_orphan(&consensus, e.clone()).await.unwrap();
pool.add_orphan(&consensus, f.clone()).await.unwrap();
pool.add_orphan(&consensus, h.clone()).await.unwrap();
assert_eq!(pool.orphans.len(), 4);
pool.add_orphan(&consensus, g.clone()).await.unwrap();
pool.add_orphan(&consensus, k.clone()).await.unwrap();
assert_eq!(pool.orphans.len(), 3);
consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap();
pool.revalidate_orphans(&consensus).await;
assert_eq!(pool.orphans.len(), 1);
assert!(pool.orphans.contains_key(&h.hash())); // h's parent, g, was never inserted to the pool
pool.add_orphan(&consensus, g.clone()).await.unwrap();
assert!(pool.orphans.contains_key(&k.hash())); // k's parent, h, was never inserted to the pool
consensus.validate_and_insert_block(h.clone()).virtual_state_task.await.unwrap();
pool.revalidate_orphans(&consensus).await;
assert!(pool.orphans.is_empty());

drop((a, b, c, d, e, f, g, h));
drop((a, b, c, d, e, f, g, h, k));
}
}
Loading

0 comments on commit 0c76768

Please sign in to comment.