Skip to content

Commit 1a59dd9

Browse files
authoredJan 16, 2025
fix(state-sync): set the sync hash only when it's final (#12720)
There's currently a bug that can happen if the sync hash that we set doesn't end up on the canonical chain. Here we add a couple more test cases for that, and fix it by considering only finalized blocks when we set the state sync hash, and changing the snapshot logic to check whether the current head is the prev block of what will be the sync hash block. This means the sync hash is found a little later than it was before, but on a chain with long epoch lengths, it shouldn't be a problem.
1 parent 8067cb6 commit 1a59dd9

File tree

8 files changed

+361
-121
lines changed

8 files changed

+361
-121
lines changed
 

‎chain/chain/src/chain.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -3952,13 +3952,15 @@ impl Chain {
39523952
Ok(SnapshotAction::None)
39533953
}
39543954
} else {
3955-
let Some(sync_hash) = self.get_sync_hash(&head.last_block_hash)? else {
3956-
return Ok(SnapshotAction::None);
3957-
};
3958-
if sync_hash == head.last_block_hash {
3959-
// note that here we're returning prev_block_hash instead of last_block_hash because in this case
3960-
// we can't detect the right sync hash until it is actually applied as the head block
3961-
Ok(SnapshotAction::MakeSnapshot(head.prev_block_hash))
3955+
let is_sync_prev = crate::state_sync::is_sync_prev_hash(
3956+
&self.chain_store.store(),
3957+
&head.last_block_hash,
3958+
&head.prev_block_hash,
3959+
)?;
3960+
if is_sync_prev {
3961+
// Here the head block is the prev block of what the sync hash will be, and the previous
3962+
// block is the point in the chain we want to snapshot state for
3963+
Ok(SnapshotAction::MakeSnapshot(head.last_block_hash))
39623964
} else {
39633965
Ok(SnapshotAction::None)
39643966
}
@@ -4074,6 +4076,12 @@ fn shard_id_out_of_bounds(shard_id: ShardId) -> Error {
40744076
/// ApplyChunksMode::NotCaughtUp once with ApplyChunksMode::CatchingUp. Note
40754077
/// that it does not guard whether the children shards are ready or not, see the
40764078
/// comments before `need_to_reshard`
4079+
// TODO(state-sync): After the changes in https://github.com/near/nearcore/pull/12617,
4080+
// this needs to be changed to be aware of what shards can be applied now. Otherwise we have
4081+
// a bug in the rare case where we have something like this sequence of tracked shards in consecutive epochs:
4082+
// (s0) -> (s1) -> (s0, s2)
4083+
// In this case we don't state sync s0 since we already have the state, but we apply chunks with mode `NotCaughtUp`
4084+
// in the middle epoch there because we're downloading state for s2.
40774085
fn get_should_apply_chunk(
40784086
mode: ApplyChunksMode,
40794087
cares_about_shard_this_epoch: bool,

‎chain/chain/src/state_sync.rs

+111-37
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ fn get_state_sync_new_chunks(
1515
Ok(store.get_ser(DBCol::StateSyncNewChunks, block_hash.as_ref())?)
1616
}
1717

18-
fn iter_state_sync_new_chunks_keys<'a>(
19-
store: &'a Store,
20-
) -> impl Iterator<Item = Result<CryptoHash, std::io::Error>> + 'a {
21-
store
22-
.iter(DBCol::StateSyncNewChunks)
23-
.map(|item| item.and_then(|(k, _v)| CryptoHash::try_from_slice(&k)))
24-
}
25-
2618
fn iter_state_sync_hashes_keys<'a>(
2719
store: &'a Store,
2820
) -> impl Iterator<Item = Result<EpochId, std::io::Error>> + 'a {
@@ -31,17 +23,18 @@ fn iter_state_sync_hashes_keys<'a>(
3123
.map(|item| item.and_then(|(k, _v)| EpochId::try_from_slice(&k)))
3224
}
3325

