Skip to content

Commit

Permalink
Add pruning point list validations (kaspanet#243)
Browse files Browse the repository at this point in the history
* Add pruning point list validations

* Don't do DownloadHeadersProof on a relatively synced node

* fmt

* clippy

* address review comments

* Handle another case of finality conflict

* Handle two TODOs

* Address review comments

* Use real consensus in IBD with headers proof when needed

* acquire current consensus session only when needed

* typo
  • Loading branch information
someone235 authored Sep 1, 2023
1 parent 2ec8e21 commit 6c88c0d
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 31 deletions.
18 changes: 17 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use kaspa_consensus_core::{
blockstatus::BlockStatus,
errors::consensus::ConsensusResult,
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData},
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{Transaction, TransactionOutpoint, UtxoEntry},
BlockHashSet, ChainPath, Hash,
Expand Down Expand Up @@ -350,6 +350,22 @@ impl ConsensusSessionOwned {
) -> ConsensusResult<u64> {
self.clone().spawn_blocking(move |c| c.estimate_network_hashes_per_second(start_hash, window_size)).await
}

pub async fn async_validate_pruning_points(&self) -> ConsensusResult<()> {
self.clone().spawn_blocking(move |c| c.validate_pruning_points()).await
}

pub async fn async_are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
self.clone().spawn_blocking(move |c| c.are_pruning_points_violating_finality(pp_list)).await
}

pub async fn async_creation_timestamp(&self) -> u64 {
self.clone().spawn_blocking(move |c| c.creation_timestamp()).await
}

pub async fn async_finality_point(&self) -> Hash {
self.clone().spawn_blocking(move |c| c.finality_point()).await
}
}

pub type ConsensusProxy = ConsensusSessionOwned;
16 changes: 16 additions & 0 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,22 @@ pub trait ConsensusApi: Send + Sync {
fn estimate_network_hashes_per_second(&self, start_hash: Option<Hash>, window_size: usize) -> ConsensusResult<u64> {
unimplemented!()
}

fn validate_pruning_points(&self) -> ConsensusResult<()> {
unimplemented!()
}

fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
unimplemented!()
}

fn creation_timestamp(&self) -> u64 {
unimplemented!()
}

fn finality_point(&self) -> Hash {
unimplemented!()
}
}

pub type DynConsensus = Arc<dyn ConsensusApi>;
4 changes: 4 additions & 0 deletions consensus/core/src/config/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl Params {
pub fn default_rpc_port(&self) -> u16 {
self.net.default_rpc_port()
}

pub fn finality_duration(&self) -> u64 {
self.target_time_per_block * self.finality_depth
}
}

impl From<NetworkType> for Params {
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
entry.creation_timestamp,
));

// We write the new active entry only once the instance was created successfully.
Expand Down Expand Up @@ -235,6 +236,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
entry.creation_timestamp,
));

(ConsensusInstance::new(session_lock, consensus.clone()), Arc::new(Ctl::new(self.management_store.clone(), db, consensus)))
Expand Down
33 changes: 33 additions & 0 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crossbeam_channel::{
};
use itertools::Itertools;
use kaspa_consensusmanager::{SessionLock, SessionReadGuard};

use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
Expand Down Expand Up @@ -111,6 +112,9 @@ pub struct Consensus {

// Config
config: Arc<Config>,

// Other
creation_timestamp: u64,
}

impl Deref for Consensus {
Expand All @@ -128,6 +132,7 @@ impl Consensus {
pruning_lock: SessionLock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
creation_timestamp: u64,
) -> Self {
let params = &config.params;
let perf_params = &config.perf;
Expand Down Expand Up @@ -262,6 +267,7 @@ impl Consensus {
notification_root,
counters,
config,
creation_timestamp,
}
}

Expand Down Expand Up @@ -515,6 +521,20 @@ impl ConsensusApi for Consensus {
self.virtual_processor.import_pruning_point_utxo_set(new_pruning_point, imported_utxo_multiset)
}

fn validate_pruning_points(&self) -> ConsensusResult<()> {
let hst = self.storage.headers_selected_tip_store.read().get().unwrap().hash;
let pp_info = self.pruning_point_store.read().get().unwrap();
if !self.services.pruning_point_manager.is_valid_pruning_point(pp_info.pruning_point, hst) {
return Err(ConsensusError::General("invalid pruning point candidate"));
}

if !self.services.pruning_point_manager.are_pruning_points_in_valid_chain(pp_info, hst) {
return Err(ConsensusError::General("past pruning points do not form a valid chain"));
}

Ok(())
}

fn header_exists(&self, hash: Hash) -> bool {
match self.statuses_store.read().get(hash).unwrap_option() {
Some(status) => status.has_block_header(),
Expand Down Expand Up @@ -727,4 +747,17 @@ impl ConsensusApi for Consensus {
}
}
}

fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
self.virtual_processor.are_pruning_points_violating_finality(pp_list)
}

