Skip to content

Commit

Permalink
In-memory shard UId mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Sep 18, 2024
1 parent 197fc1e commit 7d1841c
Show file tree
Hide file tree
Showing 30 changed files with 374 additions and 187 deletions.
10 changes: 8 additions & 2 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use near_store::flat::{
FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::trie::StateReader;
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
Expand Down Expand Up @@ -94,8 +95,9 @@ impl FlatStorageShardCreator {
part_id: PartId,
progress: Arc<AtomicU64>,
result_sender: Sender<u64>,
state_reader: StateReader,
) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid, state_reader);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap();
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
Expand Down Expand Up @@ -195,7 +197,8 @@ impl FlatStorageShardCreator {
let store = self.runtime.store().clone();
let epoch_id = self.epoch_manager.get_epoch_id(&block_hash)?;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let trie_storage = TrieDBStorage::new(store, shard_uid);
let state_reader = self.runtime.get_tries().get_state_reader();
let trie_storage = TrieDBStorage::new(store, shard_uid, state_reader);
let state_root =
*chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root();
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
Expand Down Expand Up @@ -226,6 +229,7 @@ impl FlatStorageShardCreator {
fetching_state_status,
)) => {
let store = self.runtime.store().clone();
let state_reader = self.runtime.get_tries().get_state_reader();
let block_hash = fetching_state_status.block_hash;
let start_part_id = fetching_state_status.part_id;
let num_parts_in_step = fetching_state_status.num_parts_in_step;
Expand All @@ -251,6 +255,7 @@ impl FlatStorageShardCreator {
let inner_progress = progress.clone();
let inner_sender = self.fetched_parts_sender.clone();
let inner_threads_used = self.metrics.threads_used();
let state_reader = state_reader.clone();
thread_pool.spawn(move || {
inner_threads_used.inc();
Self::fetch_state_part(
Expand All @@ -260,6 +265,7 @@ impl FlatStorageShardCreator {
PartId::new(part_id, num_parts),
inner_progress,
inner_sender,
state_reader,
);
inner_threads_used.dec();
})
Expand Down
1 change: 0 additions & 1 deletion chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,6 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::FlatStorageStatus
| DBCol::Misc
| DBCol::_ReceiptIdToShardId
| DBCol::ShardUIdMapping
=> unreachable!(),
}
self.merge(store_update);
Expand Down
5 changes: 5 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use near_primitives::views::{
use near_store::config::StateSnapshotType;
use near_store::flat::FlatStorageManager;
use near_store::metadata::DbKind;
use near_store::trie::StateReader;
use near_store::{
ApplyStatePartResult, DBCol, ShardTries, StateSnapshotConfig, Store, Trie, TrieConfig,
TrieUpdate, WrappedTrieChanges, COLD_HEAD_KEY,
Expand Down Expand Up @@ -101,10 +102,14 @@ impl NightshadeRuntime {
let trie_viewer = TrieViewer::new(trie_viewer_state_size_limit, max_gas_burnt_view);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let shard_uids: Vec<_> = genesis_config.shard_layout.shard_uids().collect();
// TODO(reshardingV3) Recursively calculate resharding parents for `shard_uids`.
let shard_uids_ancestor_tree = shard_uids.iter().map(|id| (*id, None)).collect();
let state_reader = StateReader::new(&shard_uids, shard_uids_ancestor_tree);
let tries = ShardTries::new(
store.clone(),
trie_config,
&shard_uids,
state_reader,
flat_storage_manager,
state_snapshot_config,
);
Expand Down
19 changes: 5 additions & 14 deletions core/primitives/src/shard_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,6 @@ pub struct ShardUId {
pub shard_id: u32,
}

/// A wrapper for ShardUId used to denote when we expect the mapped version of the value.
///
/// ReshardingV3 uses mapping strategy for the State column.
/// It means we access trie storage using a mapped shard UID, that can be the same shard UID,
/// resharding parent shard UID, or any ancestor shard UID in the tree of resharding history.
#[derive(Clone, Copy)]
pub struct MappedShardUId(pub ShardUId);

impl From<ShardUId> for MappedShardUId {
fn from(shard_uid: ShardUId) -> Self {
MappedShardUId(shard_uid)
}
}

impl ShardUId {
pub fn single_shard() -> Self {
Self { version: 0, shard_id: 0 }
Expand Down Expand Up @@ -391,6 +377,11 @@ impl TryFrom<&[u8]> for ShardUId {
}
}

pub struct ShardUIdWithAncestors {
pub uid: ShardUId,
pub ancestors: Vec<ShardUId>,
}

/// Returns the byte representation for (block, shard_uid)
pub fn get_block_shard_uid(block_hash: &CryptoHash, shard_uid: &ShardUId) -> Vec<u8> {
let mut res = Vec::with_capacity(40);
Expand Down
5 changes: 3 additions & 2 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::{map_shard_uid, metrics, DBCol, DBTransaction, Database, Store, TrieChanges};
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
use near_primitives::block::{Block, BlockHeader, Tip};
Expand Down Expand Up @@ -185,7 +185,8 @@ fn copy_state_from_store(
hot_store.get_ser::<TrieChanges>(DBCol::TrieChanges, &key)?;

let Some(trie_changes) = trie_changes else { continue };
let shard_uid_db_prefix = map_shard_uid(&hot_store, shard_uid).0.to_bytes();
// TODO(reshardingV3) Likely should use mapped shard UId
let shard_uid_db_prefix = shard_uid.to_bytes();
for op in trie_changes.insertions() {
let key = join_two_keys(&shard_uid_db_prefix, op.hash().as_bytes());
let value = op.payload().to_vec();
Expand Down
10 changes: 1 addition & 9 deletions core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,6 @@ pub enum DBCol {
/// Witnesses with the lowest index are garbage collected first.
/// u64 -> LatestWitnessesKey
LatestWitnessesByIndex,
/// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column.
/// It could be parent shard after resharding, ancestor shard in case of many resharding,
/// or just map shard to itself if there was no resharding or we updated the mapping during state sync.
/// - *Rows*: `ShardUId`
/// - *Column type*: `ShardUId`
ShardUIdMapping,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -500,8 +494,7 @@ impl DBCol {
| DBCol::FlatState
| DBCol::FlatStateChanges
| DBCol::FlatStateDeltaMetadata
| DBCol::FlatStorageStatus
| DBCol::ShardUIdMapping => false,
| DBCol::FlatStorageStatus => false,
}
}

Expand Down Expand Up @@ -573,7 +566,6 @@ impl DBCol {
DBCol::StateTransitionData => &[DBKeyType::BlockHash, DBKeyType::ShardId],
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
DBCol::ShardUIdMapping => &[DBKeyType::ShardUId],
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::metrics::flat_state_metrics::inlining_migration::{
FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT,
PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT,
};
use crate::trie::StateReader;
use crate::{DBCol, Store, TrieDBStorage, TrieStorage};

use super::store_helper::{
Expand Down Expand Up @@ -96,13 +97,14 @@ struct StateValueReader {
}

impl StateValueReader {
fn new(store: Store, num_threads: usize) -> Self {
fn new(store: Store, state_reader: StateReader, num_threads: usize) -> Self {
let (value_request_send, value_request_recv) = channel::unbounded();
let (value_response_send, value_response_recv) = channel::unbounded();
let mut join_handles = Vec::new();
for _ in 0..num_threads {
join_handles.push(Self::spawn_read_value_thread(
store.clone(),
state_reader.clone(),
value_request_recv.clone(),
value_response_send.clone(),
));
Expand Down Expand Up @@ -130,12 +132,14 @@ impl StateValueReader {

fn spawn_read_value_thread(
store: Store,
state_reader: StateReader,
recv: channel::Receiver<ReadValueRequest>,
send: channel::Sender<ReadValueResponse>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
while let Ok(req) = recv.recv() {
let trie_storage = TrieDBStorage::new(store.clone(), req.shard_uid);
let trie_storage =
TrieDBStorage::new(store.clone(), req.shard_uid, state_reader.clone());
let bytes = match trie_storage.retrieve_raw_bytes(&req.value_hash) {
Ok(bytes) => Some(bytes.to_vec()),
Err(err) => {
Expand Down Expand Up @@ -175,7 +179,9 @@ pub fn inline_flat_state_values(
) -> bool {
info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration");
let migration_start = std::time::Instant::now();
let mut value_reader = StateValueReader::new(store.clone(), read_state_threads);
// TODO(reshardingV3) We might want to provide shard_uids together with resharding tree to the `inline_flat_state_values()`.
let state_reader = StateReader::new(&[], [].into());
let mut value_reader = StateValueReader::new(store.clone(), state_reader, read_state_threads);
let mut inlined_total_count = 0;
let mut interrupted = false;
for (batch_index, batch) in
Expand Down Expand Up @@ -311,7 +317,7 @@ mod tests {
use super::inline_flat_state_values;
use crate::flat::store_helper::encode_flat_state_db_key;
use crate::flat::{FlatStateValuesInliningMigrationHandle, FlatStorageManager};
use crate::{DBCol, NodeStorage, Store, TrieCachingStorage};
use crate::{get_key_from_shard_uid_db_prefix_and_hash, DBCol, NodeStorage, Store};
use borsh::BorshDeserialize;
use near_o11y::testonly::init_test_logger;
use near_primitives::hash::{hash, CryptoHash};
Expand Down Expand Up @@ -439,10 +445,8 @@ mod tests {
for (i, value) in values.iter().enumerate() {
// TODO(reshardingV3) Test with shard_uid mapping to different shard_uid
let shard_uid_db_prefix = shard_uid.into();
let trie_key = TrieCachingStorage::get_key_from_shard_uid_db_prefix_and_hash(
shard_uid_db_prefix,
&hash(&value),
);
let trie_key =
get_key_from_shard_uid_db_prefix_and_hash(shard_uid_db_prefix, &hash(&value));
store_update.increment_refcount(DBCol::State, &trie_key, &value);
let fs_key = encode_flat_state_db_key(shard_uid, &[i as u8]);
let fs_value = borsh::to_vec(&FlatStateValue::value_ref(&value)).unwrap();
Expand Down
6 changes: 4 additions & 2 deletions core/store/src/genesis/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use tracing::{error, info, warn};

use crate::{
flat::FlatStorageManager, genesis::GenesisStateApplier, get_genesis_hash,
get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, ShardTries,
StateSnapshotConfig, Store, TrieConfig,
get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, trie::StateReader,
ShardTries, StateSnapshotConfig, Store, TrieConfig,
};

const STATE_DUMP_FILE: &str = "state_dump";
Expand Down Expand Up @@ -111,6 +111,7 @@ fn genesis_state_from_genesis(
let runtime_config = runtime_config_store.get_config(genesis.config.protocol_version);
let storage_usage_config = &runtime_config.fees.storage_usage_config;
let shard_uids: Vec<_> = shard_layout.shard_uids().collect();
let state_reader = StateReader::new_without_resharding_history(&shard_uids);
// note that here we are depending on the behavior that shard_layout.shard_uids() returns an iterator
// in order by shard id from 0 to num_shards()
let mut shard_account_ids: Vec<HashSet<AccountId>> =
Expand All @@ -132,6 +133,7 @@ fn genesis_state_from_genesis(
store.clone(),
TrieConfig::default(),
&shard_uids,
state_reader,
FlatStorageManager::new(store),
StateSnapshotConfig::default(),
);
Expand Down
8 changes: 4 additions & 4 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::db::{refcount, DBIterator, DBOp, DBSlice, DBTransaction, Database, St
pub use crate::trie::iterator::{TrieIterator, TrieTraversalItem};
pub use crate::trie::update::{TrieUpdate, TrieUpdateIterator, TrieUpdateValuePtr};
pub use crate::trie::{
estimator, map_shard_uid, resharding_v2, ApplyStatePartResult, KeyForStateChanges,
KeyLookupMode, NibbleSlice, PartialStorage, PrefetchApi, PrefetchError, RawTrieNode,
RawTrieNodeWithSize, ShardTries, StateSnapshot, StateSnapshotConfig, Trie, TrieAccess,
TrieCache, TrieCachingStorage, TrieChanges, TrieConfig, TrieDBStorage, TrieStorage,
estimator, get_key_from_shard_uid_db_prefix_and_hash, resharding_v2, ApplyStatePartResult,
KeyForStateChanges, KeyLookupMode, NibbleSlice, PartialStorage, PrefetchApi, PrefetchError,
RawTrieNode, RawTrieNodeWithSize, ShardTries, StateSnapshot, StateSnapshotConfig, Trie,
TrieAccess, TrieCache, TrieCachingStorage, TrieChanges, TrieConfig, TrieDBStorage, TrieStorage,
WrappedTrieChanges, STATE_SNAPSHOT_COLUMNS,
};
use borsh::{BorshDeserialize, BorshSerialize};
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::flat::{
store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
};
use crate::metadata::{DbKind, DbVersion, DB_VERSION};
use crate::trie::StateReader;
use crate::{
get, get_delayed_receipt_indices, get_promise_yield_indices, DBCol, NodeStorage, ShardTries,
StateSnapshotConfig, Store, Trie, TrieConfig,
Expand Down Expand Up @@ -63,6 +64,10 @@ pub fn create_test_node_storage_with_cold(
(storage, hot, cold)
}

pub fn create_test_state_reader(shard_uid: ShardUId) -> StateReader {
StateReader::new_without_resharding_history(&[shard_uid])
}

/// Creates an in-memory database.
pub fn create_test_store() -> Store {
create_test_node_storage(DB_VERSION, DbKind::RPC).get_hot_store()
Expand Down Expand Up @@ -123,6 +128,7 @@ impl TestTriesBuilder {
let shard_uids = (0..self.num_shards)
.map(|shard_id| ShardUId { shard_id: shard_id as u32, version: self.shard_version })
.collect::<Vec<_>>();
let state_reader = StateReader::new_without_resharding_history(&shard_uids);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let tries = ShardTries::new(
store.clone(),
Expand All @@ -131,6 +137,7 @@ impl TestTriesBuilder {
..Default::default()
},
&shard_uids,
state_reader,
flat_storage_manager,
StateSnapshotConfig::default(),
);
Expand Down
8 changes: 6 additions & 2 deletions core/store/src/trie/from_flat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::flat::{store_helper, FlatStorageError, FlatStorageManager};
use crate::trie::StateReader;
use crate::{ShardTries, StateSnapshotConfig, Store, Trie, TrieConfig, TrieDBStorage, TrieStorage};
use near_primitives::{shard_layout::ShardUId, state::FlatStateValue};
use std::time::Instant;
Expand All @@ -11,7 +12,9 @@ use std::time::Instant;
// Please note that the trie is created for the block state with height equal to flat_head
// flat state can comtain deltas after flat_head and can be different from tip of the blockchain.
pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: ShardUId) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let shard_uids = &[shard_uid];
let state_reader = StateReader::new_without_resharding_history(shard_uids);
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid, state_reader.clone());
let flat_state_to_trie_kv =
|entry: Result<(Vec<u8>, FlatStateValue), FlatStorageError>| -> (Vec<u8>, Vec<u8>) {
let (key, value) = entry.unwrap();
Expand All @@ -31,7 +34,8 @@ pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: Sha
let tries = ShardTries::new(
write_store.clone(),
TrieConfig::default(),
&[shard_uid],
shard_uids,
state_reader,
FlatStorageManager::new(write_store),
StateSnapshotConfig::default(),
);
Expand Down
15 changes: 8 additions & 7 deletions core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use super::construction::TrieConstructor;
use super::node::{InputMemTrieNode, MemTrieNodeId};
use crate::flat::FlatStorageError;
use crate::trie::Children;
use crate::{map_shard_uid, DBCol, NibbleSlice, RawTrieNode, RawTrieNodeWithSize, Store};
use crate::{DBCol, NibbleSlice, RawTrieNode, RawTrieNodeWithSize, Store};
use borsh::BorshDeserialize;
use near_primitives::errors::{MissingTrieValueContext, StorageError};
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{MappedShardUId, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use near_primitives::types::StateRoot;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn load_memtrie_in_parallel(
/// or multiple shards.
pub struct ParallelMemTrieLoader {
store: Store,
shard_uid_db_prefix: MappedShardUId,
shard_uid: ShardUId,
root: StateRoot,
num_subtrees_desired: usize,
}
Expand All @@ -57,8 +57,7 @@ impl ParallelMemTrieLoader {
root: StateRoot,
num_subtrees_desired: usize,
) -> Self {
let shard_uid_db_prefix = map_shard_uid(&store, shard_uid);
Self { store, shard_uid_db_prefix, root, num_subtrees_desired }
Self { store, shard_uid, root, num_subtrees_desired }
}

/// Implements stage 1; recursively expanding the trie until all subtrees are small enough.
Expand Down Expand Up @@ -88,7 +87,8 @@ impl ParallelMemTrieLoader {
) -> Result<TrieLoadingPlanNode, StorageError> {
// Read the node from the State column.
let mut key = [0u8; 40];
key[0..8].copy_from_slice(&self.shard_uid_db_prefix.0.to_bytes());
// TODO(reshardingV3) Likely should use mapped shard UId here
key[0..8].copy_from_slice(&self.shard_uid.to_bytes());
key[8..40].copy_from_slice(&hash.0);
let node = RawTrieNodeWithSize::try_from_slice(
&self
Expand Down Expand Up @@ -214,7 +214,8 @@ impl ParallelMemTrieLoader {
arena: &mut impl Arena,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid_db_prefix.0);
// TODO(reshardingV3) likely should use mapped shard UId here
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
let mut recon = TrieConstructor::new(arena);
Expand Down
Loading

0 comments on commit 7d1841c

Please sign in to comment.