Skip to content

Commit

Permalink
feat(resharding): improved error handling (#10152)
Browse files Browse the repository at this point in the history
- get rid of some unwraps in the resharding implementation
- implemented an integration test for the error handling


I'm yet to see what actually happens in a real node when such an error
occurs. This PR is just a good first step, making sure that the error is
correctly propagated to catchup. Ideally I'd like to report any errors
in the logs and in the metrics so that the node owner can try to
recover.
  • Loading branch information
wacban authored Nov 10, 2023
1 parent a32d8a2 commit c58d080
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 43 deletions.
83 changes: 48 additions & 35 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use near_primitives::state::FlatStateValue;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{AccountId, ShardId, StateRoot};
use near_store::flat::{
store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
store_helper, BlockInfo, FlatStorageError, FlatStorageManager, FlatStorageReadyStatus,
FlatStorageStatus,
};
use near_store::split_state::get_delayed_receipts;
use near_store::trie::SnapshotError;
use near_store::{ShardTries, ShardUId, Store, Trie, TrieDBStorage, TrieStorage};
use near_store::{ShardTries, ShardUId, StorageError, Store, Trie, TrieDBStorage, TrieStorage};
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
Expand Down Expand Up @@ -114,21 +115,22 @@ struct TrieUpdateBatch {
// The batch size is roughly batch_memory_limit.
fn get_trie_update_batch(
config: &StateSplitConfig,
iter: &mut impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> Option<TrieUpdateBatch> {
iter: &mut impl Iterator<Item = Result<(Vec<u8>, Option<Vec<u8>>), FlatStorageError>>,
) -> Result<Option<TrieUpdateBatch>, FlatStorageError> {
let mut size: u64 = 0;
let mut entries = Vec::new();
while let Some((key, value)) = iter.next() {
while let Some(item) = iter.next() {
let (key, value) = item?;
size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64);
entries.push((key, value));
if size > config.batch_size.as_u64() {
break;
}
}
if entries.is_empty() {
None
Ok(None)
} else {
Some(TrieUpdateBatch { entries, size })
Ok(Some(TrieUpdateBatch { entries, size }))
}
}

Expand Down Expand Up @@ -179,6 +181,18 @@ fn set_flat_storage_state(
Ok(())
}

/// Helper function to read the value from flat storage.
/// It either returns the inlined value or reads ref value from the storage.
fn read_flat_state_value(
trie_storage: &TrieDBStorage,
flat_state_value: FlatStateValue,
) -> Vec<u8> {
match flat_state_value {
FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(),
FlatStateValue::Inlined(val) => val,
}
}

impl Chain {
pub fn build_state_for_split_shards_preprocessing(
&self,
Expand Down Expand Up @@ -282,37 +296,33 @@ impl Chain {
let (snapshot_store, flat_storage_manager) = tries
.get_state_snapshot(&prev_prev_hash)
.map_err(|err| StorageInconsistentState(err.to_string()))?;
let flat_storage_chunk_view =
flat_storage_manager.chunk_view(shard_uid, prev_prev_hash).ok_or_else(|| {
StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string())
})?;
let flat_storage_iter =
flat_storage_chunk_view.iter_flat_state_entries(None, None).map(|entry| {
let (key, value) = entry.unwrap();
(key, Some(value))
});

let flat_storage_chunk_view = flat_storage_manager.chunk_view(shard_uid, prev_prev_hash);
let flat_storage_chunk_view = flat_storage_chunk_view.ok_or_else(|| {
StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string())
})?;
// Get the flat storage iter and wrap the value in Optional::Some to
// match the delta iterator so that they can be chained.
let flat_storage_iter = flat_storage_chunk_view.iter_flat_state_entries(None, None);
let flat_storage_iter = flat_storage_iter.map_ok(|(key, value)| (key, Some(value)));

// Get the delta iter and wrap the items in Result to match the flat
// storage iter so that they can be chained.
let delta = store_helper::get_delta_changes(&snapshot_store, shard_uid, prev_hash)
.map_err(|err| StorageInconsistentState(err.to_string()))?
.ok_or_else(|| {
StorageInconsistentState("Delta missing for snapshot flat storage".to_string())
})?;
.map_err(|err| StorageInconsistentState(err.to_string()))?;
let delta = delta.ok_or_else(|| {
StorageInconsistentState("Delta missing for snapshot flat storage".to_string())
})?;
let delta_iter = delta.0.into_iter();
let delta_iter = delta_iter.map(|item| Ok(item));

// chain the flat storage and flat storage delta iterators
let iter = flat_storage_iter.chain(delta_iter);

// map the iterator to read the values
let trie_storage = TrieDBStorage::new(tries.get_store(), shard_uid);
let flat_state_value_to_trie_value_fn = |value: FlatStateValue| -> Vec<u8> {
match value {
FlatStateValue::Ref(ref_value) => {
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec()
}
FlatStateValue::Inlined(inline_value) => inline_value,
}
};
let mut iter = flat_storage_iter.chain(delta_iter).map(
move |(key, value)| -> (Vec<u8>, Option<Vec<u8>>) {
(key, value.map(flat_state_value_to_trie_value_fn))
},
);
let iter = iter.map_ok(move |(key, value)| {
(key, value.map(|value| read_flat_state_value(&trie_storage, value)))
});

// function to map account id to shard uid in range of child shards
let checked_account_id_to_shard_uid =
Expand All @@ -322,12 +332,15 @@ impl Chain {
let metrics_labels = [shard_uid_string.as_str()];

// Once we build the iterator, we break it into batches using the get_trie_update_batch function.
let mut iter = iter;
loop {
// Prepare the batch.
let batch = {
let histogram = RESHARDING_BATCH_PREPARE_TIME.with_label_values(&metrics_labels);
let _timer = histogram.start_timer();
let Some(batch) = get_trie_update_batch(&config, &mut iter) else { break };
let batch = get_trie_update_batch(&config, &mut iter);
let batch = batch.map_err(Into::<StorageError>::into)?;
let Some(batch) = batch else { break };
batch
};

Expand Down
127 changes: 119 additions & 8 deletions integration-tests/src/tests/client/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,25 @@ impl TestReshardingEnv {
self.txs_by_height.insert(height, txs);
}

fn step(
&mut self,
drop_chunk_condition: &DropChunkCondition,
protocol_version: ProtocolVersion,
) {
self.step_err(drop_chunk_condition, protocol_version).unwrap();
}

/// produces and processes the next block also checks that all accounts in
/// initial_accounts are intact
///
/// allows for changing the protocol version in the middle of the test the
/// testing_v2 argument means whether the test should expect the sharding
/// layout V2 to be used once the appropriate protocol version is reached
fn step(
fn step_err(
&mut self,
drop_chunk_condition: &DropChunkCondition,
protocol_version: ProtocolVersion,
) {
) -> Result<(), near_client::Error> {
let env = &mut self.env;
let head = env.clients[0].chain.head().unwrap();
let next_height = head.height + 1;
Expand Down Expand Up @@ -310,10 +318,8 @@ impl TestReshardingEnv {
chunk_producer_to_shard_id.entry(validator_id).or_default().push(shard_id);
}

// Make sure that catchup is done before the end of each epoch, but when it is done is
// by chance. This simulates when catchup takes a long time to be done
let should_catchup =
self.rng.gen_bool(P_CATCHUP) || (next_height + 1) % self.epoch_length == 0;
let should_catchup = Self::should_catchup(&mut self.rng, self.epoch_length, next_height);

// process block, this also triggers chunk producers for the next block to produce chunks
for j in 0..self.num_clients {
let client = &mut env.clients[j];
Expand All @@ -333,15 +339,15 @@ impl TestReshardingEnv {
let block = MaybeValidated::from(block.clone());
client.start_process_block(block, Provenance::NONE, Arc::new(|_| {})).unwrap();
if should_catchup {
run_catchup(client, &[]).unwrap();
run_catchup(client, &[])?;
}
while wait_for_all_blocks_in_processing(&mut client.chain) {
let (_, errors) =
client.postprocess_ready_blocks(Arc::new(|_| {}), should_produce_chunk);
assert!(errors.is_empty(), "unexpected errors: {:?}", errors);
}
if should_catchup {
run_catchup(&mut env.clients[j], &[]).unwrap();
run_catchup(&mut env.clients[j], &[])?;
}
}

Expand All @@ -365,6 +371,25 @@ impl TestReshardingEnv {
for account_id in self.initial_accounts.iter() {
check_account(env, account_id, &block);
}

Ok(())
}

fn should_catchup(rng: &mut StdRng, epoch_length: u64, next_height: u64) -> bool {
// Always do catchup before the end of the epoch to ensure it happens at
// least once. Doing it in the very last block triggers some error conditions.
if (next_height + 1) % epoch_length == 0 {
return true;
}

// Don't do catchup in the very first block of the epoch. This is
// primarily for the error handling test where we want to corrupt the
// databse before catchup happens.
if next_height % epoch_length == 1 {
return false;
}

rng.gen_bool(P_CATCHUP)
}

// Submit the tx to all clients for processing and checks:
Expand Down Expand Up @@ -1583,5 +1608,91 @@ fn test_latest_protocol_missing_chunks_high_missing_prob() {
test_latest_protocol_missing_chunks(0.9, 27);
}

fn test_shard_layout_upgrade_error_handling_impl(
resharding_type: ReshardingType,
rng_seed: u64,
state_snapshot_enabled: bool,
) {
init_test_logger();
tracing::info!(target: "test", "test_shard_layout_upgrade_simple_impl starting");

let genesis_protocol_version = get_genesis_protocol_version(&resharding_type);
let target_protocol_version = get_target_protocol_version(&resharding_type);

// setup
let epoch_length = 5;
let mut test_env = TestReshardingEnv::new(
epoch_length,
2,
2,
100,
None,
genesis_protocol_version,
rng_seed,
state_snapshot_enabled,
Some(resharding_type),
);
test_env.set_init_tx(vec![]);

let mut nonce = 100;
let genesis_hash = *test_env.env.clients[0].chain.genesis_block().hash();
let mut all_accounts: HashSet<_> = test_env.initial_accounts.clone().into_iter().collect();
let mut accounts_to_check: Vec<_> = vec![];
let initial_accounts = test_env.initial_accounts.clone();

// add transactions until after sharding upgrade finishes
for height in 2..3 * epoch_length {
let txs = generate_create_accounts_txs(
&mut test_env.rng,
genesis_hash,
&initial_accounts,
&mut accounts_to_check,
&mut all_accounts,
&mut nonce,
10,
true,
);

test_env.set_tx_at_height(height, txs);
}

let drop_chunk_condition = DropChunkCondition::new();
for _ in 1..4 * epoch_length {
let result = test_env.step_err(&drop_chunk_condition, target_protocol_version);
if let Err(err) = result {
tracing::info!(target: "test", ?err, "step failed, as expected");
return;
}

// corrupt the state snapshot if available to make resharding fail
currupt_state_snapshot(&test_env);
}

assert!(false, "no error was recorded, something is wrong in error handling");
}

fn currupt_state_snapshot(test_env: &TestReshardingEnv) {
let tries = test_env.env.clients[0].runtime_adapter.get_tries();
let Ok(snapshot_hash) = tries.get_state_snapshot_hash() else { return };
let (store, flat_storage_manager) = tries.get_state_snapshot(&snapshot_hash).unwrap();
let shard_uids = flat_storage_manager.get_shard_uids();
let mut store_update = store.store_update();
for shard_uid in shard_uids {
flat_storage_manager.remove_flat_storage_for_shard(shard_uid, &mut store_update).unwrap();
}
store_update.commit().unwrap();
}

#[test]
fn test_shard_layout_upgrade_error_handling_v1() {
test_shard_layout_upgrade_error_handling_impl(ReshardingType::V1, 42, false);
}

#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
#[test]
fn test_shard_layout_upgrade_error_handling_v2() {
test_shard_layout_upgrade_error_handling_impl(ReshardingType::V2, 42, false);
}

// TODO(resharding) add a test with missing blocks
// TODO(resharding) add a test with deleting accounts and delayed receipts check

0 comments on commit c58d080

Please sign in to comment.