Skip to content

Commit

Permalink
feat(resharding): resharding archival node (#12578)
Browse files Browse the repository at this point in the history
Mapping from child to parent shard is stored in
`DBCol::StateShardUIdMapping`, which is marked as a cold column.
By default, it would be copied from hot to cold storage at each block.
In this PR, we do add `is_last_block_in_epoch` parameter to
`update_cold_db()` so that it is only copied once per epoch.

For that, a slight refactor of `cold_store_copy()` was needed.
If it is possible to have skips at epoch boundary, then
`cold_store_copy()` had a bug, where previous epoch shard layout could
be used to copy data for a block from the new shard layout.

Added archival node and rpc node to reshardingv3 testloop.
In testloop, create temporary account before resharding and remove it
after resharding. After gc period, the account is available at the
archival node and it is not available at rpc node.
However, it works without actually testing the `cold_store_copy` path,
because neither `cold_store_copy` nor garbage collection is called for
archival node in testloop. So it needs fixing testloop and a follow-up
testing.
  • Loading branch information
staffik authored Dec 16, 2024
1 parent 6bc2df9 commit d3b8e0b
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 115 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

// TODO(reshardingV3) Revisit this function, probably it is not needed anymore.
// TODO(resharding) Revisit this function, probably it is not needed anymore.
fn get_shard_uids_to_gc(
&mut self,
epoch_manager: &dyn EpochManagerAdapter,
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl ReshardingManager {
) -> io::Result<()> {
let mut store_update = self.store.trie_store().store_update();
let parent_shard_uid = split_shard_event.parent_shard;
// TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding?
// TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding?
for child_shard_uid in split_shard_event.children_shards() {
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid);
}
Expand Down
19 changes: 15 additions & 4 deletions core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ impl<'a> TrieStoreUpdateAdapter<'a> {
}
}

/// Get the `ShardUId` mapping for child_shard_uid. If the mapping does not exist, map the shard to itself.
/// Used by Resharding V3 for State mapping.
///
/// It is kept out of `TrieStoreAdapter`, so that `TrieStoreUpdateAdapter` can use it without
/// cloning `store` each time, see https://github.com/near/nearcore/pull/12232#discussion_r1804810508.
pub fn get_shard_uid_mapping(store: &Store, child_shard_uid: ShardUId) -> ShardUId {
store
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &child_shard_uid.to_bytes())
.unwrap_or_else(|_| {
panic!("get_shard_uid_mapping() failed for child_shard_uid = {}", child_shard_uid)
})
.unwrap_or(child_shard_uid)
}

/// Constructs db key to be used to access the State column.
/// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix
/// to its ancestor in the resharding tree (according to Resharding V3)
Expand All @@ -186,10 +200,7 @@ fn get_key_from_shard_uid_and_hash(
shard_uid: ShardUId,
hash: &CryptoHash,
) -> [u8; 40] {
let mapped_shard_uid = store
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes())
.expect("get_key_from_shard_uid_and_hash() failed")
.unwrap_or(shard_uid);
let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid);
let mut key = [0; 40];
key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes());
key[8..].copy_from_slice(hash.as_ref());
Expand Down
37 changes: 22 additions & 15 deletions core/store/src/archive/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::adapter::trie_store::get_shard_uid_mapping;
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};
Expand Down Expand Up @@ -57,8 +58,7 @@ struct BatchTransaction {
}

/// Updates provided cold database from provided hot store with information about block at `height`.
/// Returns if the block was copied (false only if height is not present in `hot_store`).
/// Block as `height` has to be final.
/// Block at `height` has to be final and present in `hot_store`.
///
/// First, we read from hot store information necessary
/// to determine all the keys that need to be updated in cold db.
Expand All @@ -80,31 +80,38 @@ pub fn update_cold_db(
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
is_last_block_in_epoch: bool,
num_threads: usize,
) -> io::Result<bool> {
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height);
let _timer = metrics::COLD_COPY_DURATION.start_timer();

if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() {
return Ok(false);
}

let height_key = height.to_le_bytes();
let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?;
let block_hash_key = block_hash_vec.as_slice();

let key_type_to_keys =
get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?;
let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::<Vec<DBCol>>();
let columns_to_update = DBCol::iter()
.filter(|col| {
if !col.is_cold() {
return false;
}
if col == &DBCol::StateShardUIdMapping && !is_last_block_in_epoch {
return false;
}
true
})
.collect::<Vec<DBCol>>();

