Skip to content

Commit

Permalink
Merge remote-tracking branch 'msutton-origin/revalidate-orphans-2' in…
Browse files Browse the repository at this point in the history
…to tn11-compiled
  • Loading branch information
coderofstuff committed Dec 17, 2023
2 parents 2840cf6 + a5a1140 commit 0a9f96e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 23 deletions.
16 changes: 12 additions & 4 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub struct FlowContextInner {
transactions_spread: AsyncRwLock<TransactionsSpread>,
shared_transaction_requests: Arc<Mutex<HashMap<TransactionId, RequestScopeMetadata>>>,
is_ibd_running: Arc<AtomicBool>,
ibd_peer_key: Arc<RwLock<Option<PeerKey>>>,
ibd_peer_key: Arc<RwLock<Option<(PeerKey, u64)>>>,
pub address_manager: Arc<Mutex<AddressManager>>,
connection_manager: RwLock<Option<Arc<ConnectionManager>>>,
mining_manager: MiningManagerProxy,
Expand Down Expand Up @@ -272,9 +272,9 @@ impl FlowContext {
&self.mining_manager
}

pub fn try_set_ibd_running(&self, peer_key: PeerKey) -> Option<IbdRunningGuard> {
pub fn try_set_ibd_running(&self, peer_key: PeerKey, daa_score: u64) -> Option<IbdRunningGuard> {
if self.is_ibd_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
self.ibd_peer_key.write().replace(peer_key);
self.ibd_peer_key.write().replace((peer_key, daa_score));
Some(IbdRunningGuard { indicator: self.is_ibd_running.clone() })
} else {
None
Expand All @@ -285,9 +285,17 @@ impl FlowContext {
self.is_ibd_running.load(Ordering::SeqCst)
}

pub fn ibd_daa_score(&self) -> Option<u64> {
if self.is_ibd_running() {
self.ibd_peer_key.read().map(|x| x.1)
} else {
None
}
}

pub fn ibd_peer_key(&self) -> Option<PeerKey> {
if self.is_ibd_running() {
*self.ibd_peer_key.read()
self.ibd_peer_key.read().map(|x| x.0)
} else {
None
}
Expand Down
73 changes: 55 additions & 18 deletions protocol/flows/src/v5/blockrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use kaspa_consensus_core::{api::BlockValidationFutures, block::Block, blockstatus::BlockStatus, errors::block::RuleError};
use kaspa_consensusmanager::ConsensusProxy;
use kaspa_core::{debug, info};
use kaspa_core::{debug, info, warn};
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
Expand All @@ -17,31 +17,35 @@ use std::{collections::VecDeque, sync::Arc};

pub struct RelayInvMessage {
hash: Hash,
is_indirect: bool,
known_within_range: bool,
}

/// Encapsulates an incoming invs route which also receives data locally
pub struct TwoWayIncomingRoute {
incoming_route: SharedIncomingRoute,
indirect_invs: VecDeque<Hash>,
indirect_invs: VecDeque<RelayInvMessage>,
}

impl TwoWayIncomingRoute {
pub fn new(incoming_route: SharedIncomingRoute) -> Self {
Self { incoming_route, indirect_invs: VecDeque::new() }
}

pub fn enqueue_unknown_indirect_invs<I: IntoIterator<Item = Hash>>(&mut self, iter: I) {
self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, known_within_range: false }))
}

pub fn enqueue_indirect_invs<I: IntoIterator<Item = Hash>>(&mut self, iter: I) {
self.indirect_invs.extend(iter)
self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, known_within_range: true }))
}

