Skip to content

Commit

Permalink
refactor: Rename BodySync to BlockSync for consistency (#10293)
Browse files Browse the repository at this point in the history
And a bit of documentation for header sync.
And a bit of documentation of block sync.
And a bit of refactoring of block sync.
And a tiny refactoring related to state sync.
And a lot of TODOs.
  • Loading branch information
nikurt authored Dec 5, 2023
1 parent fbc2a21 commit 345e8e0
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 186 deletions.
76 changes: 45 additions & 31 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ pub fn check_header_known(
chain: &Chain,
header: &BlockHeader,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let header_head = chain.store().header_head()?;
if header.hash() == &header_head.last_block_hash
|| header.hash() == &header_head.prev_block_hash
Expand All @@ -411,6 +412,7 @@ fn check_known_store(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
if chain.store().block_exists(block_hash)? {
Ok(Err(BlockKnownError::KnownInStore))
} else {
Expand All @@ -427,6 +429,7 @@ pub fn check_known(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let head = chain.store().head()?;
// Quick in-memory check for fast-reject any block handled recently.
if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash {
Expand Down Expand Up @@ -1890,59 +1893,70 @@ impl Chain {
mut headers: Vec<BlockHeader>,
challenges: &mut Vec<ChallengeBody>,
) -> Result<(), Error> {
// Sort headers by heights if they are out of order.
// Sort headers by heights.
headers.sort_by_key(|left| left.height());

if let Some(header) = headers.first() {
debug!(target: "chain", "Sync block headers: {} headers from {} at {}", headers.len(), header.hash(), header.height());
if let (Some(first_header), Some(last_header)) = (headers.first(), headers.last()) {
info!(
target: "chain",
num_headers = headers.len(),
first_hash = ?first_header.hash(),
first_height = first_header.height(),
last_hash = ?last_header.hash(),
last_height = ?last_header.height(),
"Sync block headers");
} else {
// No headers.
return Ok(());
};

// Performance optimization to skip looking up every header in the store.
let all_known = if let Some(last_header) = headers.last() {
// If the last header is known, then the other headers are known too.
self.store.get_block_header(last_header.hash()).is_ok()
} else {
false
// Empty set of headers, therefore all received headers are known.
true
};

if !all_known {
// Validate header and then add to the chain.
for header in headers.iter() {
match check_header_known(self, header)? {
Ok(_) => {}
Err(_) => continue,
}
if all_known {
return Ok(());
}

self.validate_header(header, &Provenance::SYNC, challenges)?;
let mut chain_update = self.chain_update();
chain_update.chain_store_update.save_block_header(header.clone())?;
// Validate header and then add to the chain.
for header in headers.iter() {
match check_header_known(self, header)? {
Ok(_) => {}
Err(_) => continue,
}

// Add validator proposals for given header.
let last_finalized_height =
chain_update.chain_store_update.get_block_height(header.last_final_block())?;
let epoch_manager_update = chain_update
.epoch_manager
.add_validator_proposals(BlockHeaderInfo::new(header, last_finalized_height))?;
chain_update.chain_store_update.merge(epoch_manager_update);
chain_update.commit()?;
self.validate_header(header, &Provenance::SYNC, challenges)?;
let mut chain_update = self.chain_update();
chain_update.chain_store_update.save_block_header(header.clone())?;

#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info_if_finalised(header)?;
chain_update.commit()?;
}
// Add validator proposals for given header.
let last_finalized_height =
chain_update.chain_store_update.get_block_height(header.last_final_block())?;
let epoch_manager_update = chain_update
.epoch_manager
.add_validator_proposals(BlockHeaderInfo::new(header, last_finalized_height))?;
chain_update.chain_store_update.merge(epoch_manager_update);
chain_update.commit()?;

#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info_if_finalised(header)?;
chain_update.commit()?;
}
}

let mut chain_update = self.chain_update();

if let Some(header) = headers.last() {
// Update header_head if it's the new tip
chain_update.update_header_head_if_not_challenged(header)?;
}

chain_update.commit()
}

Expand Down
20 changes: 12 additions & 8 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ pub enum SyncStatus {
StateSync(StateSyncStatus),
/// Sync state across all shards is done.
StateSyncDone,
/// Catch up on blocks.
BodySync { start_height: BlockHeight, current_height: BlockHeight, highest_height: BlockHeight },
/// Download and process blocks until the head reaches the head of the network.
BlockSync {
start_height: BlockHeight,
current_height: BlockHeight,
highest_height: BlockHeight,
},
}

impl SyncStatus {
Expand All @@ -311,18 +315,18 @@ impl SyncStatus {
// Represent NoSync as 0 because it is the state of a normal well-behaving node.
SyncStatus::NoSync => 0,
SyncStatus::AwaitingPeers => 1,
SyncStatus::EpochSync { epoch_ord: _ } => 2,
SyncStatus::HeaderSync { start_height: _, current_height: _, highest_height: _ } => 3,
SyncStatus::EpochSync { .. } => 2,
SyncStatus::HeaderSync { .. } => 3,
SyncStatus::StateSync(_) => 4,
SyncStatus::StateSyncDone => 5,
SyncStatus::BodySync { start_height: _, current_height: _, highest_height: _ } => 6,
SyncStatus::BlockSync { .. } => 6,
}
}

pub fn start_height(&self) -> Option<BlockHeight> {
match self {
SyncStatus::HeaderSync { start_height, .. } => Some(*start_height),
SyncStatus::BodySync { start_height, .. } => Some(*start_height),
SyncStatus::BlockSync { start_height, .. } => Some(*start_height),
_ => None,
}
}
Expand Down Expand Up @@ -353,8 +357,8 @@ impl From<SyncStatus> for SyncStatusView {
.collect(),
),
SyncStatus::StateSyncDone => SyncStatusView::StateSyncDone,
SyncStatus::BodySync { start_height, current_height, highest_height } => {
SyncStatusView::BodySync { start_height, current_height, highest_height }
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
SyncStatusView::BlockSync { start_height, current_height, highest_height }
}
}
}
Expand Down
59 changes: 40 additions & 19 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,21 +428,11 @@ impl Handler<WithSpanContext<BlockResponse>> for ClientActor {
.store()
.get_all_block_hashes_by_height(block.header().height());
if was_requested || blocks_at_height.is_err() || blocks_at_height.as_ref().unwrap().is_empty() {
if let SyncStatus::StateSync(StateSyncStatus{ sync_hash, .. }) = &mut this.client.sync_status {
if let Ok(header) = this.client.chain.get_block_header(sync_hash) {
if block.hash() == header.prev_hash() {
if let Err(e) = this.client.chain.save_block(block.into()) {
error!(target: "client", "Failed to save a block during state sync: {}", e);
}
} else if block.hash() == sync_hash {
// This is the immediate block after a state sync
// We can afford to delay requesting missing chunks for this one block
if let Err(e) = this.client.chain.save_orphan(block.into(), false) {
error!(target: "client", "Received an invalid block during state sync: {}", e);
}
}
return;
}
// This is a very sneaky piece of logic.
if this.maybe_receive_state_sync_blocks(&block) {
// A node is syncing its state. Don't consider receiving
// blocks other than the few special ones that State Sync expects.
return;
}
this.client.receive_block(
block,
Expand Down Expand Up @@ -1377,17 +1367,17 @@ impl ClientActor {
debug_span!(target: "client", "receive_headers", num_headers = headers.len(), ?peer_id)
.entered();
if headers.is_empty() {
info!(target: "client", "Received an empty set of block headers");
return true;
}
info!(target: "client", "Received block headers from height {} to {}", headers.first().unwrap().height(), headers.last().unwrap().height());
match self.client.sync_block_headers(headers) {
Ok(_) => true,
Err(err) => {
if err.is_bad_data() {
error!(target: "client", "Error processing sync blocks: {}", err);
error!(target: "client", ?err, "Error processing sync blocks");
false
} else {
debug!(target: "client", "Block headers refused by chain: {}", err);
debug!(target: "client", ?err, "Block headers refused by chain");
true
}
}
Expand Down Expand Up @@ -1762,7 +1752,7 @@ impl ClientActor {

self.client
.process_block_processing_artifact(block_processing_artifacts);
self.client.sync_status.update(SyncStatus::BodySync {
self.client.sync_status.update(SyncStatus::BlockSync {
start_height: 0,
current_height: 0,
highest_height: 0,
Expand All @@ -1784,6 +1774,37 @@ impl ClientActor {
&self.config_updater,
)
}

/// Checks if the node is syncing its State and applies special logic in that case.
/// A node usually ignores blocks that are too far ahead, but in case of a node syncing its state it is looking for 2 specific blocks:
/// * The first block of the new epoch
/// * The last block of the prev epoch
/// Returns whether the node is syncing its state.
fn maybe_receive_state_sync_blocks(&mut self, block: &Block) -> bool {
let SyncStatus::StateSync(StateSyncStatus { sync_hash, .. }) = self.client.sync_status
else {
return false;
};
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
let block: MaybeValidated<Block> = (*block).clone().into();
let block_hash = *block.hash();
// Notice that two blocks are saved differently:
// * save_block() for one block.
// * save_orphan() for another block.
if &block_hash == header.prev_hash() {
// The last block of the previous epoch.
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
} else if block_hash == sync_hash {
// The first block of the new epoch.
if let Err(err) = self.client.chain.save_orphan(block, false) {
error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync");
}
}
}
true
}
}

impl Drop for ClientActor {
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ pub fn display_sync_status(
current_height
)
}
SyncStatus::BodySync { start_height, current_height, highest_height } => {
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
let percent = if highest_height <= start_height {
0.0
} else {
Expand Down
Loading

0 comments on commit 345e8e0

Please sign in to comment.