diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index d58d401bc8..2828b1e986 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -127,7 +127,7 @@ pub struct FlowContextInner { transactions_spread: AsyncRwLock, shared_transaction_requests: Arc>>, is_ibd_running: Arc, - ibd_peer_key: Arc>>, + ibd_peer_key: Arc>>, pub address_manager: Arc>, connection_manager: RwLock>>, mining_manager: MiningManagerProxy, @@ -272,9 +272,9 @@ impl FlowContext { &self.mining_manager } - pub fn try_set_ibd_running(&self, peer_key: PeerKey) -> Option { + pub fn try_set_ibd_running(&self, peer_key: PeerKey, daa_score: u64) -> Option { if self.is_ibd_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() { - self.ibd_peer_key.write().replace(peer_key); + self.ibd_peer_key.write().replace((peer_key, daa_score)); Some(IbdRunningGuard { indicator: self.is_ibd_running.clone() }) } else { None @@ -285,9 +285,17 @@ impl FlowContext { self.is_ibd_running.load(Ordering::SeqCst) } + pub fn ibd_daa_score(&self) -> Option { + if self.is_ibd_running() { + self.ibd_peer_key.read().map(|x| x.1) + } else { + None + } + } + pub fn ibd_peer_key(&self) -> Option { if self.is_ibd_running() { - *self.ibd_peer_key.read() + self.ibd_peer_key.read().map(|x| x.0) } else { None } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 9d0698020f..604bc21bc1 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -4,7 +4,7 @@ use crate::{ }; use kaspa_consensus_core::{api::BlockValidationFutures, block::Block, blockstatus::BlockStatus, errors::block::RuleError}; use kaspa_consensusmanager::ConsensusProxy; -use kaspa_core::{debug, info}; +use kaspa_core::{debug, info, warn}; use kaspa_hashes::Hash; use kaspa_p2p_lib::{ common::ProtocolError, @@ -17,13 +17,13 @@ use std::{collections::VecDeque, sync::Arc}; pub struct RelayInvMessage { hash: Hash, - is_indirect: bool, + known_within_range: bool, } /// Encapsulates an incoming invs route which also receives data locally pub struct TwoWayIncomingRoute { incoming_route: SharedIncomingRoute, - indirect_invs: VecDeque, + indirect_invs: VecDeque, } impl TwoWayIncomingRoute { @@ -31,17 +31,21 @@ impl TwoWayIncomingRoute { Self { incoming_route, indirect_invs: VecDeque::new() } } + pub fn enqueue_unknown_indirect_invs>(&mut self, iter: I) { + self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, known_within_range: false })) + } + pub fn enqueue_indirect_invs>(&mut self, iter: I) { - self.indirect_invs.extend(iter) + self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, known_within_range: true })) } pub async fn dequeue(&mut self) -> Result { if let Some(inv) = self.indirect_invs.pop_front() { - Ok(RelayInvMessage { hash: inv, is_indirect: true }) + Ok(inv) } else { let msg = dequeue!(self.incoming_route, Payload::InvRelayBlock)?; let inv = msg.try_into()?; - Ok(RelayInvMessage { hash: inv, is_indirect: false }) + Ok(RelayInvMessage { hash: inv, known_within_range: false }) } } } @@ -129,7 +133,7 @@ impl HandleRelayInvsFlow { // We do not apply the skip heuristic below if inv was queued indirectly (as an orphan root), since // that means the process started by a proper and relevant relay block - if !inv.is_indirect && !broadcast { + if !inv.known_within_range && !broadcast { debug!( "Relay block {} has lower blue work than virtual's merge depth root ({} <= {}), hence we are skipping it", inv.hash, block.header.blue_work, blue_work_threshold @@ -137,14 +141,25 @@ impl HandleRelayInvsFlow { continue; } - let BlockValidationFutures { block_task, virtual_state_task } = session.validate_and_insert_block(block.clone()); + let BlockValidationFutures { block_task, mut virtual_state_task } = session.validate_and_insert_block(block.clone()); match block_task.await { Ok(_) => {} Err(RuleError::MissingParents(missing_parents)) => { debug!("Block {} is orphan and has missing parents: {:?}", block.hash(), missing_parents); - self.process_orphan(&session, block, inv.is_indirect).await?; - continue; + if self.process_orphan(&session, block.clone(), inv.known_within_range).await? { + continue; + } else { + // Retry + let BlockValidationFutures { block_task: block_task_inner, virtual_state_task: virtual_state_task_inner } = + session.validate_and_insert_block(block.clone()); + virtual_state_task = virtual_state_task_inner; + match block_task_inner.await { + Ok(_) => warn!("Retried orphan block {} successfully", block.hash()), // TODO + Err(RuleError::MissingParents(_)) => continue, + Err(rule_error) => return Err(rule_error.into()), + } + } } Err(rule_error) => return Err(rule_error.into()), } @@ -169,17 +184,20 @@ impl HandleRelayInvsFlow { } } - async fn enqueue_orphan_roots(&mut self, consensus: &ConsensusProxy, orphan: Hash) { + async fn enqueue_orphan_roots(&mut self, consensus: &ConsensusProxy, orphan: Hash) -> bool { if let Some(roots) = self.ctx.get_orphan_roots(consensus, orphan).await { if roots.is_empty() { - return; + return false; } if self.ctx.is_log_throttled() { debug!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len()); } else { info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len()); } - self.invs_route.enqueue_indirect_invs(roots) + self.invs_route.enqueue_indirect_invs(roots); + true + } else { + false } } @@ -208,19 +226,38 @@ impl HandleRelayInvsFlow { } } - async fn process_orphan(&mut self, consensus: &ConsensusProxy, block: Block, is_indirect_inv: bool) -> Result<(), ProtocolError> { + async fn process_orphan( + &mut self, + consensus: &ConsensusProxy, + block: Block, + known_within_range: bool, + ) -> Result { // Return if the block has been orphaned from elsewhere already if self.ctx.is_known_orphan(block.hash()).await { - return Ok(()); + return Ok(false); + } + + if let Some(ibd_daa_score) = self.ctx.ibd_daa_score() { + if block.header.daa_score + 100 > ibd_daa_score && block.header.daa_score < ibd_daa_score + 500 { + let hash = block.hash(); + self.ctx.add_orphan(block).await; + if let Some(roots) = self.ctx.get_orphan_roots(consensus, hash).await { + if !roots.is_empty() { + self.invs_route.enqueue_unknown_indirect_invs(roots); + return Ok(true); + } + } + return Ok(false); + } } // Add the block to the orphan pool if it's within orphan resolution range. // If the block is indirect it means one of its descendants was already is resolution range, so // we can avoid the query. - if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? { + if known_within_range || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? { let hash = block.hash(); self.ctx.add_orphan(block).await; - self.enqueue_orphan_roots(consensus, hash).await; + return Ok(self.enqueue_orphan_roots(consensus, hash).await); } else { // Send the block to IBD flow via the dedicated job channel. If the channel has a pending job, we prefer // the block with higher blue work, since it is usually more recent @@ -229,7 +266,7 @@ impl HandleRelayInvsFlow { Err(TrySendError::Closed(_)) => return Err(ProtocolError::ConnectionClosed), // This indicates that IBD flow has exited } } - Ok(()) + Ok(false) } /// Finds out whether the given block hash should be retrieved via the unorphaning diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 929d050602..9c736fc907 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -71,7 +71,7 @@ impl IbdFlow { async fn start_impl(&mut self) -> Result<(), ProtocolError> { while let Ok(relay_block) = self.relay_receiver.recv().await { - if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key()) { + if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key(), relay_block.header.daa_score) { info!("IBD started with peer {}", self.router); match self.ibd(relay_block).await {