pub async fn dequeue(&mut self) -> Result<RelayInvMessage, ProtocolError> {
if let Some(inv) = self.indirect_invs.pop_front() {
Ok(RelayInvMessage { hash: inv, is_indirect: true })
Ok(inv)
} else {
let msg = dequeue!(self.incoming_route, Payload::InvRelayBlock)?;
let inv = msg.try_into()?;
Ok(RelayInvMessage { hash: inv, is_indirect: false })
Ok(RelayInvMessage { hash: inv, known_within_range: false })
}
}
}
Expand Down Expand Up @@ -129,22 +133,33 @@ impl HandleRelayInvsFlow {

// We do not apply the skip heuristic below if inv was queued indirectly (as an orphan root), since
// that means the process started by a proper and relevant relay block
if !inv.is_indirect && !broadcast {
if !inv.known_within_range && !broadcast {
debug!(
"Relay block {} has lower blue work than virtual's merge depth root ({} <= {}), hence we are skipping it",
inv.hash, block.header.blue_work, blue_work_threshold
);
continue;
}

let BlockValidationFutures { block_task, virtual_state_task } = session.validate_and_insert_block(block.clone());
let BlockValidationFutures { block_task, mut virtual_state_task } = session.validate_and_insert_block(block.clone());

match block_task.await {
Ok(_) => {}
Err(RuleError::MissingParents(missing_parents)) => {
debug!("Block {} is orphan and has missing parents: {:?}", block.hash(), missing_parents);
self.process_orphan(&session, block, inv.is_indirect).await?;
continue;
if self.process_orphan(&session, block.clone(), inv.known_within_range).await? {
continue;
} else {
// Retry
let BlockValidationFutures { block_task: block_task_inner, virtual_state_task: virtual_state_task_inner } =
session.validate_and_insert_block(block.clone());
virtual_state_task = virtual_state_task_inner;
match block_task_inner.await {
Ok(_) => warn!("Retried orphan block {} successfully", block.hash()), // TODO
Err(RuleError::MissingParents(_)) => continue,
Err(rule_error) => return Err(rule_error.into()),
}
}
}
Err(rule_error) => return Err(rule_error.into()),
}
Expand All @@ -169,17 +184,20 @@ impl HandleRelayInvsFlow {
}
}

async fn enqueue_orphan_roots(&mut self, consensus: &ConsensusProxy, orphan: Hash) {
async fn enqueue_orphan_roots(&mut self, consensus: &ConsensusProxy, orphan: Hash) -> bool {
if let Some(roots) = self.ctx.get_orphan_roots(consensus, orphan).await {
if roots.is_empty() {
return;
return false;
}
if self.ctx.is_log_throttled() {
debug!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len());
} else {
info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len());
}
self.invs_route.enqueue_indirect_invs(roots)
self.invs_route.enqueue_indirect_invs(roots);
true
} else {
false
}
}

Expand Down Expand Up @@ -208,19 +226,38 @@ impl HandleRelayInvsFlow {
}
}

async fn process_orphan(&mut self, consensus: &ConsensusProxy, block: Block, is_indirect_inv: bool) -> Result<(), ProtocolError> {
async fn process_orphan(
&mut self,
consensus: &ConsensusProxy,
block: Block,
known_within_range: bool,
) -> Result<bool, ProtocolError> {
// Return if the block has been orphaned from elsewhere already
if self.ctx.is_known_orphan(block.hash()).await {
return Ok(());
return Ok(false);
}

if let Some(ibd_daa_score) = self.ctx.ibd_daa_score() {
if block.header.daa_score + 100 > ibd_daa_score && block.header.daa_score < ibd_daa_score + 500 {
let hash = block.hash();
self.ctx.add_orphan(block).await;
if let Some(roots) = self.ctx.get_orphan_roots(consensus, hash).await {
if !roots.is_empty() {
self.invs_route.enqueue_unknown_indirect_invs(roots);
return Ok(true);
}
}
return Ok(false);
}
}

// Add the block to the orphan pool if it's within orphan resolution range.
// If the block is indirect it means one of its descendants was already is resolution range, so
// we can avoid the query.
if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? {
if known_within_range || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? {
let hash = block.hash();
self.ctx.add_orphan(block).await;
self.enqueue_orphan_roots(consensus, hash).await;
return Ok(self.enqueue_orphan_roots(consensus, hash).await);
} else {
// Send the block to IBD flow via the dedicated job channel. If the channel has a pending job, we prefer
// the block with higher blue work, since it is usually more recent
Expand All @@ -229,7 +266,7 @@ impl HandleRelayInvsFlow {
Err(TrySendError::Closed(_)) => return Err(ProtocolError::ConnectionClosed), // This indicates that IBD flow has exited
}
}
Ok(())
Ok(false)
}

/// Finds out whether the given block hash should be retrieved via the unorphaning
Expand Down
2 changes: 1 addition & 1 deletion protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl IbdFlow {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
while let Ok(relay_block) = self.relay_receiver.recv().await {
if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key()) {
if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key(), relay_block.header.daa_score) {
info!("IBD started with peer {}", self.router);

match self.ibd(relay_block).await {
Expand Down

0 comments on commit 0a9f96e

Please sign in to comment.