Skip to content

Commit

Permalink
Refactor ImportQueueStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Oct 24, 2024
1 parent 0181718 commit 8757114
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
50 changes: 32 additions & 18 deletions crates/subcoin-network/src/block_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ impl QueuedBlocks {
}
}

#[derive(Debug, Clone, Copy)]
enum ImportQueueStatus {
/// Queue is ready to accept more blocks for import.
Ready,
/// The queue is overloaded and cannot accept more blocks at the moment.
Overloaded,
}

impl ImportQueueStatus {
fn is_overloaded(&self) -> bool {
matches!(self, Self::Overloaded)
}
}

/// Manages the of blocks downloaded from the Bitcoin network.
///
/// This struct keeps track of:
Expand Down Expand Up @@ -86,10 +100,11 @@ pub(crate) struct BlockDownloadManager {
/// when the results of processed blocks are notified. It helps track
/// the most recent activity related to block processing.
last_progress_time: Instant,
/// Whether there are too many blocks in the queue.
import_queue_is_overloaded: bool,
/// Import queue status.
queue_status: ImportQueueStatus,
/// Last time the log of too many blocks in the queue was printed.
last_overloaded_queue_log_time: Option<Instant>,
/// Peer store.
peer_store: Arc<dyn PeerStore>,
}

Expand All @@ -103,7 +118,7 @@ impl BlockDownloadManager {
best_queued_number: 0u32,
orphan_blocks_pool: OrphanBlocksPool::new(),
last_progress_time: Instant::now(),
import_queue_is_overloaded: false,
queue_status: ImportQueueStatus::Ready,
last_overloaded_queue_log_time: None,
peer_store,
}
Expand Down Expand Up @@ -159,7 +174,7 @@ impl BlockDownloadManager {
}

/// Checks if the import queue is overloaded and updates the internal state.
fn update_and_check_queue_status(&mut self, best_number: u32) -> bool {
fn evaluate_queue_status(&mut self, best_number: u32) -> ImportQueueStatus {
// Maximum number of pending blocks in the import queue.
let max_queued_blocks = match best_number {
0..=100_000 => 8192,
Expand All @@ -171,25 +186,24 @@ impl BlockDownloadManager {

let queued_blocks = self.best_queued_number - best_number;

let import_queue_is_overloaded = queued_blocks > max_queued_blocks;
if queued_blocks > max_queued_blocks {
self.queue_status = ImportQueueStatus::Overloaded;

if import_queue_is_overloaded
&& self
if self
.last_overloaded_queue_log_time
.map(|last_time| last_time.elapsed() > BUSY_QUEUE_LOG_INTERVAL)
.unwrap_or(true)
{
tracing::debug!(
best_number,
best_queued_number = self.best_queued_number,
"⏸️ Pausing download: too many blocks ({queued_blocks}) in the queue",
);
self.last_overloaded_queue_log_time.replace(Instant::now());
{
tracing::debug!(
best_number,
best_queued_number = self.best_queued_number,
"⏸️ Pausing download: too many blocks ({queued_blocks}) in the queue",
);
self.last_overloaded_queue_log_time.replace(Instant::now());
}
}

self.import_queue_is_overloaded = import_queue_is_overloaded;

self.import_queue_is_overloaded
self.queue_status
}

fn reset(&mut self) {
Expand All @@ -200,7 +214,7 @@ impl BlockDownloadManager {
self.best_queued_number = 0u32;
self.orphan_blocks_pool.clear();
self.last_progress_time = Instant::now();
self.import_queue_is_overloaded = false;
self.queue_status = ImportQueueStatus::Ready;
}

/// Checks if there are blocks ready to be imported.
Expand Down
21 changes: 13 additions & 8 deletions crates/subcoin-network/src/block_downloader/blocks_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
}

pub(crate) fn sync_status(&self) -> SyncStatus {
if self.download_manager.import_queue_is_overloaded {
if self.download_manager.queue_status.is_overloaded() {
SyncStatus::Importing {
target: self.target_block_number,
peers: vec![self.peer_id],
Expand Down Expand Up @@ -116,11 +116,14 @@ where
return SyncAction::Request(self.prepare_blocks_request());
}

if self.download_manager.import_queue_is_overloaded {
if self
let best_number = self.client.best_number();

if self.download_manager.queue_status.is_overloaded() {
let still_overloaded = self
.download_manager
.update_and_check_queue_status(self.client.best_number())
{
.evaluate_queue_status(best_number)
.is_overloaded();
if still_overloaded {
return SyncAction::None;
} else {
return SyncAction::Request(self.prepare_blocks_request());
Expand All @@ -132,7 +135,7 @@ where
return self.truncate_and_prepare_block_data_request(block_data_request);
}

if self.client.best_number() == self.target_block_number {
if best_number == self.target_block_number {
self.state = State::Completed;
return SyncAction::SwitchToIdle;
}
Expand Down Expand Up @@ -305,7 +308,8 @@ where
// No more new blocks request as there are enough ongoing blocks in the queue.
if self
.download_manager
.update_and_check_queue_status(best_number)
.evaluate_queue_status(best_number)
.is_overloaded()
{
return SyncAction::None;
}
Expand Down Expand Up @@ -367,7 +371,8 @@ where

if self
.download_manager
.update_and_check_queue_status(self.client.best_number())
.evaluate_queue_status(self.client.best_number())
.is_overloaded()
{
SyncAction::None
} else {
Expand Down
17 changes: 10 additions & 7 deletions crates/subcoin-network/src/block_downloader/headers_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
}

pub(crate) fn sync_status(&self) -> SyncStatus {
if self.download_manager.import_queue_is_overloaded {
if self.download_manager.queue_status.is_overloaded() {
SyncStatus::Importing {
target: self.target_block_number,
peers: vec![self.peer_id],
Expand Down Expand Up @@ -199,11 +199,12 @@ where
return self.start_block_download(start, end);
}

if self.download_manager.import_queue_is_overloaded {
let import_queue_still_busy = self
if self.download_manager.queue_status.is_overloaded() {
let still_overloaded = self
.download_manager
.update_and_check_queue_status(self.client.best_number());
if import_queue_still_busy {
.evaluate_queue_status(self.client.best_number())
.is_overloaded();
if still_overloaded {
return SyncAction::None;
} else {
// Resume blocks or headers request.
Expand Down Expand Up @@ -551,7 +552,8 @@ where

if self
.download_manager
.update_and_check_queue_status(best_number)
.evaluate_queue_status(best_number)
.is_overloaded()
{
*paused = true;
return SyncAction::None;
Expand Down Expand Up @@ -616,7 +618,8 @@ where

if self
.download_manager
.update_and_check_queue_status(best_number)
.evaluate_queue_status(best_number)
.is_overloaded()
{
return SyncAction::None;
}
Expand Down

0 comments on commit 8757114

Please sign in to comment.