Skip to content

Commit

Permalink
Merge branch 'master' into tx-speedbumps
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsutton authored Jan 7, 2024
2 parents b68e237 + 0f431e0 commit 318d555
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ tower-http = { version = "0.4.4", features = [
] }
tower = "0.4.7"
hyper = "0.14.27"
chrono = "0.4.31"
# workflow dependencies that are not a part of core libraries

# workflow-perf-monitor = { path = "../../../workflow-perf-monitor-rs" }
Expand Down
18 changes: 15 additions & 3 deletions consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ impl PruningProcessor {
// This indicates the node crashed during a former pruning point move and we need to recover
if pruning_utxoset_position != pruning_point {
info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point);
self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point);
if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) {
info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
return;
}
}
}

Expand Down Expand Up @@ -189,7 +192,11 @@ impl PruningProcessor {
info!("Periodic pruning point movement: advancing from {} to {}", current_pruning_info.pruning_point, new_pruning_point);

// Advance the pruning point utxoset to the state of the new pruning point using chain-block UTXO diffs
self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point);
if !self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point) {
info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
return;
}
info!("Updated the pruning point UTXO set");

// Finally, prune data in the new pruning point past
self.prune(new_pruning_point);
Expand All @@ -199,9 +206,12 @@ impl PruningProcessor {
}
}

fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) {
fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) -> bool {
let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
for chain_block in self.reachability_service.forward_chain_iterator(utxoset_position, new_pruning_point, true).skip(1) {
if self.is_consensus_exiting.load(Ordering::Relaxed) {
return false;
}
let utxo_diff = self.utxo_diffs_store.get(chain_block).expect("chain blocks have utxo state");
let mut batch = WriteBatch::default();
pruning_utxoset_write.utxo_set.write_diff_batch(&mut batch, utxo_diff.as_ref()).unwrap();
Expand All @@ -211,8 +221,10 @@ impl PruningProcessor {
drop(pruning_utxoset_write);

if self.config.enable_sanity_checks {
info!("Performing a sanity check that the new UTXO set has the expected UTXO commitment");
self.assert_utxo_commitment(new_pruning_point);
}
true
}

fn assert_utxo_commitment(&self, pruning_point: Hash) {
Expand Down
3 changes: 2 additions & 1 deletion protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log.workspace = true
parking_lot.workspace = true
rand.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tokio-stream = { workspace = true, features = ["net"] }
uuid = { workspace = true, features = ["v4", "fast-rng"] }
chrono.workspace = true
32 changes: 24 additions & 8 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum IbdType {
DownloadHeadersProof,
}

struct QueueChunkOutput {
jobs: Vec<BlockValidationFuture>,
daa_score: u64,
timestamp: u64,
}
// TODO: define a peer banning strategy

impl IbdFlow {
Expand Down Expand Up @@ -372,12 +377,18 @@ impl IbdFlow {
let mut chunk_stream = HeadersChunkStream::new(&self.router, &mut self.incoming_route);

if let Some(chunk) = chunk_stream.next().await? {
let mut prev_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (mut prev_daa_score, mut prev_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let mut prev_jobs: Vec<BlockValidationFuture> =
chunk.into_iter().map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task).collect();

while let Some(chunk) = chunk_stream.next().await? {
let current_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (current_daa_score, current_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let current_jobs = chunk
.into_iter()
.map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task)
Expand All @@ -386,8 +397,9 @@ impl IbdFlow {
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand Down Expand Up @@ -513,17 +525,19 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
let mut progress_reporter = ProgressReporter::new(low_header.daa_score, high_header.daa_score, "blocks");

let mut iter = hashes.chunks(IBD_BATCH_SIZE);
let (mut prev_jobs, mut prev_daa_score) =
let QueueChunkOutput { jobs: mut prev_jobs, daa_score: mut prev_daa_score, timestamp: mut prev_timestamp } =
self.queue_block_processing_chunk(consensus, iter.next().expect("hashes was non empty")).await?;

for chunk in iter {
let (current_jobs, current_daa_score) = self.queue_block_processing_chunk(consensus, chunk).await?;
let QueueChunkOutput { jobs: current_jobs, daa_score: current_daa_score, timestamp: current_timestamp } =
self.queue_block_processing_chunk(consensus, chunk).await?;
let prev_chunk_len = prev_jobs.len();
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand All @@ -538,9 +552,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
&mut self,
consensus: &ConsensusProxy,
chunk: &[Hash],
) -> Result<(Vec<BlockValidationFuture>, u64), ProtocolError> {
) -> Result<QueueChunkOutput, ProtocolError> {
let mut jobs = Vec::with_capacity(chunk.len());
let mut current_daa_score = 0;
let mut current_timestamp = 0;
self.router
.enqueue(make_message!(
Payload::RequestIbdBlocks,
Expand All @@ -557,9 +572,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
return Err(ProtocolError::OtherOwned(format!("sent header of {} where expected block with body", block.hash())));
}
current_daa_score = block.header.daa_score;
current_timestamp = block.header.timestamp;
jobs.push(consensus.validate_and_insert_block(block).virtual_state_task);
}

Ok((jobs, current_daa_score))
Ok(QueueChunkOutput { jobs, daa_score: current_daa_score, timestamp: current_timestamp })
}
}
9 changes: 7 additions & 2 deletions protocol/flows/src/v5/ibd/progress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::{Duration, Instant};

use chrono::{Local, LocalResult, TimeZone};
use kaspa_core::info;

/// Minimum number of items to report
Expand Down Expand Up @@ -34,7 +35,7 @@ impl ProgressReporter {
}
}

pub fn report(&mut self, processed_delta: usize, current_daa_score: u64) {
pub fn report(&mut self, processed_delta: usize, current_daa_score: u64, current_timestamp: u64) {
self.current_batch += processed_delta;
let now = Instant::now();
if now - self.last_log_time < REPORT_TIME_GRANULARITY && self.current_batch < REPORT_BATCH_GRANULARITY && self.processed > 0 {
Expand All @@ -48,7 +49,11 @@ impl ProgressReporter {
let relative_daa_score = if current_daa_score > self.low_daa_score { current_daa_score - self.low_daa_score } else { 0 };
let percent = ((relative_daa_score as f64 / (self.high_daa_score - self.low_daa_score) as f64) * 100.0) as i32;
if percent > self.last_reported_percent {
info!("IBD: Processed {} {} ({}%)", self.processed, self.object_name, percent);
let date = match Local.timestamp_opt(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000)) {
LocalResult::None | LocalResult::Ambiguous(_, _) => "cannot parse date".into(),
LocalResult::Single(date) => date.format("%Y-%m-%d %H:%M:%S.%3f:%z").to_string(),
};
info!("IBD: Processed {} {} ({}%) last block timestamp: {}", self.processed, self.object_name, percent, date);
self.last_reported_percent = percent;
}
self.last_log_time = now;
Expand Down

0 comments on commit 318d555

Please sign in to comment.