Skip to content

Commit

Permalink
Merge pull request #3 from coderofstuff/long-process-exit
Browse files Browse the repository at this point in the history
Exit early on long pruning-related process when process is exiting
  • Loading branch information
michaelsutton committed Dec 21, 2023
2 parents 330a18a + 9b33de6 commit 4e05b43
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
3 changes: 3 additions & 0 deletions consensus/core/src/errors/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub enum PruningImportError {

#[error("pruning import data lead to validation rule error")]
PruningImportRuleError(#[from] RuleError),

#[error("process exit was initiated while validating pruning point proof")]
PruningValidationInterrupted,
}

pub type PruningImportResult<T> = std::result::Result<T, PruningImportError>;
30 changes: 26 additions & 4 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_txscript::caches::TxScriptCacheCounters;

use std::thread::{self, JoinHandle};
use std::{
future::Future,
iter::once,
ops::Deref,
sync::{atomic::Ordering, Arc},
};
use std::{
sync::atomic::AtomicBool,
thread::{self, JoinHandle},
};
use tokio::sync::oneshot;

use self::{services::ConsensusServices, storage::ConsensusStorage};
Expand Down Expand Up @@ -122,6 +125,9 @@ pub struct Consensus {

// Other
creation_timestamp: u64,

// Signals
is_consensus_exiting: Arc<AtomicBool>,
}

impl Deref for Consensus {
Expand All @@ -144,6 +150,7 @@ impl Consensus {
) -> Self {
let params = &config.params;
let perf_params = &config.perf;
let is_consensus_exiting: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));

//
// Storage layer
Expand All @@ -155,7 +162,13 @@ impl Consensus {
// Services and managers
//

let services = ConsensusServices::new(db.clone(), storage.clone(), config.clone(), tx_script_cache_counters);
let services = ConsensusServices::new(
db.clone(),
storage.clone(),
config.clone(),
tx_script_cache_counters,
is_consensus_exiting.clone(),
);

//
// Processor channels
Expand Down Expand Up @@ -249,8 +262,15 @@ impl Consensus {
counters.clone(),
));

let pruning_processor =
Arc::new(PruningProcessor::new(pruning_receiver, db.clone(), &storage, &services, pruning_lock.clone(), config.clone()));
let pruning_processor = Arc::new(PruningProcessor::new(
pruning_receiver,
db.clone(),
&storage,
&services,
pruning_lock.clone(),
config.clone(),
is_consensus_exiting.clone(),
));

// Ensure the relations stores are initialized
header_processor.init();
Expand Down Expand Up @@ -278,6 +298,7 @@ impl Consensus {
counters,
config,
creation_timestamp,
is_consensus_exiting,
}
}

Expand Down Expand Up @@ -333,6 +354,7 @@ impl Consensus {
}

pub fn signal_exit(&self) {
self.is_consensus_exiting.store(true, Ordering::Relaxed);
self.block_sender.send(BlockProcessingMessage::Exit).unwrap();
}

Expand Down
4 changes: 3 additions & 1 deletion consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{

use itertools::Itertools;
use kaspa_txscript::caches::TxScriptCacheCounters;
use std::sync::Arc;
use std::sync::{atomic::AtomicBool, Arc};

pub type DbGhostdagManager =
GhostdagManager<DbGhostdagStore, MTRelationsService<DbRelationsStore>, MTReachabilityService<DbReachabilityStore>, DbHeadersStore>;
Expand Down Expand Up @@ -71,6 +71,7 @@ impl ConsensusServices {
storage: Arc<ConsensusStorage>,
config: Arc<Config>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
is_consensus_exiting: Arc<AtomicBool>,
) -> Arc<Self> {
let params = &config.params;

Expand Down Expand Up @@ -185,6 +186,7 @@ impl ConsensusServices {
params.pruning_proof_m,
params.anticone_finalization_depth(),
params.ghostdag_k,
is_consensus_exiting,
));

let sync_manager = SyncManager::new(
Expand Down
16 changes: 15 additions & 1 deletion consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use rocksdb::WriteBatch;
use std::{
collections::VecDeque,
ops::Deref,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -76,6 +79,9 @@ pub struct PruningProcessor {

// Config
config: Arc<Config>,

// Signals
is_consensus_exiting: Arc<AtomicBool>,
}

impl Deref for PruningProcessor {
Expand All @@ -94,6 +100,7 @@ impl PruningProcessor {
services: &Arc<ConsensusServices>,
pruning_lock: SessionLock,
config: Arc<Config>,
is_consensus_exiting: Arc<AtomicBool>,
) -> Self {
Self {
receiver,
Expand All @@ -105,6 +112,7 @@ impl PruningProcessor {
pruning_proof_manager: services.pruning_proof_manager.clone(),
pruning_lock,
config,
is_consensus_exiting,
}
}

Expand Down Expand Up @@ -336,6 +344,12 @@ impl PruningProcessor {
// If we have the lock for more than a few milliseconds, release and recapture to allow consensus progress during pruning
if lock_acquire_time.elapsed() > Duration::from_millis(5) {
drop(reachability_read);
// An exit signal was received. Exit from this long running process.
if self.is_consensus_exiting.load(Ordering::Relaxed) {
drop(prune_guard);
info!("Header and Block pruning interrupted: Process is exiting");
return;
}
prune_guard.blocking_yield();
lock_acquire_time = Instant::now();
reachability_read = self.reachability_store.upgradable_read();
Expand Down
15 changes: 14 additions & 1 deletion consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{
collections::{hash_map::Entry, BinaryHeap},
collections::{hash_map::Entry::Vacant, VecDeque},
ops::{Deref, DerefMut},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use itertools::Itertools;
Expand Down Expand Up @@ -115,6 +118,8 @@ pub struct PruningProofManager {
pruning_proof_m: u64,
anticone_finalization_depth: u64,
ghostdag_k: KType,

is_consensus_exiting: Arc<AtomicBool>,
}

impl PruningProofManager {
Expand All @@ -132,6 +137,7 @@ impl PruningProofManager {
pruning_proof_m: u64,
anticone_finalization_depth: u64,
ghostdag_k: KType,
is_consensus_exiting: Arc<AtomicBool>,
) -> Self {
Self {
db,
Expand Down Expand Up @@ -162,6 +168,8 @@ impl PruningProofManager {
pruning_proof_m,
anticone_finalization_depth,
ghostdag_k,

is_consensus_exiting,
}
}

Expand Down Expand Up @@ -432,6 +440,11 @@ impl PruningProofManager {

let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
for level in (0..=self.max_block_level).rev() {
// Before processing this level, check if the process is exiting so we can end early
if self.is_consensus_exiting.load(Ordering::Relaxed) {
return Err(PruningImportError::PruningValidationInterrupted);
}

info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len());
let level_idx = level as usize;
let mut selected_tip = None;
Expand Down

0 comments on commit 4e05b43

Please sign in to comment.