Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pruning point list validations #243

Merged
merged 14 commits into from
Sep 1, 2023
18 changes: 17 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use kaspa_consensus_core::{
blockstatus::BlockStatus,
errors::consensus::ConsensusResult,
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData},
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{Transaction, TransactionOutpoint, UtxoEntry},
BlockHashSet, ChainPath, Hash,
Expand Down Expand Up @@ -350,6 +350,22 @@ impl ConsensusSessionOwned {
) -> ConsensusResult<u64> {
self.clone().spawn_blocking(move |c| c.estimate_network_hashes_per_second(start_hash, window_size)).await
}

pub async fn async_validate_pruning_points(&self) -> ConsensusResult<()> {
self.clone().spawn_blocking(move |c| c.validate_pruning_points()).await
}

pub async fn async_are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
self.clone().spawn_blocking(move |c| c.are_pruning_points_violating_finality(pp_list)).await
}

pub async fn async_creation_timestamp(&self) -> u64 {
self.clone().spawn_blocking(move |c| c.creation_timestamp()).await
}

pub async fn async_finality_point(&self) -> Hash {
self.clone().spawn_blocking(move |c| c.finality_point()).await
}
}

pub type ConsensusProxy = ConsensusSessionOwned;
16 changes: 16 additions & 0 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,22 @@ pub trait ConsensusApi: Send + Sync {
fn estimate_network_hashes_per_second(&self, start_hash: Option<Hash>, window_size: usize) -> ConsensusResult<u64> {
unimplemented!()
}

fn validate_pruning_points(&self) -> ConsensusResult<()> {
unimplemented!()
}

fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
unimplemented!()
}

fn creation_timestamp(&self) -> u64 {
unimplemented!()
}

fn finality_point(&self) -> Hash {
unimplemented!()
}
}

pub type DynConsensus = Arc<dyn ConsensusApi>;
4 changes: 4 additions & 0 deletions consensus/core/src/config/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl Params {
pub fn default_rpc_port(&self) -> u16 {
self.net.default_rpc_port()
}

pub fn finality_duration(&self) -> u64 {
self.target_time_per_block * self.finality_depth
}
}

impl From<NetworkType> for Params {
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
entry.creation_timestamp,
));

// We write the new active entry only once the instance was created successfully.
Expand Down Expand Up @@ -235,6 +236,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
entry.creation_timestamp,
));

(ConsensusInstance::new(session_lock, consensus.clone()), Arc::new(Ctl::new(self.management_store.clone(), db, consensus)))
Expand Down
33 changes: 33 additions & 0 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crossbeam_channel::{
};
use itertools::Itertools;
use kaspa_consensusmanager::{SessionLock, SessionReadGuard};

use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
Expand Down Expand Up @@ -111,6 +112,9 @@ pub struct Consensus {

// Config
config: Arc<Config>,

// Other
creation_timestamp: u64,
}

impl Deref for Consensus {
Expand All @@ -128,6 +132,7 @@ impl Consensus {
pruning_lock: SessionLock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
creation_timestamp: u64,
) -> Self {
let params = &config.params;
let perf_params = &config.perf;
Expand Down Expand Up @@ -262,6 +267,7 @@ impl Consensus {
notification_root,
counters,
config,
creation_timestamp,
}
}

Expand Down Expand Up @@ -515,6 +521,20 @@ impl ConsensusApi for Consensus {
self.virtual_processor.import_pruning_point_utxo_set(new_pruning_point, imported_utxo_multiset)
}

fn validate_pruning_points(&self) -> ConsensusResult<()> {
let hst = self.storage.headers_selected_tip_store.read().get().unwrap().hash;
let pp_info = self.pruning_point_store.read().get().unwrap();
if !self.services.pruning_point_manager.is_valid_pruning_point(pp_info.pruning_point, hst) {
return Err(ConsensusError::General("invalid pruning point candidate"));
}

if !self.services.pruning_point_manager.are_pruning_points_in_valid_chain(pp_info, hst) {
return Err(ConsensusError::General("past pruning points do not form a valid chain"));
}

Ok(())
}

fn header_exists(&self, hash: Hash) -> bool {
match self.statuses_store.read().get(hash).unwrap_option() {
Some(status) => status.has_block_header(),
Expand Down Expand Up @@ -727,4 +747,17 @@ impl ConsensusApi for Consensus {
}
}
}

fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
self.virtual_processor.are_pruning_points_violating_finality(pp_list)
}

