diff --git a/consensus/core/src/blockstatus.rs b/consensus/core/src/blockstatus.rs index 573b60a09..68041345a 100644 --- a/consensus/core/src/blockstatus.rs +++ b/consensus/core/src/blockstatus.rs @@ -43,4 +43,8 @@ impl BlockStatus { pub fn is_valid(self) -> bool { self != BlockStatus::StatusInvalid } + + pub fn is_invalid(self) -> bool { + self == BlockStatus::StatusInvalid + } } diff --git a/consensus/core/src/config/bps.rs b/consensus/core/src/config/bps.rs index 975e8df0f..2f30e0536 100644 --- a/consensus/core/src/config/bps.rs +++ b/consensus/core/src/config/bps.rs @@ -58,6 +58,18 @@ impl Bps { let val = (Self::ghostdag_k() / 2) as u8; if val < 10 { 10 + // TODO (TEMP): uncomment when restarting TN11 or when implementing a TN11 HF + /* + } else if val > 16 { + // We currently limit the number of parents by 16 in order to preserve processing performance + // and to prevent number of parent references per network round from growing quadratically with + // BPS. As BPS might grow beyond 10 this will mean that blocks will reference less parents than + // the average number of DAG tips. Which means relying on randomness between network peers for ensuring + // that all tips are eventually merged. We conjecture that with high probability every block will + // be merged after a log number of rounds. For mainnet this requires an increase to the value of GHOSTDAG + // K accompanied by a short security analysis, or moving to the parameterless DAGKNIGHT. + 16 + */ } else { val } @@ -67,6 +79,16 @@ impl Bps { Self::ghostdag_k() as u64 * 10 } + // TODO (TEMP): rename to mergeset_size_limit when restarting TN11 or when implementing a TN11 HF + pub const fn _mergeset_size_limit() -> u64 { + let val = Self::ghostdag_k() as u64 * 2; + if val < 180 { + 180 + } else { + val + } + } + pub const fn merge_depth_bound() -> u64 { BPS * MERGE_DEPTH_DURATION } @@ -87,7 +109,7 @@ impl Bps { } pub const fn pruning_proof_m() -> u64 { - // No need to scale this constant with BPS since the important block levels (higher) remain logarithmically long + // No need to scale this constant with BPS since the important block levels (higher) remain logarithmically short PRUNING_PROOF_M } diff --git a/consensus/core/src/config/constants.rs b/consensus/core/src/config/constants.rs index 5fe8f20bd..fb0c018f0 100644 --- a/consensus/core/src/config/constants.rs +++ b/consensus/core/src/config/constants.rs @@ -59,7 +59,7 @@ pub mod consensus { /// Minimal size of the difficulty window. Affects the DA algorithm only at the starting period of a new net pub const MIN_DIFFICULTY_WINDOW_LEN: usize = 10; - /// **Legacy** difficulty adjustment window size corresponding to ~46 minutes with 1 BPS + /// **Legacy** difficulty adjustment window size corresponding to ~44 minutes with 1 BPS pub const LEGACY_DIFFICULTY_WINDOW_SIZE: usize = 2641; /// **New** difficulty window duration expressed in time units (seconds). diff --git a/consensus/core/src/config/genesis.rs b/consensus/core/src/config/genesis.rs index 1f714b686..536d38996 100644 --- a/consensus/core/src/config/genesis.rs +++ b/consensus/core/src/config/genesis.rs @@ -143,10 +143,24 @@ pub const TESTNET_GENESIS: GenesisBlock = GenesisBlock { pub const TESTNET11_GENESIS: GenesisBlock = GenesisBlock { hash: Hash::from_bytes([ - 0x5a, 0x90, 0xf8, 0x71, 0x09, 0x32, 0x3d, 0x61, 0x41, 0xff, 0x51, 0x04, 0xa2, 0xd5, 0xf8, 0xd8, 0x85, 0x7a, 0x6f, 0x39, 0x2e, - 0xb4, 0x90, 0x5c, 0xe3, 0x55, 0x5e, 0xc9, 0x12, 0xcd, 0xfb, 0x9c, + 0x3c, 0x8d, 0x1b, 0xea, 0xd4, 0x65, 0xd4, 0xf7, 0x93, 0xf4, 0xb5, 0x20, 0x1a, 0x99, 0x22, 0x43, 0x75, 0xe2, 0x3b, 0x33, 0xd4, + 0x54, 0x9f, 0x36, 0x07, 0xc2, 0xa9, 0xf9, 0x51, 0xe4, 0xec, 0xc4, + ]), + hash_merkle_root: Hash::from_bytes([ + 0xf5, 0xd7, 0x3a, 0xef, 0xd9, 0x1d, 0x82, 0x1a, 0xde, 0x39, 0xc8, 0x9a, 0x73, 0x2d, 0xc2, 0x1b, 0xa0, 0xab, 0x82, 0x42, 0xe0, + 0x48, 0xdc, 0x2f, 0x26, 0x88, 0x1d, 0x6e, 0x7c, 0x00, 0x4e, 0xe0, ]), bits: 504155340, // see `gen_testnet11_genesis` + #[rustfmt::skip] + coinbase_payload: &[ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Blue score + 0x00, 0xE1, 0xF5, 0x05, 0x00, 0x00, 0x00, 0x00, // Subsidy + 0x00, 0x00, // Script version + 0x01, // Varint + 0x00, // OP-FALSE + 0x6b, 0x61, 0x73, 0x70, 0x61, 0x2d, 0x74, 0x65, 0x73, 0x74, 0x6e, 0x65, 0x74, // kaspa-testnet + 11, 3 // TN11, Relaunch 3 + ], ..TESTNET_GENESIS }; @@ -217,7 +231,7 @@ mod tests { fn test_genesis_hashes() { [GENESIS, TESTNET_GENESIS, TESTNET11_GENESIS, SIMNET_GENESIS, DEVNET_GENESIS].into_iter().for_each(|genesis| { let block: Block = (&genesis).into(); - assert_eq!(calc_hash_merkle_root(block.transactions.iter()), block.header.hash_merkle_root); + assert_hashes_eq(calc_hash_merkle_root(block.transactions.iter()), block.header.hash_merkle_root); assert_hashes_eq(block.hash(), genesis.hash); }); } diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index 178654552..18a3c8489 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -216,10 +216,22 @@ impl Params { /// Returns whether the sink timestamp is recent enough and the node is considered synced or nearly synced. pub fn is_nearly_synced(&self, sink_timestamp: u64, sink_daa_score: u64) -> bool { - // We consider the node close to being synced if the sink (virtual selected parent) block - // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would - // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty - unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score) + if self.net.is_mainnet() { + // We consider the node close to being synced if the sink (virtual selected parent) block + // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would + // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty + unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score) + } else { + // For testnets we consider the node to be synced if the sink timestamp is within a time range which + // is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically. + // + // This period is smaller than the above mainnet calculation in order to ensure that an IBDing miner + // with significant testnet hashrate does not overwhelm the network with deep side-DAGs. + // + // We use DAA duration as baseline and scale it down with BPS + let max_expected_duration_without_blocks_in_milliseconds = self.target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION; // = DAA duration in milliseconds / bps + unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds + } } pub fn network_name(&self) -> String { diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index d74120bb8..b7cae8102 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -80,7 +80,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender use itertools::Itertools; use kaspa_utils::binary_heap::BinaryHeapExtensions; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use rand::seq::SliceRandom; +use rand::{seq::SliceRandom, Rng}; use rayon::{ prelude::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator}, ThreadPool, @@ -176,8 +176,10 @@ impl VirtualStateProcessor { thread_pool, genesis: params.genesis.clone(), - max_block_parents: params.max_block_parents, - mergeset_size_limit: params.mergeset_size_limit, + // TODO (TEMP): remove TN11 bounds when restarting/HF TN11, see comments in bps.rs + // (changing these values here is a way to influence the mined templates w/o breaking consensus) + max_block_parents: params.max_block_parents.min(16), + mergeset_size_limit: params.mergeset_size_limit.min(248), pruning_depth: params.pruning_depth, db, @@ -522,7 +524,9 @@ impl VirtualStateProcessor { drop(selected_chain_write); } - /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation + /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation. + /// + /// Guaranteed to be `>= self.max_block_parents` fn max_virtual_parent_candidates(&self) -> usize { // Limit to max_block_parents x 3 candidates. This way we avoid going over thousands of tips when the network isn't healthy. // There's no specific reason for a factor of 3, and its not a consensus rule, just an estimation for reducing the amount @@ -574,11 +578,7 @@ impl VirtualStateProcessor { let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default(); return ( candidate, - heap.into_sorted_iter() - .take(self.max_virtual_parent_candidates()) - .take_while(|s| s.blue_work >= filtering_blue_work) - .map(|s| s.hash) - .collect(), + heap.into_sorted_iter().take_while(|s| s.blue_work >= filtering_blue_work).map(|s| s.hash).collect(), ); } else { debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate) @@ -614,16 +614,32 @@ impl VirtualStateProcessor { // TODO: tests // Mergeset increasing might traverse DAG areas which are below the finality point and which theoretically - // can borderline with pruned data, hence we acquire the prune lock to insure data consistency. Note that + // can borderline with pruned data, hence we acquire the prune lock to ensure data consistency. Note that // the final selected mergeset can never be pruned (this is the essence of the prunality proof), however // we might touch such data prior to validating the bounded merge rule. All in all, this function is short // enough so we avoid making further optimizations let _prune_guard = self.pruning_lock.blocking_read(); let max_block_parents = self.max_block_parents as usize; + let max_candidates = self.max_virtual_parent_candidates(); // Prioritize half the blocks with highest blue work and pick the rest randomly to ensure diversity between nodes - if candidates.len() > max_block_parents / 2 { - // `make_contiguous` should be a no op since the deque was just built + if candidates.len() > max_candidates { + // make_contiguous should be a no op since the deque was just built + let slice = candidates.make_contiguous(); + + // Keep slice[..max_block_parents / 2] as is, choose max_candidates - max_block_parents / 2 in random + // from the remainder of the slice while swapping them to slice[max_block_parents / 2..max_candidates]. + // + // Inspired by rand::partial_shuffle (which lacks the guarantee on chosen elements location). + for i in max_block_parents / 2..max_candidates { + let j = rand::thread_rng().gen_range(i..slice.len()); // i < max_candidates < slice.len() + slice.swap(i, j); + } + + // Truncate the unchosen elements + candidates.truncate(max_candidates); + } else if candidates.len() > max_block_parents / 2 { + // Fallback to a simpler algo in this case candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng()); } diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index ad57a8fbc..ce4f0a264 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -59,6 +59,9 @@ const PROTOCOL_VERSION: u32 = 6; /// See `check_orphan_resolution_range` const BASELINE_ORPHAN_RESOLUTION_RANGE: u32 = 5; +/// Orphans are kept as full blocks so we cannot hold too much of them in memory +const MAX_ORPHANS_UPPER_BOUND: usize = 1024; + /// The min time to wait before allowing another parallel request const REQUEST_SCOPE_WAIT_TIME: Duration = Duration::from_secs(1); @@ -203,11 +206,11 @@ impl FlowContext { ) -> Self { let hub = Hub::new(); - let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32; + let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().ceil() as u32; - // The maximum amount of orphans allowed in the orphans pool. This number is an - // approximation of how many orphans there can possibly be on average. - let max_orphans = 2u64.pow(orphan_resolution_range) as usize * config.ghostdag_k as usize; + // The maximum amount of orphans allowed in the orphans pool. This number is an approximation + // of how many orphans there can possibly be on average bounded by an upper bound. + let max_orphans = (2u64.pow(orphan_resolution_range) as usize * config.ghostdag_k as usize).min(MAX_ORPHANS_UPPER_BOUND); Self { inner: Arc::new(FlowContextInner { node_id: Uuid::new_v4().into(), @@ -360,6 +363,10 @@ impl FlowContext { unorphaned_blocks } + pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) { + self.orphans_pool.write().await.revalidate_orphans(consensus).await + } + /// Adds the rpc-submitted block to the DAG and propagates it to peers. pub async fn submit_rpc_block(&self, consensus: &ConsensusProxy, block: Block) -> Result<(), ProtocolError> { if block.transactions.is_empty() { diff --git a/protocol/flows/src/flowcontext/orphans.rs b/protocol/flows/src/flowcontext/orphans.rs index e2f60a059..ae37f50f9 100644 --- a/protocol/flows/src/flowcontext/orphans.rs +++ b/protocol/flows/src/flowcontext/orphans.rs @@ -136,6 +136,27 @@ impl OrphanBlocksPool { } }) } + + /// Iterate all orphans and remove blocks which are no longer orphans. + /// This is important for the overall health of the pool and for ensuring that + /// orphan blocks don't evict due to pool size limit while already processed + /// blocks remain in it. Should be called following IBD. + pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) { + let mut i = 0; + while i < self.orphans.len() { + if let Some((&h, _)) = self.orphans.get_index(i) { + if consensus.async_get_block_status(h).await.is_some_and(|s| s.is_invalid() || s.has_block_body()) { + // If we swap removed do not advance i so that we revisit the new element moved + // to i in the next iteration. Loop will progress because len is shorter now. + self.orphans.swap_remove_index(i); + } else { + i += 1; + } + } else { + i += 1; + } + } + } } #[cfg(test)] @@ -183,6 +204,8 @@ mod tests { let b = Block::from_precomputed_hash(9.into(), vec![]); let c = Block::from_precomputed_hash(10.into(), roots.clone()); let d = Block::from_precomputed_hash(11.into(), vec![10.into()]); + let e = Block::from_precomputed_hash(12.into(), vec![10.into()]); + let f = Block::from_precomputed_hash(13.into(), vec![12.into()]); pool.add_orphan(c.clone()); pool.add_orphan(d.clone()); @@ -192,11 +215,24 @@ mod tests { consensus.validate_and_insert_block(a.clone()).virtual_state_task.await.unwrap(); consensus.validate_and_insert_block(b.clone()).virtual_state_task.await.unwrap(); + // Test unorphaning let (blocks, _, virtual_state_tasks) = pool.unorphan_blocks(&consensus, 8.into()).await; try_join_all(virtual_state_tasks).await.unwrap(); assert_eq!(blocks.into_iter().map(|b| b.hash()).collect::>(), HashSet::from([10.into(), 11.into()])); assert!(pool.orphans.is_empty()); - drop((a, b, c, d)); + // Test revalidation + pool.add_orphan(d.clone()); + pool.add_orphan(e.clone()); + pool.add_orphan(f.clone()); + assert_eq!(pool.orphans.len(), 3); + pool.revalidate_orphans(&consensus).await; + assert_eq!(pool.orphans.len(), 2); + consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap(); + consensus.validate_and_insert_block(f.clone()).virtual_state_task.await.unwrap(); + pool.revalidate_orphans(&consensus).await; + assert!(pool.orphans.is_empty()); + + drop((a, b, c, d, e, f)); } } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 874676d99..9d0698020 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -251,7 +251,12 @@ impl HandleRelayInvsFlow { .await?; let msg = dequeue_with_timeout!(self.msg_route, Payload::BlockLocator)?; let locator_hashes: Vec = msg.try_into()?; - for h in locator_hashes { + // Locator hashes are sent from later to earlier, so it makes sense to query consensus in reverse. Technically + // with current syncer-side implementations (in both go-kaspa and this codebase) we could query only the last one, + // but we prefer not relying on such details for correctness + // + // TODO: change syncer-side to only send the most early block since it's sufficient for our needs + for h in locator_hashes.into_iter().rev() { if consensus.async_get_block_status(h).await.is_some_and(|s| s.has_block_body()) { return Ok(true); } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index ba0ad53f4..27d98ffc5 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -129,7 +129,12 @@ impl IbdFlow { // Relay block might be in the antipast of syncer sink, thus // check its past for missing bodies as well. - self.sync_missing_block_bodies(&session, relay_block.hash()).await + self.sync_missing_block_bodies(&session, relay_block.hash()).await?; + + // Following IBD we revalidate orphans since many of them might have been processed during the IBD + self.ctx.revalidate_orphans(&session).await; + + Ok(()) } async fn determine_ibd_type( diff --git a/simpa/src/main.rs b/simpa/src/main.rs index a7573e499..d0f3d01be 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -325,7 +325,7 @@ async fn validate(src_consensus: &Consensus, dst_consensus: &Consensus, params: for (i, mut chunk) in iter.enumerate() { let current_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk); let statuses = try_join_all(prev_joins).await.unwrap(); - info!("Validated chunk {}", i); + trace!("Validated chunk {}", i); assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending())); prev_joins = current_joins; }