Skip to content

Commit

Permalink
Resharding archival node
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Dec 9, 2024
1 parent 3270551 commit 8a9f670
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 55 deletions.
14 changes: 10 additions & 4 deletions core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ impl<'a> TrieStoreUpdateAdapter<'a> {
}
}

/// Get the `ShardUId` mapping for child_shard_uid.
/// Used by Resharding V3 for State mapping.
pub fn get_shard_uid_mapping(store: &Store, child_shard_uid: ShardUId) -> ShardUId {
store
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &child_shard_uid.to_bytes())
.expect("get_shard_uid_mapping() failed")
.unwrap_or(child_shard_uid)
}

/// Constructs db key to be used to access the State column.
/// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix
/// to its ancestor in the resharding tree (according to Resharding V3)
Expand All @@ -186,10 +195,7 @@ fn get_key_from_shard_uid_and_hash(
shard_uid: ShardUId,
hash: &CryptoHash,
) -> [u8; 40] {
let mapped_shard_uid = store
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes())
.expect("get_key_from_shard_uid_and_hash() failed")
.unwrap_or(shard_uid);
let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid);
let mut key = [0; 40];
key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes());
key[8..].copy_from_slice(hash.as_ref());
Expand Down
34 changes: 20 additions & 14 deletions core/store/src/archive/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::adapter::trie_store::get_shard_uid_mapping;
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};
Expand Down Expand Up @@ -57,8 +58,7 @@ struct BatchTransaction {
}

/// Updates provided cold database from provided hot store with information about block at `height`.
/// Returns if the block was copied (false only if height is not present in `hot_store`).
/// Block as `height` has to be final.
/// Block at `height` has to be final and present in `hot_store`.
///
/// First, we read from hot store information necessary
/// to determine all the keys that need to be updated in cold db.
Expand All @@ -80,31 +80,38 @@ pub fn update_cold_db(
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
is_last_block_in_epoch: bool,
num_threads: usize,
) -> io::Result<bool> {
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height);
let _timer = metrics::COLD_COPY_DURATION.start_timer();

if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() {
return Ok(false);
}

let height_key = height.to_le_bytes();
let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?;
let block_hash_key = block_hash_vec.as_slice();

let key_type_to_keys =
get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?;
let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::<Vec<DBCol>>();
let columns_to_update = DBCol::iter()
.filter(|col| {
if !col.is_cold() {
return false;
}
if col == &DBCol::StateShardUIdMapping && !is_last_block_in_epoch {
return false;
}
true
})
.collect::<Vec<DBCol>>();

