Skip to content

Commit

Permalink
Reset peer state to Available if updated
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Oct 24, 2024
1 parent 8757114 commit 520cd43
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
29 changes: 19 additions & 10 deletions crates/subcoin-network/src/block_downloader/blocks_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ pub struct BlocksFirstDownloader<Block, Client> {
target_block_number: u32,
state: State,
download_manager: BlockDownloadManager,
downloaded_blocks_count: usize,
last_locator_start: u32,
pending_block_requests: Vec<Inventory>,
/// Number of blocks' data requested but not yet received.
requested_blocks_count: usize,
inflight_blocks_count: usize,
_phantom: PhantomData<Block>,
}

Expand All @@ -73,9 +74,10 @@ where
target_block_number: peer_best,
state: State::Idle,
download_manager: BlockDownloadManager::new(peer_store),
downloaded_blocks_count: 0,
last_locator_start: 0u32,
pending_block_requests: Vec::new(),
requested_blocks_count: 0,
inflight_blocks_count: 0,
_phantom: Default::default(),
};

Expand All @@ -98,13 +100,18 @@ where
}
}

pub(crate) fn sync_peer(&self) -> PeerId {
self.peer_id
pub(crate) fn replaceable_sync_peer(&self) -> Option<PeerId> {
if self.downloaded_blocks_count > 0 {
None
} else {
self.peer_id
}
}

pub(crate) fn update_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
self.peer_id = peer_id;
self.target_block_number = target_block_number;
self.downloaded_blocks_count = 0;
}

pub(crate) fn download_manager(&mut self) -> &mut BlockDownloadManager {
Expand All @@ -130,7 +137,7 @@ where
}
}

if !self.pending_block_requests.is_empty() && self.requested_blocks_count == 0 {
if !self.pending_block_requests.is_empty() && self.inflight_blocks_count == 0 {
let block_data_request = std::mem::take(&mut self.pending_block_requests);
return self.truncate_and_prepare_block_data_request(block_data_request);
}
Expand All @@ -149,12 +156,13 @@ where

pub(crate) fn restart(&mut self, new_peer: PeerId, peer_best: u32) {
self.peer_id = new_peer;
self.downloaded_blocks_count = 0;
self.target_block_number = peer_best;
self.last_locator_start = 0u32;
self.download_manager.reset();
self.state = State::Restarting;
self.pending_block_requests.clear();
self.requested_blocks_count = 0;
self.inflight_blocks_count = 0;
}

// Handle `inv` message.
Expand Down Expand Up @@ -217,13 +225,13 @@ where
self.pending_block_requests = block_data_request.split_off(max_request_size);
}

self.requested_blocks_count = block_data_request.len();
self.inflight_blocks_count = block_data_request.len();

tracing::debug!(
from = ?self.peer_id,
pending_block_data_request = self.pending_block_requests.len(),
"📦 Downloading {} blocks",
self.requested_blocks_count,
self.inflight_blocks_count,
);

SyncAction::Request(SyncRequest::Data(block_data_request, self.peer_id))
Expand Down Expand Up @@ -275,7 +283,8 @@ where
self.download_manager
.add_block(block_number, block_hash, block, from);

self.requested_blocks_count -= 1;
self.inflight_blocks_count -= 1;
self.downloaded_blocks_count += 1;

match block_number.cmp(&last_get_blocks_target) {
CmpOrdering::Less => {
Expand Down
24 changes: 17 additions & 7 deletions crates/subcoin-network/src/block_downloader/headers_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub struct HeadersFirstDownloader<Block, Client> {
state: State,
download_manager: BlockDownloadManager,
downloaded_headers: DownloadedHeaders,
downloaded_blocks_count: usize,
last_locator_start: u32,
// TODO: Now it's solely used for the purpose of displaying the sync state.
// refactor it later.
Expand Down Expand Up @@ -155,6 +156,7 @@ where
completed_range: None,
},
download_manager: BlockDownloadManager::new(peer_store),
downloaded_blocks_count: 0,
last_locator_start: 0u32,
target_block_number,
_phantom: Default::default(),
Expand All @@ -177,19 +179,24 @@ where
}
}

pub(crate) fn sync_peer(&self) -> PeerId {
self.peer_id
}

pub(crate) fn download_manager(&mut self) -> &mut BlockDownloadManager {
&mut self.download_manager
pub(crate) fn replaceable_sync_peer(&self) -> Option<PeerId> {
if self.downloaded_blocks_count > 0 {
None
} else {
Some(self.peer_id)
}
}

pub(crate) fn update_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
self.peer_id = peer_id;
self.downloaded_blocks_count = 0;
self.target_block_number = target_block_number;
}

pub(crate) fn download_manager(&mut self) -> &mut BlockDownloadManager {
&mut self.download_manager
}

pub(crate) fn on_tick(&mut self) -> SyncAction {
if matches!(self.state, State::RestartingHeaders) {
return self.headers_request_action();
Expand Down Expand Up @@ -250,6 +257,7 @@ where
pub(crate) fn restart(&mut self, new_peer: PeerId, peer_best: u32) {
self.peer_id = new_peer;
self.download_manager.reset();
self.downloaded_blocks_count = 0;
self.last_locator_start = 0u32;
self.target_block_number = peer_best;
if let Some((start, end)) = self.downloaded_headers.completed_range {
Expand Down Expand Up @@ -531,6 +539,8 @@ where
self.download_manager
.add_block(block_number, block_hash, block, from);

self.downloaded_blocks_count += 1;

let should_request_more_headers = match block_download {
BlockDownload::AllBlocks { start, end } => {
if end.hash == block_hash {
Expand Down
35 changes: 23 additions & 12 deletions crates/subcoin-network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,16 @@ where
}

pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
let current_sync_peer_id = match &self.syncing {
Syncing::BlocksFirst(downloader) => downloader.sync_peer(),
Syncing::HeadersFirst(downloader) => downloader.sync_peer(),
let maybe_sync_peer_id = match &self.syncing {
Syncing::BlocksFirst(downloader) => downloader.replaceable_sync_peer(),
Syncing::HeadersFirst(downloader) => downloader.replaceable_sync_peer(),
Syncing::Idle => return,
};

let Some(current_sync_peer_id) = maybe_sync_peer_id else {
return;
};

let Some(current_latency) = self
.peers
.get(&current_sync_peer_id)
Expand Down Expand Up @@ -389,21 +393,28 @@ where
let peer_id = best_sync_peer.peer_id;
let target_block_number = best_sync_peer.best_number;

match &mut self.syncing {
let sync_peer_updated = match &mut self.syncing {
Syncing::BlocksFirst(downloader) => {
downloader.update_sync_peer(peer_id, target_block_number);
downloader.replace_sync_peer(peer_id, target_block_number);
true
}
Syncing::HeadersFirst(downloader) => {
downloader.update_sync_peer(peer_id, target_block_number);
downloader.replace_sync_peer(peer_id, target_block_number);
true
}
Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
}
};

tracing::debug!(
old_peer_id = ?current_sync_peer_id,
new_peer_id = ?peer_id,
"🔧 Sync peer ({current_latency} ms) updated to a new peer with lower latency ({best_latency} ms)",
);
if sync_peer_updated {
tracing::debug!(
old_peer_id = ?current_sync_peer_id,
new_peer_id = ?peer_id,
"🔧 Sync peer ({current_latency} ms) updated to a new peer with lower latency ({best_latency} ms)",
);
self.peers.entry(replaced_peer_id).and_modify(|peer| {
peer.state = PeerSyncState::Available;
});
}
}
}

Expand Down

0 comments on commit 520cd43

Please sign in to comment.