Skip to content

Commit

Permalink
Various syncing improvements (#67)
Browse files Browse the repository at this point in the history
* Tiny refactoring

* Avoid downloading the headers repeatly

* Refactor Restarting variants

* Include missing_blocks_count in the log

* Remove connect latency factor

This is not much useful and adds more complexity.

* Increase import queue size between 300_000 and 600_000

* Various small refactorings

* Move peer latency constants to one place

* Refactor LatencyTracker

* Introduce PeerStoreHandle

* Refactor network_peers RPC

* Fix test

* Introduce PeerStore trait

* Increase the persistent peer lantency threshold

* Refactor Discouraged peer

* Remove unreliable peers

* Fixes

* Include concrete latence value in PingLatencyTooHigh

* Increase the peer latency threshold and decrease the size of block request

* Use BTreeMap for deterministic output

* Refactor ImportQueueStatus

* Reset peer state to Available if updated

* Fix evaludate_queue_status()
  • Loading branch information
liuchengxu authored Oct 24, 2024
1 parent 72e5e71 commit aa2dfc4
Show file tree
Hide file tree
Showing 11 changed files with 825 additions and 497 deletions.
84 changes: 60 additions & 24 deletions crates/subcoin-network/src/block_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ pub use self::blocks_first::BlocksFirstDownloader;
pub use self::headers_first::HeadersFirstDownloader;

use crate::orphan_blocks_pool::OrphanBlocksPool;
use crate::peer_store::PeerStore;
use crate::PeerId;
use bitcoin::{Block as BitcoinBlock, BlockHash};
use sc_consensus::BlockImportError;
use sc_consensus_nakamoto::ImportManyBlocksResult;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

/// Interval for logging when the block import queue is too busy.
Expand Down Expand Up @@ -47,6 +50,20 @@ impl QueuedBlocks {
}
}

#[derive(Debug, Clone, Copy)]
enum ImportQueueStatus {
/// Queue is ready to accept more blocks for import.
Ready,
/// The queue is overloaded and cannot accept more blocks at the moment.
Overloaded,
}

impl ImportQueueStatus {
fn is_overloaded(&self) -> bool {
matches!(self, Self::Overloaded)
}
}

/// Manages the of blocks downloaded from the Bitcoin network.
///
/// This struct keeps track of:
Expand All @@ -58,7 +75,7 @@ impl QueuedBlocks {
/// [`BlockDownloadManager`] is designed to be used in both Blocks-First and
/// Headers-First sync strategies, providing a common component for managing
/// the state of blocks during the sync process.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct BlockDownloadManager {
/// A set of block hashes that have been requested from the network.
///
Expand All @@ -83,14 +100,16 @@ pub(crate) struct BlockDownloadManager {
/// when the results of processed blocks are notified. It helps track
/// the most recent activity related to block processing.
last_progress_time: Instant,
/// Whether there are too many blocks in the queue.
import_queue_is_overloaded: bool,
/// Import queue status.
queue_status: ImportQueueStatus,
/// Last time the log of too many blocks in the queue was printed.
last_overloaded_queue_log_time: Option<Instant>,
/// Peer store.
peer_store: Arc<dyn PeerStore>,
}

impl BlockDownloadManager {
fn new() -> Self {
fn new(peer_store: Arc<dyn PeerStore>) -> Self {
Self {
requested_blocks: HashSet::new(),
downloaded_blocks: Vec::new(),
Expand All @@ -99,8 +118,9 @@ impl BlockDownloadManager {
best_queued_number: 0u32,
orphan_blocks_pool: OrphanBlocksPool::new(),
last_progress_time: Instant::now(),
import_queue_is_overloaded: false,
queue_status: ImportQueueStatus::Ready,
last_overloaded_queue_log_time: None,
peer_store,
}
}

Expand All @@ -117,15 +137,21 @@ impl BlockDownloadManager {
/// increases, resulting in longer network response times for block retrieval.
///
/// The timeout values are configurated arbitrarily.
fn is_stalled(&self) -> bool {
fn is_stalled(&self, peer_id: PeerId) -> bool {
let stall_timeout = match self.best_queued_number {
0..300_000 => 60, // Standard timeout, 1 minute
300_000..600_000 => 120, // Extended timeout, 2 minutes
600_000..800_000 => 180, // Extended timeout, 3 minutes
_ => 300,
};

self.last_progress_time.elapsed().as_secs() > stall_timeout
let stalled = self.last_progress_time.elapsed().as_secs() > stall_timeout;

if stalled {
self.peer_store.record_failure(peer_id);
}

stalled
}

fn block_exists(&self, block_hash: BlockHash) -> bool {
Expand All @@ -148,36 +174,38 @@ impl BlockDownloadManager {
}

/// Checks if the import queue is overloaded and updates the internal state.
fn update_and_check_queue_status(&mut self, best_number: u32) -> bool {
fn evaluate_queue_status(&mut self, best_number: u32) -> ImportQueueStatus {
// Maximum number of pending blocks in the import queue.
let max_queued_blocks = match best_number {
0..=100_000 => 8192,
100_001..=200_000 => 4096,
200_001..=300_000 => 2048,
300_001..=600_000 => 1024,
_ => 512,
};

let queued_blocks = self.best_queued_number - best_number;

let import_queue_is_overloaded = queued_blocks > max_queued_blocks;
if queued_blocks > max_queued_blocks {
self.queue_status = ImportQueueStatus::Overloaded;

if import_queue_is_overloaded
&& self
if self
.last_overloaded_queue_log_time
.map(|last_time| last_time.elapsed() > BUSY_QUEUE_LOG_INTERVAL)
.unwrap_or(true)
{
tracing::debug!(
best_number,
best_queued_number = self.best_queued_number,
"⏸️ Pausing download: too many blocks ({queued_blocks}) in the queue",
);
self.last_overloaded_queue_log_time.replace(Instant::now());
{
tracing::debug!(
best_number,
best_queued_number = self.best_queued_number,
"⏸️ Pausing download: too many blocks ({queued_blocks}) in the queue",
);
self.last_overloaded_queue_log_time.replace(Instant::now());
}
} else {
self.queue_status = ImportQueueStatus::Ready;
}

self.import_queue_is_overloaded = import_queue_is_overloaded;

self.import_queue_is_overloaded
self.queue_status
}

fn reset(&mut self) {
Expand All @@ -188,7 +216,7 @@ impl BlockDownloadManager {
self.best_queued_number = 0u32;
self.orphan_blocks_pool.clear();
self.last_progress_time = Instant::now();
self.import_queue_is_overloaded = false;
self.queue_status = ImportQueueStatus::Ready;
}

/// Checks if there are blocks ready to be imported.
Expand Down Expand Up @@ -253,8 +281,14 @@ impl BlockDownloadManager {
.unzip()
}

/// Add the block the queue that is ready to be imported.
fn add_block(&mut self, block_number: u32, block_hash: BlockHash, block: BitcoinBlock) {
/// Add the block that is ready to be imported.
fn add_block(
&mut self,
block_number: u32,
block_hash: BlockHash,
block: BitcoinBlock,
from: PeerId,
) {
let mut insert_block = |block_number, block_hash, block| {
self.downloaded_blocks.push(block);
self.queued_blocks.insert(block_number, block_hash);
Expand All @@ -274,6 +308,8 @@ impl BlockDownloadManager {
insert_block(number, hash, child_block);
}
}

self.peer_store.record_block_download(from);
}

fn add_orphan_block(&mut self, block_hash: BlockHash, orphan_block: BitcoinBlock) {
Expand Down
Loading

0 comments on commit aa2dfc4

Please sign in to comment.