// Create new thread pool with `num_threads`.
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))?
.install(|| {
cold_columns
.into_par_iter() // Process every cold column as a separate task in thread pool in parallel.
columns_to_update
.into_par_iter() // Process cold columns to update as a separate task in thread pool in parallel.
// Copy column to cold db.
.map(|col: DBCol| -> io::Result<()> {
if col == DBCol::State {
Expand All @@ -127,8 +134,7 @@ pub fn update_cold_db(
},
)
})?;

Ok(true)
Ok(())
}

// Correctly set the key and value on DBTransaction, taking reference counting
Expand Down Expand Up @@ -189,9 +195,9 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
total_keys += trie_changes.insertions().len();
let mapped_shard_uid_key = get_shard_uid_mapping(hot_store, shard_uid).to_bytes();
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let key = join_two_keys(&mapped_shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

total_size += value.len();
Expand Down
1 change: 0 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ impl DBCol {
| DBCol::StateHeaders
| DBCol::TransactionResultForBlock
| DBCol::Transactions
// TODO(reshardingV3) How the mapping will work with split storage?
| DBCol::StateShardUIdMapping => true,

// TODO
Expand Down
88 changes: 67 additions & 21 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use near_async::time::Duration;
use near_chain::ChainStoreAccess;
use near_chain_configs::test_genesis::TestGenesisBuilder;
use near_chain_configs::DEFAULT_GC_NUM_EPOCHS_TO_KEEP;
use near_client::Client;
use near_client::{Client, Query, ViewClientActorInner};
use near_o11y::testonly::init_test_logger;
use near_primitives::block::Tip;
use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout};
use near_primitives::state_record::StateRecord;
use near_primitives::types::{AccountId, BlockHeightDelta, EpochId, Gas, NumShards, ShardId};
use near_primitives::types::{
AccountId, BlockHeightDelta, BlockId, BlockReference, EpochId, Gas, NumShards, ShardId,
};
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};
use near_store::adapter::StoreAdapter;
use near_store::db::refcount::decode_value_with_rc;
Expand All @@ -21,9 +23,10 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use crate::test_loop::builder::TestLoopBuilder;
use crate::test_loop::env::{TestData, TestLoopEnv};
use crate::test_loop::env::TestData;
use crate::test_loop::utils::transactions::{
get_shared_block_hash, get_smallest_height_head, run_tx, submit_tx,
do_create_account, do_delete_account, get_shared_block_hash, get_smallest_height_head, run_tx,
submit_tx,
};
use crate::test_loop::utils::{ONE_NEAR, TGAS};
use assert_matches::assert_matches;
Expand All @@ -36,7 +39,7 @@ use near_primitives::state::FlatStateValue;
use near_primitives::test_utils::create_user_test_signer;
use near_primitives::transaction::SignedTransaction;
use near_primitives::trie_key::TrieKey;
use near_primitives::views::FinalExecutionStatus;
use near_primitives::views::{FinalExecutionStatus, QueryRequest};
use std::cell::Cell;
use std::u64;

Expand Down Expand Up @@ -130,6 +133,8 @@ struct TestReshardingParameters {
accounts: Vec<AccountId>,
clients: Vec<AccountId>,
block_and_chunk_producers: Vec<AccountId>,
rpc_ids: Vec<AccountId>,
archival_clients: HashSet<AccountId>,
initial_balance: u128,
epoch_length: BlockHeightDelta,
shuffle_shard_assignment_for_chunk_producers: bool,
Expand All @@ -153,13 +158,13 @@ struct TestReshardingParameters {

impl TestReshardingParameters {
fn new() -> Self {
Self::with_clients(3)
Self::with_clients(5)
}

fn with_clients(num_clients: u64) -> Self {
let num_accounts = 8;
fn with_clients(num_clients: usize) -> Self {
let num_accounts = 12;
let initial_balance = 1_000_000 * ONE_NEAR;
let epoch_length = 6;
let epoch_length = 8;
let track_all_shards = true;
let all_chunks_expected = true;

Expand Down Expand Up @@ -188,15 +193,19 @@ impl TestReshardingParameters {
.cloned()
.collect();

let block_and_chunk_producers = clients.clone();
let block_and_chunk_producers = clients.iter().take(num_clients - 2).cloned().collect_vec();
let rpc_ids = vec![clients[num_clients - 2].clone()];
let archival_clients = HashSet::from([clients[num_clients - 1].clone()]);
let load_mem_tries_for_tracked_shards = true;

Self {
accounts,
clients,
block_and_chunk_producers,
archival_clients,
rpc_ids,
initial_balance,
epoch_length,
epoch_length: epoch_length.try_into().unwrap(),
track_all_shards,
all_chunks_expected,
load_mem_tries_for_tracked_shards,
Expand Down Expand Up @@ -882,6 +891,19 @@ impl TrieSanityCheck {
}
}

fn assert_account_exist_in_view_client(
view_client: &mut ViewClientActorInner,
account_id: AccountId,
height: u64,
) {
let msg = Query::new(
BlockReference::BlockId(BlockId::Height(height)),
QueryRequest::ViewAccount { account_id },
);
let result = near_async::messaging::Handler::handle(view_client, msg);
assert!(result.is_ok());
}

/// Base setup to check sanity of Resharding V3.
/// TODO(#11881): add the following scenarios:
/// - Nodes must not track all shards. State sync must succeed.
Expand Down Expand Up @@ -933,7 +955,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
let parent_shard_uid = account_id_to_shard_uid(&new_boundary_account, &base_shard_layout);

epoch_config.shard_layout =
ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account);
ShardLayout::derive_shard_layout(&base_shard_layout, new_boundary_account.clone());
tracing::info!(target: "test", ?base_shard_layout, new_shard_layout=?epoch_config.shard_layout, "shard layout");

let expected_num_shards = epoch_config.shard_layout.num_shards();
Expand Down Expand Up @@ -973,10 +995,11 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
builder = builder.runtime_config_store(runtime_config_store);
}

let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = builder
let mut env = builder
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(params.clients)
.archival_clients(params.archival_clients)
.load_mem_tries_for_tracked_shards(params.load_mem_tries_for_tracked_shards)
.drop_protocol_upgrade_chunks(
base_protocol_version + 1,
Expand All @@ -991,13 +1014,18 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
&contract_id,
near_test_contracts::rs_contract().into(),
&signer,
get_shared_block_hash(&node_datas, &test_loop),
get_shared_block_hash(&env.datas, &env.test_loop),
);
run_tx(&mut test_loop, deploy_contract_tx, &node_datas, Duration::seconds(5));
run_tx(&mut env.test_loop, deploy_contract_tx, &env.datas, Duration::seconds(5));
}

let rpc_id = params.rpc_ids[0].clone();
let temporary_account =
format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap();
do_create_account(&mut env, &rpc_id, &new_boundary_account, &temporary_account, 100 * ONE_NEAR);

let client_handles =
node_datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec();
env.datas.iter().map(|data| data.client_sender.actor_handle()).collect_vec();

#[cfg(feature = "test_features")]
{
Expand All @@ -1020,7 +1048,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
params
.loop_actions
.iter()
.for_each(|action| action(&node_datas, test_loop_data, client_handles[0].clone()));
.for_each(|action| action(&env.datas, test_loop_data, client_handles[0].clone()));

let clients =
client_handles.iter().map(|handle| &test_loop_data.get(handle).client).collect_vec();
Expand Down Expand Up @@ -1059,19 +1087,37 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
return true;
};

test_loop.run_until(
env.test_loop.run_until(
success_condition,
// Give enough time to produce ~7 epochs.
Duration::seconds((7 * params.epoch_length) as i64),
);
let client = &test_loop.data.get(&client_handles[0]).client;
trie_sanity_check.check_epochs(client);
let height_soon_after_resharding = latest_block_height.get();
// Produce some blocks after resharding, then delete `temporary_account`.
env.test_loop.run_for(Duration::seconds(2));
do_delete_account(&mut env, &rpc_id, &temporary_account, &new_boundary_account);
// Wait for garbage collection to kick in, so that it is tested as well.
test_loop
env.test_loop
.run_for(Duration::seconds((DEFAULT_GC_NUM_EPOCHS_TO_KEEP * params.epoch_length) as i64));

TestLoopEnv { test_loop, datas: node_datas, tempdir }
.shutdown_and_drain_remaining_events(Duration::seconds(20));
// At the end of the test we know for sure resharding has been completed.
// Verify that state is equal across tries and flat storage for all children shards.
let clients =
client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec();
assert_state_sanity_for_children_shard(parent_shard_uid, &clients[0]);

// After resharding and gc-period, assert the deleted `temporary_account` is still accessible through archival node view client.
let archival_view_client_handle = env.datas.last().unwrap().view_client_sender.actor_handle();
let archival_view_client = env.test_loop.data.get_mut(&archival_view_client_handle);
assert_account_exist_in_view_client(
archival_view_client,
temporary_account,
height_soon_after_resharding,
);

env.shutdown_and_drain_remaining_events(Duration::seconds(20));
}

#[test]
Expand Down
36 changes: 21 additions & 15 deletions nearcore/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,35 @@ fn cold_store_copy(
return Ok(ColdStoreCopyResult::NoBlockCopied);
}

// Here it should be sufficient to just read from hot storage.
// Because BlockHeight is never garbage collectable and is not even copied to cold.
let cold_head_hash =
hot_store.get_ser::<CryptoHash>(DBCol::BlockHeight, &cold_head_height.to_le_bytes())?;
let cold_head_hash =
cold_head_hash.ok_or(ColdStoreError::ColdHeadHashReadError { cold_head_height })?;

// The previous block is the cold head so we can use it to get epoch id.
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(&cold_head_hash)?;
let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;

let mut next_height = cold_head_height + 1;
while !update_cold_db(cold_db, hot_store, &shard_layout, &next_height, num_threads)? {
next_height += 1;
let next_height_block_hash = loop {
if next_height > hot_final_head_height {
return Err(ColdStoreError::SkippedBlocksBetweenColdHeadAndNextHeightError {
cold_head_height,
next_height,
hot_final_head_height,
});
}
}

// Here it should be sufficient to just read from hot storage.
// Because BlockHeight is never garbage collectable and is not even copied to cold.
match hot_store.get_ser::<CryptoHash>(DBCol::BlockHeight, &next_height.to_le_bytes())? {
Some(next_height_block_hash) => break next_height_block_hash,
None => next_height = next_height + 1,
}
};
// The next block hash exists in hot store so we can use it to get epoch id.
let epoch_id = epoch_manager.get_epoch_id(&next_height_block_hash)?;
let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;
let is_last_block_in_epoch =
epoch_manager.is_next_block_epoch_start(&next_height_block_hash)?;
update_cold_db(
cold_db,
hot_store,
&shard_layout,
&next_height,
is_last_block_in_epoch,
num_threads,
)?;
update_cold_head(cold_db, hot_store, &next_height)?;

let result = if next_height >= hot_final_head_height {
Expand Down

0 comments on commit 8a9f670

Please sign in to comment.