fn creation_timestamp(&self) -> u64 {
self.creation_timestamp
}

fn finality_point(&self) -> Hash {
self.virtual_processor
.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point())
}
}
4 changes: 3 additions & 1 deletion consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub type DbSyncManager = SyncManager<
DbStatusesStore,
>;

pub type DbPruningPointManager = PruningPointManager<DbGhostdagStore, DbReachabilityStore, DbHeadersStore, DbPastPruningPointsStore>;
pub type DbPruningPointManager =
PruningPointManager<DbGhostdagStore, DbReachabilityStore, DbHeadersStore, DbPastPruningPointsStore, DbHeadersSelectedTipStore>;
pub type DbBlockDepthManager = BlockDepthManager<DbDepthStore, DbReachabilityStore, DbGhostdagStore>;
pub type DbParentsManager = ParentsManager<DbHeadersStore, DbReachabilityStore, MTRelationsService<DbRelationsStore>>;

Expand Down Expand Up @@ -153,6 +154,7 @@ impl ConsensusServices {
storage.ghostdag_primary_store.clone(),
storage.headers_store.clone(),
storage.past_pruning_points_store.clone(),
storage.headers_selected_tip_store.clone(),
);

let parents_manager = ParentsManager::new(
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl TestConsensus {
pub fn with_db(db: Arc<DB>, config: &Config, notification_sender: Sender<Notification>) -> Self {
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() }
Expand All @@ -61,7 +61,7 @@ impl TestConsensus {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand All @@ -73,7 +73,7 @@ impl TestConsensus {
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand Down
31 changes: 30 additions & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use kaspa_consensus_core::{
config::genesis::GenesisBlock,
header::Header,
merkle::calc_hash_merkle_root,
pruning::PruningPointsList,
tx::{MutableTransaction, Transaction},
utxo::{
utxo_diff::UtxoDiff,
Expand Down Expand Up @@ -316,7 +317,7 @@ impl VirtualStateProcessor {
.expect("expecting an open unbounded channel");
}

pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point);
if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) {
finality_point
Expand Down Expand Up @@ -933,6 +934,34 @@ impl VirtualStateProcessor {

Ok(())
}

pub fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
// Ideally we would want to check if the last known pruning point has the finality point
someone235 marked this conversation as resolved.
Show resolved Hide resolved
// in its chain, but in some cases it's impossible: let `lkp` be the last known pruning
// point from the list, and `fup` be the first unknown pruning point (the one following `lkp`).
// fup.blue_score - lkp.blue_score ≈ finality_depth (±k), so it's possible for `lkp` not to
// have the finality point in its past. So we have no choice but to check if `lkp`
// has `finality_point.finality_point` in its chain (in the worst case `fup` is one block
// above the current finality point, and in this case `lkp` will be a few blocks above the
// finality_point.finality_point), meaning this function can only detect finality violations
// in depth of 2*finality_depth, and can give false negatives for smaller finality violations.
let current_pp = self.pruning_point_store.read().pruning_point().unwrap();
let vf = self.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, current_pp);
let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp);

let last_known_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() {
Some(status) => status.is_valid(),
None => false,
});

if let Some(last_known_pp) = last_known_pp {
!self.reachability_service.is_chain_ancestor_of(vff, last_known_pp.hash)
} else {
// If no pruning point is known, there's definitely a finality violation
// (normally at least genesis should be known).
true
}
}
}

enum MergesetIncreaseResult {
Expand Down
94 changes: 91 additions & 3 deletions consensus/src/processes/pruning.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};

use super::reachability::ReachabilityResultExtensions;
use crate::model::{
services::reachability::{MTReachabilityService, ReachabilityService},
stores::{
ghostdag::{CompactGhostdagData, GhostdagStoreReader},
headers::HeaderStoreReader,
headers_selected_tip::HeadersSelectedTipStoreReader,
past_pruning_points::PastPruningPointsStoreReader,
pruning::PruningPointInfo,
reachability::ReachabilityStoreReader,
},
};
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use parking_lot::RwLock;