// Create new thread pool with `num_threads`.
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))?
.install(|| {
cold_columns
.into_par_iter() // Process every cold column as a separate task in thread pool in parallel.
columns_to_update
.into_par_iter() // Process cold columns to update as a separate task in thread pool in parallel.
// Copy column to cold db.
.map(|col: DBCol| -> io::Result<()> {
if col == DBCol::State {
Expand All @@ -127,8 +134,7 @@ pub fn update_cold_db(
},
)
})?;

Ok(true)
Ok(())
}

// Correctly set the key and value on DBTransaction, taking reference counting
Expand Down Expand Up @@ -189,9 +195,10 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
total_keys += trie_changes.insertions().len();
let mapped_shard_uid_key = get_shard_uid_mapping(hot_store, shard_uid).to_bytes();
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
// TODO(resharding) Test it properly. Currently this path is not triggered in testloop.
let key = join_two_keys(&mapped_shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

total_size += value.len();
Expand Down Expand Up @@ -343,7 +350,7 @@ pub fn copy_all_data_to_cold(
tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold");
return Ok(CopyAllDataToColdStatus::Interrupted);
}
// TODO(reshardingV3) Should do mapping here?
// TODO(resharding) Should do mapping here?
let (key, value) = result?;
transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?;
}
Expand Down
1 change: 0 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ impl DBCol {
| DBCol::StateHeaders
| DBCol::TransactionResultForBlock
| DBCol::Transactions
// TODO(reshardingV3) How the mapping will work with split storage?
| DBCol::StateShardUIdMapping => true,

// TODO
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ParallelMemTrieLoader {
arena: &mut impl ArenaMut,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
// TODO(reshardingV3) This seems fragile, potentially does not work with mapping.
// TODO(resharding) This seems fragile, potentially does not work with mapping.
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
Expand Down
20 changes: 13 additions & 7 deletions integration-tests/src/test_loop/tests/max_receipt_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn slow_test_max_receipt_size() {

let account0: AccountId = "account0".parse().unwrap();
let account0_signer = &create_user_test_signer(&account0).into();
let rpc_id = "account4".parse().unwrap();

// We can't test receipt limit by submitting large transactions because we hit the transaction size limit
// before hitting the receipt size limit.
Expand All @@ -33,7 +34,7 @@ fn slow_test_max_receipt_size() {
get_shared_block_hash(&env.datas, &env.test_loop),
);
let large_tx_exec_res =
execute_tx(&mut env.test_loop, large_tx, &env.datas, Duration::seconds(5));
execute_tx(&mut env.test_loop, &rpc_id, large_tx, &env.datas, Duration::seconds(5));
assert_matches!(large_tx_exec_res, Err(InvalidTxError::TransactionSizeExceeded { .. }));

// Let's test it by running a contract that generates a large receipt.
Expand All @@ -44,7 +45,7 @@ fn slow_test_max_receipt_size() {
&account0_signer,
get_shared_block_hash(&env.datas, &env.test_loop),
);
run_tx(&mut env.test_loop, deploy_contract_tx, &env.datas, Duration::seconds(5));
run_tx(&mut env.test_loop, &rpc_id, deploy_contract_tx, &env.datas, Duration::seconds(5));

// Calling generate_large_receipt({"account_id": "account0", "method_name": "noop", "total_args_size": 3000000})
// will generate a receipt that has ~3_000_000 bytes. It'll be a single receipt with multiple FunctionCall actions.
Expand All @@ -60,7 +61,7 @@ fn slow_test_max_receipt_size() {
300 * TGAS,
get_shared_block_hash(&env.datas, &env.test_loop),
);
run_tx(&mut env.test_loop, large_receipt_tx, &env.datas, Duration::seconds(5));
run_tx(&mut env.test_loop, &rpc_id, large_receipt_tx, &env.datas, Duration::seconds(5));

// Generating a receipt that is 5 MB should fail, it's above the receipt size limit.
let too_large_receipt_tx = SignedTransaction::call(
Expand All @@ -74,9 +75,14 @@ fn slow_test_max_receipt_size() {
300 * TGAS,
get_shared_block_hash(&env.datas, &env.test_loop),
);
let too_large_receipt_tx_exec_res =
execute_tx(&mut env.test_loop, too_large_receipt_tx, &env.datas, Duration::seconds(5))
.unwrap();
let too_large_receipt_tx_exec_res = execute_tx(
&mut env.test_loop,
&rpc_id,
too_large_receipt_tx,
&env.datas,
Duration::seconds(5),
)
.unwrap();

match too_large_receipt_tx_exec_res.status {
FinalExecutionStatus::Failure(TxExecutionError::ActionError(action_error)) => {
Expand Down Expand Up @@ -111,7 +117,7 @@ fn slow_test_max_receipt_size() {
300 * TGAS,
get_shared_block_hash(&env.datas, &env.test_loop),
);
let sum_4_res = run_tx(&mut env.test_loop, sum_4_tx, &env.datas, Duration::seconds(5));
let sum_4_res = run_tx(&mut env.test_loop, &rpc_id, sum_4_tx, &env.datas, Duration::seconds(5));
assert_eq!(sum_4_res, 10u64.to_le_bytes().to_vec());

env.shutdown_and_drain_remaining_events(Duration::seconds(20));
Expand Down
Loading

0 comments on commit d3b8e0b

Please sign in to comment.