diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index 331efdfe211..2a98abb3bcb 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -2,6 +2,7 @@ use std::io; use std::num::NonZero; use std::sync::Arc; +use borsh::BorshDeserialize; use near_primitives::errors::{MissingTrieValueContext, StorageError}; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; @@ -31,24 +32,12 @@ impl TrieStoreAdapter { TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) } } - /// Reads shard_uid mapping for given shard. - /// If the mapping does not exist, it means that `shard_uid` maps to itself. - pub(crate) fn read_shard_uid_mapping_from_db( - &self, - shard_uid: ShardUId, - ) -> Result { - let mapped_shard_uid = - self.store.get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()); - let mapped_shard_uid = mapped_shard_uid - .map_err(|err| StorageError::StorageInconsistentState(err.to_string()))?; - Ok(mapped_shard_uid.unwrap_or(shard_uid)) - } - /// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3. /// For this, it does extra read from `DBCol::StateShardUIdMapping`. + /// + /// For more details, see `get_key_from_shard_uid_and_hash()` docs. pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result, StorageError> { - let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; - let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash); + let key = get_key_from_shard_uid_and_hash(&self.store, shard_uid, hash); let val = self .store .get(DBCol::State, key.as_ref()) @@ -57,6 +46,15 @@ impl TrieStoreAdapter { Ok(val.into()) } + pub fn get_ser( + &self, + shard_uid: ShardUId, + hash: &CryptoHash, + ) -> Result { + let bytes = self.get(shard_uid, hash)?; + T::try_from_slice(&bytes).map_err(|e| StorageError::StorageInconsistentState(e.to_string())) + } + pub fn get_state_snapshot_hash(&self) -> Result { let val = self .store @@ -100,18 +98,22 @@ impl<'a> TrieStoreUpdateAdapter<'a> { Self { store_update: StoreUpdateHolder::Reference(store_update) } } + fn get_key_from_shard_uid_and_hash(&self, shard_uid: ShardUId, hash: &CryptoHash) -> [u8; 40] { + get_key_from_shard_uid_and_hash(&self.store_update.store, shard_uid, hash) + } + pub fn decrement_refcount_by( &mut self, shard_uid: ShardUId, hash: &CryptoHash, decrement: NonZero, ) { - let key = get_key_from_shard_uid_and_hash(shard_uid, hash); + let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash); self.store_update.decrement_refcount_by(DBCol::State, key.as_ref(), decrement); } pub fn decrement_refcount(&mut self, shard_uid: ShardUId, hash: &CryptoHash) { - let key = get_key_from_shard_uid_and_hash(shard_uid, hash); + let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash); self.store_update.decrement_refcount(DBCol::State, key.as_ref()); } @@ -120,10 +122,10 @@ impl<'a> TrieStoreUpdateAdapter<'a> { shard_uid: ShardUId, hash: &CryptoHash, data: &[u8], - decrement: NonZero, + increment: NonZero, ) { - let key = get_key_from_shard_uid_and_hash(shard_uid, hash); - self.store_update.increment_refcount_by(DBCol::State, key.as_ref(), data, decrement); + let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash); + self.store_update.increment_refcount_by(DBCol::State, key.as_ref(), data, increment); } pub fn set_state_snapshot_hash(&mut self, hash: Option) { @@ -156,14 +158,152 @@ impl<'a> TrieStoreUpdateAdapter<'a> { ) } + /// Set the mapping from `child_shard_uid` to `parent_shard_uid`. + /// Used by Resharding V3 for State mapping. + #[cfg(test)] + fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) { + self.store_update.set( + DBCol::StateShardUIdMapping, + child_shard_uid.to_bytes().as_ref(), + &borsh::to_vec(&parent_shard_uid).expect("Borsh serialize cannot fail"), + ) + } + pub fn delete_all_state(&mut self) { self.store_update.delete_all(DBCol::State) } } -pub fn get_key_from_shard_uid_and_hash(shard_uid: ShardUId, hash: &CryptoHash) -> [u8; 40] { +/// Constructs db key to be used to access the State column. +/// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix +/// to its ancestor in the resharding tree (according to Resharding V3) +/// or map to itself if the mapping does not exist. +/// +/// Please note that the mapped shard uid is read from db each time which may seem slow. +/// In practice the `StateShardUIdMapping` is very small and should always be stored in the RocksDB cache. +/// The deserialization of ShardUId is also very cheap. +fn get_key_from_shard_uid_and_hash( + store: &Store, + shard_uid: ShardUId, + hash: &CryptoHash, +) -> [u8; 40] { + let mapped_shard_uid = store + .get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()) + .expect("get_key_from_shard_uid_and_hash() failed") + .unwrap_or(shard_uid); let mut key = [0; 40]; - key[0..8].copy_from_slice(&shard_uid.to_bytes()); + key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes()); key[8..].copy_from_slice(hash.as_ref()); key } + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use near_primitives::errors::StorageError; + use near_primitives::hash::CryptoHash; + use near_primitives::shard_layout::ShardUId; + + use crate::adapter::trie_store::TrieStoreAdapter; + use crate::NodeStorage; + + const ONE: std::num::NonZeroU32 = match std::num::NonZeroU32::new(1) { + Some(num) => num, + None => panic!(), + }; + + #[test] + fn test_trie_store_adapter() { + let (_tmp_dir, opener) = NodeStorage::test_opener(); + let store = TrieStoreAdapter::new(opener.open().unwrap().get_hot_store()); + let shard_uids: Vec = + (0..3).map(|i| ShardUId { version: 0, shard_id: i }).collect(); + let dummy_hash = CryptoHash::default(); + + assert_matches!( + store.get(shard_uids[0], &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + { + let mut store_update = store.store_update(); + store_update.increment_refcount_by(shard_uids[0], &dummy_hash, &[0], ONE); + store_update.increment_refcount_by(shard_uids[1], &dummy_hash, &[1], ONE); + store_update.increment_refcount_by(shard_uids[2], &dummy_hash, &[2], ONE); + store_update.commit().unwrap(); + } + assert_eq!(*store.get(shard_uids[0], &dummy_hash).unwrap(), [0]); + { + let mut store_update = store.store_update(); + store_update.delete_all_state(); + store_update.commit().unwrap(); + } + assert_matches!( + store.get(shard_uids[0], &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + } + + #[test] + fn test_shard_uid_mapping() { + let (_tmp_dir, opener) = NodeStorage::test_opener(); + let store = TrieStoreAdapter::new(opener.open().unwrap().get_hot_store()); + let parent_shard = ShardUId { version: 0, shard_id: 0 }; + let child_shard = ShardUId { version: 0, shard_id: 1 }; + let dummy_hash = CryptoHash::default(); + // Write some data to `parent_shard`. + { + let mut store_update = store.store_update(); + store_update.increment_refcount_by(parent_shard, &dummy_hash, &[0], ONE); + store_update.commit().unwrap(); + } + // The data is not yet visible to child shard, because the mapping has not been set yet. + assert_matches!( + store.get(child_shard, &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + // Set the shard_uid mapping from `child_shard` to `parent_shard`. + { + let mut store_update = store.store_update(); + store_update.set_shard_uid_mapping(child_shard, parent_shard); + store_update.commit().unwrap(); + } + // The data is now visible to both `parent_shard` and `child_shard`. + assert_eq!(*store.get(child_shard, &dummy_hash).unwrap(), [0]); + assert_eq!(*store.get(parent_shard, &dummy_hash).unwrap(), [0]); + // Remove the data using `parent_shard` UId. + { + let mut store_update = store.store_update(); + store_update.decrement_refcount(parent_shard, &dummy_hash); + store_update.commit().unwrap(); + } + // The data is now not visible to any shard. + assert_matches!( + store.get(child_shard, &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + assert_matches!( + store.get(parent_shard, &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + // Restore the data now using the `child_shard` UId. + { + let mut store_update = store.store_update(); + store_update.increment_refcount_by(child_shard, &dummy_hash, &[0], ONE); + store_update.commit().unwrap(); + } + // The data is now visible to both shards again. + assert_eq!(*store.get(child_shard, &dummy_hash).unwrap(), [0]); + assert_eq!(*store.get(parent_shard, &dummy_hash).unwrap(), [0]); + // Remove the data using `child_shard` UId. + { + let mut store_update = store.store_update(); + store_update.decrement_refcount_by(child_shard, &dummy_hash, ONE); + store_update.commit().unwrap(); + } + // The data is not visible to any shard again. + assert_matches!( + store.get(child_shard, &dummy_hash), + Err(StorageError::MissingTrieValue(_, _)) + ); + } +} diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index ef64b3fdecb..afb75cf04f2 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -326,6 +326,7 @@ pub fn copy_all_data_to_cold( tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold"); return Ok(CopyAllDataToColdStatus::Interrupted); } + // TODO(reshardingV3) Should do mapping here? let (key, value) = result?; transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?; } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 0747bbdddcb..71eb56d107a 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -327,7 +327,7 @@ impl Store { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) } + StoreUpdate { transaction: DBTransaction::new(), store: self.clone() } } pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { @@ -459,7 +459,7 @@ impl Store { /// Keeps track of current changes to the database and can commit all of them to the database. pub struct StoreUpdate { transaction: DBTransaction, - storage: Arc, + store: Store, } impl StoreUpdateAdapter for StoreUpdate { @@ -618,7 +618,10 @@ impl StoreUpdate { /// /// Panics if `self`’s and `other`’s storage are incompatible. pub fn merge(&mut self, other: StoreUpdate) { - assert!(core::ptr::addr_eq(Arc::as_ptr(&self.storage), Arc::as_ptr(&other.storage))); + assert!(core::ptr::addr_eq( + Arc::as_ptr(&self.store.storage), + Arc::as_ptr(&other.store.storage) + )); self.transaction.merge(other.transaction) } @@ -735,8 +738,7 @@ impl StoreUpdate { } } } - // TODO(reshardingV3) Map shard_uid for ops referencing State column. - self.storage.write(self.transaction) + self.store.storage.write(self.transaction) } } diff --git a/nearcore/src/entity_debug.rs b/nearcore/src/entity_debug.rs index 37394ddc873..d246da87d43 100644 --- a/nearcore/src/entity_debug.rs +++ b/nearcore/src/entity_debug.rs @@ -33,7 +33,7 @@ use near_primitives::views::{ BlockHeaderView, BlockView, ChunkView, ExecutionOutcomeView, ReceiptView, SignedTransactionView, }; use near_store::adapter::flat_store::encode_flat_state_db_key; -use near_store::adapter::trie_store::get_key_from_shard_uid_and_hash; +use near_store::adapter::StoreAdapter; use near_store::db::GENESIS_CONGESTION_INFO_KEY; use near_store::flat::delta::KeyForFlatStateDelta; use near_store::flat::{FlatStateChanges, FlatStateDeltaMetadata, FlatStorageStatus}; @@ -249,11 +249,9 @@ impl EntityDebugHandlerImpl { } EntityQuery::RawTrieNodeByHash { trie_node_hash, shard_uid } => { let node = store - .get_ser::( - DBCol::State, - &get_key_from_shard_uid_and_hash(shard_uid, &trie_node_hash), - )? - .ok_or_else(|| anyhow!("Trie node not found"))?; + .trie_store() + .get_ser::(shard_uid, &trie_node_hash) + .map_err(|e| anyhow!("Trie node not found: {e}"))?; Ok(serialize_raw_trie_node(node)) } EntityQuery::RawTrieRootByChunkHash { chunk_hash } => { @@ -270,21 +268,17 @@ impl EntityDebugHandlerImpl { .nth(shard_index) .ok_or_else(|| anyhow!("Shard {} not found", chunk.shard_id()))?; let node = store - .get_ser::( - DBCol::State, - &get_key_from_shard_uid_and_hash(shard_uid, &chunk.prev_state_root()), - )? - .ok_or_else(|| anyhow!("State root not found"))?; + .trie_store() + .get_ser::(shard_uid, &chunk.prev_state_root()) + .map_err(|e| anyhow!("State root not found: {e}"))?; Ok(serialize_raw_trie_node(node)) } EntityQuery::RawTrieValueByHash { trie_value_hash, shard_uid } => { let value = store - .get( - DBCol::State, - &get_key_from_shard_uid_and_hash(shard_uid, &trie_value_hash), - )? - .ok_or_else(|| anyhow!("Trie value not found"))?; - Ok(serialize_entity(&hex::encode(value.as_slice()))) + .trie_store() + .get(shard_uid, &trie_value_hash) + .map_err(|e| anyhow!("Trie value not found: {e}"))?; + Ok(serialize_entity(&hex::encode(value))) } EntityQuery::ReceiptById { receipt_id } => { let receipt = store @@ -461,8 +455,9 @@ impl EntityDebugHandlerImpl { ) -> anyhow::Result> { Ok(match state { FlatStateValue::Ref(value) => store - .get(DBCol::State, &get_key_from_shard_uid_and_hash(shard_uid, &value.hash))? - .ok_or_else(|| anyhow!("ValueRef could not be dereferenced"))? + .trie_store() + .get(shard_uid, &value.hash) + .map_err(|e| anyhow!("ValueRef could not be dereferenced: {e}"))? .to_vec(), FlatStateValue::Inlined(data) => data, })