Skip to content

Commit

Permalink
Add timestamps to IBD progress (kaspanet#376)
Browse files Browse the repository at this point in the history
* Add timestamps to IBD progress

* Add timezone

* Address review comments

* Change error message
  • Loading branch information
someone235 authored Jan 7, 2024
1 parent 2211832 commit 0f431e0
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ tower-http = { version = "0.4.4", features = [
] }
tower = "0.4.7"
hyper = "0.14.27"
chrono = "0.4.31"
# workflow dependencies that are not a part of core libraries

# workflow-perf-monitor = { path = "../../../workflow-perf-monitor-rs" }
Expand Down
3 changes: 2 additions & 1 deletion protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log.workspace = true
parking_lot.workspace = true
rand.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tokio-stream = { workspace = true, features = ["net"] }
uuid = { workspace = true, features = ["v4", "fast-rng"] }
chrono.workspace = true
32 changes: 24 additions & 8 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum IbdType {
DownloadHeadersProof,
}

struct QueueChunkOutput {
jobs: Vec<BlockValidationFuture>,
daa_score: u64,
timestamp: u64,
}
// TODO: define a peer banning strategy

impl IbdFlow {
Expand Down Expand Up @@ -372,12 +377,18 @@ impl IbdFlow {
let mut chunk_stream = HeadersChunkStream::new(&self.router, &mut self.incoming_route);

if let Some(chunk) = chunk_stream.next().await? {
let mut prev_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (mut prev_daa_score, mut prev_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let mut prev_jobs: Vec<BlockValidationFuture> =
chunk.into_iter().map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task).collect();

while let Some(chunk) = chunk_stream.next().await? {
let current_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (current_daa_score, current_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let current_jobs = chunk
.into_iter()
.map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task)
Expand All @@ -386,8 +397,9 @@ impl IbdFlow {
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand Down Expand Up @@ -513,17 +525,19 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
let mut progress_reporter = ProgressReporter::new(low_header.daa_score, high_header.daa_score, "blocks");

let mut iter = hashes.chunks(IBD_BATCH_SIZE);
let (mut prev_jobs, mut prev_daa_score) =
let QueueChunkOutput { jobs: mut prev_jobs, daa_score: mut prev_daa_score, timestamp: mut prev_timestamp } =
self.queue_block_processing_chunk(consensus, iter.next().expect("hashes was non empty")).await?;

for chunk in iter {
let (current_jobs, current_daa_score) = self.queue_block_processing_chunk(consensus, chunk).await?;
let QueueChunkOutput { jobs: current_jobs, daa_score: current_daa_score, timestamp: current_timestamp } =
self.queue_block_processing_chunk(consensus, chunk).await?;
let prev_chunk_len = prev_jobs.len();
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand All @@ -538,9 +552,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
&mut self,
consensus: &ConsensusProxy,
chunk: &[Hash],
) -> Result<(Vec<BlockValidationFuture>, u64), ProtocolError> {
) -> Result<QueueChunkOutput, ProtocolError> {
let mut jobs = Vec::with_capacity(chunk.len());
let mut current_daa_score = 0;
let mut current_timestamp = 0;
self.router
.enqueue(make_message!(
Payload::RequestIbdBlocks,
Expand All @@ -557,9 +572,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
return Err(ProtocolError::OtherOwned(format!("sent header of {} where expected block with body", block.hash())));
}
current_daa_score = block.header.daa_score;
current_timestamp = block.header.timestamp;
jobs.push(consensus.validate_and_insert_block(block).virtual_state_task);
}

Ok((jobs, current_daa_score))
Ok(QueueChunkOutput { jobs, daa_score: current_daa_score, timestamp: current_timestamp })
}
}
9 changes: 7 additions & 2 deletions protocol/flows/src/v5/ibd/progress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::{Duration, Instant};

use chrono::{Local, LocalResult, TimeZone};
use kaspa_core::info;

/// Minimum number of items to report
Expand Down Expand Up @@ -34,7 +35,7 @@ impl ProgressReporter {
}
}

pub fn report(&mut self, processed_delta: usize, current_daa_score: u64) {
pub fn report(&mut self, processed_delta: usize, current_daa_score: u64, current_timestamp: u64) {
self.current_batch += processed_delta;
let now = Instant::now();
if now - self.last_log_time < REPORT_TIME_GRANULARITY && self.current_batch < REPORT_BATCH_GRANULARITY && self.processed > 0 {
Expand All @@ -48,7 +49,11 @@ impl ProgressReporter {
let relative_daa_score = if current_daa_score > self.low_daa_score { current_daa_score - self.low_daa_score } else { 0 };
let percent = ((relative_daa_score as f64 / (self.high_daa_score - self.low_daa_score) as f64) * 100.0) as i32;
if percent > self.last_reported_percent {
info!("IBD: Processed {} {} ({}%)", self.processed, self.object_name, percent);
let date = match Local.timestamp_opt(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000)) {
LocalResult::None | LocalResult::Ambiguous(_, _) => "cannot parse date".into(),
LocalResult::Single(date) => date.format("%Y-%m-%d %H:%M:%S.%3f:%z").to_string(),
};
info!("IBD: Processed {} {} ({}%) last block timestamp: {}", self.processed, self.object_name, percent, date);
self.last_reported_percent = percent;
}
self.last_log_time = now;
Expand Down

0 comments on commit 0f431e0

Please sign in to comment.