From 0c767681e8faaa49cd1d5e02ef4044d2e186f048 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Wed, 3 Jan 2024 19:50:39 +0200 Subject: [PATCH] Orphans improvements part 2 (#373) * track avg mergesets * unorphan ancestors if they exist w/o roots * refactor processing batch object + test fix * use warn * improve log * comment * orphan eviction policy * oops: use the correct len * review comments --- components/consensusmanager/src/batch.rs | 30 +++++ components/consensusmanager/src/lib.rs | 2 + components/consensusmanager/src/session.rs | 15 +++ .../pipeline/header_processor/processor.rs | 2 + consensus/src/pipeline/mod.rs | 4 + consensus/src/pipeline/monitor.rs | 5 +- protocol/flows/src/flow_context.rs | 22 ++-- protocol/flows/src/flowcontext/orphans.rs | 110 +++++++++++++----- protocol/flows/src/v5/blockrelay/flow.rs | 62 ++++++---- 9 files changed, 196 insertions(+), 56 deletions(-) create mode 100644 components/consensusmanager/src/batch.rs diff --git a/components/consensusmanager/src/batch.rs b/components/consensusmanager/src/batch.rs new file mode 100644 index 000000000..d9ff99f3a --- /dev/null +++ b/components/consensusmanager/src/batch.rs @@ -0,0 +1,30 @@ +use kaspa_consensus_core::{api::BlockValidationFuture, block::Block}; +use std::fmt::Debug; + +pub struct BlockProcessingBatch { + pub blocks: Vec, + pub block_tasks: Option>, + pub virtual_state_tasks: Option>, +} + +impl BlockProcessingBatch { + pub fn new(blocks: Vec, block_tasks: Vec, virtual_state_tasks: Vec) -> Self { + Self { blocks, block_tasks: Some(block_tasks), virtual_state_tasks: Some(virtual_state_tasks) } + } + + pub fn zip(self) -> impl Iterator { + self.blocks.into_iter().zip(self.virtual_state_tasks.unwrap()) + } +} + +impl Default for BlockProcessingBatch { + fn default() -> Self { + Self { blocks: Default::default(), block_tasks: Some(Default::default()), virtual_state_tasks: Some(Default::default()) } + } +} + +impl Debug for BlockProcessingBatch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockProcessingBatch").field("blocks", &self.blocks).finish() + } +} diff --git a/components/consensusmanager/src/lib.rs b/components/consensusmanager/src/lib.rs index 14edc623c..54bdda40b 100644 --- a/components/consensusmanager/src/lib.rs +++ b/components/consensusmanager/src/lib.rs @@ -4,8 +4,10 @@ use kaspa_core::{core::Core, debug, service::Service}; use parking_lot::RwLock; use std::{collections::VecDeque, ops::Deref, sync::Arc, thread::JoinHandle}; +mod batch; mod session; +pub use batch::BlockProcessingBatch; pub use session::{ spawn_blocking, ConsensusInstance, ConsensusProxy, ConsensusSessionBlocking, SessionLock, SessionReadGuard, SessionWriteGuard, }; diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index 1c68c4694..2631f66fe 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -21,6 +21,8 @@ use std::{ops::Deref, sync::Arc}; pub use tokio::task::spawn_blocking; +use crate::BlockProcessingBatch; + #[derive(Clone)] pub struct SessionOwnedReadGuard(Arc); @@ -158,6 +160,19 @@ impl ConsensusSessionOwned { self.consensus.validate_and_insert_block(block) } + pub fn validate_and_insert_block_batch(&self, mut batch: Vec) -> BlockProcessingBatch { + // Sort by blue work in order to ensure topological order + batch.sort_by(|a, b| a.header.blue_work.partial_cmp(&b.header.blue_work).unwrap()); + let (block_tasks, virtual_state_tasks) = batch + .iter() + .map(|b| { + let BlockValidationFutures { block_task, virtual_state_task } = self.consensus.validate_and_insert_block(b.clone()); + (block_task, virtual_state_task) + }) + .unzip(); + BlockProcessingBatch::new(batch, block_tasks, virtual_state_tasks) + } + pub fn validate_and_insert_trusted_block(&self, tb: TrustedBlock) -> BlockValidationFutures { self.consensus.validate_and_insert_trusted_block(tb) } diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index 88005ce3d..25c9944e4 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -356,6 +356,8 @@ impl HeaderProcessor { .unwrap_or_else(|| Arc::new(self.ghostdag_managers[level].ghostdag(&ctx.known_parents[level]))) }) .collect_vec(); + + self.counters.mergeset_counts.fetch_add(ghostdag_data[0].mergeset_size() as u64, Ordering::Relaxed); ctx.ghostdag_data = Some(ghostdag_data); } diff --git a/consensus/src/pipeline/mod.rs b/consensus/src/pipeline/mod.rs index 2c17551ff..8a526d94b 100644 --- a/consensus/src/pipeline/mod.rs +++ b/consensus/src/pipeline/mod.rs @@ -12,6 +12,7 @@ pub struct ProcessingCounters { pub blocks_submitted: AtomicU64, pub header_counts: AtomicU64, pub dep_counts: AtomicU64, + pub mergeset_counts: AtomicU64, pub body_counts: AtomicU64, pub txs_counts: AtomicU64, pub chain_block_counts: AtomicU64, @@ -24,6 +25,7 @@ impl ProcessingCounters { blocks_submitted: self.blocks_submitted.load(Ordering::Relaxed), header_counts: self.header_counts.load(Ordering::Relaxed), dep_counts: self.dep_counts.load(Ordering::Relaxed), + mergeset_counts: self.mergeset_counts.load(Ordering::Relaxed), body_counts: self.body_counts.load(Ordering::Relaxed), txs_counts: self.txs_counts.load(Ordering::Relaxed), chain_block_counts: self.chain_block_counts.load(Ordering::Relaxed), @@ -37,6 +39,7 @@ pub struct ProcessingCountersSnapshot { pub blocks_submitted: u64, pub header_counts: u64, pub dep_counts: u64, + pub mergeset_counts: u64, pub body_counts: u64, pub txs_counts: u64, pub chain_block_counts: u64, @@ -51,6 +54,7 @@ impl core::ops::Sub for &ProcessingCountersSnapshot { blocks_submitted: self.blocks_submitted.checked_sub(rhs.blocks_submitted).unwrap_or_default(), header_counts: self.header_counts.checked_sub(rhs.header_counts).unwrap_or_default(), dep_counts: self.dep_counts.checked_sub(rhs.dep_counts).unwrap_or_default(), + mergeset_counts: self.mergeset_counts.checked_sub(rhs.mergeset_counts).unwrap_or_default(), body_counts: self.body_counts.checked_sub(rhs.body_counts).unwrap_or_default(), txs_counts: self.txs_counts.checked_sub(rhs.txs_counts).unwrap_or_default(), chain_block_counts: self.chain_block_counts.checked_sub(rhs.chain_block_counts).unwrap_or_default(), diff --git a/consensus/src/pipeline/monitor.rs b/consensus/src/pipeline/monitor.rs index 75b841dad..8c73b34a7 100644 --- a/consensus/src/pipeline/monitor.rs +++ b/consensus/src/pipeline/monitor.rs @@ -50,13 +50,14 @@ impl ConsensusMonitor { let now = Instant::now(); info!( - "Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} parent references; {} UTXO-validated blocks; {:.2} avg txs per block; {} avg block mass)", + "Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} UTXO-validated blocks; {:.2} parents; {:.2} mergesets; {:.2} TPB; {:.1} mass)", delta.body_counts, delta.header_counts, (now - last_log_time).as_secs_f64(), delta.txs_counts, - delta.dep_counts, delta.chain_block_counts, + if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 }, + if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 }, if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 }, if delta.body_counts != 0 { delta.mass_counts / delta.body_counts } else{ 0 }, ); diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index c6806f128..dea1941a1 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -17,7 +17,7 @@ use kaspa_consensus_notify::{ notification::{Notification, PruningPointUtxoSetOverrideNotification}, root::ConsensusNotificationRoot, }; -use kaspa_consensusmanager::{ConsensusInstance, ConsensusManager, ConsensusProxy}; +use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy}; use kaspa_core::{ debug, info, kaspad_env::{name, version}, @@ -102,7 +102,7 @@ impl BlockEventLogger { /// Start the logger listener. Must be called from an async tokio context fn start(&self) { - let chunk_limit = self.bps * 4; // We prefer that the 1 sec timeout forces the log, but nonetheless still want a reasonable bound on each chunk + let chunk_limit = self.bps * 10; // We prefer that the 1 sec timeout forces the log, but nonetheless still want a reasonable bound on each chunk let receiver = self.receiver.lock().take().expect("expected to be called once"); tokio::spawn(async move { let chunk_stream = UnboundedReceiverStream::new(receiver).chunks_timeout(chunk_limit, Duration::from_secs(1)); @@ -110,7 +110,7 @@ impl BlockEventLogger { while let Some(chunk) = chunk_stream.next().await { #[derive(Default)] struct LogSummary { - // Representative + // Representatives relay_rep: Option, submit_rep: Option, orphan_rep: Option, @@ -511,7 +511,7 @@ impl FlowContext { // Broadcast as soon as the block has been validated and inserted into the DAG self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await; - self.on_new_block(consensus, block, virtual_state_task).await; + self.on_new_block(consensus, Default::default(), block, virtual_state_task).await; self.log_block_event(BlockLogEvent::Submit(hash)); Ok(()) @@ -539,7 +539,13 @@ impl FlowContext { /// and possibly rebroadcast manually added transactions when not in IBD. /// /// _GO-KASPAD: OnNewBlock + broadcastTransactionsAfterBlockAdded_ - pub async fn on_new_block(&self, consensus: &ConsensusProxy, block: Block, virtual_state_task: BlockValidationFuture) { + pub async fn on_new_block( + &self, + consensus: &ConsensusProxy, + ancestor_batch: BlockProcessingBatch, + block: Block, + virtual_state_task: BlockValidationFuture, + ) { let hash = block.hash(); let mut blocks = self.unorphan_blocks(consensus, hash).await; @@ -554,7 +560,7 @@ impl FlowContext { blocks.sort_by(|a, b| a.0.header.blue_work.partial_cmp(&b.0.header.blue_work).unwrap()); // Use a ProcessQueue so we get rid of duplicates let mut transactions_to_broadcast = ProcessQueue::new(); - for (block, virtual_state_task) in once((block, virtual_state_task)).chain(blocks.into_iter()) { + for (block, virtual_state_task) in ancestor_batch.zip().chain(once((block, virtual_state_task))).chain(blocks.into_iter()) { // We only care about waiting for virtual to process the block at this point, before proceeding with post-processing // actions such as updating the mempool. We know this will not err since `block_task` already completed w/o error let _ = virtual_state_task.await; @@ -568,8 +574,8 @@ impl FlowContext { } } - // Don't relay transactions when in IBD - if self.is_ibd_running() { + // Transaction relay is disabled if the node is out of sync and thus not mining + if !consensus.async_is_nearly_synced().await { return; } diff --git a/protocol/flows/src/flowcontext/orphans.rs b/protocol/flows/src/flowcontext/orphans.rs index 6531706ed..efbf18299 100644 --- a/protocol/flows/src/flowcontext/orphans.rs +++ b/protocol/flows/src/flowcontext/orphans.rs @@ -3,7 +3,7 @@ use kaspa_consensus_core::{ api::{BlockValidationFuture, BlockValidationFutures}, block::Block, }; -use kaspa_consensusmanager::ConsensusProxy; +use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusProxy}; use kaspa_core::debug; use kaspa_hashes::Hash; use kaspa_utils::option::OptionExtensions; @@ -20,12 +20,21 @@ use super::process_queue::ProcessQueue; pub enum OrphanOutput { /// Block is orphan with the provided missing roots Roots(Vec), - /// Block has no missing roots (but it might have known orphan ancestors) - NoRoots, + /// Block has no missing roots (but it might have known orphan ancestors which are returned + /// along with their corresponding consensus processing tasks) + NoRoots(BlockProcessingBatch), /// The block does not exist in the orphan pool Unknown, } +#[derive(Debug)] +enum FindRootsOutput { + /// Block is orphan with the provided missing roots and a possible set of known orphan ancestors + Roots(Vec, HashSet), + /// Block has no missing roots (but it might have known orphan ancestors) + NoRoots(HashSet), +} + struct OrphanBlock { /// The actual block block: Block, @@ -49,11 +58,17 @@ pub struct OrphanBlocksPool { orphans: IndexMap, /// Max number of orphans to keep in the pool max_orphans: usize, + /// The log base 2 of `max_orphans` + max_orphans_log: usize, } impl OrphanBlocksPool { pub fn new(max_orphans: usize) -> Self { - Self { orphans: IndexMap::with_capacity(max_orphans), max_orphans } + Self { + orphans: IndexMap::with_capacity(max_orphans), + max_orphans, + max_orphans_log: (max_orphans as f64).log2().ceil() as usize, + } } /// Adds the provided block to the orphan pool. Returns None if the block is already @@ -63,11 +78,45 @@ impl OrphanBlocksPool { if self.orphans.contains_key(&orphan_hash) { return None; } + + let (roots, orphan_ancestors) = + match self.get_orphan_roots(consensus, orphan_block.header.direct_parents().iter().copied().collect()).await { + FindRootsOutput::Roots(roots, orphan_ancestors) => (roots, orphan_ancestors), + FindRootsOutput::NoRoots(orphan_ancestors) => { + let blocks: Vec<_> = + orphan_ancestors.into_iter().map(|h| self.orphans.remove(&h).expect("orphan ancestor").block).collect(); + return Some(OrphanOutput::NoRoots(consensus.validate_and_insert_block_batch(blocks))); + } + }; + if self.orphans.len() == self.max_orphans { - debug!("Orphan blocks pool size exceeded. Evicting a random orphan block."); - // Evict a random orphan in order to keep pool size under the limit - if let Some((evicted, _)) = self.orphans.swap_remove_index(rand::thread_rng().gen_range(0..self.max_orphans)) { - debug!("Evicted {} from the orphan blocks pool", evicted); + let mut eviction_succeeded = false; + debug!("Orphan blocks pool size exceeded. Trying to evict a random orphan block."); + // Retry up to a logarithmic number of times + for i in 0..self.max_orphans_log { + // Evict a random orphan in order to keep pool size under the limit + let rand_index = rand::thread_rng().gen_range(0..self.orphans.len()); + if !orphan_ancestors.is_empty() { + // IndexMap has no API for getting a removable Entry by index + if let Some(rand_hash) = self.orphans.get_index(rand_index).map(|(&h, _)| h) { + if orphan_ancestors.contains(&rand_hash) { + continue; // Do not evict an ancestor of this new orphan + } + } + } + if let Some((evicted, _)) = self.orphans.swap_remove_index(rand_index) { + debug!("Evicted {} from the orphan blocks pool for new block {} (after {} retries)", evicted, orphan_hash, i); + eviction_succeeded = true; + break; + } + } + if !eviction_succeeded { + // All retries have found an existing ancestor, so we reject the new block + debug!( + "Tried to evict a random orphan for new orphan {}, but all {} retries found an existing ancestor. Rejecting.", + orphan_hash, self.max_orphans_log + ); + return None; } } for parent in orphan_block.header.direct_parents() { @@ -77,8 +126,8 @@ impl OrphanBlocksPool { } // Insert self.orphans.insert(orphan_block.hash(), OrphanBlock::new(orphan_block, self.iterate_child_orphans(orphan_hash).collect())); - // Get roots - Some(self.get_orphan_roots(consensus, orphan_hash).await) + // Return roots + Some(OrphanOutput::Roots(roots)) } /// Returns whether this block is in the orphan pool. @@ -90,19 +139,26 @@ impl OrphanBlocksPool { /// not in the orphan pool AND do not exist consensus-wise or are header-only. Given an orphan relayed by /// a peer, these blocks should be the next-in-line to be requested from that peer. pub async fn get_orphan_roots_if_known(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput { - if !self.orphans.contains_key(&orphan) { - return OrphanOutput::Unknown; + if let Some(orphan_block) = self.orphans.get(&orphan) { + match self.get_orphan_roots(consensus, orphan_block.block.header.direct_parents().iter().copied().collect()).await { + FindRootsOutput::Roots(roots, _) => OrphanOutput::Roots(roots), + FindRootsOutput::NoRoots(_) => OrphanOutput::NoRoots(Default::default()), + } + } else { + OrphanOutput::Unknown } - self.get_orphan_roots(consensus, orphan).await } - /// Internal get roots method. Assumes 'orphan' is within the pool - async fn get_orphan_roots(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput { + /// Internal get roots method. The arg `queue` is the set of blocks to perform BFS from and + /// search through the orphan pool and consensus until finding any unknown roots or finding + /// out that no ancestor is missing. + async fn get_orphan_roots(&self, consensus: &ConsensusProxy, mut queue: VecDeque) -> FindRootsOutput { let mut roots = Vec::new(); - let mut queue = VecDeque::from([orphan]); - let mut visited = HashSet::from([orphan]); // We avoid the custom block hasher here. See comment on `orphans` above. + let mut visited: HashSet<_> = queue.iter().copied().collect(); + let mut orphan_ancestors = HashSet::new(); while let Some(current) = queue.pop_front() { if let Some(block) = self.orphans.get(¤t) { + orphan_ancestors.insert(current); for parent in block.block.header.direct_parents().iter().copied() { if visited.insert(parent) { queue.push_back(parent); @@ -118,9 +174,9 @@ impl OrphanBlocksPool { } if roots.is_empty() { - OrphanOutput::NoRoots + FindRootsOutput::NoRoots(orphan_ancestors) } else { - OrphanOutput::Roots(roots) + FindRootsOutput::Roots(roots, orphan_ancestors) } } @@ -269,10 +325,12 @@ mod tests { let b = Block::from_precomputed_hash(9.into(), vec![]); let c = Block::from_precomputed_hash(10.into(), roots.clone()); let d = Block::from_precomputed_hash(11.into(), vec![10.into()]); + let e = Block::from_precomputed_hash(12.into(), vec![10.into()]); let f = Block::from_precomputed_hash(13.into(), vec![12.into()]); let g = Block::from_precomputed_hash(14.into(), vec![13.into()]); let h = Block::from_precomputed_hash(15.into(), vec![14.into()]); + let k = Block::from_precomputed_hash(16.into(), vec![15.into()]); pool.add_orphan(&consensus, c.clone()).await.unwrap(); pool.add_orphan(&consensus, d.clone()).await.unwrap(); @@ -289,18 +347,18 @@ mod tests { assert!(pool.orphans.is_empty()); // Test revalidation - pool.add_orphan(&consensus, d.clone()).await.unwrap(); - pool.add_orphan(&consensus, e.clone()).await.unwrap(); pool.add_orphan(&consensus, f.clone()).await.unwrap(); - pool.add_orphan(&consensus, h.clone()).await.unwrap(); - assert_eq!(pool.orphans.len(), 4); + pool.add_orphan(&consensus, g.clone()).await.unwrap(); + pool.add_orphan(&consensus, k.clone()).await.unwrap(); + assert_eq!(pool.orphans.len(), 3); + consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap(); pool.revalidate_orphans(&consensus).await; assert_eq!(pool.orphans.len(), 1); - assert!(pool.orphans.contains_key(&h.hash())); // h's parent, g, was never inserted to the pool - pool.add_orphan(&consensus, g.clone()).await.unwrap(); + assert!(pool.orphans.contains_key(&k.hash())); // k's parent, h, was never inserted to the pool + consensus.validate_and_insert_block(h.clone()).virtual_state_task.await.unwrap(); pool.revalidate_orphans(&consensus).await; assert!(pool.orphans.is_empty()); - drop((a, b, c, d, e, f, g, h)); + drop((a, b, c, d, e, f, g, h, k)); } } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 3bfda200b..b0e5a3c54 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -4,8 +4,8 @@ use crate::{ flowcontext::orphans::OrphanOutput, }; use kaspa_consensus_core::{api::BlockValidationFutures, block::Block, blockstatus::BlockStatus, errors::block::RuleError}; -use kaspa_consensusmanager::ConsensusProxy; -use kaspa_core::{debug, info}; +use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusProxy}; +use kaspa_core::{debug, info, warn}; use kaspa_hashes::Hash; use kaspa_p2p_lib::{ common::ProtocolError, @@ -107,8 +107,8 @@ impl HandleRelayInvsFlow { } match self.ctx.get_orphan_roots_if_known(&session, inv.hash).await { - OrphanOutput::Unknown => {} // Keep processing this inv - OrphanOutput::NoRoots => continue, // Existing orphan w/o missing roots + OrphanOutput::Unknown => {} // Keep processing this inv + OrphanOutput::NoRoots(_) => continue, // Existing orphan w/o missing roots OrphanOutput::Roots(roots) => { // Known orphan with roots to enqueue self.enqueue_orphan_roots(inv.hash, roots, inv.known_within_range); @@ -152,31 +152,51 @@ impl HandleRelayInvsFlow { let BlockValidationFutures { block_task, mut virtual_state_task } = session.validate_and_insert_block(block.clone()); - match block_task.await { - Ok(_) => {} + let ancestor_batch = match block_task.await { + Ok(_) => Default::default(), Err(RuleError::MissingParents(missing_parents)) => { debug!("Block {} is orphan and has missing parents: {:?}", block.hash(), missing_parents); - if self.process_orphan(&session, block.clone(), inv.known_within_range).await? { - continue; - } else { - // Block is possibly not an orphan, retrying + if let Some(mut ancestor_batch) = self.process_orphan(&session, block.clone(), inv.known_within_range).await? { + // Block is not an orphan, retrying let BlockValidationFutures { block_task: block_task_inner, virtual_state_task: virtual_state_task_inner } = session.validate_and_insert_block(block.clone()); virtual_state_task = virtual_state_task_inner; + for block_task in ancestor_batch.block_tasks.take().unwrap() { + match block_task.await { + Ok(_) => {} + // We disconnect on invalidness even though this is not a direct relay from this peer, because + // current relay is a descendant of this block (i.e. this peer claims all its ancestors are valid) + Err(rule_error) => return Err(rule_error.into()), + } + } + match block_task_inner.await { - Ok(_) => info!("Retried orphan block {} successfully", block.hash()), - Err(RuleError::MissingParents(_)) => continue, + Ok(_) => match ancestor_batch.blocks.len() { + 0 => info!("Retried orphan block {} successfully", block.hash()), + // Use warn in order to track this rare case more easily + n => warn!("Unorphaned {} ancestors and retried orphan block {} successfully", n, block.hash()), + }, Err(rule_error) => return Err(rule_error.into()), } + ancestor_batch + } else { + continue; } } Err(rule_error) => return Err(rule_error.into()), - } + }; // As a policy, we only relay blocks who stand a chance to enter past(virtual). // The only mining rule which permanently excludes a block is the merge depth bound // (as opposed to "max parents" and "mergeset size limit" rules) if broadcast { + let msgs = ancestor_batch + .blocks + .iter() + .map(|b| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) })) + .collect(); + self.ctx.hub().broadcast_many(msgs).await; + self.ctx .hub() .broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) })) @@ -187,7 +207,7 @@ impl HandleRelayInvsFlow { // can continue processing the following relay blocks let ctx = self.ctx.clone(); tokio::spawn(async move { - ctx.on_new_block(&session, block, virtual_state_task).await; + ctx.on_new_block(&session, ancestor_batch, block, virtual_state_task).await; ctx.log_block_event(BlockLogEvent::Relay(inv.hash)); }); } @@ -223,17 +243,17 @@ impl HandleRelayInvsFlow { } } - /// Process the orphan block. Returns `false` if the block has no missing roots, indicating - /// a retry is recommended + /// Process the orphan block. Returns `Some(BlockProcessingBatch)` if the block has no missing roots, where + /// the batch includes ancestor blocks and their consensus processing batch. This indicates a retry is recommended. async fn process_orphan( &mut self, consensus: &ConsensusProxy, block: Block, mut known_within_range: bool, - ) -> Result { + ) -> Result, ProtocolError> { // Return if the block has been orphaned from elsewhere already if self.ctx.is_known_orphan(block.hash()).await { - return Ok(true); + return Ok(None); } /* We orphan a block if one of the following holds: @@ -258,7 +278,9 @@ impl HandleRelayInvsFlow { // Note that no roots means it is still possible there is a known orphan ancestor in the orphan pool. However // we should still retry consensus in this case because the ancestor might have been queued to consensus // already and consensus handles dependencies with improved (pipeline) concurrency and overlapping - Some(OrphanOutput::NoRoots) => return Ok(false), + Some(OrphanOutput::NoRoots(ancestor_batch)) => { + return Ok(Some(ancestor_batch)); + } Some(OrphanOutput::Roots(roots)) => self.enqueue_orphan_roots(hash, roots, known_within_range), None | Some(OrphanOutput::Unknown) => {} } @@ -270,7 +292,7 @@ impl HandleRelayInvsFlow { Err(TrySendError::Closed(_)) => return Err(ProtocolError::ConnectionClosed), // This indicates that IBD flow has exited } } - Ok(true) + Ok(None) } /// Applies an heuristic to check whether we should store the orphan block in the orphan pool for IBD considerations.