diff --git a/Cargo.lock b/Cargo.lock index 7c795a9e2..82baf8b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2696,6 +2696,7 @@ name = "kaspa-p2p-flows" version = "0.13.1" dependencies = [ "async-trait", + "chrono", "futures", "indexmap 2.1.0", "itertools 0.11.0", diff --git a/Cargo.toml b/Cargo.toml index 88a0e9fba..f26f501da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -251,6 +251,7 @@ tower-http = { version = "0.4.4", features = [ ] } tower = "0.4.7" hyper = "0.14.27" +chrono = "0.4.31" # workflow dependencies that are not a part of core libraries # workflow-perf-monitor = { path = "../../../workflow-perf-monitor-rs" } diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index fe3d2a15d..e9bb7fe32 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -147,7 +147,10 @@ impl PruningProcessor { // This indicates the node crashed during a former pruning point move and we need to recover if pruning_utxoset_position != pruning_point { info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point); - self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point); + if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) { + info!("Interrupted while advancing the pruning point UTXO set: Process is exiting"); + return; + } } } @@ -189,7 +192,11 @@ impl PruningProcessor { info!("Periodic pruning point movement: advancing from {} to {}", current_pruning_info.pruning_point, new_pruning_point); // Advance the pruning point utxoset to the state of the new pruning point using chain-block UTXO diffs - self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point); + if !self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point) { + info!("Interrupted while advancing the pruning point UTXO set: Process is exiting"); + return; + } + info!("Updated the pruning point UTXO set"); // Finally, prune data in the new pruning point past self.prune(new_pruning_point); @@ -199,9 +206,12 @@ impl PruningProcessor { } } - fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) { + fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) -> bool { let mut pruning_utxoset_write = self.pruning_utxoset_stores.write(); for chain_block in self.reachability_service.forward_chain_iterator(utxoset_position, new_pruning_point, true).skip(1) { + if self.is_consensus_exiting.load(Ordering::Relaxed) { + return false; + } let utxo_diff = self.utxo_diffs_store.get(chain_block).expect("chain blocks have utxo state"); let mut batch = WriteBatch::default(); pruning_utxoset_write.utxo_set.write_diff_batch(&mut batch, utxo_diff.as_ref()).unwrap(); @@ -211,8 +221,10 @@ impl PruningProcessor { drop(pruning_utxoset_write); if self.config.enable_sanity_checks { + info!("Performing a sanity check that the new UTXO set has the expected UTXO commitment"); self.assert_utxo_commitment(new_pruning_point); } + true } fn assert_utxo_commitment(&self, pruning_point: Hash) { diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index 2c651ee6c..e6ea97782 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -30,6 +30,7 @@ log.workspace = true parking_lot.workspace = true rand.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } tokio-stream = { workspace = true, features = ["net"] } uuid = { workspace = true, features = ["v4", "fast-rng"] } +chrono.workspace = true diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index fd58ca761..8dc7fedf7 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -63,6 +63,11 @@ pub enum IbdType { DownloadHeadersProof, } +struct QueueChunkOutput { + jobs: Vec, + daa_score: u64, + timestamp: u64, +} // TODO: define a peer banning strategy impl IbdFlow { @@ -372,12 +377,18 @@ impl IbdFlow { let mut chunk_stream = HeadersChunkStream::new(&self.router, &mut self.incoming_route); if let Some(chunk) = chunk_stream.next().await? { - let mut prev_daa_score = chunk.last().expect("chunk is never empty").daa_score; + let (mut prev_daa_score, mut prev_timestamp) = { + let last_header = chunk.last().expect("chunk is never empty"); + (last_header.daa_score, last_header.timestamp) + }; let mut prev_jobs: Vec = chunk.into_iter().map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task).collect(); while let Some(chunk) = chunk_stream.next().await? { - let current_daa_score = chunk.last().expect("chunk is never empty").daa_score; + let (current_daa_score, current_timestamp) = { + let last_header = chunk.last().expect("chunk is never empty"); + (last_header.daa_score, last_header.timestamp) + }; let current_jobs = chunk .into_iter() .map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task) @@ -386,8 +397,9 @@ impl IbdFlow { // Join the previous chunk so that we always concurrently process a chunk and receive another try_join_all(prev_jobs).await?; // Log the progress - progress_reporter.report(prev_chunk_len, prev_daa_score); + progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp); prev_daa_score = current_daa_score; + prev_timestamp = current_timestamp; prev_jobs = current_jobs; } @@ -513,17 +525,19 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", let mut progress_reporter = ProgressReporter::new(low_header.daa_score, high_header.daa_score, "blocks"); let mut iter = hashes.chunks(IBD_BATCH_SIZE); - let (mut prev_jobs, mut prev_daa_score) = + let QueueChunkOutput { jobs: mut prev_jobs, daa_score: mut prev_daa_score, timestamp: mut prev_timestamp } = self.queue_block_processing_chunk(consensus, iter.next().expect("hashes was non empty")).await?; for chunk in iter { - let (current_jobs, current_daa_score) = self.queue_block_processing_chunk(consensus, chunk).await?; + let QueueChunkOutput { jobs: current_jobs, daa_score: current_daa_score, timestamp: current_timestamp } = + self.queue_block_processing_chunk(consensus, chunk).await?; let prev_chunk_len = prev_jobs.len(); // Join the previous chunk so that we always concurrently process a chunk and receive another try_join_all(prev_jobs).await?; // Log the progress - progress_reporter.report(prev_chunk_len, prev_daa_score); + progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp); prev_daa_score = current_daa_score; + prev_timestamp = current_timestamp; prev_jobs = current_jobs; } @@ -538,9 +552,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", &mut self, consensus: &ConsensusProxy, chunk: &[Hash], - ) -> Result<(Vec, u64), ProtocolError> { + ) -> Result { let mut jobs = Vec::with_capacity(chunk.len()); let mut current_daa_score = 0; + let mut current_timestamp = 0; self.router .enqueue(make_message!( Payload::RequestIbdBlocks, @@ -557,9 +572,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", return Err(ProtocolError::OtherOwned(format!("sent header of {} where expected block with body", block.hash()))); } current_daa_score = block.header.daa_score; + current_timestamp = block.header.timestamp; jobs.push(consensus.validate_and_insert_block(block).virtual_state_task); } - Ok((jobs, current_daa_score)) + Ok(QueueChunkOutput { jobs, daa_score: current_daa_score, timestamp: current_timestamp }) } } diff --git a/protocol/flows/src/v5/ibd/progress.rs b/protocol/flows/src/v5/ibd/progress.rs index ae2c2ed3b..458138a25 100644 --- a/protocol/flows/src/v5/ibd/progress.rs +++ b/protocol/flows/src/v5/ibd/progress.rs @@ -1,5 +1,6 @@ use std::time::{Duration, Instant}; +use chrono::{Local, LocalResult, TimeZone}; use kaspa_core::info; /// Minimum number of items to report @@ -34,7 +35,7 @@ impl ProgressReporter { } } - pub fn report(&mut self, processed_delta: usize, current_daa_score: u64) { + pub fn report(&mut self, processed_delta: usize, current_daa_score: u64, current_timestamp: u64) { self.current_batch += processed_delta; let now = Instant::now(); if now - self.last_log_time < REPORT_TIME_GRANULARITY && self.current_batch < REPORT_BATCH_GRANULARITY && self.processed > 0 { @@ -48,7 +49,11 @@ impl ProgressReporter { let relative_daa_score = if current_daa_score > self.low_daa_score { current_daa_score - self.low_daa_score } else { 0 }; let percent = ((relative_daa_score as f64 / (self.high_daa_score - self.low_daa_score) as f64) * 100.0) as i32; if percent > self.last_reported_percent { - info!("IBD: Processed {} {} ({}%)", self.processed, self.object_name, percent); + let date = match Local.timestamp_opt(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000)) { + LocalResult::None | LocalResult::Ambiguous(_, _) => "cannot parse date".into(), + LocalResult::Single(date) => date.format("%Y-%m-%d %H:%M:%S.%3f:%z").to_string(), + }; + info!("IBD: Processed {} {} ({}%) last block timestamp: {}", self.processed, self.object_name, percent, date); self.last_reported_percent = percent; } self.last_log_time = now;