-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(state-sync): sync to the current epoch instead of the previous #12102
Changes from all commits
ad4d1d1
76f9350
a445393
174b62f
a59e594
f8f6fb6
f4eba89
53e933c
5c9055f
281c156
386df33
df90eb0
2eda311
5900d8c
8dea6ad
78610c7
e5d428e
2379aec
5e2f654
4e9195a
78d6a75
f07977a
135921d
15d707b
776de8f
9ae019a
e4649ec
7f42d30
abb174d
e8e9f35
b45cff1
f6d860f
8c58ee5
6fa6c3c
7be8679
80174d5
97d3abc
3566cd0
105257e
c008b90
89bc547
1b81f8c
1c10b15
18d3623
eb88ad3
7be1c8a
7902bb7
7dbf812
741862b
3176a89
6c645ed
f96f762
0628634
7d903ec
ba3397a
78497d4
98b6f6d
f4d4866
dc8e3db
0f79f7f
c48db12
3c575f7
720e2c5
920156a
ded0a8e
db3e2cb
9991c89
99f98a5
66f30f1
902e90a
752f9e8
598f49c
5133b31
0572ada
286693b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,7 +62,8 @@ use near_primitives::merkle::{merklize, MerklePath, PartialMerkleTree}; | |
use near_primitives::network::PeerId; | ||
use near_primitives::receipt::Receipt; | ||
use near_primitives::sharding::{ | ||
EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, | ||
EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, StateSyncInfo, | ||
StateSyncInfoV1, | ||
}; | ||
use near_primitives::transaction::SignedTransaction; | ||
use near_primitives::types::chunk_extra::ChunkExtra; | ||
|
@@ -104,6 +105,17 @@ pub enum AdvProduceBlocksMode { | |
OnlyValid, | ||
} | ||
|
||
/// The state associated with downloading state for a shard this node will track in the | ||
/// future but does not currently. | ||
pub struct CatchupState { | ||
wacban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Manages downloading the state. | ||
pub state_sync: StateSync, | ||
/// Keeps track of state downloads, and gets passed to `state_sync`. | ||
pub sync_status: StateSyncStatus, | ||
/// Manages going back to apply chunks after state has been downloaded. | ||
pub catchup: BlocksCatchUpState, | ||
} | ||
|
||
pub struct Client { | ||
/// Adversarial controls - should be enabled only to test disruptive | ||
/// behaviour on chain. | ||
|
@@ -139,9 +151,12 @@ pub struct Client { | |
/// Approvals for which we do not have the block yet | ||
pub pending_approvals: | ||
lru::LruCache<ApprovalInner, HashMap<AccountId, (Approval, ApprovalType)>>, | ||
/// A mapping from an epoch that we know needs to be state synced for some shards | ||
/// to a tracker that will find an appropriate sync_hash for state sync to that epoch | ||
catchup_tracker: HashMap<EpochId, NewChunkTracker>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: new_chunk_trackers or similar There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You know, that is a good question... It seems this mapping from epoch ID to catchup status has been around since state sync was first implemented, and looking at the code it doesn't seem to me like it should ever have more than one... When we add one, it's because we see a block in a new epoch T, and we know we're going to be a chunk producer for T+1 in a new shard. The epoch ID for T+1 is the hash of the last block in epoch T-1, so if there's a fork at the beginning of the epoch, it will still have the same epoch ID. So that should mean that for any given epoch height, we only have one of these. And then if we ask if it's possible to have epoch T and T+1 in there at the same time, we can look at the fact that we remove the epoch info for epoch T in So idk how it is even possible to have two, but maybe I'm missing something and that logic was put there for a reason? But now that I think about it, what happens if we apply the first block of a new epoch and then save There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ehh it's possible that last part is not a bug actually, since there's care put into only removing the particular sync hash from the |
||
/// A mapping from a block for which a state sync is underway for the next epoch, and the object | ||
/// storing the current status of the state sync and blocks catch up | ||
pub catchup_state_syncs: HashMap<CryptoHash, (StateSync, StateSyncStatus, BlocksCatchUpState)>, | ||
pub catchup_state_syncs: HashMap<CryptoHash, CatchupState>, | ||
/// Keeps track of information needed to perform the initial Epoch Sync | ||
pub epoch_sync: EpochSync, | ||
/// Keeps track of syncing headers. | ||
|
@@ -224,6 +239,102 @@ pub struct ProduceChunkResult { | |
pub transactions_storage_proof: Option<PartialState>, | ||
} | ||
|
||
/// This keeps track of the number of new chunks seen in each shard since the block that was passed to new() | ||
/// This whole thing could be replaced with a much simpler function that just computes the number of new chunks | ||
/// in each shard from scratch every time we call it, but in the unlikely and unfortunate case where a shard | ||
/// hasn't had any chunks for a very long time, it would end up being a nontrivial inefficiency to do that | ||
/// every time run_catchup() is called | ||
pub struct NewChunkTracker { | ||
last_checked_hash: CryptoHash, | ||
last_checked_height: BlockHeight, | ||
num_new_chunks: HashMap<ShardId, usize>, | ||
sync_hash: Option<CryptoHash>, | ||
} | ||
|
||
impl NewChunkTracker { | ||
fn new( | ||
first_block_hash: CryptoHash, | ||
first_block_height: BlockHeight, | ||
shard_ids: &[ShardId], | ||
) -> Self { | ||
Self { | ||
last_checked_hash: first_block_hash, | ||
last_checked_height: first_block_height, | ||
num_new_chunks: shard_ids.iter().map(|shard_id| (*shard_id, 0)).collect(), | ||
sync_hash: None, | ||
} | ||
} | ||
|
||
// TODO(current_epoch_sync_hash): refactor this and use the same logic in get_current_epoch_sync_hash(). Ideally | ||
// that function should go away (at least as it is now) in favor of a more efficient approach that we can call on | ||
// every new block application | ||
fn record_new_chunks( | ||
&mut self, | ||
epoch_manager: &dyn EpochManagerAdapter, | ||
header: &BlockHeader, | ||
) -> Result<bool, Error> { | ||
let shard_layout = epoch_manager.get_shard_layout(header.epoch_id())?; | ||
|
||
let mut done = true; | ||
for (shard_id, num_new_chunks) in self.num_new_chunks.iter_mut() { | ||
let shard_index = shard_layout.get_shard_index(*shard_id); | ||
let Some(included) = header.chunk_mask().get(shard_index) else { | ||
return Err(Error::Other(format!( | ||
"can't get shard {} in chunk mask for block {}", | ||
shard_id, | ||
header.hash() | ||
))); | ||
}; | ||
if *included { | ||
*num_new_chunks += 1; | ||
} | ||
if *num_new_chunks < 2 { | ||
done = false; | ||
} | ||
} | ||
self.last_checked_hash = *header.hash(); | ||
self.last_checked_height = header.height(); | ||
Ok(done) | ||
} | ||
|
||
fn find_sync_hash( | ||
&mut self, | ||
chain: &Chain, | ||
epoch_manager: &dyn EpochManagerAdapter, | ||
) -> Result<Option<CryptoHash>, Error> { | ||
if let Some(sync_hash) = self.sync_hash { | ||
return Ok(Some(sync_hash)); | ||
} | ||
|
||
let final_head = chain.final_head()?; | ||
|
||
while self.last_checked_height < final_head.height { | ||
let next_hash = match chain.chain_store().get_next_block_hash(&self.last_checked_hash) { | ||
Ok(h) => h, | ||
Err(near_chain_primitives::Error::DBNotFoundErr(_)) => { | ||
return Err(Error::Other(format!( | ||
"final head is #{} {} but get_next_block_hash(#{} {}) is not found", | ||
final_head.height, | ||
final_head.last_block_hash, | ||
self.last_checked_height, | ||
&self.last_checked_hash | ||
))); | ||
} | ||
Err(e) => return Err(e.into()), | ||
}; | ||
let next_header = chain.get_block_header(&next_hash)?; | ||
let done = self.record_new_chunks(epoch_manager, &next_header)?; | ||
if done { | ||
// TODO(current_epoch_state_sync): check to make sure the epoch IDs are the same. If there are no new chunks in some shard in the epoch, | ||
// this will be for an epoch ahead of this one | ||
self.sync_hash = Some(next_hash); | ||
break; | ||
} | ||
} | ||
Ok(self.sync_hash) | ||
} | ||
} | ||
|
||
impl Client { | ||
pub fn new( | ||
clock: Clock, | ||
|
@@ -371,6 +482,7 @@ impl Client { | |
pending_approvals: lru::LruCache::new( | ||
NonZeroUsize::new(num_block_producer_seats).unwrap(), | ||
), | ||
catchup_tracker: HashMap::new(), | ||
catchup_state_syncs: HashMap::new(), | ||
epoch_sync, | ||
header_sync, | ||
|
@@ -2458,6 +2570,57 @@ impl Client { | |
Ok(false) | ||
} | ||
|
||
/// Find the sync hash. Most of the time it will already be set in `state_sync_info`. If not, try to find it, | ||
/// and set the corresponding field in `state_sync_info`. | ||
fn get_catchup_sync_hash_v1( | ||
&mut self, | ||
state_sync_info: &mut StateSyncInfoV1, | ||
epoch_first_block: &BlockHeader, | ||
) -> Result<Option<CryptoHash>, Error> { | ||
if state_sync_info.sync_hash.is_some() { | ||
return Ok(state_sync_info.sync_hash); | ||
} | ||
|
||
let new_chunk_tracker = match self.catchup_tracker.entry(*epoch_first_block.epoch_id()) { | ||
std::collections::hash_map::Entry::Occupied(e) => e.into_mut(), | ||
std::collections::hash_map::Entry::Vacant(e) => { | ||
let shard_ids = self.epoch_manager.shard_ids(epoch_first_block.epoch_id())?; | ||
e.insert(NewChunkTracker::new( | ||
*epoch_first_block.hash(), | ||
epoch_first_block.height(), | ||
&shard_ids, | ||
)) | ||
} | ||
}; | ||
|
||
if let Some(sync_hash) = | ||
new_chunk_tracker.find_sync_hash(&self.chain, self.epoch_manager.as_ref())? | ||
{ | ||
state_sync_info.sync_hash = Some(sync_hash); | ||
let mut update = self.chain.mut_chain_store().store_update(); | ||
// note that iterate_state_sync_infos() collects everything into a Vec, so we're not | ||
// actually writing to the DB while actively iterating this column | ||
update.add_state_sync_info(StateSyncInfo::V1(state_sync_info.clone())); | ||
// TODO: would be nice to be able to propagate context up the call stack so we can just log | ||
// once at the top with all the info. Otherwise this error will look very cryptic | ||
update.commit()?; | ||
} | ||
Ok(state_sync_info.sync_hash) | ||
} | ||
|
||
/// Find the sync hash. If syncing to the old epoch's state, it's always set. If syncing to | ||
/// the current epoch's state, it might not yet be known, in which case we try to find it. | ||
fn get_catchup_sync_hash( | ||
&mut self, | ||
state_sync_info: &mut StateSyncInfo, | ||
marcelo-gonzalez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
epoch_first_block: &BlockHeader, | ||
) -> Result<Option<CryptoHash>, Error> { | ||
match state_sync_info { | ||
StateSyncInfo::V0(info) => Ok(Some(info.sync_hash)), | ||
StateSyncInfo::V1(info) => self.get_catchup_sync_hash_v1(info, epoch_first_block), | ||
} | ||
} | ||
|
||
/// Walks through all the ongoing state syncs for future epochs and processes them | ||
pub fn run_catchup( | ||
&mut self, | ||
|
@@ -2469,17 +2632,27 @@ impl Client { | |
let _span = debug_span!(target: "sync", "run_catchup").entered(); | ||
let me = signer.as_ref().map(|x| x.validator_id().clone()); | ||
|
||
for (sync_hash, state_sync_info) in self.chain.chain_store().iterate_state_sync_infos()? { | ||
assert_eq!(sync_hash, state_sync_info.epoch_tail_hash); | ||
for (epoch_first_block, mut state_sync_info) in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is too large - can you split it down into smaller methods please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extracted a get_catchup_sync_hash() function in f96f762 |
||
self.chain.chain_store().iterate_state_sync_infos()? | ||
{ | ||
assert_eq!(&epoch_first_block, state_sync_info.epoch_first_block()); | ||
|
||
let state_sync_timeout = self.config.state_sync_timeout; | ||
let block_header = self.chain.get_block(&sync_hash)?.header().clone(); | ||
let block_header = self.chain.get_block(&epoch_first_block)?.header().clone(); | ||
let epoch_id = block_header.epoch_id(); | ||
|
||
let (state_sync, status, blocks_catch_up_state) = | ||
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { | ||
tracing::debug!(target: "client", ?sync_hash, "inserting new state sync"); | ||
( | ||
StateSync::new( | ||
let sync_hash = match self.get_catchup_sync_hash(&mut state_sync_info, &block_header)? { | ||
Some(h) => h, | ||
None => continue, | ||
}; | ||
|
||
let CatchupState { state_sync, sync_status: status, catchup } = self | ||
.catchup_state_syncs | ||
.entry(sync_hash) | ||
.or_insert_with(|| { | ||
tracing::debug!(target: "client", ?epoch_first_block, ?sync_hash, "inserting new state sync"); | ||
CatchupState { | ||
state_sync: StateSync::new( | ||
self.clock.clone(), | ||
self.runtime_adapter.store().clone(), | ||
self.epoch_manager.clone(), | ||
|
@@ -2492,21 +2665,20 @@ impl Client { | |
self.state_sync_future_spawner.clone(), | ||
true, | ||
), | ||
StateSyncStatus { | ||
sync_status: StateSyncStatus { | ||
sync_hash, | ||
sync_status: HashMap::new(), | ||
download_tasks: Vec::new(), | ||
computation_tasks: Vec::new(), | ||
}, | ||
BlocksCatchUpState::new(sync_hash, *epoch_id), | ||
) | ||
catchup: BlocksCatchUpState::new(sync_hash, *epoch_id), | ||
} | ||
}); | ||
|
||
// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours. | ||
debug!(target: "catchup", ?me, ?sync_hash, progress_per_shard = ?status.sync_status, "Catchup"); | ||
|
||
let tracking_shards: Vec<ShardId> = | ||
state_sync_info.shards.iter().map(|tuple| tuple.0).collect(); | ||
state_sync_info.shards().iter().map(|tuple| tuple.0).collect(); | ||
|
||
// Initialize the new shard sync to contain the shards to split at | ||
// first. It will get updated with the shard sync download status | ||
|
@@ -2518,19 +2690,20 @@ impl Client { | |
self.chain.catchup_blocks_step( | ||
&me, | ||
&sync_hash, | ||
blocks_catch_up_state, | ||
catchup, | ||
block_catch_up_task_scheduler, | ||
)?; | ||
|
||
if blocks_catch_up_state.is_finished() { | ||
if catchup.is_finished() { | ||
let mut block_processing_artifacts = BlockProcessingArtifact::default(); | ||
|
||
self.chain.finish_catchup_blocks( | ||
&me, | ||
&epoch_first_block, | ||
&sync_hash, | ||
&mut block_processing_artifacts, | ||
apply_chunks_done_sender.clone(), | ||
&blocks_catch_up_state.done_blocks, | ||
&catchup.done_blocks, | ||
)?; | ||
|
||
self.process_block_processing_artifact(block_processing_artifacts, &signer); | ||
|
@@ -2716,11 +2889,11 @@ impl Client { | |
impl Client { | ||
pub fn get_catchup_status(&self) -> Result<Vec<CatchupStatusView>, near_chain::Error> { | ||
let mut ret = vec![]; | ||
for (sync_hash, (_, shard_sync_state, block_catchup_state)) in | ||
for (sync_hash, CatchupState { sync_status, catchup, .. }) in | ||
self.catchup_state_syncs.iter() | ||
{ | ||
let sync_block_height = self.chain.get_block_header(sync_hash)?.height(); | ||
let shard_sync_status: HashMap<_, _> = shard_sync_state | ||
let shard_sync_status: HashMap<_, _> = sync_status | ||
.sync_status | ||
.iter() | ||
.map(|(shard_id, state)| (*shard_id, state.to_string())) | ||
|
@@ -2729,7 +2902,7 @@ impl Client { | |
sync_block_hash: *sync_hash, | ||
sync_block_height, | ||
shard_sync_status, | ||
blocks_to_catchup: self.chain.get_block_catchup_status(block_catchup_state), | ||
blocks_to_catchup: self.chain.get_block_catchup_status(catchup), | ||
}); | ||
} | ||
Ok(ret) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to split this field into multiple fields (or enum) to differentiate these meanings? it feels like the field being false indicates both we want to apply the chunks and not apply the chunks based on other state such as sync_hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think actually in both cases if this field is false, we don't want to apply the chunks for shards we don't currently track, and this logic should be the same:
nearcore/chain/chain/src/chain.rs
Line 3980 in 82bd46b
I think we probably could split it, but it's a little bit tricky. Let me think about it actually... For now in this PR it is kept as is to not have to touch too many things and possibly break something. the tricky part is that right now we add the first block of the epoch to the
BlocksToCatchup
column based on this field, which is then read to see if we'll need to catch up the next block after this one as well:nearcore/chain/chain/src/chain.rs
Line 2288 in 82bd46b
I guess where that is called maybe we can just call
get_state_sync_info()
again, and also check if catchup is already done, but it requires some care