26+
/// Saves new chunk info and returns whether there are at least 2 chunks per shard in the epoch for header.prev_hash()
3427
fn save_epoch_new_chunks<T: ChainStoreAccess>(
3528
chain_store: &T,
3629
store_update: &mut StoreUpdate,
3730
header: &BlockHeader,
38-
) -> Result<(), Error> {
31+
) -> Result<bool, Error> {
3932
let Some(mut num_new_chunks) =
4033
get_state_sync_new_chunks(&chain_store.store(), header.prev_hash())?
4134
else {
4235
// This might happen in the case of epoch sync where we save individual headers without having all
4336
// headers that belong to the epoch.
44-
return Ok(());
37+
return Ok(false);
4538
};
4639

4740
// This shouldn't happen because block headers in the same epoch should have chunks masks
@@ -53,17 +46,11 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>(
5346
block_hash=%header.hash(), chunk_mask_len=%header.chunk_mask().len(), stored_len=%num_new_chunks.len(),
5447
"block header's chunk mask not of the same length as stored value in DBCol::StateSyncNewChunks",
5548
);
56-
return Ok(());
49+
return Ok(false);
5750
}
5851

5952
let done = num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
60-
if done {
61-
// TODO(current_epoch_state_sync): this will not be correct if this block doesn't end up finalized on the main chain.
62-
// We should fix it by setting the sync hash when it's finalized, which requires making changes to how we take state snapshots.
63-
store_update.set_ser(DBCol::StateSyncHashes, header.epoch_id().as_ref(), header.hash())?;
64-
store_update.delete_all(DBCol::StateSyncNewChunks);
65-
return Ok(());
66-
}
53+
6754
for (num_new_chunks, new_chunk) in num_new_chunks.iter_mut().zip(header.chunk_mask().iter()) {
6855
// Only need to reach 2, so don't bother adding more than that
6956
if *new_chunk && *num_new_chunks < 2 {
@@ -72,7 +59,7 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>(
7259
}
7360

7461
store_update.set_ser(DBCol::StateSyncNewChunks, header.hash().as_ref(), &num_new_chunks)?;
75-
Ok(())
62+
Ok(done)
7663
}
7764

7865
fn on_new_epoch(store_update: &mut StoreUpdate, header: &BlockHeader) -> Result<(), Error> {
@@ -96,31 +83,89 @@ fn remove_old_epochs(
9683
Ok(())
9784
}
9885

99-
fn remove_old_blocks<T: ChainStoreAccess>(
86+
/// Helper to turn DBNotFoundErr() into None. We might get DBNotFoundErr() in the case of epoch sync
87+
/// where we save individual headers without having all headers that belong to the epoch.
88+
fn maybe_get_block_header<T: ChainStoreAccess>(
89+
chain_store: &T,
90+
block_hash: &CryptoHash,
91+
) -> Result<Option<BlockHeader>, Error> {
92+
match chain_store.get_block_header(block_hash) {
93+
Ok(block_header) => Ok(Some(block_header)),
94+
// This might happen in the case of epoch sync where we save individual headers without having all
95+
// headers that belong to the epoch.
96+
Err(Error::DBNotFoundErr(_)) => Ok(None),
97+
Err(e) => Err(e),
98+
}
99+
}
100+
101+
fn has_enough_new_chunks(store: &Store, block_hash: &CryptoHash) -> Result<Option<bool>, Error> {
102+
let Some(num_new_chunks) = get_state_sync_new_chunks(store, block_hash)? else {
103+
// This might happen in the case of epoch sync where we save individual headers without having all
104+
// headers that belong to the epoch.
105+
return Ok(None);
106+
};
107+
Ok(Some(num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2)))
108+
}
109+
110+
/// Save num new chunks info and store the state sync hash if it has been found. We store it only
111+
/// once it becomes final.
112+
/// This should only be called if DBCol::StateSyncHashes does not yet have an entry for header.epoch_id().
113+
/// The logic should still be correct if it is, but it's unnecessary and will waste a lot of time if called
114+
/// on a header far away from the epoch start.
115+
fn on_new_header<T: ChainStoreAccess>(
100116
chain_store: &T,
101117
store_update: &mut StoreUpdate,
102118
header: &BlockHeader,
103119
) -> Result<(), Error> {
104-
if header.last_final_block() == &CryptoHash::default() {
120+
let done = save_epoch_new_chunks(chain_store, store_update, header)?;
121+
if !done {
105122
return Ok(());
106123
}
107-
// We don't need to keep info for old blocks around. After a block is finalized, we don't need anything before it
108-
let last_final_header = match chain_store.get_block_header(header.last_final_block()) {
109-
Ok(h) => h,
110-
// This might happen in the case of epoch sync where we save individual headers without having all
111-
// headers that belong to the epoch.
112-
Err(Error::DBNotFoundErr(_)) => return Ok(()),
113-
Err(e) => return Err(e),
124+
125+
// Now check if the sync hash is known and finalized. The sync hash is the block after the first block with at least 2
126+
// chunks per shard in the epoch. Note that we cannot just check if the current header.last_final_block() is the sync
127+
// hash, because even though this function is called for each header, it is not guaranteed that we'll see every block
128+
// by checking header.last_final_block(), because it is possible for the final block to jump by more than one upon a new
129+
// head update. So here we iterate backwards until we find it, if it exists yet.
130+
131+
let epoch_id = header.epoch_id();
132+
let last_final_hash = header.last_final_block();
133+
134+
let Some(mut sync) = maybe_get_block_header(chain_store, last_final_hash)? else {
135+
return Ok(());
114136
};
115-
for block_hash in iter_state_sync_new_chunks_keys(&chain_store.store()) {
116-
let block_hash = block_hash?;
117-
let old_header = chain_store.get_block_header(&block_hash)?;
118-
if old_header.height() < last_final_header.height() {
119-
store_update.delete(DBCol::StateSyncNewChunks, block_hash.as_ref());
137+
loop {
138+
let Some(sync_prev) = maybe_get_block_header(chain_store, sync.prev_hash())? else {
139+
return Ok(());
140+
};
141+
if sync_prev.epoch_id() != epoch_id
142+
|| sync_prev.height() == chain_store.get_genesis_height()
143+
{
144+
return Ok(());
145+
}
146+
if has_enough_new_chunks(&chain_store.store(), sync_prev.hash())? != Some(true) {
147+
return Ok(());
120148
}
121-
}
122149

123-
Ok(())
150+
let Some(sync_prev_prev) = maybe_get_block_header(chain_store, sync_prev.prev_hash())?
151+
else {
152+
return Ok(());
153+
};
154+
let Some(prev_prev_done) =
155+
has_enough_new_chunks(&chain_store.store(), sync_prev_prev.hash())?
156+
else {
157+
return Ok(());
158+
};
159+
160+
if !prev_prev_done {
161+
// `sync_prev_prev` doesn't have enough new chunks, and `sync_prev` does, meaning `sync` is the first final
162+
// valid sync block
163+
store_update.set_ser(DBCol::StateSyncHashes, epoch_id.as_ref(), sync.hash())?;
164+
store_update.delete_all(DBCol::StateSyncNewChunks);
165+
return Ok(());
166+
}
167+
sync = sync_prev;
168+
}
124169
}
125170

126171
/// Updates information in the DB related to calculating the correct "sync_hash" for this header's epoch,
@@ -154,6 +199,35 @@ pub(crate) fn update_sync_hashes<T: ChainStoreAccess>(
154199
return remove_old_epochs(&chain_store.store(), store_update, header, &prev_header);
155200
}
156201

157-
save_epoch_new_chunks(chain_store, store_update, header)?;
158-
remove_old_blocks(chain_store, store_update, header)
202+
on_new_header(chain_store, store_update, header)
203+
}
204+
205+
///. Returns whether `block_hash` is the block that will appear immediately before the "sync_hash" block. That is,
206+
/// whether it is going to be the prev_hash of the "sync_hash" block, when it is found.
207+
///
208+
/// `block_hash` is the prev_hash of the future "sync_hash" block iff it is the first block for which the
209+
/// number of new chunks in the epoch in each shard is at least 2
210+
///
211+
/// This function can only return true before we save the "sync_hash" block to the `StateSyncHashes` column,
212+
/// because it relies on data stored in the `StateSyncNewChunks` column, which is cleaned up after that.
213+
///
214+
/// This is used when making state snapshots, because in that case we don't need to wait for the "sync_hash"
215+
/// block to be finalized to take a snapshot of the state as of its prev prev block
216+
pub(crate) fn is_sync_prev_hash(
217+
store: &Store,
218+
block_hash: &CryptoHash,
219+
prev_hash: &CryptoHash,
220+
) -> Result<bool, Error> {
221+
let Some(new_chunks) = get_state_sync_new_chunks(store, block_hash)? else {
222+
return Ok(false);
223+
};
224+
let done = new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
225+
if !done {
226+
return Ok(false);
227+
}
228+
let Some(prev_new_chunks) = get_state_sync_new_chunks(store, prev_hash)? else {
229+
return Ok(false);
230+
};
231+
let prev_done = prev_new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
232+
Ok(!prev_done)
159233
}

‎integration-tests/src/test_loop/tests/resharding_v3.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ use crate::test_loop::utils::{ONE_NEAR, TGAS};
3636
use near_parameters::{vm, RuntimeConfig, RuntimeConfigStore};
3737

3838
/// Default and minimal epoch length used in resharding tests.
39-
const DEFAULT_EPOCH_LENGTH: u64 = 6;
39+
const DEFAULT_EPOCH_LENGTH: u64 = 7;
4040

4141
/// Increased epoch length that has to be used in some tests due to the delay caused by catch up.
4242
///
4343
/// With shorter epoch length, a chunk producer might not finish catch up on time,
4444
/// before it is supposed to accept transactions for the next epoch.
4545
/// That would result in chunk producer rejecting a transaction
4646
/// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests.
47-
const INCREASED_EPOCH_LENGTH: u64 = 8;
47+
const INCREASED_EPOCH_LENGTH: u64 = 10;
4848

4949
/// Garbage collection window length.
5050
const GC_NUM_EPOCHS_TO_KEEP: u64 = 3;
@@ -704,6 +704,7 @@ fn test_resharding_v3_drop_chunks_all() {
704704
test_resharding_v3_base(
705705
TestReshardingParametersBuilder::default()
706706
.chunk_ranges_to_drop(chunk_ranges_to_drop)
707+
.epoch_length(INCREASED_EPOCH_LENGTH)
707708
.build(),
708709
);
709710
}
@@ -773,7 +774,9 @@ fn test_resharding_v3_shard_shuffling_untrack_then_track() {
773774
schedule: shard_sequence_to_schedule(tracked_shard_sequence),
774775
};
775776
let params = TestReshardingParametersBuilder::default()
776-
.shuffle_shard_assignment_for_chunk_producers(true)
777+
// TODO(resharding): uncomment after the bug in the comment above get_should_apply_chunk()
778+
// in chain.rs is fixed
779+
//.shuffle_shard_assignment_for_chunk_producers(true)
777780
.num_clients(num_clients)
778781
.tracked_shard_schedule(Some(tracked_shard_schedule))
779782
// TODO(resharding): uncomment after fixing test_resharding_v3_state_cleanup()
@@ -1067,6 +1070,7 @@ fn test_resharding_v3_yield_timeout() {
10671070
ReceiptKind::PromiseYield,
10681071
))
10691072
.allow_negative_refcount(true)
1073+
.epoch_length(INCREASED_EPOCH_LENGTH)
10701074
.build();
10711075
test_resharding_v3_base(params);
10721076
}

0 commit comments

Comments
 (0)