fn creation_timestamp(&self) -> u64 {
self.creation_timestamp
}

fn finality_point(&self) -> Hash {
self.virtual_processor
.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point())
}
}
4 changes: 3 additions & 1 deletion consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub type DbSyncManager = SyncManager<
DbStatusesStore,
>;

pub type DbPruningPointManager = PruningPointManager<DbGhostdagStore, DbReachabilityStore, DbHeadersStore, DbPastPruningPointsStore>;
pub type DbPruningPointManager =
PruningPointManager<DbGhostdagStore, DbReachabilityStore, DbHeadersStore, DbPastPruningPointsStore, DbHeadersSelectedTipStore>;
pub type DbBlockDepthManager = BlockDepthManager<DbDepthStore, DbReachabilityStore, DbGhostdagStore>;
pub type DbParentsManager = ParentsManager<DbHeadersStore, DbReachabilityStore, MTRelationsService<DbRelationsStore>>;

Expand Down Expand Up @@ -153,6 +154,7 @@ impl ConsensusServices {
storage.ghostdag_primary_store.clone(),
storage.headers_store.clone(),
storage.past_pruning_points_store.clone(),
storage.headers_selected_tip_store.clone(),
);

let parents_manager = ParentsManager::new(
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl TestConsensus {
pub fn with_db(db: Arc<DB>, config: &Config, notification_sender: Sender<Notification>) -> Self {
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() }
Expand All @@ -61,7 +61,7 @@ impl TestConsensus {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand All @@ -73,7 +73,7 @@ impl TestConsensus {
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand Down
31 changes: 30 additions & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use kaspa_consensus_core::{
config::genesis::GenesisBlock,
header::Header,
merkle::calc_hash_merkle_root,
pruning::PruningPointsList,
tx::{MutableTransaction, Transaction},
utxo::{
utxo_diff::UtxoDiff,
Expand Down Expand Up @@ -316,7 +317,7 @@ impl VirtualStateProcessor {
.expect("expecting an open unbounded channel");
}

pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point);
if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) {
finality_point
Expand Down Expand Up @@ -933,6 +934,34 @@ impl VirtualStateProcessor {

Ok(())
}

pub fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
// Ideally we would want to check if the last known pruning point has the finality point
// in its chain, but in some cases it's impossible: let `lkp` be the last known pruning
// point from the list, and `fup` be the first unknown pruning point (the one following `lkp`).
// fup.blue_score - lkp.blue_score ≈ finality_depth (±k), so it's possible for `lkp` not to
// have the finality point in its past. So we have no choice but to check if `lkp`
// has `finality_point.finality_point` in its chain (in the worst case `fup` is one block
// above the current finality point, and in this case `lkp` will be a few blocks above the
// finality_point.finality_point), meaning this function can only detect finality violations
// in depth of 2*finality_depth, and can give false negatives for smaller finality violations.
let current_pp = self.pruning_point_store.read().pruning_point().unwrap();
let vf = self.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, current_pp);
let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp);

let last_known_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() {
Some(status) => status.is_valid(),
None => false,
});

if let Some(last_known_pp) = last_known_pp {
!self.reachability_service.is_chain_ancestor_of(vff, last_known_pp.hash)
} else {
// If no pruning point is known, there's definitely a finality violation
// (normally at least genesis should be known).
true
}
}
}

enum MergesetIncreaseResult {
Expand Down
94 changes: 91 additions & 3 deletions consensus/src/processes/pruning.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};

use super::reachability::ReachabilityResultExtensions;
use crate::model::{
services::reachability::{MTReachabilityService, ReachabilityService},
stores::{
ghostdag::{CompactGhostdagData, GhostdagStoreReader},
headers::HeaderStoreReader,
headers_selected_tip::HeadersSelectedTipStoreReader,
past_pruning_points::PastPruningPointsStoreReader,
pruning::PruningPointInfo,
reachability::ReachabilityStoreReader,
},
};
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use parking_lot::RwLock;

