diff --git a/consensus/core/src/blockstatus.rs b/consensus/core/src/blockstatus.rs index 67f957d92..d7c387dd5 100644 --- a/consensus/core/src/blockstatus.rs +++ b/consensus/core/src/blockstatus.rs @@ -45,4 +45,8 @@ impl BlockStatus { pub fn is_valid(self) -> bool { self != BlockStatus::StatusInvalid } + + pub fn is_invalid(self) -> bool { + self == BlockStatus::StatusInvalid + } } diff --git a/consensus/core/src/config/bps.rs b/consensus/core/src/config/bps.rs index 975e8df0f..57602fd20 100644 --- a/consensus/core/src/config/bps.rs +++ b/consensus/core/src/config/bps.rs @@ -58,6 +58,15 @@ impl Bps { let val = (Self::ghostdag_k() / 2) as u8; if val < 10 { 10 + } else if val > 16 { + // We currently limit the number of parents by 16 in order to preserve processing performance + // and to avoid number of parent references per network round from growing quadratically with + // BPS. As BPS might grow beyond 10 this will mean that blocks will reference less parents than + // the average number of DAG tips. Which means relying on randomness between network peers for ensuring + // that all tips are eventually merged. We conjecture that with high probability every block will + // be merged after a log number of rounds. For mainnet this requires an increase to the value of GHOSTDAG + // K accompanied by a short security analysis, or moving to the parameterless DAGKNIGHT. + 16 } else { val } @@ -87,7 +96,7 @@ impl Bps { } pub const fn pruning_proof_m() -> u64 { - // No need to scale this constant with BPS since the important block levels (higher) remain logarithmically long + // No need to scale this constant with BPS since the important block levels (higher) remain logarithmically short PRUNING_PROOF_M } diff --git a/consensus/core/src/config/constants.rs b/consensus/core/src/config/constants.rs index de94e1d87..931c4d87d 100644 --- a/consensus/core/src/config/constants.rs +++ b/consensus/core/src/config/constants.rs @@ -59,7 +59,7 @@ pub mod consensus { /// Minimal size of the difficulty window. Affects the DA algorithm only at the starting period of a new net pub const MIN_DIFFICULTY_WINDOW_LEN: usize = 10; - /// **Legacy** difficulty adjustment window size corresponding to ~46 minutes with 1 BPS + /// **Legacy** difficulty adjustment window size corresponding to ~44 minutes with 1 BPS pub const LEGACY_DIFFICULTY_WINDOW_SIZE: usize = 2641; /// **New** difficulty window duration expressed in time units (seconds). diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index 178654552..6912739d3 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -216,10 +216,22 @@ impl Params { /// Returns whether the sink timestamp is recent enough and the node is considered synced or nearly synced. pub fn is_nearly_synced(&self, sink_timestamp: u64, sink_daa_score: u64) -> bool { - // We consider the node close to being synced if the sink (virtual selected parent) block - // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would - // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty - unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score) + if self.net.is_mainnet() { + // We consider the node close to being synced if the sink (virtual selected parent) block + // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would + // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty + unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score) + } else { + // For testnets we consider the node to be synced if the sink timestamp is within a time range which + // is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically. + // + // This period is smaller than the above mainnet calculation in order to insure that an IBDing miner + // with significant testnet hashrate does not overwhelm the network with deep side-DAGs. + // + // We use DAA duration as baseline and scale it down with BPS + let max_expected_duration_without_blocks_in_milliseconds = self.target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION; // = DAA duration in milliseconds / bps + unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds + } } pub fn network_name(&self) -> String { diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 0fc752c4c..ae9361ee6 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -80,7 +80,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender use itertools::Itertools; use kaspa_utils::binary_heap::BinaryHeapExtensions; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use rand::seq::SliceRandom; +use rand::{seq::SliceRandom, Rng}; use rayon::{ prelude::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator}, ThreadPool, @@ -522,7 +522,9 @@ impl VirtualStateProcessor { drop(selected_chain_write); } - /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation + /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation. + /// + /// Guaranteed to be `>= self.max_block_parents` fn max_virtual_parent_candidates(&self) -> usize { // Limit to max_block_parents x 3 candidates. This way we avoid going over thousands of tips when the network isn't healthy. // There's no specific reason for a factor of 3, and its not a consensus rule, just an estimation for reducing the amount @@ -574,11 +576,7 @@ impl VirtualStateProcessor { let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default(); return ( candidate, - heap.into_sorted_iter() - .take(self.max_virtual_parent_candidates()) - .take_while(|s| s.blue_work >= filtering_blue_work) - .map(|s| s.hash) - .collect(), + heap.into_sorted_iter().take_while(|s| s.blue_work >= filtering_blue_work).map(|s| s.hash).collect(), ); } else { debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate) @@ -620,10 +618,26 @@ impl VirtualStateProcessor { // enough so we avoid making further optimizations let _prune_guard = self.pruning_lock.blocking_read(); let max_block_parents = self.max_block_parents as usize; + let max_candidates = self.max_virtual_parent_candidates(); // Prioritize half the blocks with highest blue work and pick the rest randomly to ensure diversity between nodes - if candidates.len() > max_block_parents / 2 { - // `make_contiguous` should be a no op since the deque was just built + if candidates.len() > max_candidates { + // make_contiguous should be a no op since the deque was just built + let slice = candidates.make_contiguous(); + + // Keep slice[..max_block_parents / 2] as is, choose max_candidates - max_block_parents / 2 in random + // from the remainder of the slice while swapping them to slice[max_block_parents / 2..max_candidates]. + // + // Inspired by rand::partial_shuffle (which lacks the guarantee on chosen elements location). + for i in max_block_parents / 2..max_candidates { + let j = rand::thread_rng().gen_range(i..slice.len()); // i < max_candidates < slice.len() + slice.swap(i, j); + } + + // Truncate the unchosen elements + candidates.truncate(max_candidates); + } else if candidates.len() > max_block_parents / 2 { + // Fallback to a simpler algo in this case candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng()); } diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index ad57a8fbc..1cc344d6f 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -203,7 +203,7 @@ impl FlowContext { ) -> Self { let hub = Hub::new(); - let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32; + let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().ceil() as u32; // The maximum amount of orphans allowed in the orphans pool. This number is an // approximation of how many orphans there can possibly be on average. @@ -360,6 +360,10 @@ impl FlowContext { unorphaned_blocks } + pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) { + self.orphans_pool.write().await.revalidate_orphans(consensus).await + } + /// Adds the rpc-submitted block to the DAG and propagates it to peers. pub async fn submit_rpc_block(&self, consensus: &ConsensusProxy, block: Block) -> Result<(), ProtocolError> { if block.transactions.is_empty() { diff --git a/protocol/flows/src/flowcontext/orphans.rs b/protocol/flows/src/flowcontext/orphans.rs index e2f60a059..ad19e1ec4 100644 --- a/protocol/flows/src/flowcontext/orphans.rs +++ b/protocol/flows/src/flowcontext/orphans.rs @@ -136,6 +136,27 @@ impl OrphanBlocksPool { } }) } + + /// Iterate all orphans and remove blocks which are no longer orphans. + /// This is important for the overall health of the pool and for insuring that + /// orphan blocks don't evict due to pool size limit while already processed + /// blocks remain in it. Should be called following IBD. + pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) { + let mut i = 0; + while i < self.orphans.len() { + if let Some((&h, _)) = self.orphans.get_index(i) { + if consensus.async_get_block_status(h).await.is_some_and(|s| s.is_invalid() || s.has_block_body()) { + // If we swap removed do not advance i so that we revisit the new element moved + // to i in the next iteration. Loop will progress because len is shorter now. + self.orphans.swap_remove_index(i); + } else { + i += 1; + } + } else { + i += 1; + } + } + } } #[cfg(test)] @@ -183,6 +204,8 @@ mod tests { let b = Block::from_precomputed_hash(9.into(), vec![]); let c = Block::from_precomputed_hash(10.into(), roots.clone()); let d = Block::from_precomputed_hash(11.into(), vec![10.into()]); + let e = Block::from_precomputed_hash(12.into(), vec![10.into()]); + let f = Block::from_precomputed_hash(13.into(), vec![12.into()]); pool.add_orphan(c.clone()); pool.add_orphan(d.clone()); @@ -192,11 +215,24 @@ mod tests { consensus.validate_and_insert_block(a.clone()).virtual_state_task.await.unwrap(); consensus.validate_and_insert_block(b.clone()).virtual_state_task.await.unwrap(); + // Test unorphaning let (blocks, _, virtual_state_tasks) = pool.unorphan_blocks(&consensus, 8.into()).await; try_join_all(virtual_state_tasks).await.unwrap(); assert_eq!(blocks.into_iter().map(|b| b.hash()).collect::>(), HashSet::from([10.into(), 11.into()])); assert!(pool.orphans.is_empty()); - drop((a, b, c, d)); + // Test revalidation + pool.add_orphan(d.clone()); + pool.add_orphan(e.clone()); + pool.add_orphan(f.clone()); + assert_eq!(pool.orphans.len(), 3); + pool.revalidate_orphans(&consensus).await; + assert_eq!(pool.orphans.len(), 2); + consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap(); + consensus.validate_and_insert_block(f.clone()).virtual_state_task.await.unwrap(); + pool.revalidate_orphans(&consensus).await; + assert!(pool.orphans.is_empty()); + + drop((a, b, c, d, e, f)); } } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 874676d99..e4a8542cc 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -251,7 +251,10 @@ impl HandleRelayInvsFlow { .await?; let msg = dequeue_with_timeout!(self.msg_route, Payload::BlockLocator)?; let locator_hashes: Vec = msg.try_into()?; - for h in locator_hashes { + // Locator hashes are sent from later to earlier, so it makes sense to query consensus in reverse. Technically + // with current syncer-side implementations (in both go-kaspa and this codebase) we could query only the last one, + // but we prefer not relying on such details for correctness + for h in locator_hashes.into_iter().rev() { if consensus.async_get_block_status(h).await.is_some_and(|s| s.has_block_body()) { return Ok(true); } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index ba0ad53f4..27d98ffc5 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -129,7 +129,12 @@ impl IbdFlow { // Relay block might be in the antipast of syncer sink, thus // check its past for missing bodies as well. - self.sync_missing_block_bodies(&session, relay_block.hash()).await + self.sync_missing_block_bodies(&session, relay_block.hash()).await?; + + // Following IBD we revalidate orphans since many of them might have been processed during the IBD + self.ctx.revalidate_orphans(&session).await; + + Ok(()) } async fn determine_ibd_type( diff --git a/simpa/src/main.rs b/simpa/src/main.rs index a7573e499..d0f3d01be 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -325,7 +325,7 @@ async fn validate(src_consensus: &Consensus, dst_consensus: &Consensus, params: for (i, mut chunk) in iter.enumerate() { let current_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk); let statuses = try_join_all(prev_joins).await.unwrap(); - info!("Validated chunk {}", i); + trace!("Validated chunk {}", i); assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending())); prev_joins = current_joins; }