Skip to content

Commit

Permalink
fix(resharding): catchup children shards to one block before the fina…
Browse files Browse the repository at this point in the history
…l chain (#12557)

PR to fix a failure in the resharding test loop.

The wrong assumption was that flat storage for children shards should
catchup deltas up to and including the last final block. This creates a
mismatch in the state between memtries and flat storage in the first
block after resharding is completed; to put it simply, flat storage was
ahead of one block and it would return in sync with memtries after the
next chunk application.

The fix is to apply deltas only up to the previous final block, like it
is done in `Chain`.

Also adding a test (which currently fails) combining single shard
tracking and "longer" resharding.
  • Loading branch information
Trisfald authored Dec 6, 2024
1 parent ebb4717 commit 999bb8a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
47 changes: 34 additions & 13 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,15 @@ impl FlatStorageResharder {
batch_id = ?num_batches_done)
.entered();
let chain_final_head = chain_store.final_head()?;

// If we reached the desired new flat head, we can terminate the delta application step.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
return Ok((
num_batches_done,
Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?),
));
}

let mut merged_changes = FlatStateChanges::default();
let store = self.runtime.store().flat_store();
let mut store_update = store.store_update();
Expand All @@ -651,8 +660,8 @@ impl FlatStorageResharder {
height <= chain_final_head.height,
"flat head: {flat_head_block_hash}"
);
// Stop if we reached chain final head.
if flat_head_block_hash == chain_final_head.last_block_hash {
// Stop if we reached the desired new flat head.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
break;
}
flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?;
Expand All @@ -677,11 +686,6 @@ impl FlatStorageResharder {
store_update.commit()?;
num_batches_done += 1;

// If we reached chain final head, we can terminate the delta application step.
if flat_head_block_hash == chain_final_head.last_block_hash {
return Ok((num_batches_done, chain_final_head));
}

// Sleep between batches in order to throttle resharding and leave some resource for the
// regular node operation.
std::thread::sleep(batch_delay);
Expand Down Expand Up @@ -988,6 +992,23 @@ fn copy_kv_to_left_child(
store_update.set(split_params.left_child_shard, key, value);
}

/// Returns `true` if a flat head at `flat_head_block_hash` has reached the necessary height to be
/// considered in sync with the chain.
///
/// Observations:
/// - as a result of delta application during parent split, if the resharding is extremely fast the
/// flat head might be already on the last final block.
/// - the new flat head candidate is the previous block hash of the final head as stated in
/// `Chain::get_new_flat_storage_head`.
/// - this method assumes the flat head is never beyond the final chain.
fn is_flat_head_on_par_with_chain(
flat_head_block_hash: &CryptoHash,
chain_final_head: &Tip,
) -> bool {
*flat_head_block_hash == chain_final_head.prev_block_hash
|| *flat_head_block_hash == chain_final_head.last_block_hash
}

/// Struct to describe, perform and track progress of a flat storage resharding.
#[derive(Clone, Debug)]
pub enum FlatStorageReshardingEventStatus {
Expand Down Expand Up @@ -2226,16 +2247,16 @@ mod tests {

// Check shards flat storage status.
let flat_store = resharder.runtime.store().flat_store();
let last_final_block = chain.get_block_by_height(NUM_BLOCKS - 2).unwrap();
let prev_last_final_block = chain.get_block_by_height(NUM_BLOCKS - 3).unwrap();
assert_eq!(flat_store.get_flat_storage_status(parent_shard), Ok(FlatStorageStatus::Empty));
for child_shard in [left_child_shard, right_child_shard] {
assert_eq!(
flat_store.get_flat_storage_status(child_shard),
Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: BlockInfo {
hash: *last_final_block.hash(),
height: last_final_block.header().height(),
prev_hash: *last_final_block.header().prev_hash()
hash: *prev_last_final_block.hash(),
height: prev_last_final_block.header().height(),
prev_hash: *prev_last_final_block.header().prev_hash()
}
}))
);
Expand All @@ -2247,8 +2268,8 @@ mod tests {
}
// Children flat storages should contain the new accounts created through the deltas
// application.
// Flat store will contain only changes from final blocks.
for height in 1..NUM_BLOCKS - 1 {
// Flat store will only contain changes until the previous final block.
for height in 1..NUM_BLOCKS - 2 {
let new_account_left_child = account!(format!("oo{}", height));
assert_eq!(
flat_store.get(
Expand Down
17 changes: 14 additions & 3 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,9 +1190,20 @@ fn test_resharding_v3_load_mem_trie() {
}

#[test]
// TODO(resharding): fix nearcore and un-ignore this test
#[cfg_attr(not(feature = "test_features"), ignore)]
fn test_resharding_v3_slower_post_processing_tasks() {
test_resharding_v3_base(TestReshardingParameters::new().delay_flat_state_resharding(2));
}

#[test]
// TODO(resharding): fix nearcore and change the ignore condition
// #[cfg_attr(not(feature = "test_features"), ignore)]
#[ignore]
fn test_resharding_v3_slower_post_processing_tasks() {
test_resharding_v3_base(TestReshardingParameters::new().delay_flat_state_resharding(3));
fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() {
let params = TestReshardingParameters::new()
.shuffle_shard_assignment()
.single_shard_tracking()
.chunk_miss_possible()
.delay_flat_state_resharding(2);
test_resharding_v3_base(params);
}

0 comments on commit 999bb8a

Please sign in to comment.