From ff8ddd71106b0b37124a582138accf3ffadddbb5 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 15 Aug 2023 20:38:17 +0300 Subject: [PATCH 01/11] Add pruning point list validations --- components/consensusmanager/src/session.rs | 10 +- consensus/core/src/api/mod.rs | 8 ++ consensus/src/consensus/mod.rs | 17 ++++ consensus/src/consensus/services.rs | 4 +- .../pipeline/virtual_processor/processor.rs | 27 ++++++ consensus/src/processes/pruning.rs | 97 ++++++++++++++++++- protocol/flows/src/v5/ibd/flow.rs | 16 ++- 7 files changed, 169 insertions(+), 10 deletions(-) diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index 31f22c452..afa01408b 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -10,7 +10,7 @@ use kaspa_consensus_core::{ blockstatus::BlockStatus, errors::consensus::ConsensusResult, header::Header, - pruning::{PruningPointProof, PruningPointTrustedData}, + pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList}, trusted::{ExternalGhostdagData, TrustedBlock}, tx::{Transaction, TransactionOutpoint, UtxoEntry}, BlockHashSet, ChainPath, Hash, @@ -350,6 +350,14 @@ impl ConsensusSessionOwned { ) -> ConsensusResult { self.clone().spawn_blocking(move |c| c.estimate_network_hashes_per_second(start_hash, window_size)).await } + + pub async fn async_validate_pruning_points(&self) -> ConsensusResult<()> { + self.clone().spawn_blocking(move |c| c.validate_pruning_points()).await + } + + pub async fn async_are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { + self.clone().spawn_blocking(move |c| c.are_pruning_points_violating_finality(pp_list)).await + } } pub type ConsensusProxy = ConsensusSessionOwned; diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 6bc21677b..49f99a3a9 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -256,6 +256,14 @@ pub trait ConsensusApi: Send + Sync { fn estimate_network_hashes_per_second(&self, start_hash: Option, window_size: usize) -> ConsensusResult { unimplemented!() } + + fn validate_pruning_points(&self) -> ConsensusResult<()> { + unimplemented!() + } + + fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { + unimplemented!() + } } pub type DynConsensus = Arc; diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index aea4a958a..35a955edb 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -508,6 +508,19 @@ impl ConsensusApi for Consensus { self.virtual_processor.import_pruning_point_utxo_set(new_pruning_point, imported_utxo_multiset) } + fn validate_pruning_points(&self) -> ConsensusResult<()> { + let pp_info = self.pruning_point_store.read().get().unwrap(); + if !self.services.pruning_point_manager.is_valid_pruning_point(pp_info.pruning_point) { + return Err(ConsensusError::General("invalid pruning point candidate")); + } + + if !self.services.pruning_point_manager.are_pruning_points_in_valid_chain(pp_info) { + return Err(ConsensusError::General("past pruning points do not form a valid chain")); + } + + Ok(()) + } + fn header_exists(&self, hash: Hash) -> bool { match self.statuses_store.read().get(hash).unwrap_option() { Some(status) => status.has_block_header(), @@ -712,4 +725,8 @@ impl ConsensusApi for Consensus { } } } + + fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { + self.virtual_processor.are_pruning_points_violating_finality(pp_list) + } } diff --git a/consensus/src/consensus/services.rs b/consensus/src/consensus/services.rs index 347a96593..f0b791ba2 100644 --- a/consensus/src/consensus/services.rs +++ b/consensus/src/consensus/services.rs @@ -37,7 +37,8 @@ pub type DbSyncManager = SyncManager< DbStatusesStore, >; -pub type DbPruningPointManager = PruningPointManager; +pub type DbPruningPointManager = + PruningPointManager; pub type DbBlockDepthManager = BlockDepthManager; pub type DbParentsManager = ParentsManager>; @@ -153,6 +154,7 @@ impl ConsensusServices { storage.ghostdag_primary_store.clone(), storage.headers_store.clone(), storage.past_pruning_points_store.clone(), + storage.headers_selected_tip_store.clone(), ); let parents_manager = ParentsManager::new( diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index ff22bcdc4..449e6c1d9 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -54,6 +54,7 @@ use kaspa_consensus_core::{ config::genesis::GenesisBlock, header::Header, merkle::calc_hash_merkle_root, + pruning::PruningPointsList, tx::{MutableTransaction, Transaction}, utxo::{ utxo_diff::UtxoDiff, @@ -932,6 +933,32 @@ impl VirtualStateProcessor { Ok(()) } + + pub fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { + // Ideally we would want to check if the last known pruning point has the finality point + // in its chain, but in some cases it's impossible: let `lkp` be the last known pruning + // point from the list, and `fup` be the first unknown pruning point (the one following `lkp`). + // fup.blue_score - lkp.blue_score ∈ [finality_depth, finality_depth + k], so it's possible for + // `lkp` not to have the finality point in its past (if `fup` is close to the sink). So we have + // no choice but to check if `lkp` has `finality_point.finality_point` in its chain, meaning + // this function can only detect finality violations in depth of 2*finality_depth. + let current_pp = self.pruning_point_store.read().pruning_point().unwrap(); + let vf = self.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, current_pp); + let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp); + + let last_knonwn_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() { + Some(status) => status.is_valid(), + None => false, + }); + + if let Some(last_knonwn_pp) = last_knonwn_pp { + !self.reachability_service.is_chain_ancestor_of(vff, last_knonwn_pp.hash) + } else { + // If no pruning point is known, there's definitely a finality violation + // (normally at least genesis should be known). + true + } + } } enum MergesetIncreaseResult { diff --git a/consensus/src/processes/pruning.rs b/consensus/src/processes/pruning.rs index 7f34eaa42..5f4be43e6 100644 --- a/consensus/src/processes/pruning.rs +++ b/consensus/src/processes/pruning.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; use super::reachability::ReachabilityResultExtensions; use crate::model::{ @@ -6,12 +6,14 @@ use crate::model::{ stores::{ ghostdag::{CompactGhostdagData, GhostdagStoreReader}, headers::HeaderStoreReader, + headers_selected_tip::HeadersSelectedTipStoreReader, past_pruning_points::PastPruningPointsStoreReader, pruning::PruningPointInfo, reachability::ReachabilityStoreReader, }, }; use kaspa_hashes::Hash; +use parking_lot::RwLock; #[derive(Clone)] pub struct PruningPointManager< @@ -19,6 +21,7 @@ pub struct PruningPointManager< T: ReachabilityStoreReader, U: HeaderStoreReader, V: PastPruningPointsStoreReader, + W: HeadersSelectedTipStoreReader, > { pruning_depth: u64, finality_depth: u64, @@ -28,10 +31,16 @@ pub struct PruningPointManager< ghostdag_store: Arc, headers_store: Arc, past_pruning_points_store: Arc, + header_selected_tip_store: Arc>, } -impl - PruningPointManager +impl< + S: GhostdagStoreReader, + T: ReachabilityStoreReader, + U: HeaderStoreReader, + V: PastPruningPointsStoreReader, + W: HeadersSelectedTipStoreReader, + > PruningPointManager { pub fn new( pruning_depth: u64, @@ -41,6 +50,7 @@ impl, headers_store: Arc, past_pruning_points_store: Arc, + header_selected_tip_store: Arc>, ) -> Self { Self { pruning_depth, @@ -50,6 +60,7 @@ impl= pp_bs + self.pruning_depth } + + pub fn is_valid_pruning_point(&self, pp_candidate: Hash) -> bool { + if pp_candidate == self.genesis_hash { + return true; + } + + let hst = self.header_selected_tip_store.read().get().unwrap().hash; + + if !self.reachability_service.is_chain_ancestor_of(pp_candidate, hst) { + return false; + } + + let hst_bs = self.ghostdag_store.get_blue_score(hst).unwrap(); + self.is_pruning_point_in_pruning_depth(hst_bs, pp_candidate) + } + + pub fn are_pruning_points_in_valid_chain(&self, pruning_info: PruningPointInfo) -> bool { + // We want to validate that the past pruning points form a chain to genesis. Since + // each pruning point's header doesn't point to the previous pruning point, but to + // the pruning point from its POV, we can't just traverse from one pruning point to + // the next one by merely relying on the current pruning point header, but instead + // we rely on the fact that each pruning point is pointed by another known block or + // pruning point. + // So in the first stage we go over the selected chain and add to the queue of expected + // pruning points all the pruning points from the POV of some chain block. In the second + // stage we go over the past pruning points from recent to older, check that it's the head + // of the queue (by popping the queue), and add its header pruning point to the queue since + // we expect to see it later on the list. + // The first stage is important because the most recent pruning point is pointing to a few + // pruning points before, so the first few pruning points on the list won't be pointed by + // any other pruning point in the list, so we are compelled to check if it's refereced by + // the selected chain. + let mut expected_pps_queue = VecDeque::new(); + let hst = self.header_selected_tip_store.read().get().unwrap().hash; + for current in self.reachability_service.backward_chain_iterator(hst, pruning_info.pruning_point, false) { + let current_header = self.headers_store.get_header(current).unwrap(); + if expected_pps_queue.is_empty() || *expected_pps_queue.back().unwrap() != current_header.pruning_point { + expected_pps_queue.push_back(current_header.pruning_point); + } + } + + for idx in (0..=pruning_info.index).rev() { + let pp = self.past_pruning_points_store.get(idx).unwrap(); + let pp_header = self.headers_store.get_header(pp).unwrap(); + let Some(expected_pp) = expected_pps_queue.pop_front() else { + // If we have less than expected pruning points. + return false; + }; + + if expected_pp != pp { + return false; + } + + if idx == 0 { + // The 0th pruning point should always be genesis, and no + // more pruning points should be expected below it. + if !expected_pps_queue.is_empty() || pp != self.genesis_hash { + return false; + } + break; + } + + // Add the pruning point from the POV of the current one if it's + // not already added. + match expected_pps_queue.back() { + Some(last_added_pp) => { + if *last_added_pp != pp_header.pruning_point { + expected_pps_queue.push_back(pp_header.pruning_point); + } + } + None => { + // expected_pps_queue should always have one block in the queue + // until we reach genesis. + return false; + } + } + } + + true + } } #[cfg(test)] diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index cd8cbb0a3..035354b19 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -169,12 +169,13 @@ impl IbdFlow { ) -> Result<(), ProtocolError> { info!("Starting IBD with headers proof with peer {}", self.router); - let session = staging.session().await; + let staging_session = staging.session().await; - let pruning_point = self.sync_and_validate_pruning_proof(&session).await?; - self.sync_headers(&session, syncer_virtual_selected_parent, pruning_point, relay_block).await?; - self.validate_staging_timestamps(&self.ctx.consensus().session().await, &session).await?; - self.sync_pruning_point_utxoset(&session, pruning_point).await?; + let pruning_point = self.sync_and_validate_pruning_proof(&staging_session).await?; + self.sync_headers(&staging_session, syncer_virtual_selected_parent, pruning_point, relay_block).await?; + staging_session.async_validate_pruning_points().await?; + self.validate_staging_timestamps(&self.ctx.consensus().session().await, &staging_session).await?; + self.sync_pruning_point_utxoset(&staging_session, pruning_point).await?; Ok(()) } @@ -207,6 +208,11 @@ impl IbdFlow { return Err(ProtocolError::Other("the first pruning point in the list is expected to be genesis")); } + if consensus.async_are_pruning_points_violating_finality(pruning_points).await { + // TODO: Find a better way to deal with finality conflicts + return Err(ProtocolError::Other("pruning points are violating finality")); + } + // TODO: validate pruning points before importing let msg = dequeue_with_timeout!(self.incoming_route, Payload::TrustedData)?; From 88b9a1ff19f7c585067d868b56e2ac88ee12105b Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Wed, 16 Aug 2023 19:50:19 +0300 Subject: [PATCH 02/11] Don't do DownloadHeadersProof on a relatively synced node --- components/consensusmanager/src/session.rs | 8 ++++++++ consensus/core/src/api/mod.rs | 8 ++++++++ consensus/core/src/config/params.rs | 4 ++++ consensus/src/consensus/factory.rs | 2 ++ consensus/src/consensus/mod.rs | 14 ++++++++++++++ consensus/src/consensus/test_consensus.rs | 6 +++--- .../src/pipeline/virtual_processor/processor.rs | 2 +- protocol/flows/src/v5/ibd/flow.rs | 15 +++++++++++++-- simpa/src/main.rs | 8 +++++--- simpa/src/simulator/network.rs | 11 +++++++++-- 10 files changed, 67 insertions(+), 11 deletions(-) diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index afa01408b..28b1d2184 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -358,6 +358,14 @@ impl ConsensusSessionOwned { pub async fn async_are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { self.clone().spawn_blocking(move |c| c.are_pruning_points_violating_finality(pp_list)).await } + + pub async fn async_creation_timestamp(&self) -> u64 { + self.clone().spawn_blocking(move |c| c.creation_timestamp()).await + } + + pub async fn async_finality_point(&self) -> Hash { + self.clone().spawn_blocking(move |c| c.finality_point()).await + } } pub type ConsensusProxy = ConsensusSessionOwned; diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 49f99a3a9..b9662ed77 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -264,6 +264,14 @@ pub trait ConsensusApi: Send + Sync { fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { unimplemented!() } + + fn creation_timestamp(&self) -> u64 { + unimplemented!() + } + + fn finality_point(&self) -> Hash { + unimplemented!() + } } pub type DynConsensus = Arc; diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index b966745e8..f82918308 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -237,6 +237,10 @@ impl Params { pub fn default_rpc_port(&self) -> u16 { self.net.default_rpc_port() } + + pub fn finality_duration(&self) -> u64 { + self.target_time_per_block * self.finality_depth + } } impl From for Params { diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index 98da44488..2ff73b37a 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -202,6 +202,7 @@ impl ConsensusFactory for Factory { session_lock.clone(), self.notification_root.clone(), self.counters.clone(), + entry.creation_timestamp, )); // We write the new active entry only once the instance was created successfully. @@ -227,6 +228,7 @@ impl ConsensusFactory for Factory { session_lock.clone(), self.notification_root.clone(), self.counters.clone(), + entry.creation_timestamp, )); (ConsensusInstance::new(session_lock, consensus.clone()), Arc::new(Ctl::new(self.management_store.clone(), db, consensus))) diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 35a955edb..40661c63b 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -63,6 +63,7 @@ use crossbeam_channel::{ }; use itertools::Itertools; use kaspa_consensusmanager::{SessionLock, SessionReadGuard}; +use kaspa_core::time::unix_now; use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_muhash::MuHash; @@ -108,6 +109,9 @@ pub struct Consensus { // Config config: Arc, + + // Other + creation_timestamp: u64, } impl Deref for Consensus { @@ -125,6 +129,7 @@ impl Consensus { pruning_lock: SessionLock, notification_root: Arc, counters: Arc, + creation_timestamp: u64, ) -> Self { let params = &config.params; let perf_params = &config.perf; @@ -259,6 +264,7 @@ impl Consensus { notification_root, counters, config, + creation_timestamp, } } @@ -729,4 +735,12 @@ impl ConsensusApi for Consensus { fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool { self.virtual_processor.are_pruning_points_violating_finality(pp_list) } + + fn creation_timestamp(&self) -> u64 { + self.creation_timestamp + } + + fn finality_point(&self) -> Hash { + self.virtual_processor.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point()) + } } diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index e75092b6a..620722022 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -50,7 +50,7 @@ impl TestConsensus { pub fn with_db(db: Arc, config: &Config, notification_sender: Sender) -> Self { let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0)); let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() } @@ -61,7 +61,7 @@ impl TestConsensus { let (db_lifetime, db) = create_temp_db!(ConnBuilder::default()); let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0)); let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); Self { consensus, block_builder, params: config.params.clone(), db_lifetime } @@ -73,7 +73,7 @@ impl TestConsensus { let (dummy_notification_sender, _) = async_channel::unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0)); let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); Self { consensus, block_builder, params: config.params.clone(), db_lifetime } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 449e6c1d9..118af27f6 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -317,7 +317,7 @@ impl VirtualStateProcessor { .expect("expecting an open unbounded channel"); } - pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { + pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point); if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) { finality_point diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 035354b19..e316a1e7f 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -9,12 +9,13 @@ use futures::future::try_join_all; use kaspa_consensus_core::{ api::BlockValidationFuture, block::Block, + errors::consensus::ConsensusError, header::Header, pruning::{PruningPointProof, PruningPointsList}, BlockHashSet, }; use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy, StagingConsensus}; -use kaspa_core::{debug, info, warn}; +use kaspa_core::{debug, info, time::unix_now, warn}; use kaspa_hashes::Hash; use kaspa_muhash::MuHash; use kaspa_p2p_lib::{ @@ -107,6 +108,16 @@ impl IbdFlow { .await?; } IbdType::DownloadHeadersProof => { + if unix_now() - session.async_creation_timestamp().await > self.ctx.config.finality_duration() { + let f = session.async_finality_point().await; + let f_ts = session.async_get_header(f).await?.timestamp; + if unix_now() - f_ts < self.ctx.config.finality_duration() * 3 / 2 { + return Err(ProtocolError::Other( + "This consensus instance is already synced and cannot be replaced via pruning point proof", + )); + } + } + drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { @@ -208,7 +219,7 @@ impl IbdFlow { return Err(ProtocolError::Other("the first pruning point in the list is expected to be genesis")); } - if consensus.async_are_pruning_points_violating_finality(pruning_points).await { + if consensus.async_are_pruning_points_violating_finality(pruning_points.clone()).await { // TODO: Find a better way to deal with finality conflicts return Err(ProtocolError::Other("pruning points are violating finality")); } diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 428166388..123f21f72 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -19,7 +19,7 @@ use kaspa_consensus_core::{ BlockHashSet, BlockLevel, HashMapCustomHasher, }; use kaspa_consensus_notify::root::ConsensusNotificationRoot; -use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, trace, warn}; +use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, time::unix_now, trace, warn}; use kaspa_database::prelude::ConnBuilder; use kaspa_database::{create_temp_db, load_existing_db}; use kaspa_hashes::Hash; @@ -187,7 +187,8 @@ fn main() { }; let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus = Arc::new(Consensus::new(db, config.clone(), Default::default(), notification_root, Default::default())); + let consensus = + Arc::new(Consensus::new(db, config.clone(), Default::default(), notification_root, Default::default(), unix_now())); (consensus, lifetime) } else { let until = if args.target_blocks.is_none() { config.genesis.timestamp + args.sim_time * 1000 } else { u64::MAX }; // milliseconds @@ -215,7 +216,8 @@ fn main() { let (_lifetime2, db2) = create_temp_db!(ConnBuilder::default().with_parallelism(num_cpus::get())); let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus2 = Arc::new(Consensus::new(db2, config.clone(), Default::default(), notification_root, Default::default())); + let consensus2 = + Arc::new(Consensus::new(db2, config.clone(), Default::default(), notification_root, Default::default(), unix_now())); let handles2 = consensus2.run_processors(); rt.block_on(validate(&consensus, &consensus2, &config, args.delay, args.bps)); consensus2.shutdown(handles2); diff --git a/simpa/src/simulator/network.rs b/simpa/src/simulator/network.rs index f20b68d1d..82e75178e 100644 --- a/simpa/src/simulator/network.rs +++ b/simpa/src/simulator/network.rs @@ -1,5 +1,6 @@ use async_channel::unbounded; use kaspa_consensus_notify::root::ConsensusNotificationRoot; +use kaspa_core::time::unix_now; use std::sync::Arc; use std::thread::JoinHandle; @@ -75,8 +76,14 @@ impl KaspaNetworkSimulator { let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus = - Arc::new(Consensus::new(db, self.config.clone(), Default::default(), notification_root, Default::default())); + let consensus = Arc::new(Consensus::new( + db, + self.config.clone(), + Default::default(), + notification_root, + Default::default(), + unix_now(), + )); let handles = consensus.run_processors(); let (sk, pk) = secp.generate_keypair(&mut rng); let miner_process = Box::new(Miner::new( From fd4043546210f01aab69ce70cd4c0f8f23f6a576 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 24 Aug 2023 08:48:36 +0000 Subject: [PATCH 03/11] fmt --- consensus/src/consensus/mod.rs | 3 ++- protocol/flows/src/v5/ibd/flow.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 40661c63b..46d6285d2 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -741,6 +741,7 @@ impl ConsensusApi for Consensus { } fn finality_point(&self) -> Hash { - self.virtual_processor.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point()) + self.virtual_processor + .virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point()) } } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index e316a1e7f..28f046818 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -117,7 +117,7 @@ impl IbdFlow { )); } } - + drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { From 2bb308102ed08933c4e6f13b2d1d88be2ec1fd01 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 24 Aug 2023 08:49:42 +0000 Subject: [PATCH 04/11] clippy --- consensus/src/consensus/mod.rs | 2 +- protocol/flows/src/v5/ibd/flow.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 46d6285d2..c8004ed69 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -63,7 +63,7 @@ use crossbeam_channel::{ }; use itertools::Itertools; use kaspa_consensusmanager::{SessionLock, SessionReadGuard}; -use kaspa_core::time::unix_now; + use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_muhash::MuHash; diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 28f046818..f54052dfe 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -9,7 +9,6 @@ use futures::future::try_join_all; use kaspa_consensus_core::{ api::BlockValidationFuture, block::Block, - errors::consensus::ConsensusError, header::Header, pruning::{PruningPointProof, PruningPointsList}, BlockHashSet, From 937600fd4aea8eb108f063878737b6d5a1e8da6a Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Fri, 25 Aug 2023 14:51:49 +0000 Subject: [PATCH 05/11] address review comments --- protocol/flows/src/v5/ibd/flow.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index f54052dfe..a28f5ee77 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -107,16 +107,6 @@ impl IbdFlow { .await?; } IbdType::DownloadHeadersProof => { - if unix_now() - session.async_creation_timestamp().await > self.ctx.config.finality_duration() { - let f = session.async_finality_point().await; - let f_ts = session.async_get_header(f).await?.timestamp; - if unix_now() - f_ts < self.ctx.config.finality_duration() * 3 / 2 { - return Err(ProtocolError::Other( - "This consensus instance is already synced and cannot be replaced via pruning point proof", - )); - } - } - drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { @@ -164,6 +154,17 @@ impl IbdFlow { if relay_header.blue_score >= hst_header.blue_score + self.ctx.config.pruning_depth && relay_header.blue_work > hst_header.blue_work { + if unix_now() > consensus.async_creation_timestamp().await + self.ctx.config.finality_duration() { + let fp = consensus.async_finality_point().await; + let fp_ts = consensus.async_get_header(fp).await?.timestamp; + if unix_now() < fp_ts + self.ctx.config.finality_duration() * 3 / 2 { + // We reject the headers proof if the node has a relatively up-to-date finality point and current + // consensus has matured for long enough (and not recently synced). This is mostly a spam-protector + // since subsequent checks identify these violations as well + return Ok(IbdType::None); + } + } + // The relayed block has sufficient blue score and blue work over the current header selected tip Ok(IbdType::DownloadHeadersProof) } else { @@ -223,14 +224,14 @@ impl IbdFlow { return Err(ProtocolError::Other("pruning points are violating finality")); } - // TODO: validate pruning points before importing - let msg = dequeue_with_timeout!(self.incoming_route, Payload::TrustedData)?; let pkg: TrustedDataPackage = msg.try_into()?; debug!("received trusted data with {} daa entries and {} ghostdag entries", pkg.daa_window.len(), pkg.ghostdag_window.len()); let mut entry_stream = TrustedEntryStream::new(&self.router, &mut self.incoming_route); - let Some(pruning_point_entry) = entry_stream.next().await? else { return Err(ProtocolError::Other("got `done` message before receiving the pruning point")); }; + let Some(pruning_point_entry) = entry_stream.next().await? else { + return Err(ProtocolError::Other("got `done` message before receiving the pruning point")); + }; if pruning_point_entry.block.hash() != proof_pruning_point { return Err(ProtocolError::Other("the proof pruning point is not equal to the expected trusted entry")); From d8aba0091359ef1da08f6a0af430b5be66957792 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Sun, 27 Aug 2023 08:34:02 +0000 Subject: [PATCH 06/11] Handle another case of finality conflict --- protocol/flows/src/v5/ibd/flow.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index a28f5ee77..9d42af6e5 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -146,8 +146,12 @@ impl IbdFlow { return Ok(IbdType::Sync(highest_known_syncer_chain_hash)); } - // TODO: in this case we know a syncer chain block, but it violates our current finality. In some cases - // this info should possibly be used to reject the IBD despite having more blue work etc. + // If the pruning point is not in the chain of `highest_known_syncer_chain_hash`, it + // means it's in its antichain (because if `highest_known_syncer_chain_hash` was in + // the pruning point's past the pruning point itself would be + // `highest_known_syncer_chain_hash`). So it means there's a finality conflict. + // TODO: Find a better way to handle finality conflicts. + return Ok(IbdType::None); } let hst_header = consensus.async_get_header(consensus.async_get_headers_selected_tip().await).await.unwrap(); @@ -161,6 +165,7 @@ impl IbdFlow { // We reject the headers proof if the node has a relatively up-to-date finality point and current // consensus has matured for long enough (and not recently synced). This is mostly a spam-protector // since subsequent checks identify these violations as well + // TODO: Find a better way to handle finality conflicts. return Ok(IbdType::None); } } From edffc3fa8391fddd92dde269ae895ddab158440f Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Wed, 30 Aug 2023 14:36:14 +0000 Subject: [PATCH 07/11] Handle two TODOs --- protocol/flows/src/v5/ibd/flow.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 9d42af6e5..350dad835 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -207,7 +207,13 @@ impl IbdFlow { let proof_pruning_point = proof[0].last().expect("was just ensured by validation").hash; - // TODO: verify the proof pruning point is different than current consensus pruning point + if proof_pruning_point == self.ctx.config.genesis.hash { + return Err(ProtocolError::Other("the proof pruning point is the genesis block")); + } + + if proof_pruning_point == consensus.async_pruning_point().await { + return Err(ProtocolError::Other("the proof pruning point is the same as the current pruning point")); + } self.router .enqueue(make_message!(Payload::RequestPruningPointAndItsAnticone, RequestPruningPointAndItsAnticoneMessage {})) @@ -306,9 +312,6 @@ impl IbdFlow { consensus.validate_and_insert_trusted_block(tb).await?; } info!("Done processing trusted blocks"); - - // TODO: make sure that the proof pruning point is not genesis - Ok(proof_pruning_point) } From aa6236ca132865c13f3ba856227edb43093b3b21 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 31 Aug 2023 14:17:33 +0000 Subject: [PATCH 08/11] Address review comments --- consensus/src/consensus/mod.rs | 5 +++-- consensus/src/pipeline/virtual_processor/processor.rs | 10 ++++++---- consensus/src/processes/pruning.rs | 11 ++++------- protocol/flows/src/v5/ibd/flow.rs | 6 +++--- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 38e0d3268..e920b034d 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -519,12 +519,13 @@ impl ConsensusApi for Consensus { } fn validate_pruning_points(&self) -> ConsensusResult<()> { + let hst = self.storage.headers_selected_tip_store.read().get().unwrap().hash; let pp_info = self.pruning_point_store.read().get().unwrap(); - if !self.services.pruning_point_manager.is_valid_pruning_point(pp_info.pruning_point) { + if !self.services.pruning_point_manager.is_valid_pruning_point(pp_info.pruning_point, hst) { return Err(ConsensusError::General("invalid pruning point candidate")); } - if !self.services.pruning_point_manager.are_pruning_points_in_valid_chain(pp_info) { + if !self.services.pruning_point_manager.are_pruning_points_in_valid_chain(pp_info, hst) { return Err(ConsensusError::General("past pruning points do not form a valid chain")); } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 162f842c0..af0bd171c 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -938,10 +938,12 @@ impl VirtualStateProcessor { // Ideally we would want to check if the last known pruning point has the finality point // in its chain, but in some cases it's impossible: let `lkp` be the last known pruning // point from the list, and `fup` be the first unknown pruning point (the one following `lkp`). - // fup.blue_score - lkp.blue_score ∈ [finality_depth, finality_depth + k], so it's possible for - // `lkp` not to have the finality point in its past (if `fup` is close to the sink). So we have - // no choice but to check if `lkp` has `finality_point.finality_point` in its chain, meaning - // this function can only detect finality violations in depth of 2*finality_depth. + // fup.blue_score - lkp.blue_score ≈ finality_depth (±k), so it's possible for `lkp` not to + // have the finality point in its past. So we have no choice but to check if `lkp` + // has `finality_point.finality_point` in its chain (in the worst case `fup` is one block + // above the current finality point, and in this case `lkp` will be a few blocks above the + // finality_point.finality_point), meaning this function can only detect finality violations + // in depth of 2*finality_depth, and can give false negatives for smaller finality violations. let current_pp = self.pruning_point_store.read().pruning_point().unwrap(); let vf = self.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, current_pp); let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp); diff --git a/consensus/src/processes/pruning.rs b/consensus/src/processes/pruning.rs index 5f4be43e6..1d419c063 100644 --- a/consensus/src/processes/pruning.rs +++ b/consensus/src/processes/pruning.rs @@ -13,6 +13,7 @@ use crate::model::{ }, }; use kaspa_hashes::Hash; +use kaspa_utils::option::OptionExtensions; use parking_lot::RwLock; #[derive(Clone)] @@ -181,13 +182,10 @@ impl< pov_blue_score >= pp_bs + self.pruning_depth } - pub fn is_valid_pruning_point(&self, pp_candidate: Hash) -> bool { + pub fn is_valid_pruning_point(&self, pp_candidate: Hash, hst: Hash) -> bool { if pp_candidate == self.genesis_hash { return true; } - - let hst = self.header_selected_tip_store.read().get().unwrap().hash; - if !self.reachability_service.is_chain_ancestor_of(pp_candidate, hst) { return false; } @@ -196,7 +194,7 @@ impl< self.is_pruning_point_in_pruning_depth(hst_bs, pp_candidate) } - pub fn are_pruning_points_in_valid_chain(&self, pruning_info: PruningPointInfo) -> bool { + pub fn are_pruning_points_in_valid_chain(&self, pruning_info: PruningPointInfo, hst: Hash) -> bool { // We want to validate that the past pruning points form a chain to genesis. Since // each pruning point's header doesn't point to the previous pruning point, but to // the pruning point from its POV, we can't just traverse from one pruning point to @@ -213,10 +211,9 @@ impl< // any other pruning point in the list, so we are compelled to check if it's refereced by // the selected chain. let mut expected_pps_queue = VecDeque::new(); - let hst = self.header_selected_tip_store.read().get().unwrap().hash; for current in self.reachability_service.backward_chain_iterator(hst, pruning_info.pruning_point, false) { let current_header = self.headers_store.get_header(current).unwrap(); - if expected_pps_queue.is_empty() || *expected_pps_queue.back().unwrap() != current_header.pruning_point { + if expected_pps_queue.back().is_none_or(|&&h| h != current_header.pruning_point) { expected_pps_queue.push_back(current_header.pruning_point); } } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 350dad835..df416117a 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -150,7 +150,7 @@ impl IbdFlow { // means it's in its antichain (because if `highest_known_syncer_chain_hash` was in // the pruning point's past the pruning point itself would be // `highest_known_syncer_chain_hash`). So it means there's a finality conflict. - // TODO: Find a better way to handle finality conflicts. + // TODO: consider performing additional actions on finality conflicts in addition to disconnecting from the peer (e.g., banning, rpc notification) return Ok(IbdType::None); } @@ -165,7 +165,7 @@ impl IbdFlow { // We reject the headers proof if the node has a relatively up-to-date finality point and current // consensus has matured for long enough (and not recently synced). This is mostly a spam-protector // since subsequent checks identify these violations as well - // TODO: Find a better way to handle finality conflicts. + // TODO: consider performing additional actions on finality conflicts in addition to disconnecting from the peer (e.g., banning, rpc notification) return Ok(IbdType::None); } } @@ -231,7 +231,7 @@ impl IbdFlow { } if consensus.async_are_pruning_points_violating_finality(pruning_points.clone()).await { - // TODO: Find a better way to deal with finality conflicts + // TODO: consider performing additional actions on finality conflicts in addition to disconnecting from the peer (e.g., banning, rpc notification) return Err(ProtocolError::Other("pruning points are violating finality")); } From 4e40727560b874236f3aa651bdc3c6c9e7afd2e5 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 31 Aug 2023 16:24:03 +0000 Subject: [PATCH 09/11] Use real consensus in IBD with headers proof when needed --- consensus/src/processes/pruning_proof/mod.rs | 6 +++++ protocol/flows/src/v5/ibd/flow.rs | 23 ++++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index 8cf14c272..593760450 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -537,6 +537,12 @@ impl PruningProofManager { } } + if current_pp == self.genesis_hash { + // If the proof has better tips and the currnet pruning point is still + // genesis, we consider the proof state to be better. + return Ok(()); + } + for level in (0..=self.max_block_level).rev() { let level_idx = level as usize; match relations_read[level_idx].get_parents(current_pp).unwrap_option() { diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index df416117a..956e7d7b1 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -107,9 +107,11 @@ impl IbdFlow { .await?; } IbdType::DownloadHeadersProof => { - drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); - match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { + match self + .ibd_with_headers_proof(&staging, session, negotiation_output.syncer_virtual_selected_parent, &relay_block) + .await + { Ok(()) => { spawn_blocking(|| staging.commit()).await.unwrap(); self.ctx.on_pruning_point_utxoset_override(); @@ -180,6 +182,7 @@ impl IbdFlow { async fn ibd_with_headers_proof( &mut self, staging: &StagingConsensus, + consensus: ConsensusProxy, syncer_virtual_selected_parent: Hash, relay_block: &Block, ) -> Result<(), ProtocolError> { @@ -187,7 +190,7 @@ impl IbdFlow { let staging_session = staging.session().await; - let pruning_point = self.sync_and_validate_pruning_proof(&staging_session).await?; + let pruning_point = self.sync_and_validate_pruning_proof(&staging_session, consensus).await?; self.sync_headers(&staging_session, syncer_virtual_selected_parent, pruning_point, relay_block).await?; staging_session.async_validate_pruning_points().await?; self.validate_staging_timestamps(&self.ctx.consensus().session().await, &staging_session).await?; @@ -195,7 +198,11 @@ impl IbdFlow { Ok(()) } - async fn sync_and_validate_pruning_proof(&mut self, consensus: &ConsensusProxy) -> Result { + async fn sync_and_validate_pruning_proof( + &mut self, + staging: &ConsensusProxy, + consensus: ConsensusProxy, + ) -> Result { self.router.enqueue(make_message!(Payload::RequestPruningPointProof, RequestPruningPointProofMessage {})).await?; // Pruning proof generation and communication might take several minutes, so we allow a long 10 minute timeout @@ -235,6 +242,8 @@ impl IbdFlow { return Err(ProtocolError::Other("pruning points are violating finality")); } + drop(consensus); // Release the session of the old consensus for the IBD + let msg = dequeue_with_timeout!(self.incoming_route, Payload::TrustedData)?; let pkg: TrustedDataPackage = msg.try_into()?; debug!("received trusted data with {} daa entries and {} ghostdag entries", pkg.daa_window.len(), pkg.ghostdag_window.len()); @@ -256,7 +265,7 @@ impl IbdFlow { let mut trusted_set = pkg.build_trusted_subdag(entries)?; if self.ctx.config.enable_sanity_checks { - trusted_set = consensus + trusted_set = staging .clone() .spawn_blocking(move |c| { let ref_proof = proof.clone(); @@ -287,7 +296,7 @@ impl IbdFlow { }) .await; } else { - trusted_set = consensus + trusted_set = staging .clone() .spawn_blocking(move |c| { c.apply_pruning_proof(proof, &trusted_set); @@ -309,7 +318,7 @@ impl IbdFlow { last_index = i; } // TODO: queue and join in batches - consensus.validate_and_insert_trusted_block(tb).await?; + staging.validate_and_insert_trusted_block(tb).await?; } info!("Done processing trusted blocks"); Ok(proof_pruning_point) From fe8636c1a66ed612537038ff7dfc5912c929c00e Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Fri, 1 Sep 2023 14:23:03 +0300 Subject: [PATCH 10/11] acquire current consensus session only when needed --- protocol/flows/src/v5/ibd/flow.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 956e7d7b1..add81ae84 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -107,11 +107,9 @@ impl IbdFlow { .await?; } IbdType::DownloadHeadersProof => { + drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); - match self - .ibd_with_headers_proof(&staging, session, negotiation_output.syncer_virtual_selected_parent, &relay_block) - .await - { + match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { Ok(()) => { spawn_blocking(|| staging.commit()).await.unwrap(); self.ctx.on_pruning_point_utxoset_override(); @@ -182,7 +180,6 @@ impl IbdFlow { async fn ibd_with_headers_proof( &mut self, staging: &StagingConsensus, - consensus: ConsensusProxy, syncer_virtual_selected_parent: Hash, relay_block: &Block, ) -> Result<(), ProtocolError> { @@ -190,7 +187,7 @@ impl IbdFlow { let staging_session = staging.session().await; - let pruning_point = self.sync_and_validate_pruning_proof(&staging_session, consensus).await?; + let pruning_point = self.sync_and_validate_pruning_proof(&staging_session).await?; self.sync_headers(&staging_session, syncer_virtual_selected_parent, pruning_point, relay_block).await?; staging_session.async_validate_pruning_points().await?; self.validate_staging_timestamps(&self.ctx.consensus().session().await, &staging_session).await?; @@ -198,11 +195,7 @@ impl IbdFlow { Ok(()) } - async fn sync_and_validate_pruning_proof( - &mut self, - staging: &ConsensusProxy, - consensus: ConsensusProxy, - ) -> Result { + async fn sync_and_validate_pruning_proof(&mut self, staging: &ConsensusProxy) -> Result { self.router.enqueue(make_message!(Payload::RequestPruningPointProof, RequestPruningPointProofMessage {})).await?; // Pruning proof generation and communication might take several minutes, so we allow a long 10 minute timeout @@ -210,6 +203,10 @@ impl IbdFlow { let proof: PruningPointProof = msg.try_into()?; debug!("received proof with overall {} headers", proof.iter().map(|l| l.len()).sum::()); + // Get a new session for current consensus (non staging) + let consensus = self.ctx.consensus().session().await; + + // The proof is validated in the context of current consensus let proof = consensus.clone().spawn_blocking(move |c| c.validate_pruning_proof(&proof).map(|()| proof)).await?; let proof_pruning_point = proof[0].last().expect("was just ensured by validation").hash; @@ -222,6 +219,8 @@ impl IbdFlow { return Err(ProtocolError::Other("the proof pruning point is the same as the current pruning point")); } + drop(consensus); + self.router .enqueue(make_message!(Payload::RequestPruningPointAndItsAnticone, RequestPruningPointAndItsAnticoneMessage {})) .await?; @@ -237,13 +236,12 @@ impl IbdFlow { return Err(ProtocolError::Other("the first pruning point in the list is expected to be genesis")); } - if consensus.async_are_pruning_points_violating_finality(pruning_points.clone()).await { + // Check if past pruning points violate finality of current consensus + if self.ctx.consensus().session().await.async_are_pruning_points_violating_finality(pruning_points.clone()).await { // TODO: consider performing additional actions on finality conflicts in addition to disconnecting from the peer (e.g., banning, rpc notification) return Err(ProtocolError::Other("pruning points are violating finality")); } - drop(consensus); // Release the session of the old consensus for the IBD - let msg = dequeue_with_timeout!(self.incoming_route, Payload::TrustedData)?; let pkg: TrustedDataPackage = msg.try_into()?; debug!("received trusted data with {} daa entries and {} ghostdag entries", pkg.daa_window.len(), pkg.ghostdag_window.len()); From a8a1b84bf74b0a597269ae24347e5d83a60ff945 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Fri, 1 Sep 2023 14:29:06 +0300 Subject: [PATCH 11/11] typo --- consensus/src/pipeline/virtual_processor/processor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 4ed98e6bf..5a0c182de 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -949,13 +949,13 @@ impl VirtualStateProcessor { let vf = self.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, current_pp); let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp); - let last_knonwn_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() { + let last_known_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() { Some(status) => status.is_valid(), None => false, }); - if let Some(last_knonwn_pp) = last_knonwn_pp { - !self.reachability_service.is_chain_ancestor_of(vff, last_knonwn_pp.hash) + if let Some(last_known_pp) = last_known_pp { + !self.reachability_service.is_chain_ancestor_of(vff, last_known_pp.hash) } else { // If no pruning point is known, there's definitely a finality violation // (normally at least genesis should be known).