#[derive(Clone)]
pub struct PruningPointManager<
S: GhostdagStoreReader,
T: ReachabilityStoreReader,
U: HeaderStoreReader,
V: PastPruningPointsStoreReader,
W: HeadersSelectedTipStoreReader,
> {
pruning_depth: u64,
finality_depth: u64,
Expand All @@ -28,10 +32,16 @@ pub struct PruningPointManager<
ghostdag_store: Arc<S>,
headers_store: Arc<U>,
past_pruning_points_store: Arc<V>,
header_selected_tip_store: Arc<RwLock<W>>,
}

impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V: PastPruningPointsStoreReader>
PruningPointManager<S, T, U, V>
impl<
S: GhostdagStoreReader,
T: ReachabilityStoreReader,
U: HeaderStoreReader,
V: PastPruningPointsStoreReader,
W: HeadersSelectedTipStoreReader,
> PruningPointManager<S, T, U, V, W>
{
pub fn new(
pruning_depth: u64,
Expand All @@ -41,6 +51,7 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
ghostdag_store: Arc<S>,
headers_store: Arc<U>,
past_pruning_points_store: Arc<V>,
header_selected_tip_store: Arc<RwLock<W>>,
) -> Self {
Self {
pruning_depth,
Expand All @@ -50,6 +61,7 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
ghostdag_store,
headers_store,
past_pruning_points_store,
header_selected_tip_store,
}
}

Expand Down Expand Up @@ -169,6 +181,82 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
let pp_bs = self.headers_store.get_blue_score(pruning_point).unwrap();
pov_blue_score >= pp_bs + self.pruning_depth
}

pub fn is_valid_pruning_point(&self, pp_candidate: Hash, hst: Hash) -> bool {
if pp_candidate == self.genesis_hash {
return true;
}
if !self.reachability_service.is_chain_ancestor_of(pp_candidate, hst) {
return false;
}

let hst_bs = self.ghostdag_store.get_blue_score(hst).unwrap();
self.is_pruning_point_in_pruning_depth(hst_bs, pp_candidate)
}

pub fn are_pruning_points_in_valid_chain(&self, pruning_info: PruningPointInfo, hst: Hash) -> bool {
// We want to validate that the past pruning points form a chain to genesis. Since
// each pruning point's header doesn't point to the previous pruning point, but to
// the pruning point from its POV, we can't just traverse from one pruning point to
// the next one by merely relying on the current pruning point header, but instead
// we rely on the fact that each pruning point is pointed by another known block or
// pruning point.
// So in the first stage we go over the selected chain and add to the queue of expected
// pruning points all the pruning points from the POV of some chain block. In the second
// stage we go over the past pruning points from recent to older, check that it's the head
// of the queue (by popping the queue), and add its header pruning point to the queue since
// we expect to see it later on the list.
// The first stage is important because the most recent pruning point is pointing to a few
// pruning points before, so the first few pruning points on the list won't be pointed by
// any other pruning point in the list, so we are compelled to check if it's refereced by
// the selected chain.
let mut expected_pps_queue = VecDeque::new();
for current in self.reachability_service.backward_chain_iterator(hst, pruning_info.pruning_point, false) {
let current_header = self.headers_store.get_header(current).unwrap();
if expected_pps_queue.back().is_none_or(|&&h| h != current_header.pruning_point) {
expected_pps_queue.push_back(current_header.pruning_point);
}
}

for idx in (0..=pruning_info.index).rev() {
let pp = self.past_pruning_points_store.get(idx).unwrap();
let pp_header = self.headers_store.get_header(pp).unwrap();
let Some(expected_pp) = expected_pps_queue.pop_front() else {
// If we have less than expected pruning points.
return false;
};

if expected_pp != pp {
return false;
}

if idx == 0 {
// The 0th pruning point should always be genesis, and no
// more pruning points should be expected below it.
if !expected_pps_queue.is_empty() || pp != self.genesis_hash {
return false;
}
break;
}

// Add the pruning point from the POV of the current one if it's
// not already added.
match expected_pps_queue.back() {
Some(last_added_pp) => {
if *last_added_pp != pp_header.pruning_point {
expected_pps_queue.push_back(pp_header.pruning_point);
}
}
None => {
// expected_pps_queue should always have one block in the queue
// until we reach genesis.
return false;
}
}
}

true
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 6c88c0d

Please sign in to comment.