diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 00da87b8ffe..753f757c423 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -994,6 +994,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::EpochSyncProof | DBCol::Misc | DBCol::_ReceiptIdToShardId + | DBCol::ShardUIdMapping => unreachable!(), } self.merge(store_update); diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index e4ea95f4378..ef64b3fdecb 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -186,6 +186,7 @@ fn copy_state_from_store( let Some(trie_changes) = trie_changes else { continue }; for op in trie_changes.insertions() { + // TODO(reshardingV3) Handle shard_uid not mapped there let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); let value = op.payload().to_vec(); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 48b2a7f34fb..8f8c61902f2 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -300,6 +300,12 @@ pub enum DBCol { /// - *Rows*: only one key with 0 bytes. /// - *Column type*: `EpochSyncProof` EpochSyncProof, + /// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column. + /// It could be parent shard after resharding, ancestor shard in case of many resharding, + /// or just map shard to itself if there was no resharding or we updated the mapping during state sync. + /// - *Rows*: `ShardUId` + /// - *Column type*: `ShardUId` + ShardUIdMapping, } /// Defines different logical parts of a db key. @@ -502,7 +508,8 @@ impl DBCol { | DBCol::FlatStateChanges | DBCol::FlatStateDeltaMetadata | DBCol::FlatStorageStatus - | DBCol::EpochSyncProof => false, + | DBCol::EpochSyncProof + | DBCol::ShardUIdMapping => false, } } @@ -575,6 +582,7 @@ impl DBCol { DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], DBCol::EpochSyncProof => &[DBKeyType::Empty], + DBCol::ShardUIdMapping => &[DBKeyType::ShardUId], } } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index e16d4a6113a..48faa5da380 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -389,7 +389,7 @@ impl FlatStorage { let blocks = guard.get_blocks_to_head(&new_head)?; for block_hash in blocks.into_iter().rev() { - let mut store_update = StoreUpdate::new(guard.store.storage.clone()); + let mut store_update = guard.store.store_update(); // Delta must exist because flat storage is locked and we could retrieve // path from old to new head. Otherwise we return internal error. let changes = store_helper::get_delta_changes(&guard.store, shard_uid, block_hash)? @@ -460,7 +460,7 @@ impl FlatStorage { if block.prev_hash != guard.flat_head.hash && !guard.deltas.contains_key(&block.prev_hash) { return Err(guard.create_block_not_supported_error(&block_hash)); } - let mut store_update = StoreUpdate::new(guard.store.storage.clone()); + let mut store_update = guard.store.store_update(); store_helper::set_delta(&mut store_update, shard_uid, &delta); let cached_changes: CachedFlatStateChanges = delta.changes.into(); guard.deltas.insert( diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index ad05b92e7ae..ddec23f919e 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -126,10 +126,6 @@ pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: S remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState); } -pub fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::State); -} - pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { let mut buffer = vec![]; buffer.extend_from_slice(&shard_uid.to_bytes()); diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 415869d695f..349c1626ff4 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -35,6 +35,8 @@ pub use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::{AccountId, BlockHeight, StateRoot}; use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache}; +use shard_uid_mapping::{replace_shard_uid_key_prefix, retrieve_shard_uid_from_db_key}; +use std::borrow::Cow; use std::fs::File; use std::path::Path; use std::str::FromStr; @@ -54,6 +56,7 @@ pub mod metrics; pub mod migrations; mod opener; mod rocksdb_metrics; +mod shard_uid_mapping; mod sync_utils; pub mod test_utils; pub mod trie; @@ -62,6 +65,7 @@ pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{ checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError, }; +use crate::shard_uid_mapping::ShardUIdMapping; /// Specifies temperature of a storage. /// @@ -109,6 +113,7 @@ pub struct NodeStorage { #[derive(Clone)] pub struct Store { storage: Arc, + shard_uid_mapping: ShardUIdMapping, } impl NodeStorage { @@ -184,7 +189,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_hot_store(&self) -> Store { - Store { storage: self.hot_storage.clone() } + Store::new(self.hot_storage.clone()) } /// Returns the cold store. The cold store is only available in archival @@ -196,7 +201,7 @@ impl NodeStorage { /// loop should use cold store. pub fn get_cold_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => Some(Store { storage: cold_storage.clone() }), + Some(cold_storage) => Some(Store::new(cold_storage.clone())), None => None, } } @@ -208,7 +213,7 @@ impl NodeStorage { pub fn get_recovery_store(&self) -> Option { match &self.cold_storage { Some(cold_storage) => { - Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) }) + Some(Store::new(Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())))) } None => None, } @@ -222,7 +227,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_split_store(&self) -> Option { - self.get_split_db().map(|split_db| Store { storage: split_db }) + self.get_split_db().map(|split_db| Store::new(split_db)) } pub fn get_split_db(&self) -> Option> { @@ -284,26 +289,68 @@ impl NodeStorage { impl Store { pub fn new(storage: Arc) -> Self { - Self { storage } + let shard_uid_mapping = ShardUIdMapping::new(); + Self { storage, shard_uid_mapping } + } + + fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result>> { + let value = if column.is_rc() { + self.storage.get_with_rc_stripped(column, &key) + } else { + self.storage.get_raw_bytes(column, &key) + }?; + Ok(value) + } + + fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result { + let mapped_shard_uid = + self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; + Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid)) + } + + fn get_impl_state(&self, key: &[u8]) -> io::Result>> { + let shard_uid = retrieve_shard_uid_from_db_key(key)?; + let mut mapping_synchronized_with_db = false; + let mapped_shard_uid = match self.shard_uid_mapping.map(&shard_uid) { + Some(mapped_shard_uid) => mapped_shard_uid, + None => { + mapping_synchronized_with_db = true; + self.read_shard_uid_mapping_from_db(shard_uid)? + } + }; + let mapped_key = if shard_uid != mapped_shard_uid { + Cow::Owned(replace_shard_uid_key_prefix(key, mapped_shard_uid)) + } else { + Cow::Borrowed(key) + }; + let mut value = self.get_impl(DBCol::State, &mapped_key)?; + if value.is_none() && !mapping_synchronized_with_db { + let db_mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; + if db_mapped_shard_uid != mapped_shard_uid { + let mapped_key = replace_shard_uid_key_prefix(key, db_mapped_shard_uid); + value = self.get_impl(DBCol::State, &mapped_key)?; + } + } + Ok(value) } /// Fetches value from given column. /// - /// If the key does not exist in the column returns `None`. Otherwise + /// If the key does not exist in the column returns `None`. Otherwise /// returns the data as [`DBSlice`] object. The object dereferences into /// a slice, for cases when caller doesn’t need to own the value, and /// provides conversion into a vector or an Arc. pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result>> { - let value = if column.is_rc() { - self.storage.get_with_rc_stripped(column, key) + let value = if column == DBCol::State { + self.get_impl_state(key)? } else { - self.storage.get_raw_bytes(column, key) - }?; + self.get_impl(column, key)? + }; tracing::trace!( target: "store", db_op = "get", col = %column, - key = %StorageKey(key), + key = %StorageKey(&key), size = value.as_deref().map(<[u8]>::len) ); Ok(value) @@ -318,7 +365,11 @@ impl Store { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate::new(Arc::clone(&self.storage)) + StoreUpdate { + transaction: DBTransaction::new(), + storage: Arc::clone(&self.storage), + _shard_uid_mapping: self.shard_uid_mapping.clone(), + } } pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { @@ -336,6 +387,7 @@ impl Store { } pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { + assert!(col != DBCol::State, "can't iter prefix of State column"); self.storage.iter_prefix(col, key_prefix) } @@ -346,6 +398,8 @@ impl Store { lower_bound: Option<&[u8]>, upper_bound: Option<&[u8]>, ) -> DBIterator<'a> { + // That would fail if called ScanDbColumnCmd for State column. + assert!(col != DBCol::State, "can't range iter State column"); self.storage.iter_range(col, lower_bound, upper_bound) } @@ -354,6 +408,7 @@ impl Store { col: DBCol, key_prefix: &'a [u8], ) -> impl Iterator, T)>> + 'a { + assert!(col != DBCol::State, "can't iter prefix ser of State column"); self.storage .iter_prefix(col, key_prefix) .map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))) @@ -447,6 +502,8 @@ impl Store { pub struct StoreUpdate { transaction: DBTransaction, storage: Arc, + // TODO(reshardingV3) Currently unused, see TODO inside `commit()` method. + _shard_uid_mapping: ShardUIdMapping, } impl StoreUpdate { @@ -455,10 +512,6 @@ impl StoreUpdate { None => panic!(), }; - pub(crate) fn new(db: Arc) -> Self { - StoreUpdate { transaction: DBTransaction::new(), storage: db } - } - /// Inserts a new value into the database. /// /// It is a programming error if `insert` overwrites an existing, different @@ -583,6 +636,7 @@ impl StoreUpdate { /// Must not be used for reference-counted columns; use /// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead. pub fn delete(&mut self, column: DBCol, key: &[u8]) { + // It would panic if called with `State` column. assert!(!column.is_rc(), "can't delete: {column}"); self.transaction.delete(column, key.to_vec()); } @@ -594,6 +648,7 @@ impl StoreUpdate { /// Deletes the given key range from the database including `from` /// and excluding `to` keys. pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) { + assert!(column != DBCol::State, "can't range delete State column"); self.transaction.delete_range(column, from.to_vec(), to.to_vec()); } @@ -718,6 +773,7 @@ impl StoreUpdate { } } } + // TODO(reshardingV3) Use shard_uid_mapping for ops referencing State column. self.storage.write(self.transaction) } } diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 41a855031cb..27e264f0c10 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -339,7 +339,7 @@ impl<'a> StoreOpener<'a> { tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it."); let db = opener.create()?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); store.set_db_version(DB_VERSION)?; return Ok(()); } @@ -467,13 +467,13 @@ impl<'a> StoreOpener<'a> { version: DbVersion, ) -> Result { let (db, _) = opener.open(mode, version)?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); Ok(store) } fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result { let db = opener.open_unsafe(mode)?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); Ok(store) } } @@ -640,7 +640,7 @@ mod tests { fn test_checkpoint_hot_storage_and_cleanup_columns() { let (home_dir, opener) = NodeStorage::test_opener(); let node_storage = opener.open().unwrap(); - let hot_store = Store { storage: node_storage.hot_storage.clone() }; + let hot_store = Store::new(node_storage.hot_storage.clone()); assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC)); let keys = vec![vec![0], vec![1], vec![2], vec![3]]; diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs new file mode 100644 index 00000000000..0cdae9bfef9 --- /dev/null +++ b/core/store/src/shard_uid_mapping.rs @@ -0,0 +1,69 @@ +use crate::{ + flat::POISONED_LOCK_ERR, + io::{Error, Result}, +}; +use near_primitives::shard_layout::ShardUId; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +#[derive(Clone)] +pub(crate) struct ShardUIdMapping(Arc>); + +impl ShardUIdMapping { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(ShardUIdMappingInner::new()))) + } + + pub fn map(&self, shard_uid: &ShardUId) -> Option { + self.lock().map(shard_uid) + } + + pub fn update(&self, shard_uid: &ShardUId, db_mapped_shard_uid: Option) -> ShardUId { + self.lock().update(shard_uid, db_mapped_shard_uid) + } + + fn lock(&self) -> std::sync::MutexGuard { + self.0.lock().expect(POISONED_LOCK_ERR) + } +} + +pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> Result { + // TODO(reshardingV3) Consider changing the Error type to `StorageError`? + // Would need changing error types for `Store` methods as well. + ShardUId::try_from(&key[..8]) + .map_err(|e| Error::other(format!("Could not retrieve ShardUId from db key: {}", e))) +} + +pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> Vec { + let mut mapped_key = [0u8; 40]; + mapped_key[..8].copy_from_slice(&shard_uid.to_bytes()); + mapped_key[8..].copy_from_slice(&key[8..]); + mapped_key.to_vec() +} + +struct ShardUIdMappingInner { + mapping: HashMap, +} + +impl ShardUIdMappingInner { + pub fn new() -> Self { + Self { mapping: HashMap::new() } + } + + pub fn map(&self, shard_uid: &ShardUId) -> Option { + self.mapping.get(shard_uid).copied() + } + + pub fn update( + &mut self, + shard_uid: &ShardUId, + db_mapped_shard_uid: Option, + ) -> ShardUId { + // No mapping means we map shard_uid to itself + let mapped_shard_uid = db_mapped_shard_uid.unwrap_or(*shard_uid); + self.mapping.insert(*shard_uid, mapped_shard_uid); + mapped_shard_uid + } +} diff --git a/core/store/src/trie/mem/parallel_loader.rs b/core/store/src/trie/mem/parallel_loader.rs index 03945976434..d4e1701db5c 100644 --- a/core/store/src/trie/mem/parallel_loader.rs +++ b/core/store/src/trie/mem/parallel_loader.rs @@ -214,6 +214,7 @@ impl ParallelMemTrieLoader { arena: &mut impl ArenaMut, ) -> Result { // Figure out which range corresponds to the prefix of this subtree. + // TODO(reshardingV3) This seems fragile, potentially does not work with mapping. let (start, end) = subtree_to_load.to_iter_range(self.shard_uid); // Load all the keys in this range from the FlatState column. diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 8b83b6d02ff..ac8bb05968e 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -1,7 +1,6 @@ use super::mem::mem_tries::MemTries; use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; use super::TrieRefcountSubtraction; -use crate::flat::store_helper::remove_all_state_values; use crate::flat::{FlatStorageManager, FlatStorageStatus}; use crate::trie::config::TrieConfig; use crate::trie::mem::loading::load_trie_from_flat_state_and_delta; @@ -167,17 +166,13 @@ impl ShardTries { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate::new(self.get_db().clone()) + self.0.store.store_update() } pub fn get_store(&self) -> Store { self.0.store.clone() } - pub(crate) fn get_db(&self) -> &Arc { - &self.0.store.storage - } - pub fn get_flat_storage_manager(&self) -> FlatStorageManager { self.0.flat_storage_manager.clone() } @@ -397,16 +392,6 @@ impl ShardTries { Ok(manager.get_flat_storage_status(shard_uid)) } - /// Removes all trie state values from store for a given shard_uid - /// Useful when we are trying to delete state of parent shard after resharding - /// Note that flat storage needs to be handled separately - pub fn delete_trie_for_shard(&self, shard_uid: ShardUId, store_update: &mut StoreUpdate) { - // Clear both caches and remove state values from store - let _cache = self.0.caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); - let _view_cache = self.0.view_caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); - remove_all_state_values(store_update, shard_uid); - } - /// Retains in-memory tries for given shards, i.e. unload tries from memory for shards that are NOT /// in the given list. Should be called to unload obsolete tries from memory. pub fn retain_mem_tries(&self, shard_uids: &[ShardUId]) { @@ -909,33 +894,4 @@ mod test { trie.update_cache(insert_ops, shard_uid); assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); } - - #[test] - fn test_delete_trie_for_shard() { - let shard_uid = ShardUId::single_shard(); - let tries = create_trie(); - - let key = CryptoHash::hash_borsh("alice").as_bytes().to_vec(); - let val: Vec = Vec::from([0, 1, 2, 3, 4]); - - // insert some data - let trie = tries.get_trie_for_shard(shard_uid, CryptoHash::default()); - let trie_changes = trie.update(vec![(key, Some(val))]).unwrap(); - let mut store_update = tries.store_update(); - tries.apply_insertions(&trie_changes, shard_uid, &mut store_update); - store_update.commit().unwrap(); - - // delete trie for shard_uid - let mut store_update = tries.store_update(); - tries.delete_trie_for_shard(shard_uid, &mut store_update); - store_update.commit().unwrap(); - - // verify if data and caches are deleted - assert!(tries.0.caches.lock().unwrap().get(&shard_uid).is_none()); - assert!(tries.0.view_caches.lock().unwrap().get(&shard_uid).is_none()); - let store = tries.get_store(); - let key_prefix = shard_uid.to_bytes(); - let mut iter = store.iter_prefix(DBCol::State, &key_prefix); - assert!(iter.next().is_none()); - } }