Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reshardingV3] State ShardUIdMapping #12084

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use near_store::flat::{
FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::trie::StateReader;
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
Expand Down Expand Up @@ -94,8 +95,9 @@ impl FlatStorageShardCreator {
part_id: PartId,
progress: Arc<AtomicU64>,
result_sender: Sender<u64>,
state_reader: StateReader,
) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid, state_reader);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap();
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
Expand Down Expand Up @@ -195,7 +197,8 @@ impl FlatStorageShardCreator {
let store = self.runtime.store().clone();
let epoch_id = self.epoch_manager.get_epoch_id(&block_hash)?;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let trie_storage = TrieDBStorage::new(store, shard_uid);
let state_reader = self.runtime.get_tries().get_state_reader();
let trie_storage = TrieDBStorage::new(store, shard_uid, state_reader);
let state_root =
*chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root();
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
Expand Down Expand Up @@ -226,6 +229,7 @@ impl FlatStorageShardCreator {
fetching_state_status,
)) => {
let store = self.runtime.store().clone();
let state_reader = self.runtime.get_tries().get_state_reader();
let block_hash = fetching_state_status.block_hash;
let start_part_id = fetching_state_status.part_id;
let num_parts_in_step = fetching_state_status.num_parts_in_step;
Expand All @@ -251,6 +255,7 @@ impl FlatStorageShardCreator {
let inner_progress = progress.clone();
let inner_sender = self.fetched_parts_sender.clone();
let inner_threads_used = self.metrics.threads_used();
let state_reader = state_reader.clone();
thread_pool.spawn(move || {
inner_threads_used.inc();
Self::fetch_state_part(
Expand All @@ -260,6 +265,7 @@ impl FlatStorageShardCreator {
PartId::new(part_id, num_parts),
inner_progress,
inner_sender,
state_reader,
);
inner_threads_used.dec();
})
Expand Down
5 changes: 5 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use near_primitives::views::{
use near_store::config::StateSnapshotType;
use near_store::flat::FlatStorageManager;
use near_store::metadata::DbKind;
use near_store::trie::StateReader;
use near_store::{
ApplyStatePartResult, DBCol, ShardTries, StateSnapshotConfig, Store, Trie, TrieConfig,
TrieUpdate, WrappedTrieChanges, COLD_HEAD_KEY,
Expand Down Expand Up @@ -101,10 +102,14 @@ impl NightshadeRuntime {
let trie_viewer = TrieViewer::new(trie_viewer_state_size_limit, max_gas_burnt_view);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let shard_uids: Vec<_> = genesis_config.shard_layout.shard_uids().collect();
// TODO(reshardingV3) Recursively calculate resharding parents for `shard_uids`.
Copy link
Contributor Author

@staffik staffik Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require iterating through previous epochs, which is a bit cumbersome, see

// and (2) it is not easy to walk backwards from the last epoch; there's no
// "give me the previous epoch" query. So instead, we use block header's
// `next_epoch_id` to establish an epoch chain.
.

let shard_uids_ancestor_tree = shard_uids.iter().map(|id| (*id, None)).collect();
let state_reader = StateReader::new(&shard_uids, shard_uids_ancestor_tree);
let tries = ShardTries::new(
store.clone(),
trie_config,
&shard_uids,
state_reader,
flat_storage_manager,
state_snapshot_config,
);
Expand Down
4 changes: 3 additions & 1 deletion core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ fn copy_state_from_store(
hot_store.get_ser::<TrieChanges>(DBCol::TrieChanges, &key)?;

let Some(trie_changes) = trie_changes else { continue };
// TODO(reshardingV3) Likely should use mapped shard UId
let shard_uid_db_prefix = shard_uid.to_bytes();
for op in trie_changes.insertions() {
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let key = join_two_keys(&shard_uid_db_prefix, op.hash().as_bytes());
let value = op.payload().to_vec();

tracing::trace!(target: "cold_store", pretty_key=?near_fmt::StorageKey(&key), "copying state node to colddb");
Expand Down
18 changes: 13 additions & 5 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::metrics::flat_state_metrics::inlining_migration::{
FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT,
PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT,
};
use crate::trie::StateReader;
use crate::{DBCol, Store, TrieDBStorage, TrieStorage};

use super::store_helper::{
Expand Down Expand Up @@ -96,13 +97,14 @@ struct StateValueReader {
}

impl StateValueReader {
fn new(store: Store, num_threads: usize) -> Self {
fn new(store: Store, state_reader: StateReader, num_threads: usize) -> Self {
let (value_request_send, value_request_recv) = channel::unbounded();
let (value_response_send, value_response_recv) = channel::unbounded();
let mut join_handles = Vec::new();
for _ in 0..num_threads {
join_handles.push(Self::spawn_read_value_thread(
store.clone(),
state_reader.clone(),
value_request_recv.clone(),
value_response_send.clone(),
));
Expand Down Expand Up @@ -130,12 +132,14 @@ impl StateValueReader {

fn spawn_read_value_thread(
store: Store,
state_reader: StateReader,
recv: channel::Receiver<ReadValueRequest>,
send: channel::Sender<ReadValueResponse>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
while let Ok(req) = recv.recv() {
let trie_storage = TrieDBStorage::new(store.clone(), req.shard_uid);
let trie_storage =
TrieDBStorage::new(store.clone(), req.shard_uid, state_reader.clone());
let bytes = match trie_storage.retrieve_raw_bytes(&req.value_hash) {
Ok(bytes) => Some(bytes.to_vec()),
Err(err) => {
Expand Down Expand Up @@ -175,7 +179,9 @@ pub fn inline_flat_state_values(
) -> bool {
info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration");
let migration_start = std::time::Instant::now();
let mut value_reader = StateValueReader::new(store.clone(), read_state_threads);
// TODO(reshardingV3) We might want to provide shard_uids together with resharding tree to the `inline_flat_state_values()`.
let state_reader = StateReader::new(&[], [].into());
let mut value_reader = StateValueReader::new(store.clone(), state_reader, read_state_threads);
let mut inlined_total_count = 0;
let mut interrupted = false;
for (batch_index, batch) in
Expand Down Expand Up @@ -311,7 +317,7 @@ mod tests {
use super::inline_flat_state_values;
use crate::flat::store_helper::encode_flat_state_db_key;
use crate::flat::{FlatStateValuesInliningMigrationHandle, FlatStorageManager};
use crate::{DBCol, NodeStorage, Store, TrieCachingStorage};
use crate::{get_key_from_shard_uid_db_prefix_and_hash, DBCol, NodeStorage, Store};
use borsh::BorshDeserialize;
use near_o11y::testonly::init_test_logger;
use near_primitives::hash::{hash, CryptoHash};
Expand Down Expand Up @@ -437,8 +443,10 @@ mod tests {
fn populate_flat_store(store: &Store, shard_uid: ShardUId, values: &[Vec<u8>]) {
let mut store_update = store.store_update();
for (i, value) in values.iter().enumerate() {
// TODO(reshardingV3) Test with shard_uid mapping to different shard_uid
let shard_uid_db_prefix = shard_uid.into();
let trie_key =
TrieCachingStorage::get_key_from_shard_uid_and_hash(shard_uid, &hash(&value));
get_key_from_shard_uid_db_prefix_and_hash(shard_uid_db_prefix, &hash(&value));
store_update.increment_refcount(DBCol::State, &trie_key, &value);
let fs_key = encode_flat_state_db_key(shard_uid, &[i as u8]);
let fs_value = borsh::to_vec(&FlatStateValue::value_ref(&value)).unwrap();
Expand Down
9 changes: 5 additions & 4 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h
store_update.delete(DBCol::FlatStateDeltaMetadata, &key);
}

/// Must not be called for DBCol::State column, since ReshardingV3 introduces mapping strategy for this column.
staffik marked this conversation as resolved.
Show resolved Hide resolved
fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) {
assert!(
col != DBCol::State,
"Programming error, called remove_range_by_shard_uid() on DBCol::State"
);
let key_from = shard_uid.to_bytes();
let key_to = ShardUId::next_shard_prefix(&key_from);
store_update.delete_range(col, &key_from, &key_to);
Expand All @@ -126,10 +131,6 @@ pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: S
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
staffik marked this conversation as resolved.
Show resolved Hide resolved
remove_range_by_shard_uid(store_update, shard_uid, DBCol::State);
}

pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
let mut buffer = vec![];
buffer.extend_from_slice(&shard_uid.to_bytes());
Expand Down
6 changes: 4 additions & 2 deletions core/store/src/genesis/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use tracing::{error, info, warn};

use crate::{
flat::FlatStorageManager, genesis::GenesisStateApplier, get_genesis_hash,
get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, ShardTries,
StateSnapshotConfig, Store, TrieConfig,
get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, trie::StateReader,
ShardTries, StateSnapshotConfig, Store, TrieConfig,
};

const STATE_DUMP_FILE: &str = "state_dump";
Expand Down Expand Up @@ -111,6 +111,7 @@ fn genesis_state_from_genesis(
let runtime_config = runtime_config_store.get_config(genesis.config.protocol_version);
let storage_usage_config = &runtime_config.fees.storage_usage_config;
let shard_uids: Vec<_> = shard_layout.shard_uids().collect();
let state_reader = StateReader::new_without_resharding_history(&shard_uids);
// note that here we are depending on the behavior that shard_layout.shard_uids() returns an iterator
// in order by shard id from 0 to num_shards()
let mut shard_account_ids: Vec<HashSet<AccountId>> =
Expand All @@ -132,6 +133,7 @@ fn genesis_state_from_genesis(
store.clone(),
TrieConfig::default(),
&shard_uids,
state_reader,
FlatStorageManager::new(store),
StateSnapshotConfig::default(),
);
Expand Down
10 changes: 5 additions & 5 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use crate::db::{refcount, DBIterator, DBOp, DBSlice, DBTransaction, Database, St
pub use crate::trie::iterator::{TrieIterator, TrieTraversalItem};
pub use crate::trie::update::{TrieUpdate, TrieUpdateIterator, TrieUpdateValuePtr};
pub use crate::trie::{
estimator, resharding_v2, ApplyStatePartResult, KeyForStateChanges, KeyLookupMode, NibbleSlice,
PartialStorage, PrefetchApi, PrefetchError, RawTrieNode, RawTrieNodeWithSize, ShardTries,
StateSnapshot, StateSnapshotConfig, Trie, TrieAccess, TrieCache, TrieCachingStorage,
TrieChanges, TrieConfig, TrieDBStorage, TrieStorage, WrappedTrieChanges,
STATE_SNAPSHOT_COLUMNS,
estimator, get_key_from_shard_uid_db_prefix_and_hash, resharding_v2, ApplyStatePartResult,
KeyForStateChanges, KeyLookupMode, NibbleSlice, PartialStorage, PrefetchApi, PrefetchError,
RawTrieNode, RawTrieNodeWithSize, ShardTries, StateSnapshot, StateSnapshotConfig, Trie,
TrieAccess, TrieCache, TrieCachingStorage, TrieChanges, TrieConfig, TrieDBStorage, TrieStorage,
WrappedTrieChanges, STATE_SNAPSHOT_COLUMNS,
};
use borsh::{BorshDeserialize, BorshSerialize};
pub use columns::DBCol;
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::flat::{
store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
};
use crate::metadata::{DbKind, DbVersion, DB_VERSION};
use crate::trie::StateReader;
use crate::{
get, get_delayed_receipt_indices, get_promise_yield_indices, DBCol, NodeStorage, ShardTries,
StateSnapshotConfig, Store, Trie, TrieConfig,
Expand Down Expand Up @@ -63,6 +64,10 @@ pub fn create_test_node_storage_with_cold(
(storage, hot, cold)
}

pub fn create_test_state_reader(shard_uid: ShardUId) -> StateReader {
StateReader::new_without_resharding_history(&[shard_uid])
}

/// Creates an in-memory database.
pub fn create_test_store() -> Store {
create_test_node_storage(DB_VERSION, DbKind::RPC).get_hot_store()
Expand Down Expand Up @@ -123,6 +128,7 @@ impl TestTriesBuilder {
let shard_uids = (0..self.num_shards)
.map(|shard_id| ShardUId { shard_id: shard_id as u32, version: self.shard_version })
.collect::<Vec<_>>();
let state_reader = StateReader::new_without_resharding_history(&shard_uids);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let tries = ShardTries::new(
store.clone(),
Expand All @@ -131,6 +137,7 @@ impl TestTriesBuilder {
..Default::default()
},
&shard_uids,
state_reader,
flat_storage_manager,
StateSnapshotConfig::default(),
);
Expand Down
8 changes: 6 additions & 2 deletions core/store/src/trie/from_flat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::flat::{store_helper, FlatStorageError, FlatStorageManager};
use crate::trie::StateReader;
use crate::{ShardTries, StateSnapshotConfig, Store, Trie, TrieConfig, TrieDBStorage, TrieStorage};
use near_primitives::{shard_layout::ShardUId, state::FlatStateValue};
use std::time::Instant;
Expand All @@ -11,7 +12,9 @@ use std::time::Instant;
// Please note that the trie is created for the block state with height equal to flat_head
// flat state can comtain deltas after flat_head and can be different from tip of the blockchain.
pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: ShardUId) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let shard_uids = &[shard_uid];
let state_reader = StateReader::new_without_resharding_history(shard_uids);
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid, state_reader.clone());
let flat_state_to_trie_kv =
|entry: Result<(Vec<u8>, FlatStateValue), FlatStorageError>| -> (Vec<u8>, Vec<u8>) {
let (key, value) = entry.unwrap();
Expand All @@ -31,7 +34,8 @@ pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: Sha
let tries = ShardTries::new(
write_store.clone(),
TrieConfig::default(),
&[shard_uid],
shard_uids,
state_reader,
FlatStorageManager::new(write_store),
StateSnapshotConfig::default(),
);
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ParallelMemTrieLoader {
) -> Result<TrieLoadingPlanNode, StorageError> {
// Read the node from the State column.
let mut key = [0u8; 40];
// TODO(reshardingV3) Likely should use mapped shard UId here
key[0..8].copy_from_slice(&self.shard_uid.to_bytes());
key[8..40].copy_from_slice(&hash.0);
let node = RawTrieNodeWithSize::try_from_slice(
Expand Down Expand Up @@ -214,6 +215,7 @@ impl ParallelMemTrieLoader {
arena: &mut impl ArenaMut,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
// TODO(reshardingV3) likely should use mapped shard UId here
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::trie::iterator::TrieIterator;
pub use crate::trie::nibble_slice::NibbleSlice;
pub use crate::trie::prefetching_trie_storage::{PrefetchApi, PrefetchError};
pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges};
pub use crate::trie::state_reader::{get_key_from_shard_uid_db_prefix_and_hash, StateReader};
pub use crate::trie::state_snapshot::{
SnapshotError, StateSnapshot, StateSnapshotConfig, STATE_SNAPSHOT_COLUMNS,
};
Expand Down Expand Up @@ -54,6 +55,7 @@ pub mod receipts_column_helper;
pub mod resharding_v2;
mod shard_tries;
mod state_parts;
mod state_reader;
mod state_snapshot;
mod trie_recording;
mod trie_storage;
Expand Down
Loading
Loading