Skip to content

Commit

Permalink
sync handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Jan 30, 2025
1 parent 94abfca commit 37769e5
Show file tree
Hide file tree
Showing 10 changed files with 512 additions and 390 deletions.
65 changes: 65 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4686,4 +4686,69 @@ impl Chain {
})
.collect()
}

/// Select the block hash we are using to sync state. It will sync with the state before applying the
/// content of such block.
///
/// The selected block will always be the first block on a new epoch:
/// <https://github.com/nearprotocol/nearcore/issues/2021#issuecomment-583039862>.
pub fn find_sync_hash(&self) -> Result<Option<CryptoHash>, Error> {
let header_head = self.header_head()?;
let sync_hash = match self.get_sync_hash(&header_head.last_block_hash)? {
Some(h) => h,
None => return Ok(None),
};

let genesis_hash = self.genesis().hash();
tracing::debug!(
target: "sync",
?header_head,
?sync_hash,
?genesis_hash,
"find_sync_hash");
assert_ne!(&sync_hash, genesis_hash);
Ok(Some(sync_hash))
}

/// Returns the list of extra block hashes for blocks that should be
/// downloaded before the state sync. The extra blocks are needed when there
/// are missing chunks in blocks leading to the sync hash block. We need to
/// ensure that for every shard we have at least one new chunk.
///
/// This is implemented by finding the minimum height included of the sync
/// hash block and finding all blocks till that height.
pub fn get_extra_sync_block_hashes(&self, block_hash: CryptoHash) -> Vec<CryptoHash> {
// Get the block. It's possible that the block is not yet available.
// It's ok because we will retry this method later.
let block = self.get_block(&block_hash);
let Ok(block) = block else {
return vec![];
};

let min_height_included =
block.chunks().iter_deprecated().map(|chunk| chunk.height_included()).min();
let Some(min_height_included) = min_height_included else {
tracing::warn!(target: "sync", ?block_hash, "get_extra_sync_block_hashes: Cannot find the min block height");
return vec![];
};

let mut extra_block_hashes = vec![];
let mut next_hash = *block.header().prev_hash();
loop {
let next_header = self.get_block_header(&next_hash);
let Ok(next_header) = next_header else {
tracing::error!(target: "sync", hash=?next_hash, "get_extra_sync_block_hashes: Cannot get block header");
break;
};

if next_header.height() + 1 < min_height_included {
break;
}

extra_block_hashes.push(next_hash);
next_hash = *next_header.prev_hash();
}

extra_block_hashes
}
}
51 changes: 23 additions & 28 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use crate::chunk_distribution_network::{ChunkDistributionClient, ChunkDistributi
use crate::chunk_inclusion_tracker::ChunkInclusionTracker;
use crate::debug::BlockProductionTracker;
use crate::debug::PRODUCTION_TIMES_CACHE_SIZE;
use crate::metrics;
use crate::stateless_validation::chunk_endorsement::ChunkEndorsementTracker;
use crate::stateless_validation::chunk_validator::ChunkValidator;
use crate::stateless_validation::partial_witness::partial_witness_actor::PartialWitnessSenderForClient;
use crate::sync::block::BlockSync;
use crate::sync::handler::SyncHandler;
use crate::sync::header::HeaderSync;
use crate::sync::state::{StateSync, StateSyncResult};
use crate::{metrics, SyncStatus};
use itertools::Itertools;
use near_async::futures::{AsyncComputationSpawner, FutureSpawner};
use near_async::messaging::IntoSender;
Expand Down Expand Up @@ -136,7 +137,6 @@ pub struct Client {

pub clock: Clock,
pub config: ClientConfig,
pub sync_status: SyncStatus,
pub chain: Chain,
pub doomslug: Doomslug,
pub epoch_manager: Arc<dyn EpochManagerAdapter>,
Expand All @@ -153,18 +153,14 @@ pub struct Client {
/// Approvals for which we do not have the block yet
pub pending_approvals:
lru::LruCache<ApprovalInner, HashMap<AccountId, (Approval, ApprovalType)>>,
/// Handles syncing chain to the actual state of the network.
pub sync_handler: SyncHandler,
/// A mapping from a block for which a state sync is underway for the next epoch, and the object
/// storing the current status of the state sync and blocks catch up
pub catchup_state_syncs: HashMap<CryptoHash, CatchupState>,
/// Keeps track of information needed to perform the initial Epoch Sync
pub epoch_sync: EpochSync,
/// Keeps track of syncing headers.
pub header_sync: HeaderSync,
/// Keeps track of syncing block.
pub block_sync: BlockSync,
/// Keeps track of syncing state.
pub state_sync: StateSync,
/// Spawns async tasks for catchupstate sync.

Check warning on line 161 in chain/client/src/client.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (catchupstate)
state_sync_future_spawner: Arc<dyn FutureSpawner>,
/// Sender for catchup state sync requests.
chain_sender_for_state_sync: ChainSenderForStateSync,
/// List of currently accumulated challenges.
pub challenges: HashMap<CryptoHash, Challenge>,
Expand All @@ -185,8 +181,6 @@ pub struct Client {
tier1_accounts_cache: Option<(EpochId, Arc<AccountKeys>)>,
/// Resharding sender.
pub resharding_sender: ReshardingSender,
/// A map storing the last time a block was requested for state sync.
pub last_time_sync_block_requested: HashMap<CryptoHash, near_async::time::Utc>,
/// Helper module for stateless validation functionality like chunk witness production, validation
/// chunk endorsements tracking etc.
pub chunk_validator: ChunkValidator,
Expand Down Expand Up @@ -284,7 +278,6 @@ impl Client {
chain.init_flat_storage()?;
let sharded_tx_pool =
ShardedTransactionPool::new(rng_seed, config.transaction_pool_size_limit);
let sync_status = SyncStatus::AwaitingPeers;
let epoch_sync = EpochSync::new(
clock.clone(),
network_adapter.clone(),
Expand Down Expand Up @@ -361,8 +354,7 @@ impl Client {
#[cfg(feature = "sandbox")]
accrued_fastforward_delta: 0,
clock: clock.clone(),
config,
sync_status,
config: config.clone(),
chain,
doomslug,
epoch_manager,
Expand All @@ -375,11 +367,15 @@ impl Client {
pending_approvals: lru::LruCache::new(
NonZeroUsize::new(num_block_producer_seats).unwrap(),
),
sync_handler: SyncHandler::new(
clock.clone(),
config,
epoch_sync,
header_sync,
state_sync,
block_sync,
),
catchup_state_syncs: HashMap::new(),
epoch_sync,
header_sync,
block_sync,
state_sync,
state_sync_future_spawner,
chain_sender_for_state_sync,
challenges: Default::default(),
Expand All @@ -394,7 +390,6 @@ impl Client {
),
tier1_accounts_cache: None,
resharding_sender,
last_time_sync_block_requested: HashMap::new(),
chunk_validator,
chunk_inclusion_tracker: ChunkInclusionTracker::new(),
chunk_endorsement_tracker,
Expand All @@ -409,7 +404,7 @@ impl Client {
// this method was called. If yes, rebroadcasts the current head.
pub fn check_head_progress_stalled(&mut self, stall_timeout: Duration) -> Result<(), Error> {
if self.clock.now() > self.last_time_head_progress_made + stall_timeout
&& !self.sync_status.is_syncing()
&& !self.sync_handler.sync_status.is_syncing()
{
let block = self.chain.get_block(&self.chain.head()?.last_block_hash)?;
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
Expand Down Expand Up @@ -1270,10 +1265,10 @@ impl Client {
if let near_chain::Error::DBNotFoundErr(msg) = &err {
debug_assert!(!msg.starts_with("BLOCK HEIGHT"), "{:?}", err);
}
if self.sync_status.is_syncing() {
if self.sync_handler.sync_status.is_syncing() {
// While syncing, we may receive blocks that are older or from next epochs.
// This leads to Old Block or EpochOutOfBounds errors.
debug!(target: "client", ?err, sync_status = ?self.sync_status, "Error receiving a block. is syncing");
debug!(target: "client", ?err, sync_status = ?self.sync_handler.sync_status, "Error receiving a block. is syncing");
} else {
error!(target: "client", ?err, "Error on receiving a block. Not syncing");
}
Expand Down Expand Up @@ -1348,7 +1343,7 @@ impl Client {
was_requested: bool,
) -> Result<bool, near_chain::Error> {
let head = self.chain.head()?;
let is_syncing = self.sync_status.is_syncing();
let is_syncing = self.sync_handler.sync_status.is_syncing();
if block.header().height() >= head.height + BLOCK_HORIZON && is_syncing && !was_requested {
debug!(target: "client", head_height = head.height, "Dropping a block that is too far ahead.");
return Ok(false);
Expand Down Expand Up @@ -1394,7 +1389,7 @@ impl Client {
if (head.height < block.header().height()
|| &head.epoch_id == block.header().epoch_id())
&& !was_requested
&& !self.sync_status.is_syncing()
&& !self.sync_handler.sync_status.is_syncing()
{
self.rebroadcast_block(block.as_ref().into_inner());
}
Expand Down Expand Up @@ -1751,8 +1746,8 @@ impl Client {
?status,
?provenance,
skip_produce_chunk,
is_syncing = self.sync_status.is_syncing(),
sync_status = ?self.sync_status)
is_syncing = self.sync_handler.sync_status.is_syncing(),
sync_status = ?self.sync_handler.sync_status)
.entered();
let block = match self.chain.get_block(&block_hash) {
Ok(block) => block,
Expand Down Expand Up @@ -1828,7 +1823,7 @@ impl Client {
}

let can_produce_with_provenance = provenance != Provenance::SYNC;
let can_produce_with_sync_status = !self.sync_status.is_syncing();
let can_produce_with_sync_status = !self.sync_handler.sync_status.is_syncing();
if can_produce_with_provenance && can_produce_with_sync_status && !skip_produce_chunk {
self.produce_chunks(&block, &signer);
} else {
Expand Down
Loading

0 comments on commit 37769e5

Please sign in to comment.