From c58d08045582610c039de290e9870ed0b9d181c2 Mon Sep 17 00:00:00 2001 From: wacban Date: Fri, 10 Nov 2023 16:43:25 +0000 Subject: [PATCH] feat(resharding): improved error handling (#10152) - get rid of some unwraps in the resharding implementation - implemented an integration test for the error handling I'm yet to see what actually happens in a real node when such an error occurs. This PR is just a good first step, making sure that the error is correctly propagated to catchup. Ideally I'd like to report any errors in the logs and in the metrics so that the node owner can try to recover. --- chain/chain/src/resharding.rs | 83 +++++++----- .../src/tests/client/resharding.rs | 127 ++++++++++++++++-- 2 files changed, 167 insertions(+), 43 deletions(-) diff --git a/chain/chain/src/resharding.rs b/chain/chain/src/resharding.rs index 520ed70c8ee..820f68822b5 100644 --- a/chain/chain/src/resharding.rs +++ b/chain/chain/src/resharding.rs @@ -19,11 +19,12 @@ use near_primitives::state::FlatStateValue; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{AccountId, ShardId, StateRoot}; use near_store::flat::{ - store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus, + store_helper, BlockInfo, FlatStorageError, FlatStorageManager, FlatStorageReadyStatus, + FlatStorageStatus, }; use near_store::split_state::get_delayed_receipts; use near_store::trie::SnapshotError; -use near_store::{ShardTries, ShardUId, Store, Trie, TrieDBStorage, TrieStorage}; +use near_store::{ShardTries, ShardUId, StorageError, Store, Trie, TrieDBStorage, TrieStorage}; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -114,11 +115,12 @@ struct TrieUpdateBatch { // The batch size is roughly batch_memory_limit. fn get_trie_update_batch( config: &StateSplitConfig, - iter: &mut impl Iterator, Option>)>, -) -> Option { + iter: &mut impl Iterator, Option>), FlatStorageError>>, +) -> Result, FlatStorageError> { let mut size: u64 = 0; let mut entries = Vec::new(); - while let Some((key, value)) = iter.next() { + while let Some(item) = iter.next() { + let (key, value) = item?; size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64); entries.push((key, value)); if size > config.batch_size.as_u64() { @@ -126,9 +128,9 @@ fn get_trie_update_batch( } } if entries.is_empty() { - None + Ok(None) } else { - Some(TrieUpdateBatch { entries, size }) + Ok(Some(TrieUpdateBatch { entries, size })) } } @@ -179,6 +181,18 @@ fn set_flat_storage_state( Ok(()) } +/// Helper function to read the value from flat storage. +/// It either returns the inlined value or reads ref value from the storage. +fn read_flat_state_value( + trie_storage: &TrieDBStorage, + flat_state_value: FlatStateValue, +) -> Vec { + match flat_state_value { + FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(), + FlatStateValue::Inlined(val) => val, + } +} + impl Chain { pub fn build_state_for_split_shards_preprocessing( &self, @@ -282,37 +296,33 @@ impl Chain { let (snapshot_store, flat_storage_manager) = tries .get_state_snapshot(&prev_prev_hash) .map_err(|err| StorageInconsistentState(err.to_string()))?; - let flat_storage_chunk_view = - flat_storage_manager.chunk_view(shard_uid, prev_prev_hash).ok_or_else(|| { - StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string()) - })?; - let flat_storage_iter = - flat_storage_chunk_view.iter_flat_state_entries(None, None).map(|entry| { - let (key, value) = entry.unwrap(); - (key, Some(value)) - }); - + let flat_storage_chunk_view = flat_storage_manager.chunk_view(shard_uid, prev_prev_hash); + let flat_storage_chunk_view = flat_storage_chunk_view.ok_or_else(|| { + StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string()) + })?; + // Get the flat storage iter and wrap the value in Optional::Some to + // match the delta iterator so that they can be chained. + let flat_storage_iter = flat_storage_chunk_view.iter_flat_state_entries(None, None); + let flat_storage_iter = flat_storage_iter.map_ok(|(key, value)| (key, Some(value))); + + // Get the delta iter and wrap the items in Result to match the flat + // storage iter so that they can be chained. let delta = store_helper::get_delta_changes(&snapshot_store, shard_uid, prev_hash) - .map_err(|err| StorageInconsistentState(err.to_string()))? - .ok_or_else(|| { - StorageInconsistentState("Delta missing for snapshot flat storage".to_string()) - })?; + .map_err(|err| StorageInconsistentState(err.to_string()))?; + let delta = delta.ok_or_else(|| { + StorageInconsistentState("Delta missing for snapshot flat storage".to_string()) + })?; let delta_iter = delta.0.into_iter(); + let delta_iter = delta_iter.map(|item| Ok(item)); + // chain the flat storage and flat storage delta iterators + let iter = flat_storage_iter.chain(delta_iter); + + // map the iterator to read the values let trie_storage = TrieDBStorage::new(tries.get_store(), shard_uid); - let flat_state_value_to_trie_value_fn = |value: FlatStateValue| -> Vec { - match value { - FlatStateValue::Ref(ref_value) => { - trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec() - } - FlatStateValue::Inlined(inline_value) => inline_value, - } - }; - let mut iter = flat_storage_iter.chain(delta_iter).map( - move |(key, value)| -> (Vec, Option>) { - (key, value.map(flat_state_value_to_trie_value_fn)) - }, - ); + let iter = iter.map_ok(move |(key, value)| { + (key, value.map(|value| read_flat_state_value(&trie_storage, value))) + }); // function to map account id to shard uid in range of child shards let checked_account_id_to_shard_uid = @@ -322,12 +332,15 @@ impl Chain { let metrics_labels = [shard_uid_string.as_str()]; // Once we build the iterator, we break it into batches using the get_trie_update_batch function. + let mut iter = iter; loop { // Prepare the batch. let batch = { let histogram = RESHARDING_BATCH_PREPARE_TIME.with_label_values(&metrics_labels); let _timer = histogram.start_timer(); - let Some(batch) = get_trie_update_batch(&config, &mut iter) else { break }; + let batch = get_trie_update_batch(&config, &mut iter); + let batch = batch.map_err(Into::::into)?; + let Some(batch) = batch else { break }; batch }; diff --git a/integration-tests/src/tests/client/resharding.rs b/integration-tests/src/tests/client/resharding.rs index c4eab48a117..9ff6e33b907 100644 --- a/integration-tests/src/tests/client/resharding.rs +++ b/integration-tests/src/tests/client/resharding.rs @@ -252,17 +252,25 @@ impl TestReshardingEnv { self.txs_by_height.insert(height, txs); } + fn step( + &mut self, + drop_chunk_condition: &DropChunkCondition, + protocol_version: ProtocolVersion, + ) { + self.step_err(drop_chunk_condition, protocol_version).unwrap(); + } + /// produces and processes the next block also checks that all accounts in /// initial_accounts are intact /// /// allows for changing the protocol version in the middle of the test the /// testing_v2 argument means whether the test should expect the sharding /// layout V2 to be used once the appropriate protocol version is reached - fn step( + fn step_err( &mut self, drop_chunk_condition: &DropChunkCondition, protocol_version: ProtocolVersion, - ) { + ) -> Result<(), near_client::Error> { let env = &mut self.env; let head = env.clients[0].chain.head().unwrap(); let next_height = head.height + 1; @@ -310,10 +318,8 @@ impl TestReshardingEnv { chunk_producer_to_shard_id.entry(validator_id).or_default().push(shard_id); } - // Make sure that catchup is done before the end of each epoch, but when it is done is - // by chance. This simulates when catchup takes a long time to be done - let should_catchup = - self.rng.gen_bool(P_CATCHUP) || (next_height + 1) % self.epoch_length == 0; + let should_catchup = Self::should_catchup(&mut self.rng, self.epoch_length, next_height); + // process block, this also triggers chunk producers for the next block to produce chunks for j in 0..self.num_clients { let client = &mut env.clients[j]; @@ -333,7 +339,7 @@ impl TestReshardingEnv { let block = MaybeValidated::from(block.clone()); client.start_process_block(block, Provenance::NONE, Arc::new(|_| {})).unwrap(); if should_catchup { - run_catchup(client, &[]).unwrap(); + run_catchup(client, &[])?; } while wait_for_all_blocks_in_processing(&mut client.chain) { let (_, errors) = @@ -341,7 +347,7 @@ impl TestReshardingEnv { assert!(errors.is_empty(), "unexpected errors: {:?}", errors); } if should_catchup { - run_catchup(&mut env.clients[j], &[]).unwrap(); + run_catchup(&mut env.clients[j], &[])?; } } @@ -365,6 +371,25 @@ impl TestReshardingEnv { for account_id in self.initial_accounts.iter() { check_account(env, account_id, &block); } + + Ok(()) + } + + fn should_catchup(rng: &mut StdRng, epoch_length: u64, next_height: u64) -> bool { + // Always do catchup before the end of the epoch to ensure it happens at + // least once. Doing it in the very last block triggers some error conditions. + if (next_height + 1) % epoch_length == 0 { + return true; + } + + // Don't do catchup in the very first block of the epoch. This is + // primarily for the error handling test where we want to corrupt the + // databse before catchup happens. + if next_height % epoch_length == 1 { + return false; + } + + rng.gen_bool(P_CATCHUP) } // Submit the tx to all clients for processing and checks: @@ -1583,5 +1608,91 @@ fn test_latest_protocol_missing_chunks_high_missing_prob() { test_latest_protocol_missing_chunks(0.9, 27); } +fn test_shard_layout_upgrade_error_handling_impl( + resharding_type: ReshardingType, + rng_seed: u64, + state_snapshot_enabled: bool, +) { + init_test_logger(); + tracing::info!(target: "test", "test_shard_layout_upgrade_simple_impl starting"); + + let genesis_protocol_version = get_genesis_protocol_version(&resharding_type); + let target_protocol_version = get_target_protocol_version(&resharding_type); + + // setup + let epoch_length = 5; + let mut test_env = TestReshardingEnv::new( + epoch_length, + 2, + 2, + 100, + None, + genesis_protocol_version, + rng_seed, + state_snapshot_enabled, + Some(resharding_type), + ); + test_env.set_init_tx(vec![]); + + let mut nonce = 100; + let genesis_hash = *test_env.env.clients[0].chain.genesis_block().hash(); + let mut all_accounts: HashSet<_> = test_env.initial_accounts.clone().into_iter().collect(); + let mut accounts_to_check: Vec<_> = vec![]; + let initial_accounts = test_env.initial_accounts.clone(); + + // add transactions until after sharding upgrade finishes + for height in 2..3 * epoch_length { + let txs = generate_create_accounts_txs( + &mut test_env.rng, + genesis_hash, + &initial_accounts, + &mut accounts_to_check, + &mut all_accounts, + &mut nonce, + 10, + true, + ); + + test_env.set_tx_at_height(height, txs); + } + + let drop_chunk_condition = DropChunkCondition::new(); + for _ in 1..4 * epoch_length { + let result = test_env.step_err(&drop_chunk_condition, target_protocol_version); + if let Err(err) = result { + tracing::info!(target: "test", ?err, "step failed, as expected"); + return; + } + + // corrupt the state snapshot if available to make resharding fail + currupt_state_snapshot(&test_env); + } + + assert!(false, "no error was recorded, something is wrong in error handling"); +} + +fn currupt_state_snapshot(test_env: &TestReshardingEnv) { + let tries = test_env.env.clients[0].runtime_adapter.get_tries(); + let Ok(snapshot_hash) = tries.get_state_snapshot_hash() else { return }; + let (store, flat_storage_manager) = tries.get_state_snapshot(&snapshot_hash).unwrap(); + let shard_uids = flat_storage_manager.get_shard_uids(); + let mut store_update = store.store_update(); + for shard_uid in shard_uids { + flat_storage_manager.remove_flat_storage_for_shard(shard_uid, &mut store_update).unwrap(); + } + store_update.commit().unwrap(); +} + +#[test] +fn test_shard_layout_upgrade_error_handling_v1() { + test_shard_layout_upgrade_error_handling_impl(ReshardingType::V1, 42, false); +} + +#[cfg(feature = "protocol_feature_simple_nightshade_v2")] +#[test] +fn test_shard_layout_upgrade_error_handling_v2() { + test_shard_layout_upgrade_error_handling_impl(ReshardingType::V2, 42, false); +} + // TODO(resharding) add a test with missing blocks // TODO(resharding) add a test with deleting accounts and delayed receipts check