Skip to content

Commit

Permalink
Merge remote-tracking branch 'msutton-origin/revalidate-orphans' into…
Browse files Browse the repository at this point in the history
… tn11-compiled
  • Loading branch information
coderofstuff committed Dec 15, 2023
2 parents 42dc50b + 847ee7c commit 2840cf6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
2 changes: 1 addition & 1 deletion protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl FlowContext {
unorphaned_blocks
}

pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) {
pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) -> (Vec<Hash>, Vec<BlockValidationFuture>) {
self.orphans_pool.write().await.revalidate_orphans(consensus).await
}

Expand Down
46 changes: 40 additions & 6 deletions protocol/flows/src/flowcontext/orphans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ impl OrphanBlocksPool {
/// This is important for the overall health of the pool and for ensuring that
/// orphan blocks don't evict due to pool size limit while already processed
/// blocks remain in it. Should be called following IBD.
pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) {
pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) -> (Vec<Hash>, Vec<BlockValidationFuture>) {
// First, cleanup blocks already processed by consensus
let mut i = 0;
while i < self.orphans.len() {
if let Some((&h, _)) = self.orphans.get_index(i) {
Expand All @@ -156,6 +157,36 @@ impl OrphanBlocksPool {
i += 1;
}
}

// Next, search for root blocks which are processable
let mut roots = Vec::new();
for block in self.orphans.values() {
let mut processable = true;
for p in block.block.header.direct_parents().iter().copied() {
if self.orphans.contains_key(&p) || consensus.async_get_block_status(p).await.is_none_or(|s| s.is_header_only()) {
processable = false;
break;
}
}
if processable {
roots.push(block.block.clone());
}
}

// Now process the roots and unorphan their descendents
let mut virtual_processing_tasks = Vec::with_capacity(roots.len());
let mut queued_hashes = Vec::with_capacity(roots.len());
for root in roots {
let root_hash = root.hash();
let BlockValidationFutures { block_task: _, virtual_state_task: root_task } = consensus.validate_and_insert_block(root);
virtual_processing_tasks.push(root_task);
queued_hashes.push(root_hash);
let (unorphan_blocks, _, unorphan_tasks) = self.unorphan_blocks(consensus, root_hash).await;
virtual_processing_tasks.extend(unorphan_tasks);
queued_hashes.extend(unorphan_blocks.into_iter().map(|b| b.hash()));
}

(queued_hashes, virtual_processing_tasks)
}
}

Expand Down Expand Up @@ -206,6 +237,8 @@ mod tests {
let d = Block::from_precomputed_hash(11.into(), vec![10.into()]);
let e = Block::from_precomputed_hash(12.into(), vec![10.into()]);
let f = Block::from_precomputed_hash(13.into(), vec![12.into()]);
let g = Block::from_precomputed_hash(14.into(), vec![13.into()]);
let h = Block::from_precomputed_hash(15.into(), vec![14.into()]);

pool.add_orphan(c.clone());
pool.add_orphan(d.clone());
Expand All @@ -225,14 +258,15 @@ mod tests {
pool.add_orphan(d.clone());
pool.add_orphan(e.clone());
pool.add_orphan(f.clone());
assert_eq!(pool.orphans.len(), 3);
pool.add_orphan(h.clone());
assert_eq!(pool.orphans.len(), 4);
pool.revalidate_orphans(&consensus).await;
assert_eq!(pool.orphans.len(), 2);
consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap();
consensus.validate_and_insert_block(f.clone()).virtual_state_task.await.unwrap();
assert_eq!(pool.orphans.len(), 1);
assert!(pool.orphans.contains_key(&h.hash())); // h's parent, g, was never inserted to the pool
pool.add_orphan(g.clone());
pool.revalidate_orphans(&consensus).await;
assert!(pool.orphans.is_empty());

drop((a, b, c, d, e, f));
drop((a, b, c, d, e, f, g, h));
}
}
19 changes: 17 additions & 2 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Flow,
},
};
use futures::future::try_join_all;
use futures::future::{join_all, try_join_all};
use kaspa_consensus_core::{
api::BlockValidationFuture,
block::Block,
Expand Down Expand Up @@ -132,7 +132,22 @@ impl IbdFlow {
self.sync_missing_block_bodies(&session, relay_block.hash()).await?;

// Following IBD we revalidate orphans since many of them might have been processed during the IBD
self.ctx.revalidate_orphans(&session).await;
// or are now processable
let (queued_hashes, virtual_processing_tasks) = self.ctx.revalidate_orphans(&session).await;
let mut unorphaned_hashes = Vec::with_capacity(queued_hashes.len());
let results = join_all(virtual_processing_tasks).await;
for (hash, result) in queued_hashes.into_iter().zip(results) {
match result {
Ok(_) => unorphaned_hashes.push(hash),
// We do not return the error and disconnect here since we don't know
// that this peer was the origin of the orphan block
Err(e) => warn!("Validation failed for orphan block {}: {}", hash, e),
}
}
match unorphaned_hashes.len() {
0 => {}
n => info!("IBD post processing: unorphaned {} blocks ...{}", n, unorphaned_hashes.last().unwrap()),
}

Ok(())
}
Expand Down

0 comments on commit 2840cf6

Please sign in to comment.