diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 4059996a814..ea574264f55 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -507,7 +507,7 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } - // TODO(reshardingV3) Revisit this function, probably it is not needed anymore. + // TODO(resharding) Revisit this function, probably it is not needed anymore. fn get_shard_uids_to_gc( &mut self, epoch_manager: &dyn EpochManagerAdapter, diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 82b757f211f..5d3bd24e3fd 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -147,7 +147,7 @@ impl ReshardingManager { ) -> io::Result<()> { let mut store_update = self.store.trie_store().store_update(); let parent_shard_uid = split_shard_event.parent_shard; - // TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding? + // TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding? for child_shard_uid in split_shard_event.children_shards() { store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); } diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index bedec4578ea..1671acdeef1 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -173,6 +173,20 @@ impl<'a> TrieStoreUpdateAdapter<'a> { } } +/// Get the `ShardUId` mapping for child_shard_uid. If the mapping does not exist, map the shard to itself. +/// Used by Resharding V3 for State mapping. +/// +/// It is kept out of `TrieStoreAdapter`, so that `TrieStoreUpdateAdapter` can use it without +/// cloning `store` each time, see https://github.com/near/nearcore/pull/12232#discussion_r1804810508. +pub fn get_shard_uid_mapping(store: &Store, child_shard_uid: ShardUId) -> ShardUId { + store + .get_ser::(DBCol::StateShardUIdMapping, &child_shard_uid.to_bytes()) + .unwrap_or_else(|_| { + panic!("get_shard_uid_mapping() failed for child_shard_uid = {}", child_shard_uid) + }) + .unwrap_or(child_shard_uid) +} + /// Constructs db key to be used to access the State column. /// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix /// to its ancestor in the resharding tree (according to Resharding V3) @@ -186,10 +200,7 @@ fn get_key_from_shard_uid_and_hash( shard_uid: ShardUId, hash: &CryptoHash, ) -> [u8; 40] { - let mapped_shard_uid = store - .get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()) - .expect("get_key_from_shard_uid_and_hash() failed") - .unwrap_or(shard_uid); + let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid); let mut key = [0; 40]; key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes()); key[8..].copy_from_slice(hash.as_ref()); diff --git a/core/store/src/archive/cold_storage.rs b/core/store/src/archive/cold_storage.rs index 72c9ef32031..20f6a2035af 100644 --- a/core/store/src/archive/cold_storage.rs +++ b/core/store/src/archive/cold_storage.rs @@ -1,3 +1,4 @@ +use crate::adapter::trie_store::get_shard_uid_mapping; use crate::columns::DBKeyType; use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY}; use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges}; @@ -57,8 +58,7 @@ struct BatchTransaction { } /// Updates provided cold database from provided hot store with information about block at `height`. -/// Returns if the block was copied (false only if height is not present in `hot_store`). -/// Block as `height` has to be final. +/// Block at `height` has to be final and present in `hot_store`. /// /// First, we read from hot store information necessary /// to determine all the keys that need to be updated in cold db. @@ -80,22 +80,29 @@ pub fn update_cold_db( hot_store: &Store, shard_layout: &ShardLayout, height: &BlockHeight, + is_last_block_in_epoch: bool, num_threads: usize, -) -> io::Result { +) -> io::Result<()> { let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height); let _timer = metrics::COLD_COPY_DURATION.start_timer(); - if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() { - return Ok(false); - } - let height_key = height.to_le_bytes(); let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?; let block_hash_key = block_hash_vec.as_slice(); let key_type_to_keys = get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?; - let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::>(); + let columns_to_update = DBCol::iter() + .filter(|col| { + if !col.is_cold() { + return false; + } + if col == &DBCol::StateShardUIdMapping && !is_last_block_in_epoch { + return false; + } + true + }) + .collect::>(); // Create new thread pool with `num_threads`. rayon::ThreadPoolBuilder::new() @@ -103,8 +110,8 @@ pub fn update_cold_db( .build() .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))? .install(|| { - cold_columns - .into_par_iter() // Process every cold column as a separate task in thread pool in parallel. + columns_to_update + .into_par_iter() // Process cold columns to update as a separate task in thread pool in parallel. // Copy column to cold db. .map(|col: DBCol| -> io::Result<()> { if col == DBCol::State { @@ -127,8 +134,7 @@ pub fn update_cold_db( }, ) })?; - - Ok(true) + Ok(()) } // Correctly set the key and value on DBTransaction, taking reference counting @@ -189,9 +195,10 @@ fn copy_state_from_store( let Some(trie_changes) = trie_changes else { continue }; total_keys += trie_changes.insertions().len(); + let mapped_shard_uid_key = get_shard_uid_mapping(hot_store, shard_uid).to_bytes(); for op in trie_changes.insertions() { - // TODO(reshardingV3) Handle shard_uid not mapped there - let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); + // TODO(resharding) Test it properly. Currently this path is not triggered in testloop. + let key = join_two_keys(&mapped_shard_uid_key, op.hash().as_bytes()); let value = op.payload().to_vec(); total_size += value.len(); @@ -343,7 +350,7 @@ pub fn copy_all_data_to_cold( tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold"); return Ok(CopyAllDataToColdStatus::Interrupted); } - // TODO(reshardingV3) Should do mapping here? + // TODO(resharding) Should do mapping here? let (key, value) = result?; transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?; } diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 9f6f20957a0..a6a514ac735 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -462,7 +462,6 @@ impl DBCol { | DBCol::StateHeaders | DBCol::TransactionResultForBlock | DBCol::Transactions - // TODO(reshardingV3) How the mapping will work with split storage? | DBCol::StateShardUIdMapping => true, // TODO diff --git a/core/store/src/trie/mem/parallel_loader.rs b/core/store/src/trie/mem/parallel_loader.rs index ee0a4ddee41..92969ef07d4 100644 --- a/core/store/src/trie/mem/parallel_loader.rs +++ b/core/store/src/trie/mem/parallel_loader.rs @@ -191,7 +191,7 @@ impl ParallelMemTrieLoader { arena: &mut impl ArenaMut, ) -> Result { // Figure out which range corresponds to the prefix of this subtree. - // TODO(reshardingV3) This seems fragile, potentially does not work with mapping. + // TODO(resharding) This seems fragile, potentially does not work with mapping. let (start, end) = subtree_to_load.to_iter_range(self.shard_uid); // Load all the keys in this range from the FlatState column. diff --git a/integration-tests/src/test_loop/tests/max_receipt_size.rs b/integration-tests/src/test_loop/tests/max_receipt_size.rs index bf43d348924..9d386a608bd 100644 --- a/integration-tests/src/test_loop/tests/max_receipt_size.rs +++ b/integration-tests/src/test_loop/tests/max_receipt_size.rs @@ -22,6 +22,7 @@ fn slow_test_max_receipt_size() { let account0: AccountId = "account0".parse().unwrap(); let account0_signer = &create_user_test_signer(&account0).into(); + let rpc_id = "account4".parse().unwrap(); // We can't test receipt limit by submitting large transactions because we hit the transaction size limit // before hitting the receipt size limit. @@ -33,7 +34,7 @@ fn slow_test_max_receipt_size() { get_shared_block_hash(&env.datas, &env.test_loop), ); let large_tx_exec_res = - execute_tx(&mut env.test_loop, large_tx, &env.datas, Duration::seconds(5)); + execute_tx(&mut env.test_loop, &rpc_id, large_tx, &env.datas, Duration::seconds(5)); assert_matches!(large_tx_exec_res, Err(InvalidTxError::TransactionSizeExceeded { .. })); // Let's test it by running a contract that generates a large receipt. @@ -44,7 +45,7 @@ fn slow_test_max_receipt_size() { &account0_signer, get_shared_block_hash(&env.datas, &env.test_loop), ); - run_tx(&mut env.test_loop, deploy_contract_tx, &env.datas, Duration::seconds(5)); + run_tx(&mut env.test_loop, &rpc_id, deploy_contract_tx, &env.datas, Duration::seconds(5)); // Calling generate_large_receipt({"account_id": "account0", "method_name": "noop", "total_args_size": 3000000}) // will generate a receipt that has ~3_000_000 bytes. It'll be a single receipt with multiple FunctionCall actions. @@ -60,7 +61,7 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - run_tx(&mut env.test_loop, large_receipt_tx, &env.datas, Duration::seconds(5)); + run_tx(&mut env.test_loop, &rpc_id, large_receipt_tx, &env.datas, Duration::seconds(5)); // Generating a receipt that is 5 MB should fail, it's above the receipt size limit. let too_large_receipt_tx = SignedTransaction::call( @@ -74,9 +75,14 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - let too_large_receipt_tx_exec_res = - execute_tx(&mut env.test_loop, too_large_receipt_tx, &env.datas, Duration::seconds(5)) - .unwrap(); + let too_large_receipt_tx_exec_res = execute_tx( + &mut env.test_loop, + &rpc_id, + too_large_receipt_tx, + &env.datas, + Duration::seconds(5), + ) + .unwrap(); match too_large_receipt_tx_exec_res.status { FinalExecutionStatus::Failure(TxExecutionError::ActionError(action_error)) => { @@ -111,7 +117,7 @@ fn slow_test_max_receipt_size() { 300 * TGAS, get_shared_block_hash(&env.datas, &env.test_loop), ); - let sum_4_res = run_tx(&mut env.test_loop, sum_4_tx, &env.datas, Duration::seconds(5)); + let sum_4_res = run_tx(&mut env.test_loop, &rpc_id, sum_4_tx, &env.datas, Duration::seconds(5)); assert_eq!(sum_4_res, 10u64.to_le_bytes().to_vec()); env.shutdown_and_drain_remaining_events(Duration::seconds(20)); diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 631af6bdc98..e8d26852880 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -3,17 +3,20 @@ use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle}; use near_async::time::Duration; use near_chain_configs::test_genesis::{TestGenesisBuilder, ValidatorsSpec}; use near_chain_configs::DEFAULT_GC_NUM_EPOCHS_TO_KEEP; +use near_client::Query; use near_o11y::testonly::init_test_logger; use near_primitives::epoch_manager::EpochConfigStore; use near_primitives::shard_layout::ShardLayout; -use near_primitives::types::{AccountId, BlockHeightDelta, Gas, ShardId, ShardIndex}; +use near_primitives::types::{ + AccountId, BlockHeightDelta, BlockId, BlockReference, Gas, ShardId, ShardIndex, +}; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use rand::seq::SliceRandom; use rand::Rng; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use std::cell::Cell; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use crate::test_loop::builder::TestLoopBuilder; @@ -26,8 +29,8 @@ use crate::test_loop::utils::sharding::{ next_block_has_new_shard_layout, print_and_assert_shard_accounts, }; use crate::test_loop::utils::transactions::{ - get_anchor_hash, get_next_nonce, get_shared_block_hash, get_smallest_height_head, run_tx, - store_and_submit_tx, submit_tx, + check_txs, create_account, delete_account, deploy_contract, get_anchor_hash, get_next_nonce, + get_node_data, get_smallest_height_head, store_and_submit_tx, submit_tx, }; use crate::test_loop::utils::trie_sanity::{ check_state_shard_uid_mapping_after_resharding, TrieSanityCheck, @@ -39,7 +42,7 @@ use near_crypto::Signer; use near_parameters::{vm, RuntimeConfig, RuntimeConfigStore}; use near_primitives::test_utils::create_user_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::views::FinalExecutionStatus; +use near_primitives::views::{FinalExecutionStatus, QueryRequest}; #[derive(derive_builder::Builder)] #[builder(pattern = "owned", build_fn(skip))] @@ -47,7 +50,7 @@ use near_primitives::views::FinalExecutionStatus; struct TestReshardingParameters { chunk_ranges_to_drop: HashMap>, num_accounts: u64, - num_clients: u64, + num_validators: u64, #[builder(setter(skip))] accounts: Vec, #[builder(setter(skip))] @@ -55,6 +58,8 @@ struct TestReshardingParameters { base_shard_layout_version: u64, #[builder(setter(skip))] block_and_chunk_producers: Vec, + rpc_clients: Vec, + archival_clients: HashSet, initial_balance: u128, epoch_length: BlockHeightDelta, shuffle_shard_assignment_for_chunk_producers: bool, @@ -85,8 +90,9 @@ struct TestReshardingParameters { impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { + // TODO(resharding) Test chunk validators, and maybe more RPC / archival nodes. let num_accounts = self.num_accounts.unwrap_or(8); - let num_clients = self.num_clients.unwrap_or(3); + let num_validators = self.num_validators.unwrap_or(3); // When there's a resharding task delay and single-shard tracking, the delay might be pushed out // even further because the resharding task might have to wait for the state snapshot to be made // before it can proceed, which might mean that flat storage won't be ready for the child shard for a whole epoch. @@ -96,18 +102,18 @@ impl TestReshardingParametersBuilder { .unwrap_or_else(|| self.delay_flat_state_resharding.map_or(6, |delay| delay + 7)); // #12195 prevents number of BPs bigger than `epoch_length`. - assert!(num_clients > 0 && num_clients <= epoch_length); + assert!(num_validators > 0 && num_validators <= epoch_length); let accounts = Self::compute_initial_accounts(num_accounts); - // This piece of code creates `num_clients` from `accounts`. First client is at index 0 and - // other clients are spaced in the accounts' space as evenly as possible. - let clients_per_account = num_clients as f64 / accounts.len() as f64; - let mut client_parts = 1.0 - clients_per_account; - let clients: Vec<_> = accounts + // This piece of code creates `num_validators` from `accounts`. First validator is at index 0 and + // other validator are spaced in the accounts' space as evenly as possible. + let validators_per_account = num_validators as f64 / num_accounts as f64; + let mut client_parts = 1.0 - validators_per_account; + let block_and_chunk_producers: Vec<_> = accounts .iter() .filter(|_| { - client_parts += clients_per_account; + client_parts += validators_per_account; if client_parts >= 1.0 { client_parts -= 1.0; true @@ -118,16 +124,29 @@ impl TestReshardingParametersBuilder { .cloned() .collect(); - let block_and_chunk_producers = clients.clone(); + let non_validator_accounts: Vec<_> = accounts + .iter() + .filter(|account| !block_and_chunk_producers.contains(account)) + .collect(); + assert!(non_validator_accounts.len() >= 2); + let archival_clients = vec![non_validator_accounts[0].clone()]; + let rpc_clients = vec![non_validator_accounts[1].clone()]; + let clients = + vec![block_and_chunk_producers.clone(), archival_clients.clone(), rpc_clients.clone()] + .into_iter() + .flatten() + .collect(); TestReshardingParameters { chunk_ranges_to_drop: self.chunk_ranges_to_drop.unwrap_or_default(), num_accounts, - num_clients, + num_validators, accounts, clients, base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2), block_and_chunk_producers, + archival_clients: HashSet::from_iter(archival_clients.into_iter()), + rpc_clients, initial_balance: self.initial_balance.unwrap_or(1_000_000 * ONE_NEAR), epoch_length, shuffle_shard_assignment_for_chunk_producers: self @@ -208,7 +227,7 @@ fn execute_money_transfers(account_ids: Vec) -> LoopActionFn { const NUM_TRANSFERS_PER_BLOCK: usize = 20; let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. let rpc_id: AccountId = "account0".parse().unwrap(); let seed = rand::thread_rng().gen::(); println!("Random seed: {}", seed); @@ -277,8 +296,8 @@ fn call_burn_gas_contract( let nonce = Cell::new(102); let txs = Cell::new(vec![]); let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. - let rpc_id: AccountId = "account0".parse().unwrap(); + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. + let rpc_id = "account0".parse().unwrap(); Box::new( move |node_datas: &[TestData], @@ -366,7 +385,7 @@ fn call_promise_yield( let resharding_height: Cell> = Cell::new(None); let txs = Cell::new(vec![]); let latest_height = Cell::new(0); - // TODO: to be fixed when all shard tracking gets disabled. + // TODO(resharding) Make it work with the RPC from TestReshardingParameters. let rpc_id: AccountId = "account0".parse().unwrap(); let promise_txs_sent = Cell::new(false); let nonce = Cell::new(102); @@ -499,6 +518,37 @@ fn get_base_shard_layout(version: u64) -> ShardLayout { } } +// After resharding and gc-period, assert the deleted `account_id` +// is still accessible through archival node view client, +// and it is not accessible through a regular, RPC node. +fn check_deleted_account_availability( + env: &mut TestLoopEnv, + archival_id: &AccountId, + rpc_id: &AccountId, + account_id: AccountId, + height: u64, +) { + let archival_node_data = get_node_data(&env.datas, &archival_id); + let rpc_node_data = get_node_data(&env.datas, &rpc_id); + let archival_view_client_handle = archival_node_data.view_client_sender.actor_handle(); + let rpc_view_client_handle = rpc_node_data.view_client_sender.actor_handle(); + + let block_reference = BlockReference::BlockId(BlockId::Height(height)); + let request = QueryRequest::ViewAccount { account_id }; + let msg = Query::new(block_reference, request); + + let archival_node_result = { + let view_client = env.test_loop.data.get_mut(&archival_view_client_handle); + near_async::messaging::Handler::handle(view_client, msg.clone()) + }; + let rpc_node_result = { + let view_client = env.test_loop.data.get_mut(&rpc_view_client_handle); + near_async::messaging::Handler::handle(view_client, msg) + }; + assert!(archival_node_result.is_ok()); + assert!(!rpc_node_result.is_ok()); +} + /// Base setup to check sanity of Resharding V3. /// TODO(#11881): add the following scenarios: /// - Nodes must not track all shards. State sync must succeed. @@ -530,6 +580,10 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_epoch_config_store.get_config(base_protocol_version).as_ref().clone(); base_epoch_config.shuffle_shard_assignment_for_chunk_producers = params.shuffle_shard_assignment_for_chunk_producers; + // TODO(resharding) Test chunk validators too (would need to change the lines below). + base_epoch_config.num_block_producer_seats = params.num_validators; + base_epoch_config.num_chunk_producer_seats = params.num_validators; + base_epoch_config.num_chunk_validator_seats = params.num_validators; if !params.chunk_ranges_to_drop.is_empty() { base_epoch_config.block_producer_kickout_threshold = 0; base_epoch_config.chunk_producer_kickout_threshold = 0; @@ -543,7 +597,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { let parent_shard_uid = base_shard_layout.account_id_to_shard_uid(&new_boundary_account); let mut epoch_config = base_epoch_config.clone(); epoch_config.shard_layout = - ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account); + ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account.clone()); tracing::info!(target: "test", ?base_shard_layout, new_shard_layout=?epoch_config.shard_layout, "shard layout"); let expected_num_shards = epoch_config.shard_layout.num_shards(); @@ -593,10 +647,14 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { builder = builder.runtime_config_store(runtime_config_store); } - let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = builder + let archival_id = params.archival_clients.iter().next().unwrap().clone(); + let rpc_id = params.rpc_clients[0].clone(); + + let mut env = builder .genesis(genesis) .epoch_config_store(epoch_config_store) .clients(params.clients) + .archival_clients(params.archival_clients) .load_mem_tries_for_tracked_shards(params.load_mem_tries_for_tracked_shards) .drop_protocol_upgrade_chunks( base_protocol_version + 1, @@ -604,26 +662,48 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { ) .build(); + let mut test_setup_transactions = vec![]; for contract_id in ¶ms.deploy_test_contract { - let signer = &create_user_test_signer(&contract_id).into(); - let deploy_contract_tx = SignedTransaction::deploy_contract( - 101, - &contract_id, + let deploy_contract_tx = deploy_contract( + &mut env.test_loop, + &env.datas, + &rpc_id, + contract_id, near_test_contracts::rs_contract().into(), - &signer, - get_shared_block_hash(&node_datas, &test_loop), + 1, ); - run_tx(&mut test_loop, deploy_contract_tx, &node_datas, Duration::seconds(5)); + test_setup_transactions.push(deploy_contract_tx); } + // Create an account that is: + // 1) Subaccount of a future resharding boundary account. + // 2) Temporary, because we will remove it after resharding. + // The goal is to test removing some state and see if it is kept on archival node. + // The secondary goal is to catch potential bugs due to the above two conditions making it a special case. + let temporary_account = + format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap(); + let create_account_tx = create_account( + &mut env, + &rpc_id, + &new_boundary_account, + &temporary_account, + 10 * ONE_NEAR, + 2, + ); + test_setup_transactions.push(create_account_tx); + + // Wait for the test setup transactions to settle and ensure they all succeeded. + env.test_loop.run_for(Duration::seconds(2)); + check_txs(&env.test_loop, &env.datas, &rpc_id, &test_setup_transactions); + let client_handles = - node_datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec(); + env.datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec(); #[cfg(feature = "test_features")] { if params.delay_flat_state_resharding > 0 { client_handles.iter().for_each(|handle| { - let client = &mut test_loop.data.get_mut(handle).client; + let client = &mut env.test_loop.data.get_mut(handle).client; client.chain.resharding_manager.flat_storage_resharder.adv_task_delay_by_blocks = params.delay_flat_state_resharding; }); @@ -631,7 +711,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { } let clients = - client_handles.iter().map(|handle| &test_loop.data.get(handle).client).collect_vec(); + client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec(); let mut trie_sanity_check = TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards); @@ -640,7 +720,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { params .loop_actions .iter() - .for_each(|action| action(&node_datas, test_loop_data, client_handles[0].clone())); + .for_each(|action| action(&env.datas, test_loop_data, client_handles[0].clone())); let clients = client_handles.iter().map(|handle| &test_loop_data.get(handle).client).collect_vec(); @@ -679,19 +759,30 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { return true; }; - test_loop.run_until( + env.test_loop.run_until( success_condition, // Give enough time to produce ~7 epochs. Duration::seconds((7 * params.epoch_length) as i64), ); - let client = &test_loop.data.get(&client_handles[0]).client; + let client = &env.test_loop.data.get(&client_handles[0]).client; trie_sanity_check.check_epochs(client); - // Wait for garbage collection to kick in, so that it is tested as well. - test_loop + let height_after_resharding = latest_block_height.get(); + + // Delete `temporary_account`. + delete_account(&mut env, &rpc_id, &temporary_account, &rpc_id); + // Wait for garbage collection to kick in. + env.test_loop .run_for(Duration::seconds((DEFAULT_GC_NUM_EPOCHS_TO_KEEP * params.epoch_length) as i64)); + // Check that the deleted account is still accessible at archival node, but not at a regular node. + check_deleted_account_availability( + &mut env, + &archival_id, + &rpc_id, + temporary_account, + height_after_resharding, + ); - TestLoopEnv { test_loop, datas: node_datas, tempdir } - .shutdown_and_drain_remaining_events(Duration::seconds(20)); + env.shutdown_and_drain_remaining_events(Duration::seconds(20)); } #[test] @@ -746,7 +837,7 @@ fn test_resharding_v3_drop_chunks_all() { fn test_resharding_v3_resharding_block_in_fork() { test_resharding_v3_base( TestReshardingParametersBuilder::default() - .num_clients(1) + .num_validators(1) .add_loop_action(fork_before_resharding_block(false)) .build(), ); @@ -761,7 +852,7 @@ fn test_resharding_v3_resharding_block_in_fork() { fn test_resharding_v3_double_sign_resharding_block() { test_resharding_v3_base( TestReshardingParametersBuilder::default() - .num_clients(1) + .num_validators(1) .add_loop_action(fork_before_resharding_block(true)) .build(), ); diff --git a/integration-tests/src/test_loop/utils/transactions.rs b/integration-tests/src/test_loop/utils/transactions.rs index 7bcd6d32e80..0f1049ea34d 100644 --- a/integration-tests/src/test_loop/utils/transactions.rs +++ b/integration-tests/src/test_loop/utils/transactions.rs @@ -168,7 +168,8 @@ pub fn do_create_account( amount: u128, ) { tracing::info!(target: "test", "Creating account."); - let tx = create_account(env, rpc_id, originator, new_account_id, amount); + let nonce = get_next_nonce(&env.test_loop.data, &env.datas, originator); + let tx = create_account(env, rpc_id, originator, new_account_id, amount, nonce); env.test_loop.run_for(Duration::seconds(5)); check_txs(&env.test_loop, &env.datas, rpc_id, &[tx]); } @@ -228,12 +229,11 @@ pub fn create_account( originator: &AccountId, new_account_id: &AccountId, amount: u128, + nonce: u64, ) -> CryptoHash { let block_hash = get_shared_block_hash(&env.datas, &env.test_loop); - - let nonce = get_next_nonce(&env.test_loop.data, &env.datas, originator); - let signer = create_user_test_signer(&originator).into(); - let new_signer: Signer = create_user_test_signer(&new_account_id).into(); + let signer = create_user_test_signer(&originator); + let new_signer: Signer = create_user_test_signer(&new_account_id); let tx = SignedTransaction::create_account( nonce, @@ -408,18 +408,19 @@ pub fn get_node_data<'a>(node_datas: &'a [TestData], account_id: &AccountId) -> return node_data; } } - panic!("RPC client not found"); + panic!("client not found"); } /// Run a transaction until completion and assert that the result is "success". /// Returns the transaction result. pub fn run_tx( test_loop: &mut TestLoopV2, + rpc_id: &AccountId, tx: SignedTransaction, node_datas: &[TestData], maximum_duration: Duration, ) -> Vec { - let tx_res = execute_tx(test_loop, tx, node_datas, maximum_duration).unwrap(); + let tx_res = execute_tx(test_loop, rpc_id, tx, node_datas, maximum_duration).unwrap(); assert_matches!(tx_res.status, FinalExecutionStatus::SuccessValue(_)); match tx_res.status { FinalExecutionStatus::SuccessValue(res) => res, @@ -462,14 +463,12 @@ pub fn run_txs_parallel( /// For valid transactions returns the execution result (which could have an execution error inside, check it!). pub fn execute_tx( test_loop: &mut TestLoopV2, + rpc_id: &AccountId, tx: SignedTransaction, node_datas: &[TestData], maximum_duration: Duration, ) -> Result { - // Last node is usually the rpc node - let rpc_node_id = node_datas.len().checked_sub(1).unwrap(); - - let client_sender = &node_datas[rpc_node_id].client_sender; + let client_sender = &get_node_data(node_datas, rpc_id).client_sender; let future_spawner = test_loop.future_spawner(); let mut tx_runner = TransactionRunner::new(tx, true); @@ -477,7 +476,7 @@ pub fn execute_tx( let mut res = None; test_loop.run_until( |tl_data| { - let client = &tl_data.get(&node_datas[rpc_node_id].client_sender.actor_handle()).client; + let client = &tl_data.get(&client_sender.actor_handle()).client; match tx_runner.poll(client_sender, client, &future_spawner) { Poll::Pending => false, Poll::Ready(tx_res) => { diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index 64e05437bf2..0512eb602b4 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -147,9 +147,12 @@ fn test_storage_after_commit_of_cold_update() { let client = &env.clients[0]; let client_store = client.runtime_adapter.store(); - let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(); + let epoch_id = client.epoch_manager.get_epoch_id(block.hash()).unwrap(); let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap(); - update_cold_db(cold_db, &client_store, &shard_layout, &height, 4).unwrap(); + let is_last_block_in_epoch = + client.epoch_manager.is_next_block_epoch_start(block.hash()).unwrap(); + update_cold_db(cold_db, &client_store, &shard_layout, &height, is_last_block_in_epoch, 4) + .unwrap(); last_hash = *block.hash(); } @@ -281,14 +284,22 @@ fn test_cold_db_copy_with_height_skips() { }; let client = &env.clients[0]; - let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(); + let hot_store = client.runtime_adapter.store(); + let block_hash = + hot_store.get_ser::(DBCol::BlockHeight, &height.to_le_bytes()).unwrap(); + let Some(block) = block else { + assert!(block_hash.is_none()); + continue; + }; + let block_hash = block_hash.unwrap(); + assert!(&block_hash == block.hash()); + let epoch_id = client.epoch_manager.get_epoch_id(&block_hash).unwrap(); let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap(); - update_cold_db(&cold_db, &client.runtime_adapter.store(), &shard_layout, &height, 1) + let is_last_block_in_epoch = + client.epoch_manager.is_next_block_epoch_start(&block_hash).unwrap(); + update_cold_db(&cold_db, hot_store, &shard_layout, &height, is_last_block_in_epoch, 1) .unwrap(); - - if block.is_some() { - last_hash = *block.unwrap().hash(); - } + last_hash = block_hash; } // We still need to filter out one chunk diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index bb4f2d29379..ec486b6366c 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -1464,11 +1464,21 @@ fn test_archival_gc_common( let header = block.header(); let epoch_id = header.epoch_id(); let shard_layout = env.clients[0].epoch_manager.get_shard_layout(epoch_id).unwrap(); + let is_last_block_in_epoch = + env.clients[0].epoch_manager.is_next_block_epoch_start(header.hash()).unwrap(); blocks.push(block); if i <= max_cold_head_height { - update_cold_db(storage.cold_db().unwrap(), hot_store, &shard_layout, &i, 1).unwrap(); + update_cold_db( + storage.cold_db().unwrap(), + hot_store, + &shard_layout, + &i, + is_last_block_in_epoch, + 1, + ) + .unwrap(); update_cold_head(storage.cold_db().unwrap(), &hot_store, &i).unwrap(); } } diff --git a/nearcore/src/cold_storage.rs b/nearcore/src/cold_storage.rs index b465bb60864..a868ce8184b 100644 --- a/nearcore/src/cold_storage.rs +++ b/nearcore/src/cold_storage.rs @@ -113,20 +113,8 @@ fn cold_store_copy( return Ok(ColdStoreCopyResult::NoBlockCopied); } - // Here it should be sufficient to just read from hot storage. - // Because BlockHeight is never garbage collectable and is not even copied to cold. - let cold_head_hash = - hot_store.get_ser::(DBCol::BlockHeight, &cold_head_height.to_le_bytes())?; - let cold_head_hash = - cold_head_hash.ok_or(ColdStoreError::ColdHeadHashReadError { cold_head_height })?; - - // The previous block is the cold head so we can use it to get epoch id. - let epoch_id = epoch_manager.get_epoch_id_from_prev_block(&cold_head_hash)?; - let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; - let mut next_height = cold_head_height + 1; - while !update_cold_db(cold_db, hot_store, &shard_layout, &next_height, num_threads)? { - next_height += 1; + let next_height_block_hash = loop { if next_height > hot_final_head_height { return Err(ColdStoreError::SkippedBlocksBetweenColdHeadAndNextHeightError { cold_head_height, @@ -134,8 +122,28 @@ fn cold_store_copy( hot_final_head_height, }); } - } - + // Here it should be sufficient to just read from hot storage. + // Because BlockHeight is never garbage collectable and is not even copied to cold. + let next_height_block_hash = + hot_store.get_ser::(DBCol::BlockHeight, &next_height.to_le_bytes())?; + if let Some(next_height_block_hash) = next_height_block_hash { + break next_height_block_hash; + } + next_height = next_height + 1; + }; + // The next block hash exists in hot store so we can use it to get epoch id. + let epoch_id = epoch_manager.get_epoch_id(&next_height_block_hash)?; + let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; + let is_last_block_in_epoch = + epoch_manager.is_next_block_epoch_start(&next_height_block_hash)?; + update_cold_db( + cold_db, + hot_store, + &shard_layout, + &next_height, + is_last_block_in_epoch, + num_threads, + )?; update_cold_head(cold_db, hot_store, &next_height)?; let result = if next_height >= hot_final_head_height { diff --git a/tools/cold-store/src/cli.rs b/tools/cold-store/src/cli.rs index 651ab4069ab..7fc2c99ea50 100644 --- a/tools/cold-store/src/cli.rs +++ b/tools/cold-store/src/cli.rs @@ -215,24 +215,26 @@ fn copy_next_block(store: &NodeStorage, config: &NearConfig, epoch_manager: &Epo // Here it should be sufficient to just read from hot storage. // Because BlockHeight is never garbage collectable and is not even copied to cold. - let cold_head_hash = get_ser_from_store::( + let next_height_block_hash = get_ser_from_store::( &store.get_hot_store(), DBCol::BlockHeight, - &cold_head_height.to_le_bytes(), + &next_height.to_le_bytes(), ) - .unwrap_or_else(|| panic!("No block hash in hot storage for height {}", cold_head_height)); + .unwrap_or_else(|| panic!("No block hash in hot storage for height {}", next_height)); // For copying block we need to have shard_layout. // For that we need epoch_id. - // For that we might use prev_block_hash, and because next_hight = cold_head_height + 1, - // we use cold_head_hash. + // For that we might use the hash of the block. + let epoch_id = &epoch_manager.get_epoch_id(&next_height_block_hash).unwrap(); + let shard_layout = &epoch_manager.get_shard_layout(epoch_id).unwrap(); + let is_last_block_in_epoch = + epoch_manager.is_next_block_epoch_start(&next_height_block_hash).unwrap(); update_cold_db( &*store.cold_db().unwrap(), &store.get_hot_store(), - &epoch_manager - .get_shard_layout(&epoch_manager.get_epoch_id_from_prev_block(&cold_head_hash).unwrap()) - .unwrap(), + shard_layout, &next_height, + is_last_block_in_epoch, 1, ) .unwrap_or_else(|_| panic!("Failed to copy block at height {} to cold db", next_height));