From 618ef69ecdb6a78bc53b0d59e2c606073ff76e25 Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 23 Jan 2025 23:09:57 -0500 Subject: [PATCH] fix(state-sync): fix issue when untracking and then tracking a shard (#12773) `should_catch_up_shard()` was recently modified to not catch up a shard that we tracked in the previous epoch, because we can just continue applying chunks for it. But `get_should_apply_chunk()` was not aware of this, and didn't properly handle this. In the simplest case, there was no bug, but there would have been a bug if the list of shards we don't currently track but will track in the next epoch contains some shards we used to track and some we didn't. Because in that case, when we apply the first block of the epoch, we mark it as not caught up, but then `get_should_apply_chunk()` treats all these shard IDs equally when deciding what chunks to apply, and just doesn't apply any non-tracked shards. So fix it by consolidating some logic in `EpochManager::cared_about_shard_prev_epoch_from_prev_block()` (similar to the other cares_about_shard* fns there), and then also checking this value in `get_should_apply_chunk()` to get logic consistent with `should_catch_up_shard()` --- chain/chain/src/chain.rs | 72 +++++-------- chain/chain/src/test_utils/kv_runtime.rs | 26 ++++- chain/epoch-manager/src/adapter.rs | 27 ++++- chain/epoch-manager/src/shard_tracker.rs | 55 ++++++++++ .../src/test_loop/tests/state_sync.rs | 101 ++++++++++++++++-- 5 files changed, 222 insertions(+), 59 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index c1b521ef7f7..b1a3a772b6c 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -803,14 +803,12 @@ impl Chain { epoch_id: &EpochId, block_hash: &CryptoHash, prev_hash: &CryptoHash, - prev_prev_hash: &CryptoHash, ) -> Result, Error> { let shards_to_state_sync = Chain::get_shards_to_state_sync( self.epoch_manager.as_ref(), &self.shard_tracker, me, prev_hash, - prev_prev_hash, )?; if shards_to_state_sync.is_empty() { Ok(None) @@ -2411,8 +2409,7 @@ impl Chain { // For the first block of the epoch we check if we need to start download states for // shards that we will care about in the next epoch. If there is no state to be downloaded, // we consider that we are caught up, otherwise not - let state_sync_info = - self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?; + let state_sync_info = self.get_state_sync_info(me, epoch_id, block_hash, prev_hash)?; debug!( target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()), "Checked for shards to sync for epoch T+1 upon processing first block of epoch T" @@ -2445,20 +2442,11 @@ impl Chain { shard_tracker: &ShardTracker, me: &Option, parent_hash: &CryptoHash, - prev_prev_hash: &CryptoHash, ) -> Result, Error> { let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?; let mut shards_to_sync = Vec::new(); for shard_id in epoch_manager.shard_ids(&epoch_id)? { - if Self::should_catch_up_shard( - epoch_manager, - shard_tracker, - me, - &epoch_id, - parent_hash, - prev_prev_hash, - shard_id, - )? { + if Self::should_catch_up_shard(shard_tracker, me, parent_hash, shard_id)? { shards_to_sync.push(shard_id) } } @@ -2474,43 +2462,26 @@ impl Chain { /// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs, /// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync. fn should_catch_up_shard( - _epoch_manager: &dyn EpochManagerAdapter, shard_tracker: &ShardTracker, me: &Option, - epoch_id: &EpochId, - epoch_last_block: &CryptoHash, - _epoch_last_block_prev: &CryptoHash, + prev_hash: &CryptoHash, shard_id: ShardId, ) -> Result { // Won't care about it next epoch, no need to state sync it. - if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) { + if !shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true) { return Ok(false); } // Currently tracking the shard, so no need to state sync it. - if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) { + if shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true) { return Ok(false); } // Now we need to state sync it unless we were tracking the parent in the previous epoch, // in which case we don't need to because we already have the state, and can just continue applying chunks - if epoch_id == &EpochId::default() { - return Ok(true); - } - // let (_layout, parent_shard_id, _index) = - // epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?; - // // Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in - // // the previous epoch, because it is the "parent_hash" of the last block of the previous epoch. - // // TODO: consider refactoring these ShardTracker functions to accept an epoch_id - // // to make this less tricky. - // let tracked_before = shard_tracker.care_about_shard( - // me.as_ref(), - // epoch_last_block_prev, - // parent_shard_id, - // true, - // ); - // TODO(resharding) Uncomment or remove above, and accordingly `_epoch_manager` and `_epoch_last_block_prev`. - Ok(true) + let tracked_before = + shard_tracker.cared_about_shard_in_prev_epoch(me.as_ref(), prev_hash, shard_id, true); + Ok(!tracked_before) } /// Check if any block with missing chunk is ready to be processed and start processing these blocks @@ -3759,10 +3730,17 @@ impl Chain { self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true); let cares_about_shard_next_epoch = self.shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true); + let cared_about_shard_prev_epoch = self.shard_tracker.cared_about_shard_in_prev_epoch( + me.as_ref(), + prev_hash, + shard_id, + true, + ); let should_apply_chunk = get_should_apply_chunk( mode, cares_about_shard_this_epoch, cares_about_shard_next_epoch, + cared_about_shard_prev_epoch, ); let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?; Ok(ShardContext { shard_uid, should_apply_chunk }) @@ -4127,26 +4105,28 @@ fn shard_id_out_of_bounds(shard_id: ShardId) -> Error { /// ApplyChunksMode::NotCaughtUp once with ApplyChunksMode::CatchingUp. Note /// that it does not guard whether the children shards are ready or not, see the /// comments before `need_to_reshard` -// TODO(state-sync): After the changes in https://github.com/near/nearcore/pull/12617, -// this needs to be changed to be aware of what shards can be applied now. Otherwise we have -// a bug in the rare case where we have something like this sequence of tracked shards in consecutive epochs: -// (s0) -> (s1) -> (s0, s2) -// In this case we don't state sync s0 since we already have the state, but we apply chunks with mode `NotCaughtUp` -// in the middle epoch there because we're downloading state for s2. fn get_should_apply_chunk( mode: ApplyChunksMode, cares_about_shard_this_epoch: bool, cares_about_shard_next_epoch: bool, + cared_about_shard_prev_epoch: bool, ) -> bool { match mode { - // next epoch's shard states are not ready, only update this epoch's shards - ApplyChunksMode::NotCaughtUp => cares_about_shard_this_epoch, + // next epoch's shard states are not ready, only update this epoch's shards plus shards we will care about in the future + // and already have state for + ApplyChunksMode::NotCaughtUp => { + cares_about_shard_this_epoch + || (cares_about_shard_next_epoch && cared_about_shard_prev_epoch) + } // update both this epoch and next epoch ApplyChunksMode::IsCaughtUp => cares_about_shard_this_epoch || cares_about_shard_next_epoch, // catching up next epoch's shard states, do not update this epoch's shard state // since it has already been updated through ApplyChunksMode::NotCaughtUp ApplyChunksMode::CatchingUp => { - !cares_about_shard_this_epoch && cares_about_shard_next_epoch + let syncing_shard = !cares_about_shard_this_epoch + && cares_about_shard_next_epoch + && !cared_about_shard_prev_epoch; + syncing_shard } } } diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 866668e8fca..7881f9878df 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -621,7 +621,7 @@ impl EpochManagerAdapter for MockEpochManager { &self, prev_hash: &CryptoHash, shard_id: ShardId, - ) -> Result<(ShardLayout, ShardId, ShardIndex), Error> { + ) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError> { let shard_layout = self.get_shard_layout_from_prev_block(prev_hash)?; // This is not correct if there was a resharding event in between // the previous and current block. @@ -987,6 +987,30 @@ impl EpochManagerAdapter for MockEpochManager { Ok(false) } + fn cared_about_shard_prev_epoch_from_prev_block( + &self, + parent_hash: &CryptoHash, + account_id: &AccountId, + shard_id: ShardId, + ) -> Result { + // This `unwrap` here tests that in all code paths we check that the epoch exists before + // we check if we care about a shard. Please do not remove the unwrap, fix the logic of + // the calling function. + let epoch_valset = self.get_epoch_and_valset(*parent_hash).unwrap(); + let shard_layout = self.get_shard_layout_from_prev_block(parent_hash)?; + let shard_index = shard_layout.get_shard_index(shard_id)?; + let chunk_producers = self.get_chunk_producers( + (epoch_valset.1.wrapping_sub(1)) % self.validators_by_valset.len(), + shard_index, + ); + for validator in chunk_producers { + if validator.account_id() == account_id { + return Ok(true); + } + } + Ok(false) + } + fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result { // Copied from EpochManager (KeyValueRuntime is deprecated anyway). let epoch_id = self.get_epoch_id_from_prev_block(parent_hash)?; diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index 9ccb3ccf4eb..e73fa09d735 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -150,7 +150,7 @@ pub trait EpochManagerAdapter: Send + Sync { &self, prev_hash: &CryptoHash, shard_id: ShardId, - ) -> Result<(ShardLayout, ShardId, ShardIndex), Error>; + ) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError>; /// Get shard layout given hash of previous block. fn get_shard_layout_from_prev_block( @@ -389,6 +389,13 @@ pub trait EpochManagerAdapter: Send + Sync { shard_id: ShardId, ) -> Result; + fn cared_about_shard_prev_epoch_from_prev_block( + &self, + parent_hash: &CryptoHash, + account_id: &AccountId, + shard_id: ShardId, + ) -> Result; + fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result; /// Tries to estimate in which epoch the given height would reside. @@ -639,7 +646,7 @@ impl EpochManagerAdapter for EpochManagerHandle { &self, prev_hash: &CryptoHash, shard_id: ShardId, - ) -> Result<(ShardLayout, ShardId, ShardIndex), Error> { + ) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError> { let shard_layout = self.get_shard_layout_from_prev_block(prev_hash)?; let prev_shard_layout = self.get_shard_layout(&self.get_epoch_id(prev_hash)?)?; let is_resharding_boundary = @@ -955,6 +962,22 @@ impl EpochManagerAdapter for EpochManagerHandle { ) } + // `shard_id` always refers to a shard in the current epoch that the next block from `parent_hash` belongs + // If shard layout changed after the prev epoch, returns true if the account cared about the parent shard + fn cared_about_shard_prev_epoch_from_prev_block( + &self, + parent_hash: &CryptoHash, + account_id: &AccountId, + shard_id: ShardId, + ) -> Result { + let (_layout, parent_shard_id, _index) = + self.get_prev_shard_id_from_prev_hash(parent_hash, shard_id)?; + let prev_epoch_id = self.get_prev_epoch_id_from_prev_block(parent_hash)?; + + let epoch_manager = self.read(); + epoch_manager.cares_about_shard_in_epoch(&prev_epoch_id, account_id, parent_shard_id) + } + fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result { let epoch_manager = self.read(); epoch_manager.will_shard_layout_change(parent_hash) diff --git a/chain/epoch-manager/src/shard_tracker.rs b/chain/epoch-manager/src/shard_tracker.rs index c923922896c..6e08af9bfc3 100644 --- a/chain/epoch-manager/src/shard_tracker.rs +++ b/chain/epoch-manager/src/shard_tracker.rs @@ -121,6 +121,61 @@ impl ShardTracker { self.tracks_shard_at_epoch(shard_id, &epoch_id) } + fn tracks_shard_prev_epoch_from_prev_block( + &self, + shard_id: ShardId, + prev_hash: &CryptoHash, + ) -> Result { + let epoch_id = self.epoch_manager.get_prev_epoch_id_from_prev_block(prev_hash)?; + self.tracks_shard_at_epoch(shard_id, &epoch_id) + } + + /// Whether the client cares about some shard in the previous epoch. + /// * If `account_id` is None, `is_me` is not checked and the + /// result indicates whether the client is tracking the shard + /// * If `account_id` is not None, it is supposed to be a validator + /// account and `is_me` indicates whether we check what shards + /// the client tracks. + // TODO: consolidate all these care_about_shard() functions. This could all be one + // function with an enum arg that tells what epoch we want to check, and one that allows + // passing an epoch ID or a prev hash, or current hash, or whatever. + pub fn cared_about_shard_in_prev_epoch( + &self, + account_id: Option<&AccountId>, + parent_hash: &CryptoHash, + shard_id: ShardId, + is_me: bool, + ) -> bool { + // TODO: fix these unwrap_or here and handle error correctly. The current behavior masks potential errors and bugs + // https://github.com/near/nearcore/issues/4936 + if let Some(account_id) = account_id { + let account_cares_about_shard = self + .epoch_manager + .cared_about_shard_prev_epoch_from_prev_block(parent_hash, account_id, shard_id) + .unwrap_or(false); + if account_cares_about_shard { + // An account has to track this shard because of its validation duties. + return true; + } + if !is_me { + // We don't know how another node is configured. + // It may track all shards, it may track no additional shards. + return false; + } else { + // We have access to the node config. Use the config to find a definite answer. + } + } + match self.tracked_config { + TrackedConfig::AllShards => { + // Avoid looking up EpochId as a performance optimization. + true + } + _ => { + self.tracks_shard_prev_epoch_from_prev_block(shard_id, parent_hash).unwrap_or(false) + } + } + } + /// Whether the client cares about some shard right now. /// * If `account_id` is None, `is_me` is not checked and the /// result indicates whether the client is tracking the shard diff --git a/integration-tests/src/test_loop/tests/state_sync.rs b/integration-tests/src/test_loop/tests/state_sync.rs index 681863cfe1f..d9f9f01fa10 100644 --- a/integration-tests/src/test_loop/tests/state_sync.rs +++ b/integration-tests/src/test_loop/tests/state_sync.rs @@ -19,7 +19,7 @@ use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use crate::test_loop::builder::TestLoopBuilder; use crate::test_loop::env::{TestData, TestLoopEnv}; -use crate::test_loop::utils::transactions::get_anchor_hash; +use crate::test_loop::utils::transactions::{get_anchor_hash, get_smallest_height_head}; use crate::test_loop::utils::ONE_NEAR; use itertools::Itertools; @@ -81,6 +81,7 @@ fn setup_initial_blockchain( generate_shard_accounts: bool, chunks_produced: HashMap>, skip_block_sync_height_delta: Option, + extra_node_shard_schedule: &Option>>, ) -> TestState { let mut builder = TestLoopBuilder::new(); @@ -95,7 +96,22 @@ fn setup_initial_blockchain( } }) .collect::>(); - let clients = validators.iter().map(|v| v.account_id.clone()).collect::>(); + let mut clients = validators.iter().map(|v| v.account_id.clone()).collect::>(); + + if let Some(schedule) = extra_node_shard_schedule.as_ref() { + let idx = clients.len(); + let schedule = schedule.clone(); + clients.push("extra-node".parse().unwrap()); + + builder = builder.config_modifier(move |config, client_index| { + if client_index != idx { + return; + } + + config.tracked_shards = vec![]; + config.tracked_shard_schedule = schedule.clone(); + }); + } let boundary_accounts = get_boundary_accounts(num_shards); let accounts = @@ -288,22 +304,32 @@ fn produce_chunks( loop { env.test_loop.run_until( |data| { - let client = &data.get(&handle).client; - let new_tip = client.chain.head().unwrap(); + let clients = env + .datas + .iter() + .map(|test_data| &data.get(&test_data.client_sender.actor_handle()).client) + .collect_vec(); + let new_tip = get_smallest_height_head(&clients); new_tip.height != tip.height }, timeout, ); - let handle = env.datas[0].client_sender.actor_handle(); - let client = &env.test_loop.data.get(&handle).client; - let new_tip = client.chain.head().unwrap(); - let header = client.chain.get_block_header(&tip.last_block_hash).unwrap(); + let clients = env + .datas + .iter() + .map(|test_data| { + &env.test_loop.data.get(&test_data.client_sender.actor_handle()).client + }) + .collect_vec(); + let new_tip = get_smallest_height_head(&clients); + + let header = clients[0].chain.get_block_header(&tip.last_block_hash).unwrap(); tracing::debug!("chunk mask for #{} {:?}", header.height(), header.chunk_mask()); if new_tip.epoch_id != tip.epoch_id { epoch_id_switches += 1; - if epoch_id_switches > 2 { + if epoch_id_switches > 3 { break; } if let Some(accounts) = accounts.as_mut() { @@ -358,6 +384,7 @@ struct StateSyncTest { // a value of 0 will have us generate a skip on the first block that will probably be the sync_hash, // and a value of 1 will have us skip the one after that. skip_block_sync_height_delta: Option, + extra_node_shard_schedule: Option>>, } static TEST_CASES: &[StateSyncTest] = &[ @@ -370,6 +397,7 @@ static TEST_CASES: &[StateSyncTest] = &[ generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, StateSyncTest { num_validators: 5, @@ -379,6 +407,7 @@ static TEST_CASES: &[StateSyncTest] = &[ generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, // In this test we have 2 validators and 4 shards, and we don't generate any extra accounts. // That makes 3 accounts ncluding the "near" account. This means at least one shard will have no @@ -391,6 +420,7 @@ static TEST_CASES: &[StateSyncTest] = &[ generate_shard_accounts: false, chunks_produced: &[], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, // Now we miss some chunks at the beginning of the epoch StateSyncTest { @@ -406,6 +436,7 @@ static TEST_CASES: &[StateSyncTest] = &[ (ShardId::new(3), &[true]), ], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, StateSyncTest { num_validators: 5, @@ -415,6 +446,7 @@ static TEST_CASES: &[StateSyncTest] = &[ generate_shard_accounts: true, chunks_produced: &[(ShardId::new(0), &[true, false]), (ShardId::new(1), &[true, false])], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, StateSyncTest { num_validators: 5, @@ -427,6 +459,7 @@ static TEST_CASES: &[StateSyncTest] = &[ (ShardId::new(2), &[true, false, true]), ], skip_block_sync_height_delta: None, + extra_node_shard_schedule: None, }, ]; @@ -449,11 +482,51 @@ fn slow_test_state_sync_current_epoch() { .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) .collect(), t.skip_block_sync_height_delta, + &t.extra_node_shard_schedule, ); run_test(state); } } +// This adds an extra node with an explicit tracked shards schedule to test more corner cases. +// Specifically, checking what happens when we stop tracking a shard and then track it again, +// while also needing to state sync another shard. +#[test] +fn test_state_sync_untrack_then_track() { + init_test_logger(); + + let params = StateSyncTest { + num_validators: 5, + num_block_producer_seats: 4, + num_chunk_producer_seats: 4, + num_shards: 5, + generate_shard_accounts: true, + chunks_produced: &[], + skip_block_sync_height_delta: None, + extra_node_shard_schedule: Some(vec![ + vec![ShardId::new(0), ShardId::new(1)], + vec![ShardId::new(0), ShardId::new(1)], + vec![ShardId::new(1), ShardId::new(2)], + vec![ShardId::new(0), ShardId::new(3)], + ]), + }; + let state = setup_initial_blockchain( + params.num_validators, + params.num_block_producer_seats, + params.num_chunk_producer_seats, + params.num_shards, + params.generate_shard_accounts, + params + .chunks_produced + .iter() + .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) + .collect(), + params.skip_block_sync_height_delta, + ¶ms.extra_node_shard_schedule, + ); + run_test(state); +} + // Here we drop the block that's supposed to be the sync hash after the first full epoch, // which causes it to be produced but then skipped on the final chain. If the state sync code // is unaware of the possibility of forks, this will cause the producer of that block to @@ -474,6 +547,7 @@ fn test_state_sync_from_fork() { generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: Some(0), + extra_node_shard_schedule: None, }; let state = setup_initial_blockchain( params.num_validators, @@ -487,6 +561,7 @@ fn test_state_sync_from_fork() { .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) .collect(), params.skip_block_sync_height_delta, + ¶ms.extra_node_shard_schedule, ); run_test(state); } @@ -510,6 +585,7 @@ fn test_state_sync_to_fork() { generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: Some(0), + extra_node_shard_schedule: None, }; let state = setup_initial_blockchain( params.num_validators, @@ -523,6 +599,7 @@ fn test_state_sync_to_fork() { .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) .collect(), params.skip_block_sync_height_delta, + ¶ms.extra_node_shard_schedule, ); run_test(state); } @@ -543,6 +620,7 @@ fn test_state_sync_fork_after_sync() { generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: Some(1), + extra_node_shard_schedule: None, }; let state = setup_initial_blockchain( params.num_validators, @@ -556,6 +634,7 @@ fn test_state_sync_fork_after_sync() { .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) .collect(), params.skip_block_sync_height_delta, + ¶ms.extra_node_shard_schedule, ); run_test(state); } @@ -573,6 +652,7 @@ fn test_state_sync_fork_before_sync() { generate_shard_accounts: true, chunks_produced: &[], skip_block_sync_height_delta: Some(-1), + extra_node_shard_schedule: None, }; let state = setup_initial_blockchain( params.num_validators, @@ -586,6 +666,7 @@ fn test_state_sync_fork_before_sync() { .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) .collect(), params.skip_block_sync_height_delta, + ¶ms.extra_node_shard_schedule, ); run_test(state); } @@ -640,7 +721,7 @@ fn slow_test_state_request() { init_test_logger(); let TestState { mut env, .. } = - setup_initial_blockchain(4, 4, 4, 4, false, HashMap::default(), None); + setup_initial_blockchain(4, 4, 4, 4, false, HashMap::default(), None, &None); spam_state_sync_header_reqs(&mut env); env.shutdown_and_drain_remaining_events(Duration::seconds(3));