#[derive(Clone)]
pub struct PruningPointManager<
S: GhostdagStoreReader,
T: ReachabilityStoreReader,
U: HeaderStoreReader,
V: PastPruningPointsStoreReader,
W: HeadersSelectedTipStoreReader,
> {
pruning_depth: u64,
finality_depth: u64,
Expand All @@ -28,10 +32,16 @@ pub struct PruningPointManager<
ghostdag_store: Arc<S>,
headers_store: Arc<U>,
past_pruning_points_store: Arc<V>,
header_selected_tip_store: Arc<RwLock<W>>,
}

impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V: PastPruningPointsStoreReader>
PruningPointManager<S, T, U, V>
impl<
S: GhostdagStoreReader,
T: ReachabilityStoreReader,
U: HeaderStoreReader,
V: PastPruningPointsStoreReader,
W: HeadersSelectedTipStoreReader,
> PruningPointManager<S, T, U, V, W>
{
pub fn new(
pruning_depth: u64,
Expand All @@ -41,6 +51,7 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
ghostdag_store: Arc<S>,
headers_store: Arc<U>,
past_pruning_points_store: Arc<V>,
header_selected_tip_store: Arc<RwLock<W>>,
) -> Self {
Self {
pruning_depth,
Expand All @@ -50,6 +61,7 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
ghostdag_store,
headers_store,
past_pruning_points_store,
header_selected_tip_store,
}
}

Expand Down Expand Up @@ -169,6 +181,82 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
let pp_bs = self.headers_store.get_blue_score(pruning_point).unwrap();
pov_blue_score >= pp_bs + self.pruning_depth
}

pub fn is_valid_pruning_point(&self, pp_candidate: Hash, hst: Hash) -> bool {
if pp_candidate == self.genesis_hash {
return true;
}
if !self.reachability_service.is_chain_ancestor_of(pp_candidate, hst) {
return false;
}

let hst_bs = self.ghostdag_store.get_blue_score(hst).unwrap();
self.is_pruning_point_in_pruning_depth(hst_bs, pp_candidate)
}

pub fn are_pruning_points_in_valid_chain(&self, pruning_info: PruningPointInfo, hst: Hash) -> bool {
// We want to validate that the past pruning points form a chain to genesis. Since
// each pruning point's header doesn't point to the previous pruning point, but to
// the pruning point from its POV, we can't just traverse from one pruning point to
// the next one by merely relying on the current pruning point header, but instead
// we rely on the fact that each pruning point is pointed by another known block or
// pruning point.
// So in the first stage we go over the selected chain and add to the queue of expected
// pruning points all the pruning points from the POV of some chain block. In the second
// stage we go over the past pruning points from recent to older, check that it's the head
// of the queue (by popping the queue), and add its header pruning point to the queue since
// we expect to see it later on the list.
// The first stage is important because the most recent pruning point is pointing to a few
// pruning points before, so the first few pruning points on the list won't be pointed by
// any other pruning point in the list, so we are compelled to check if it's refereced by
// the selected chain.
let mut expected_pps_queue = VecDeque::new();
for current in self.reachability_service.backward_chain_iterator(hst, pruning_info.pruning_point, false) {
let current_header = self.headers_store.get_header(current).unwrap();
if expected_pps_queue.back().is_none_or(|&&h| h != current_header.pruning_point) {
expected_pps_queue.push_back(current_header.pruning_point);
}
}

for idx in (0..=pruning_info.index).rev() {
let pp = self.past_pruning_points_store.get(idx).unwrap();
let pp_header = self.headers_store.get_header(pp).unwrap();
let Some(expected_pp) = expected_pps_queue.pop_front() else {
// If we have less than expected pruning points.
return false;
};

if expected_pp != pp {
return false;
}

if idx == 0 {
// The 0th pruning point should always be genesis, and no
// more pruning points should be expected below it.
if !expected_pps_queue.is_empty() || pp != self.genesis_hash {
return false;
}
break;
}

// Add the pruning point from the POV of the current one if it's
// not already added.
match expected_pps_queue.back() {
Some(last_added_pp) => {
if *last_added_pp != pp_header.pruning_point {
expected_pps_queue.push_back(pp_header.pruning_point);
}
}
None => {
// expected_pps_queue should always have one block in the queue
// until we reach genesis.
return false;
}
}
}

true
}
}

#[cfg(test)]
Expand Down
Loading