Skip to content

Commit

Permalink
Missing retry turnaround (#2307)
Browse files Browse the repository at this point in the history
* fix: similar turnaroud issue, but for retries.

* fix: update error counts even for spurious ones.

* fix: reintroduce DB turnaround check, as a fallback.
  • Loading branch information
shawn-zil authored Feb 7, 2025
1 parent 77cc0fd commit 0d64278
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions zilliqa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ impl Sync {
///
/// We get a plain ACK in certain cases - treated as an empty response.
pub fn handle_acknowledgement(&mut self, from: PeerId) -> Result<()> {
self.empty_count = self.empty_count.saturating_add(1);
if let Some((peer, _)) = self.in_flight.as_ref() {
// downgrade peer due to empty response
if peer.peer_id == from {
tracing::warn!(to = %peer.peer_id,
"sync::Acknowledgement : empty response"
);
self.empty_count = self.empty_count.saturating_add(1);

self.peers
.done_with_peer(self.in_flight.take(), DownGrade::Empty);
// Retry if failed in Phase 2 for whatever reason
match self.state {
SyncState::Phase1(_) if Self::DO_SPECULATIVE => {
self.request_missing_metadata(None)?
}
// Retry if failed in Phase 2 for whatever reason
SyncState::Phase2(_) => self.state = SyncState::Retry1,
_ => {}
}
Expand All @@ -177,22 +177,22 @@ impl Sync {
///
/// This gets called for any libp2p request failure - treated as a network failure
pub fn handle_request_failure(&mut self, failure: OutgoingMessageFailure) -> Result<()> {
self.timeout_count = self.timeout_count.saturating_add(1);
// check if the request is a sync messages
if let Some((peer, req_id)) = self.in_flight.as_ref() {
// downgrade peer due to network failure
if peer.peer_id == failure.peer && *req_id == failure.request_id {
tracing::warn!(to = %peer.peer_id, err = %failure.error,
"sync::RequestFailure : network error"
);
self.timeout_count = self.timeout_count.saturating_add(1);

self.peers
.done_with_peer(self.in_flight.take(), DownGrade::Timeout);
// Retry if failed in Phase 2 for whatever reason
match self.state {
SyncState::Phase1(_) if Self::DO_SPECULATIVE => {
self.request_missing_metadata(None)?
}
// Retry if failed in Phase 2 for whatever reason
SyncState::Phase2(_) => self.state = SyncState::Retry1,
_ => {}
}
Expand Down Expand Up @@ -243,20 +243,10 @@ impl Sync {
if !self.db.contains_block(&parent_hash)? {
// No parent block, trigger sync
tracing::info!("sync::SyncProposal : syncing from {parent_hash}",);
let meta = self.recent_proposals.back().unwrap().header;

let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
self.started_at_block_number = highest_block.number();

self.update_started_at()?;
// Ensure started_at_block_number is set before running this.
// https://github.com/Zilliqa/zq2/issues/2252#issuecomment-2636036676
let meta = self.recent_proposals.back().unwrap().header;
self.request_missing_metadata(Some(meta))?;
}
}
Expand Down Expand Up @@ -287,6 +277,8 @@ impl Sync {
}
// Retry to fix sync issues e.g. peers that are now offline
SyncState::Retry1 if self.in_pipeline == 0 => {
self.update_started_at()?;
// Ensure started is updated - https://github.com/Zilliqa/zq2/issues/2306
self.retry_phase1()?;
}
_ => {
Expand All @@ -297,6 +289,22 @@ impl Sync {
Ok(())
}

/// Update the startingBlock value.
///
/// Must be called before starting/re-starting Phase 1.
fn update_started_at(&mut self) -> Result<()> {
let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
self.started_at_block_number = highest_block.number();
Ok(())
}

/// Convenience function to convert a block to a proposal (add full txs)
/// Should only be used for syncing history, not for consensus messages regarding new blocks.
fn block_to_proposal(&self, block: Block) -> Proposal {
Expand All @@ -323,6 +331,7 @@ impl Sync {
/// This will rebuild history from the previous marker, with another peer.
/// If this function is called many times, it will eventually restart from Phase 0.
fn retry_phase1(&mut self) -> Result<()> {
self.retry_count = self.retry_count.saturating_add(1);
if self.db.count_sync_segments()? == 0 {
tracing::error!("sync::RetryPhase1 : cannot retry phase 1 without chain segments!");
self.state = SyncState::Phase0;
Expand All @@ -333,7 +342,6 @@ impl Sync {
"sync::RetryPhase1 : retrying segment #{}",
self.db.count_sync_segments()?,
);
self.retry_count = self.retry_count.saturating_add(1);

// remove the last segment from the chain metadata
let (meta, _) = self.db.last_sync_segment()?.unwrap();
Expand Down Expand Up @@ -720,19 +728,15 @@ impl Sync {

// TODO: Implement dynamic sub-segments - https://github.com/Zilliqa/zq2/issues/2158

// Record the oldest block in the chain's parent
// Record the oldest block in the segment
self.state = SyncState::Phase1(segment.last().cloned().unwrap());

// If the check-point/starting-point is in this segment
let checkpointed = segment.iter().any(|b| b.hash == self.checkpoint_hash);
let range = std::ops::RangeInclusive::new(
segment.last().as_ref().unwrap().number,
segment.first().as_ref().unwrap().number,
);
let started = range.contains(&self.started_at_block_number);
let block_hash = segment.last().as_ref().unwrap().hash;

// If the segment hits our history, turnaround to Phase 2.
if started || checkpointed {
if checkpointed || self.db.contains_block(&block_hash)? {
self.state = SyncState::Phase2(Hash::ZERO);
} else if Self::DO_SPECULATIVE {
self.request_missing_metadata(None)?;
Expand Down

0 comments on commit 0d64278

Please sign in to comment.