From 1e67eb7856df506f0d7765b72eec09f54680217f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Mon, 23 Sep 2024 17:46:01 +0200 Subject: [PATCH 01/13] state mapping --- chain/chain/src/garbage_collection.rs | 1 + core/store/src/cold_storage.rs | 1 + core/store/src/columns.rs | 10 ++- core/store/src/flat/storage.rs | 2 +- core/store/src/lib.rs | 88 ++++++++++++++++++---- core/store/src/opener.rs | 8 +- core/store/src/shard_uid_mapping.rs | 69 +++++++++++++++++ core/store/src/trie/mem/parallel_loader.rs | 1 + 8 files changed, 158 insertions(+), 22 deletions(-) create mode 100644 core/store/src/shard_uid_mapping.rs diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 777d83edaef..7a94e862c69 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -997,6 +997,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::EpochSyncProof | DBCol::Misc | DBCol::_ReceiptIdToShardId + | DBCol::ShardUIdMapping => unreachable!(), } self.merge(store_update); diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index e4ea95f4378..ef64b3fdecb 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -186,6 +186,7 @@ fn copy_state_from_store( let Some(trie_changes) = trie_changes else { continue }; for op in trie_changes.insertions() { + // TODO(reshardingV3) Handle shard_uid not mapped there let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); let value = op.payload().to_vec(); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 48b2a7f34fb..8f8c61902f2 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -300,6 +300,12 @@ pub enum DBCol { /// - *Rows*: only one key with 0 bytes. /// - *Column type*: `EpochSyncProof` EpochSyncProof, + /// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column. + /// It could be parent shard after resharding, ancestor shard in case of many resharding, + /// or just map shard to itself if there was no resharding or we updated the mapping during state sync. + /// - *Rows*: `ShardUId` + /// - *Column type*: `ShardUId` + ShardUIdMapping, } /// Defines different logical parts of a db key. @@ -502,7 +508,8 @@ impl DBCol { | DBCol::FlatStateChanges | DBCol::FlatStateDeltaMetadata | DBCol::FlatStorageStatus - | DBCol::EpochSyncProof => false, + | DBCol::EpochSyncProof + | DBCol::ShardUIdMapping => false, } } @@ -575,6 +582,7 @@ impl DBCol { DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], DBCol::EpochSyncProof => &[DBKeyType::Empty], + DBCol::ShardUIdMapping => &[DBKeyType::ShardUId], } } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 79f7001aacf..fcc6f223e2d 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -460,7 +460,7 @@ impl FlatStorage { return Err(guard.create_block_not_supported_error(&block_hash)); } let mut store_update = guard.store.store_update(); - store_update.set_delta(shard_uid, &delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); let cached_changes: CachedFlatStateChanges = delta.changes.into(); guard.deltas.insert( block_hash, diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 3804b60a47b..e5292998239 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -36,6 +36,8 @@ pub use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::{AccountId, BlockHeight, StateRoot}; use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache}; +use shard_uid_mapping::{replace_shard_uid_key_prefix, retrieve_shard_uid_from_db_key}; +use std::borrow::Cow; use std::fs::File; use std::path::Path; use std::str::FromStr; @@ -57,6 +59,7 @@ pub mod metrics; pub mod migrations; mod opener; mod rocksdb_metrics; +mod shard_uid_mapping; mod sync_utils; pub mod test_utils; pub mod trie; @@ -65,6 +68,7 @@ pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{ checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError, }; +use crate::shard_uid_mapping::ShardUIdMapping; /// Specifies temperature of a storage. /// @@ -112,6 +116,7 @@ pub struct NodeStorage { #[derive(Clone)] pub struct Store { storage: Arc, + shard_uid_mapping: ShardUIdMapping, } impl StoreAdapter for Store { @@ -193,7 +198,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_hot_store(&self) -> Store { - Store { storage: self.hot_storage.clone() } + Store::new(self.hot_storage.clone()) } /// Returns the cold store. The cold store is only available in archival @@ -205,7 +210,7 @@ impl NodeStorage { /// loop should use cold store. pub fn get_cold_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => Some(Store { storage: cold_storage.clone() }), + Some(cold_storage) => Some(Store::new(cold_storage.clone())), None => None, } } @@ -217,7 +222,7 @@ impl NodeStorage { pub fn get_recovery_store(&self) -> Option { match &self.cold_storage { Some(cold_storage) => { - Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) }) + Some(Store::new(Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())))) } None => None, } @@ -231,7 +236,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_split_store(&self) -> Option { - self.get_split_db().map(|split_db| Store { storage: split_db }) + self.get_split_db().map(|split_db| Store::new(split_db)) } pub fn get_split_db(&self) -> Option> { @@ -293,26 +298,68 @@ impl NodeStorage { impl Store { pub fn new(storage: Arc) -> Self { - Self { storage } + let shard_uid_mapping = ShardUIdMapping::new(); + Self { storage, shard_uid_mapping } + } + + fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result>> { + let value = if column.is_rc() { + self.storage.get_with_rc_stripped(column, &key) + } else { + self.storage.get_raw_bytes(column, &key) + }?; + Ok(value) + } + + fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result { + let mapped_shard_uid = + self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; + Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid)) + } + + fn get_impl_state(&self, key: &[u8]) -> io::Result>> { + let shard_uid = retrieve_shard_uid_from_db_key(key)?; + let mut mapping_synchronized_with_db = false; + let mapped_shard_uid = match self.shard_uid_mapping.map(&shard_uid) { + Some(mapped_shard_uid) => mapped_shard_uid, + None => { + mapping_synchronized_with_db = true; + self.read_shard_uid_mapping_from_db(shard_uid)? + } + }; + let mapped_key = if shard_uid != mapped_shard_uid { + Cow::Owned(replace_shard_uid_key_prefix(key, mapped_shard_uid)) + } else { + Cow::Borrowed(key) + }; + let mut value = self.get_impl(DBCol::State, &mapped_key)?; + if value.is_none() && !mapping_synchronized_with_db { + let db_mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; + if db_mapped_shard_uid != mapped_shard_uid { + let mapped_key = replace_shard_uid_key_prefix(key, db_mapped_shard_uid); + value = self.get_impl(DBCol::State, &mapped_key)?; + } + } + Ok(value) } /// Fetches value from given column. /// - /// If the key does not exist in the column returns `None`. Otherwise + /// If the key does not exist in the column returns `None`. Otherwise /// returns the data as [`DBSlice`] object. The object dereferences into /// a slice, for cases when caller doesn’t need to own the value, and /// provides conversion into a vector or an Arc. pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result>> { - let value = if column.is_rc() { - self.storage.get_with_rc_stripped(column, key) + let value = if column == DBCol::State { + self.get_impl_state(key)? } else { - self.storage.get_raw_bytes(column, key) - }?; + self.get_impl(column, key)? + }; tracing::trace!( target: "store", db_op = "get", col = %column, - key = %StorageKey(key), + key = %StorageKey(&key), size = value.as_deref().map(<[u8]>::len) ); Ok(value) @@ -327,7 +374,11 @@ impl Store { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate::new(Arc::clone(&self.storage)) + StoreUpdate { + transaction: DBTransaction::new(), + storage: Arc::clone(&self.storage), + _shard_uid_mapping: self.shard_uid_mapping.clone(), + } } pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { @@ -345,6 +396,7 @@ impl Store { } pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { + assert!(col != DBCol::State, "can't iter prefix of State column"); self.storage.iter_prefix(col, key_prefix) } @@ -355,6 +407,8 @@ impl Store { lower_bound: Option<&[u8]>, upper_bound: Option<&[u8]>, ) -> DBIterator<'a> { + // That would fail if called ScanDbColumnCmd for State column. + assert!(col != DBCol::State, "can't range iter State column"); self.storage.iter_range(col, lower_bound, upper_bound) } @@ -363,6 +417,7 @@ impl Store { col: DBCol, key_prefix: &'a [u8], ) -> impl Iterator, T)>> + 'a { + assert!(col != DBCol::State, "can't iter prefix ser of State column"); self.storage .iter_prefix(col, key_prefix) .map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))) @@ -456,6 +511,8 @@ impl Store { pub struct StoreUpdate { transaction: DBTransaction, storage: Arc, + // TODO(reshardingV3) Currently unused, see TODO inside `commit()` method. + _shard_uid_mapping: ShardUIdMapping, } impl StoreUpdateAdapter for StoreUpdate { @@ -470,10 +527,6 @@ impl StoreUpdate { None => panic!(), }; - pub(crate) fn new(db: Arc) -> Self { - StoreUpdate { transaction: DBTransaction::new(), storage: db } - } - /// Inserts a new value into the database. /// /// It is a programming error if `insert` overwrites an existing, different @@ -598,6 +651,7 @@ impl StoreUpdate { /// Must not be used for reference-counted columns; use /// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead. pub fn delete(&mut self, column: DBCol, key: &[u8]) { + // It would panic if called with `State` column. assert!(!column.is_rc(), "can't delete: {column}"); self.transaction.delete(column, key.to_vec()); } @@ -609,6 +663,7 @@ impl StoreUpdate { /// Deletes the given key range from the database including `from` /// and excluding `to` keys. pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) { + assert!(column != DBCol::State, "can't range delete State column"); self.transaction.delete_range(column, from.to_vec(), to.to_vec()); } @@ -733,6 +788,7 @@ impl StoreUpdate { } } } + // TODO(reshardingV3) Use shard_uid_mapping for ops referencing State column. self.storage.write(self.transaction) } } diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 41a855031cb..27e264f0c10 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -339,7 +339,7 @@ impl<'a> StoreOpener<'a> { tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it."); let db = opener.create()?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); store.set_db_version(DB_VERSION)?; return Ok(()); } @@ -467,13 +467,13 @@ impl<'a> StoreOpener<'a> { version: DbVersion, ) -> Result { let (db, _) = opener.open(mode, version)?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); Ok(store) } fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result { let db = opener.open_unsafe(mode)?; - let store = Store { storage: Arc::new(db) }; + let store = Store::new(Arc::new(db)); Ok(store) } } @@ -640,7 +640,7 @@ mod tests { fn test_checkpoint_hot_storage_and_cleanup_columns() { let (home_dir, opener) = NodeStorage::test_opener(); let node_storage = opener.open().unwrap(); - let hot_store = Store { storage: node_storage.hot_storage.clone() }; + let hot_store = Store::new(node_storage.hot_storage.clone()); assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC)); let keys = vec![vec![0], vec![1], vec![2], vec![3]]; diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs new file mode 100644 index 00000000000..0cdae9bfef9 --- /dev/null +++ b/core/store/src/shard_uid_mapping.rs @@ -0,0 +1,69 @@ +use crate::{ + flat::POISONED_LOCK_ERR, + io::{Error, Result}, +}; +use near_primitives::shard_layout::ShardUId; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +#[derive(Clone)] +pub(crate) struct ShardUIdMapping(Arc>); + +impl ShardUIdMapping { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(ShardUIdMappingInner::new()))) + } + + pub fn map(&self, shard_uid: &ShardUId) -> Option { + self.lock().map(shard_uid) + } + + pub fn update(&self, shard_uid: &ShardUId, db_mapped_shard_uid: Option) -> ShardUId { + self.lock().update(shard_uid, db_mapped_shard_uid) + } + + fn lock(&self) -> std::sync::MutexGuard { + self.0.lock().expect(POISONED_LOCK_ERR) + } +} + +pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> Result { + // TODO(reshardingV3) Consider changing the Error type to `StorageError`? + // Would need changing error types for `Store` methods as well. + ShardUId::try_from(&key[..8]) + .map_err(|e| Error::other(format!("Could not retrieve ShardUId from db key: {}", e))) +} + +pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> Vec { + let mut mapped_key = [0u8; 40]; + mapped_key[..8].copy_from_slice(&shard_uid.to_bytes()); + mapped_key[8..].copy_from_slice(&key[8..]); + mapped_key.to_vec() +} + +struct ShardUIdMappingInner { + mapping: HashMap, +} + +impl ShardUIdMappingInner { + pub fn new() -> Self { + Self { mapping: HashMap::new() } + } + + pub fn map(&self, shard_uid: &ShardUId) -> Option { + self.mapping.get(shard_uid).copied() + } + + pub fn update( + &mut self, + shard_uid: &ShardUId, + db_mapped_shard_uid: Option, + ) -> ShardUId { + // No mapping means we map shard_uid to itself + let mapped_shard_uid = db_mapped_shard_uid.unwrap_or(*shard_uid); + self.mapping.insert(*shard_uid, mapped_shard_uid); + mapped_shard_uid + } +} diff --git a/core/store/src/trie/mem/parallel_loader.rs b/core/store/src/trie/mem/parallel_loader.rs index e38254a3031..ee0a4ddee41 100644 --- a/core/store/src/trie/mem/parallel_loader.rs +++ b/core/store/src/trie/mem/parallel_loader.rs @@ -191,6 +191,7 @@ impl ParallelMemTrieLoader { arena: &mut impl ArenaMut, ) -> Result { // Figure out which range corresponds to the prefix of this subtree. + // TODO(reshardingV3) This seems fragile, potentially does not work with mapping. let (start, end) = subtree_to_load.to_iter_range(self.shard_uid); // Load all the keys in this range from the FlatState column. From d72b0c6de6e29a05e0c71bac1ea83b0ca58a26d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 15:20:42 +0200 Subject: [PATCH 02/13] comments --- core/store/src/lib.rs | 10 ++++++++-- core/store/src/shard_uid_mapping.rs | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index e5292998239..29f3e3ee344 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -302,6 +302,7 @@ impl Store { Self { storage, shard_uid_mapping } } + /// Underlying `get()` implementation for all columns. fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result>> { let value = if column.is_rc() { self.storage.get_with_rc_stripped(column, &key) @@ -311,12 +312,17 @@ impl Store { Ok(value) } + /// Reads shard_uid mapping for given shard and updates the in-memory mapping. fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result { let mapped_shard_uid = self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid)) } + /// Specialized `get` implementation for State column. + /// + /// Replace db key shard_uid prefix using in-memory mapping, + /// falling back to a mapping stored in the database. fn get_impl_state(&self, key: &[u8]) -> io::Result>> { let shard_uid = retrieve_shard_uid_from_db_key(key)?; let mut mapping_synchronized_with_db = false; @@ -407,7 +413,7 @@ impl Store { lower_bound: Option<&[u8]>, upper_bound: Option<&[u8]>, ) -> DBIterator<'a> { - // That would fail if called ScanDbColumnCmd for State column. + // That would fail if called `ScanDbColumnCmd`` for the `State` column. assert!(col != DBCol::State, "can't range iter State column"); self.storage.iter_range(col, lower_bound, upper_bound) } @@ -651,7 +657,7 @@ impl StoreUpdate { /// Must not be used for reference-counted columns; use /// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead. pub fn delete(&mut self, column: DBCol, key: &[u8]) { - // It would panic if called with `State` column. + // It would panic if called with `State` column, as it is refcounted. assert!(!column.is_rc(), "can't delete: {column}"); self.transaction.delete(column, key.to_vec()); } diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs index 0cdae9bfef9..48269f3ae31 100644 --- a/core/store/src/shard_uid_mapping.rs +++ b/core/store/src/shard_uid_mapping.rs @@ -8,6 +8,10 @@ use std::{ sync::{Arc, Mutex}, }; +/// Stores a mapping from ShardUId to ShardUId. +/// +/// Protected with mutex for concurrent access. +/// That is for resharding V3 purposes, where we use the mapping strategy for State column. #[derive(Clone)] pub(crate) struct ShardUIdMapping(Arc>); From 961871d75ff63da04497b9a660f2a598a9063111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 15:54:17 +0200 Subject: [PATCH 03/13] fix test --- core/store/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 29f3e3ee344..b25d4ec06b7 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -1233,21 +1233,21 @@ mod tests { } fn test_clear_column(store: Store) { - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); - store_update.increment_refcount(DBCol::State, &[3], &[3]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); + store_update.increment_refcount(DBCol::State, &[3; 8], &[3]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); { let mut store_update = store.store_update(); store_update.delete_all(DBCol::State); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); } #[test] From b5fb4edcf7936b7501eac6f6eb295dfaad096989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 16:25:12 +0200 Subject: [PATCH 04/13] fix test --- core/store/src/lib.rs | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index b25d4ec06b7..cdfc3bfe8ec 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -319,10 +319,13 @@ impl Store { Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid)) } - /// Specialized `get` implementation for State column. + /// Specialized `get` implementation for State column that replaces shard_uid prefix + /// with a mapped value according to mapping strategy in Resharding V3. /// - /// Replace db key shard_uid prefix using in-memory mapping, - /// falling back to a mapping stored in the database. + /// It first uses the in-memory `shard_uid_mapping` and attempts to read from storage. + /// If the node was not found in storage, it tries to update the in-memory mapping with + /// the mapping stored in `DBCol::ShardUIdMapping`. + /// If it was different, then we do the second attempt to read from storage. fn get_impl_state(&self, key: &[u8]) -> io::Result>> { let shard_uid = retrieve_shard_uid_from_db_key(key)?; let mut mapping_synchronized_with_db = false; @@ -1363,9 +1366,9 @@ mod tests { { let store = crate::test_utils::create_test_store(); let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); store_update.commit().unwrap(); store.save_state_to_file(tmp.path()).unwrap(); } @@ -1376,9 +1379,9 @@ mod tests { std::io::Read::read_to_end(tmp.as_file_mut(), &mut buffer).unwrap(); #[rustfmt::skip] assert_eq!(&[ - /* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 1, + /* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 1, 1, 1, 1, 1, 1, 1, 1, /* val len: */ 9, 0, 0, 0, /* val: */ 1, 1, 0, 0, 0, 0, 0, 0, 0, - /* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 2, + /* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 2, 2, 2, 2, 2, 2, 2, 2, /* val len: */ 9, 0, 0, 0, /* val: */ 2, 2, 0, 0, 0, 0, 0, 0, 0, /* end mark: */ 255, ][..], buffer.as_slice()); @@ -1387,22 +1390,22 @@ mod tests { { // Fresh storage, should have no data. let store = crate::test_utils::create_test_store(); - assert_eq!(None, store.get(DBCol::State, &[1]).unwrap()); - assert_eq!(None, store.get(DBCol::State, &[2]).unwrap()); + assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap()); + assert_eq!(None, store.get(DBCol::State, &[2; 8]).unwrap()); // Read data from file. store.load_state_from_file(tmp.path()).unwrap(); - assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1]).unwrap().as_deref()); - assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref()); + assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1; 8]).unwrap().as_deref()); + assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref()); // Key &[2] should have refcount of two so once decreased it should // still exist. let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); - store_update.decrement_refcount(DBCol::State, &[2]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); + store_update.decrement_refcount(DBCol::State, &[2; 8]); store_update.commit().unwrap(); - assert_eq!(None, store.get(DBCol::State, &[1]).unwrap()); - assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref()); + assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap()); + assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref()); } // Verify detection of corrupt file. From 80df67cd39cdc6a450e042a2a093553dc8d9af49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 16:45:13 +0200 Subject: [PATCH 05/13] fix test --- core/store/src/db/rocksdb.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/store/src/db/rocksdb.rs b/core/store/src/db/rocksdb.rs index a39428e6a12..1cca0e3166b 100644 --- a/core/store/src/db/rocksdb.rs +++ b/core/store/src/db/rocksdb.rs @@ -811,41 +811,41 @@ mod tests { let store = opener.open().unwrap().get_hot_store(); let ptr = (&*store.storage) as *const (dyn Database + 'static); let rocksdb = unsafe { &*(ptr as *const RocksDB) }; - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); store_update.commit().unwrap(); } { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1, 2, 0, 0, 0, 0, 0, 0, 0][..]) ); { let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1, 1, 0, 0, 0, 0, 0, 0, 0][..]) ); { let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); store_update.commit().unwrap(); } // Refcount goes to 0 -> get() returns None - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); // Internally there is an empty value - assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), Some(&[][..])); + assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..])); // single_thread_rocksdb makes compact hang forever if !cfg!(feature = "single_thread_rocksdb") { @@ -858,14 +858,14 @@ mod tests { // empty values. rocksdb.db.compact_range_cf(cf, none, none); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..]) ); - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); rocksdb.db.compact_range_cf(cf, none, none); - assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap(), None); - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); } } From 519dd7c6bad6e8a4c4274cb48dcdea9a467eef5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 17:19:30 +0200 Subject: [PATCH 06/13] Save ShardUIdMapping to cold storage --- core/store/src/columns.rs | 6 +++--- integration-tests/src/tests/client/cold_storage.rs | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 8f8c61902f2..17e27258f58 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -450,7 +450,8 @@ impl DBCol { | DBCol::StateChangesForSplitStates | DBCol::StateHeaders | DBCol::TransactionResultForBlock - | DBCol::Transactions => true, + | DBCol::Transactions + | DBCol::ShardUIdMapping => true, // TODO DBCol::ChallengedBlocks => false, @@ -508,8 +509,7 @@ impl DBCol { | DBCol::FlatStateChanges | DBCol::FlatStateDeltaMetadata | DBCol::FlatStorageStatus - | DBCol::EpochSyncProof - | DBCol::ShardUIdMapping => false, + | DBCol::EpochSyncProof => false, } } diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index 10ffd533856..3c89da3312d 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -186,10 +186,11 @@ fn test_storage_after_commit_of_cold_update() { let cold_store = &storage.get_cold_store().unwrap(); let num_checks = check_iter(client_store, cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates and StateHeaders, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders + || col == DBCol::ShardUIdMapping || num_checks > 0 ); } From 395d9fe45170260417ad7a3216a2ca6c36bc3cab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 17:44:38 +0200 Subject: [PATCH 07/13] fix test --- integration-tests/src/tests/client/cold_storage.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index 3c89da3312d..d10a7aaed5d 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -309,10 +309,11 @@ fn test_cold_db_copy_with_height_skips() { let cold_store = storage.get_cold_store().unwrap(); let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates and StateHeaders, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders + || col == DBCol::ShardUIdMapping || num_checks > 0 ); } @@ -362,8 +363,11 @@ fn test_initial_copy_to_cold(batch_size: usize) { continue; } let num_checks = check_iter(&client_store, &cold_store, col, &vec![]); - // StateChangesForSplitStates and StateHeaders are empty - if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders { + // StateChangesForSplitStates, StateHeaders, and ShardUIdMapping are empty + if col == DBCol::StateChangesForSplitStates + || col == DBCol::StateHeaders + || col == DBCol::ShardUIdMapping + { continue; } // assert that this test actually checks something From 61d4642e937824f8dff195b182de8da220e96606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 24 Sep 2024 19:04:53 +0200 Subject: [PATCH 08/13] Create ShardUIdMapping in NodeStorage --- core/store/src/lib.rs | 43 +++++++++++++++++------- core/store/src/opener.rs | 10 +++--- core/store/src/shard_uid_mapping.rs | 2 +- tools/replay-archive/src/cli.rs | 4 +-- tools/state-viewer/src/replay_headers.rs | 3 +- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index cdfc3bfe8ec..ed2d85d2eae 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -68,7 +68,7 @@ pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{ checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError, }; -use crate::shard_uid_mapping::ShardUIdMapping; +pub use crate::shard_uid_mapping::ShardUIdMapping; /// Specifies temperature of a storage. /// @@ -102,9 +102,12 @@ const STATE_FILE_END_MARK: u8 = 255; /// /// Provides access to hot storage, cold storage and split storage. Typically /// users will want to use one of the above via the Store abstraction. +/// It holds ShardUId mapping to be used for State mapping in Resharding V3. +/// It acts as a singleton that is passed around when constructing `Store` object. pub struct NodeStorage { hot_storage: Arc, cold_storage: Option>, + shard_uid_mapping: ShardUIdMapping, } /// Node’s single storage source. @@ -116,6 +119,8 @@ pub struct NodeStorage { #[derive(Clone)] pub struct Store { storage: Arc, + /// Store holds ShardUId mapping to be used for State mapping in Resharding V3. + /// See `Store::get_impl_state` for how it is used. shard_uid_mapping: ShardUIdMapping, } @@ -151,7 +156,9 @@ impl NodeStorage { None }; - Self { hot_storage, cold_storage: cold_db } + let shard_uid_mapping = ShardUIdMapping::new(); + + Self { hot_storage, cold_storage: cold_db, shard_uid_mapping } } /// Initialises an opener for a new temporary test store. @@ -179,7 +186,8 @@ impl NodeStorage { /// possibly [`crate::test_utils::create_test_store`] (depending whether you /// need [`NodeStorage`] or [`Store`] object. pub fn new(storage: Arc) -> Self { - Self { hot_storage: storage, cold_storage: None } + let shard_uid_mapping = ShardUIdMapping::new(); + Self { hot_storage: storage, cold_storage: None, shard_uid_mapping } } } @@ -198,7 +206,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_hot_store(&self) -> Store { - Store::new(self.hot_storage.clone()) + Store::new(self.hot_storage.clone(), self.shard_uid_mapping.clone()) } /// Returns the cold store. The cold store is only available in archival @@ -210,7 +218,9 @@ impl NodeStorage { /// loop should use cold store. pub fn get_cold_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => Some(Store::new(cold_storage.clone())), + Some(cold_storage) => { + Some(Store::new(cold_storage.clone(), self.shard_uid_mapping.clone())) + } None => None, } } @@ -221,9 +231,10 @@ impl NodeStorage { /// Recovery store should be use only to perform data recovery on archival nodes. pub fn get_recovery_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => { - Some(Store::new(Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())))) - } + Some(cold_storage) => Some(Store::new( + Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())), + self.shard_uid_mapping.clone(), + )), None => None, } } @@ -236,7 +247,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_split_store(&self) -> Option { - self.get_split_db().map(|split_db| Store::new(split_db)) + self.get_split_db().map(|split_db| Store::new(split_db, self.shard_uid_mapping.clone())) } pub fn get_split_db(&self) -> Option> { @@ -245,6 +256,10 @@ impl NodeStorage { .map(|cold_db| SplitDB::new(self.hot_storage.clone(), cold_db.clone())) } + pub fn get_shard_uid_mapping(&self) -> ShardUIdMapping { + self.shard_uid_mapping.clone() + } + /// Returns underlying database for given temperature. /// /// This allows accessing underlying hot and cold databases directly @@ -288,7 +303,12 @@ impl NodeStorage { } pub fn new_with_cold(hot: Arc, cold: Arc) -> Self { - Self { hot_storage: hot, cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))) } + let shard_uid_mapping = ShardUIdMapping::new(); + Self { + hot_storage: hot, + cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))), + shard_uid_mapping, + } } pub fn cold_db(&self) -> Option<&Arc> { @@ -297,8 +317,7 @@ impl NodeStorage { } impl Store { - pub fn new(storage: Arc) -> Self { - let shard_uid_mapping = ShardUIdMapping::new(); + pub fn new(storage: Arc, shard_uid_mapping: ShardUIdMapping) -> Self { Self { storage, shard_uid_mapping } } diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 27e264f0c10..c5c4fbfbfee 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -1,6 +1,7 @@ use crate::db::rocksdb::snapshot::{Snapshot, SnapshotError, SnapshotRemoveError}; use crate::db::rocksdb::RocksDB; use crate::metadata::{DbKind, DbMetadata, DbVersion, DB_VERSION}; +use crate::shard_uid_mapping::ShardUIdMapping; use crate::{DBCol, DBTransaction, Mode, NodeStorage, Store, StoreConfig, Temperature}; use std::sync::Arc; @@ -339,7 +340,7 @@ impl<'a> StoreOpener<'a> { tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it."); let db = opener.create()?; - let store = Store::new(Arc::new(db)); + let store = Store::new(Arc::new(db), ShardUIdMapping::new()); store.set_db_version(DB_VERSION)?; return Ok(()); } @@ -467,13 +468,13 @@ impl<'a> StoreOpener<'a> { version: DbVersion, ) -> Result { let (db, _) = opener.open(mode, version)?; - let store = Store::new(Arc::new(db)); + let store = Store::new(Arc::new(db), ShardUIdMapping::new()); Ok(store) } fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result { let db = opener.open_unsafe(mode)?; - let store = Store::new(Arc::new(db)); + let store = Store::new(Arc::new(db), ShardUIdMapping::new()); Ok(store) } } @@ -640,7 +641,8 @@ mod tests { fn test_checkpoint_hot_storage_and_cleanup_columns() { let (home_dir, opener) = NodeStorage::test_opener(); let node_storage = opener.open().unwrap(); - let hot_store = Store::new(node_storage.hot_storage.clone()); + let hot_store = + Store::new(node_storage.hot_storage.clone(), node_storage.shard_uid_mapping.clone()); assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC)); let keys = vec![vec![0], vec![1], vec![2], vec![3]]; diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs index 48269f3ae31..284c63bc915 100644 --- a/core/store/src/shard_uid_mapping.rs +++ b/core/store/src/shard_uid_mapping.rs @@ -13,7 +13,7 @@ use std::{ /// Protected with mutex for concurrent access. /// That is for resharding V3 purposes, where we use the mapping strategy for State column. #[derive(Clone)] -pub(crate) struct ShardUIdMapping(Arc>); +pub struct ShardUIdMapping(Arc>); impl ShardUIdMapping { pub fn new() -> Self { diff --git a/tools/replay-archive/src/cli.rs b/tools/replay-archive/src/cli.rs index fd4651c10c3..b84c90c5f94 100644 --- a/tools/replay-archive/src/cli.rs +++ b/tools/replay-archive/src/cli.rs @@ -28,7 +28,7 @@ use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{BlockHeight, Gas, ProtocolVersion, ShardId}; use near_primitives::version::ProtocolFeature; use near_state_viewer::progress_reporter::{timestamp_ms, ProgressReporter}; -use near_store::{get_genesis_state_roots, ShardUId, Store}; +use near_store::{get_genesis_state_roots, ShardUId, ShardUIdMapping, Store}; use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt}; use std::collections::HashMap; use std::path::Path; @@ -111,7 +111,7 @@ impl ReplayController { end_height: Option, ) -> Result { let storage = open_storage_for_replay(home_dir, &near_config)?; - let store = Store::new(storage.clone()); + let store = Store::new(storage.clone(), ShardUIdMapping::new()); let genesis_height = near_config.genesis.config.genesis_height; let chain_store = ChainStore::new(store.clone(), genesis_height, false); diff --git a/tools/state-viewer/src/replay_headers.rs b/tools/state-viewer/src/replay_headers.rs index c830b54aa4b..7e433a8f81c 100644 --- a/tools/state-viewer/src/replay_headers.rs +++ b/tools/state-viewer/src/replay_headers.rs @@ -282,6 +282,7 @@ fn create_replay_store(home_dir: &Path, near_config: &NearConfig) -> Store { near_config.config.cold_store.as_ref(), ); let storage = store_opener.open_in_mode(Mode::ReadOnly).unwrap(); + let shard_uid_mapping = storage.get_shard_uid_mapping(); let read_db = if storage.has_cold() { storage.get_split_db().unwrap() @@ -289,5 +290,5 @@ fn create_replay_store(home_dir: &Path, near_config: &NearConfig) -> Store { storage.into_inner(Temperature::Hot) }; let write_db = TestDB::new(); - Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst)) + Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst), shard_uid_mapping) } From 5d53a7c643d758d6e6f309c030c510efc3bee24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Wed, 25 Sep 2024 08:30:13 +0200 Subject: [PATCH 09/13] addressing review comments --- core/store/src/shard_uid_mapping.rs | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs index 284c63bc915..b90a10c7109 100644 --- a/core/store/src/shard_uid_mapping.rs +++ b/core/store/src/shard_uid_mapping.rs @@ -1,35 +1,27 @@ -use crate::{ - flat::POISONED_LOCK_ERR, - io::{Error, Result}, -}; +use crate::flat::POISONED_LOCK_ERR; +use crate::io::{Error, Result}; use near_primitives::shard_layout::ShardUId; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; /// Stores a mapping from ShardUId to ShardUId. /// /// Protected with mutex for concurrent access. /// That is for resharding V3 purposes, where we use the mapping strategy for State column. #[derive(Clone)] -pub struct ShardUIdMapping(Arc>); +pub struct ShardUIdMapping(Arc>); impl ShardUIdMapping { pub fn new() -> Self { - Self(Arc::new(Mutex::new(ShardUIdMappingInner::new()))) + Self(Arc::new(RwLock::new(ShardUIdMappingInner::new()))) } pub fn map(&self, shard_uid: &ShardUId) -> Option { - self.lock().map(shard_uid) + self.0.read().expect(POISONED_LOCK_ERR).map(shard_uid) } pub fn update(&self, shard_uid: &ShardUId, db_mapped_shard_uid: Option) -> ShardUId { - self.lock().update(shard_uid, db_mapped_shard_uid) - } - - fn lock(&self) -> std::sync::MutexGuard { - self.0.lock().expect(POISONED_LOCK_ERR) + self.0.write().expect(POISONED_LOCK_ERR).update(shard_uid, db_mapped_shard_uid) } } From 69ea15df2bbccab3b9fff3484b936fa7c31cf9cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Fri, 11 Oct 2024 11:12:02 +0200 Subject: [PATCH 10/13] Get rid of in-mem mapping --- core/store/src/flat/storage.rs | 2 +- core/store/src/lib.rs | 107 +++++++---------------- core/store/src/opener.rs | 10 +-- core/store/src/shard_uid_mapping.rs | 65 -------------- tools/replay-archive/src/cli.rs | 4 +- tools/state-viewer/src/replay_headers.rs | 3 +- 6 files changed, 42 insertions(+), 149 deletions(-) delete mode 100644 core/store/src/shard_uid_mapping.rs diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index fcc6f223e2d..79f7001aacf 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -460,7 +460,7 @@ impl FlatStorage { return Err(guard.create_block_not_supported_error(&block_hash)); } let mut store_update = guard.store.store_update(); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); let cached_changes: CachedFlatStateChanges = delta.changes.into(); guard.deltas.insert( block_hash, diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index ed2d85d2eae..e53ae28add2 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -36,8 +36,6 @@ pub use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::{AccountId, BlockHeight, StateRoot}; use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache}; -use shard_uid_mapping::{replace_shard_uid_key_prefix, retrieve_shard_uid_from_db_key}; -use std::borrow::Cow; use std::fs::File; use std::path::Path; use std::str::FromStr; @@ -59,7 +57,6 @@ pub mod metrics; pub mod migrations; mod opener; mod rocksdb_metrics; -mod shard_uid_mapping; mod sync_utils; pub mod test_utils; pub mod trie; @@ -68,7 +65,6 @@ pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{ checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError, }; -pub use crate::shard_uid_mapping::ShardUIdMapping; /// Specifies temperature of a storage. /// @@ -102,12 +98,9 @@ const STATE_FILE_END_MARK: u8 = 255; /// /// Provides access to hot storage, cold storage and split storage. Typically /// users will want to use one of the above via the Store abstraction. -/// It holds ShardUId mapping to be used for State mapping in Resharding V3. -/// It acts as a singleton that is passed around when constructing `Store` object. pub struct NodeStorage { hot_storage: Arc, cold_storage: Option>, - shard_uid_mapping: ShardUIdMapping, } /// Node’s single storage source. @@ -119,9 +112,6 @@ pub struct NodeStorage { #[derive(Clone)] pub struct Store { storage: Arc, - /// Store holds ShardUId mapping to be used for State mapping in Resharding V3. - /// See `Store::get_impl_state` for how it is used. - shard_uid_mapping: ShardUIdMapping, } impl StoreAdapter for Store { @@ -156,9 +146,7 @@ impl NodeStorage { None }; - let shard_uid_mapping = ShardUIdMapping::new(); - - Self { hot_storage, cold_storage: cold_db, shard_uid_mapping } + Self { hot_storage, cold_storage: cold_db } } /// Initialises an opener for a new temporary test store. @@ -186,8 +174,7 @@ impl NodeStorage { /// possibly [`crate::test_utils::create_test_store`] (depending whether you /// need [`NodeStorage`] or [`Store`] object. pub fn new(storage: Arc) -> Self { - let shard_uid_mapping = ShardUIdMapping::new(); - Self { hot_storage: storage, cold_storage: None, shard_uid_mapping } + Self { hot_storage: storage, cold_storage: None } } } @@ -206,7 +193,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_hot_store(&self) -> Store { - Store::new(self.hot_storage.clone(), self.shard_uid_mapping.clone()) + Store { storage: self.hot_storage.clone() } } /// Returns the cold store. The cold store is only available in archival @@ -218,9 +205,7 @@ impl NodeStorage { /// loop should use cold store. pub fn get_cold_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => { - Some(Store::new(cold_storage.clone(), self.shard_uid_mapping.clone())) - } + Some(cold_storage) => Some(Store { storage: cold_storage.clone() }), None => None, } } @@ -231,10 +216,9 @@ impl NodeStorage { /// Recovery store should be use only to perform data recovery on archival nodes. pub fn get_recovery_store(&self) -> Option { match &self.cold_storage { - Some(cold_storage) => Some(Store::new( - Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())), - self.shard_uid_mapping.clone(), - )), + Some(cold_storage) => { + Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) }) + } None => None, } } @@ -247,7 +231,7 @@ impl NodeStorage { /// store, the view client should use the split store and the cold store /// loop should use cold store. pub fn get_split_store(&self) -> Option { - self.get_split_db().map(|split_db| Store::new(split_db, self.shard_uid_mapping.clone())) + self.get_split_db().map(|split_db| Store { storage: split_db }) } pub fn get_split_db(&self) -> Option> { @@ -256,10 +240,6 @@ impl NodeStorage { .map(|cold_db| SplitDB::new(self.hot_storage.clone(), cold_db.clone())) } - pub fn get_shard_uid_mapping(&self) -> ShardUIdMapping { - self.shard_uid_mapping.clone() - } - /// Returns underlying database for given temperature. /// /// This allows accessing underlying hot and cold databases directly @@ -303,12 +283,7 @@ impl NodeStorage { } pub fn new_with_cold(hot: Arc, cold: Arc) -> Self { - let shard_uid_mapping = ShardUIdMapping::new(); - Self { - hot_storage: hot, - cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))), - shard_uid_mapping, - } + Self { hot_storage: hot, cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))) } } pub fn cold_db(&self) -> Option<&Arc> { @@ -316,9 +291,23 @@ impl NodeStorage { } } +pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> io::Result { + // TODO(reshardingV3) Consider changing the Error type to `StorageError`? + // Would need changing error types for `Store` methods as well. + ShardUId::try_from(&key[..8]) + .map_err(|e| io::Error::other(format!("Could not retrieve ShardUId from db key: {}", e))) +} + +pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> [u8; 40] { + let mut mapped_key = [0u8; 40]; + mapped_key[..8].copy_from_slice(&shard_uid.to_bytes()); + mapped_key[8..].copy_from_slice(&key[8..]); + mapped_key +} + impl Store { - pub fn new(storage: Arc, shard_uid_mapping: ShardUIdMapping) -> Self { - Self { storage, shard_uid_mapping } + pub fn new(storage: Arc) -> Self { + Self { storage } } /// Underlying `get()` implementation for all columns. @@ -331,43 +320,21 @@ impl Store { Ok(value) } - /// Reads shard_uid mapping for given shard and updates the in-memory mapping. - fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result { - let mapped_shard_uid = - self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; - Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid)) - } - /// Specialized `get` implementation for State column that replaces shard_uid prefix /// with a mapped value according to mapping strategy in Resharding V3. /// - /// It first uses the in-memory `shard_uid_mapping` and attempts to read from storage. - /// If the node was not found in storage, it tries to update the in-memory mapping with - /// the mapping stored in `DBCol::ShardUIdMapping`. - /// If it was different, then we do the second attempt to read from storage. + /// It does extra read from `DBCol::ShardUIdMapping` to map the shard_uid key prefix. fn get_impl_state(&self, key: &[u8]) -> io::Result>> { let shard_uid = retrieve_shard_uid_from_db_key(key)?; - let mut mapping_synchronized_with_db = false; - let mapped_shard_uid = match self.shard_uid_mapping.map(&shard_uid) { - Some(mapped_shard_uid) => mapped_shard_uid, - None => { - mapping_synchronized_with_db = true; - self.read_shard_uid_mapping_from_db(shard_uid)? - } - }; + let mapped_shard_uid = self + .get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())? + .unwrap_or(shard_uid); let mapped_key = if shard_uid != mapped_shard_uid { - Cow::Owned(replace_shard_uid_key_prefix(key, mapped_shard_uid)) + &replace_shard_uid_key_prefix(key, mapped_shard_uid) } else { - Cow::Borrowed(key) + key }; - let mut value = self.get_impl(DBCol::State, &mapped_key)?; - if value.is_none() && !mapping_synchronized_with_db { - let db_mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; - if db_mapped_shard_uid != mapped_shard_uid { - let mapped_key = replace_shard_uid_key_prefix(key, db_mapped_shard_uid); - value = self.get_impl(DBCol::State, &mapped_key)?; - } - } + let value = self.get_impl(DBCol::State, &mapped_key)?; Ok(value) } @@ -402,11 +369,7 @@ impl Store { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate { - transaction: DBTransaction::new(), - storage: Arc::clone(&self.storage), - _shard_uid_mapping: self.shard_uid_mapping.clone(), - } + StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) } } pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { @@ -539,8 +502,6 @@ impl Store { pub struct StoreUpdate { transaction: DBTransaction, storage: Arc, - // TODO(reshardingV3) Currently unused, see TODO inside `commit()` method. - _shard_uid_mapping: ShardUIdMapping, } impl StoreUpdateAdapter for StoreUpdate { @@ -816,7 +777,7 @@ impl StoreUpdate { } } } - // TODO(reshardingV3) Use shard_uid_mapping for ops referencing State column. + // TODO(reshardingV3) Map shard_uid for ops referencing State column. self.storage.write(self.transaction) } } diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index c5c4fbfbfee..41a855031cb 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -1,7 +1,6 @@ use crate::db::rocksdb::snapshot::{Snapshot, SnapshotError, SnapshotRemoveError}; use crate::db::rocksdb::RocksDB; use crate::metadata::{DbKind, DbMetadata, DbVersion, DB_VERSION}; -use crate::shard_uid_mapping::ShardUIdMapping; use crate::{DBCol, DBTransaction, Mode, NodeStorage, Store, StoreConfig, Temperature}; use std::sync::Arc; @@ -340,7 +339,7 @@ impl<'a> StoreOpener<'a> { tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it."); let db = opener.create()?; - let store = Store::new(Arc::new(db), ShardUIdMapping::new()); + let store = Store { storage: Arc::new(db) }; store.set_db_version(DB_VERSION)?; return Ok(()); } @@ -468,13 +467,13 @@ impl<'a> StoreOpener<'a> { version: DbVersion, ) -> Result { let (db, _) = opener.open(mode, version)?; - let store = Store::new(Arc::new(db), ShardUIdMapping::new()); + let store = Store { storage: Arc::new(db) }; Ok(store) } fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result { let db = opener.open_unsafe(mode)?; - let store = Store::new(Arc::new(db), ShardUIdMapping::new()); + let store = Store { storage: Arc::new(db) }; Ok(store) } } @@ -641,8 +640,7 @@ mod tests { fn test_checkpoint_hot_storage_and_cleanup_columns() { let (home_dir, opener) = NodeStorage::test_opener(); let node_storage = opener.open().unwrap(); - let hot_store = - Store::new(node_storage.hot_storage.clone(), node_storage.shard_uid_mapping.clone()); + let hot_store = Store { storage: node_storage.hot_storage.clone() }; assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC)); let keys = vec![vec![0], vec![1], vec![2], vec![3]]; diff --git a/core/store/src/shard_uid_mapping.rs b/core/store/src/shard_uid_mapping.rs deleted file mode 100644 index b90a10c7109..00000000000 --- a/core/store/src/shard_uid_mapping.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::flat::POISONED_LOCK_ERR; -use crate::io::{Error, Result}; -use near_primitives::shard_layout::ShardUId; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -/// Stores a mapping from ShardUId to ShardUId. -/// -/// Protected with mutex for concurrent access. -/// That is for resharding V3 purposes, where we use the mapping strategy for State column. -#[derive(Clone)] -pub struct ShardUIdMapping(Arc>); - -impl ShardUIdMapping { - pub fn new() -> Self { - Self(Arc::new(RwLock::new(ShardUIdMappingInner::new()))) - } - - pub fn map(&self, shard_uid: &ShardUId) -> Option { - self.0.read().expect(POISONED_LOCK_ERR).map(shard_uid) - } - - pub fn update(&self, shard_uid: &ShardUId, db_mapped_shard_uid: Option) -> ShardUId { - self.0.write().expect(POISONED_LOCK_ERR).update(shard_uid, db_mapped_shard_uid) - } -} - -pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> Result { - // TODO(reshardingV3) Consider changing the Error type to `StorageError`? - // Would need changing error types for `Store` methods as well. - ShardUId::try_from(&key[..8]) - .map_err(|e| Error::other(format!("Could not retrieve ShardUId from db key: {}", e))) -} - -pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> Vec { - let mut mapped_key = [0u8; 40]; - mapped_key[..8].copy_from_slice(&shard_uid.to_bytes()); - mapped_key[8..].copy_from_slice(&key[8..]); - mapped_key.to_vec() -} - -struct ShardUIdMappingInner { - mapping: HashMap, -} - -impl ShardUIdMappingInner { - pub fn new() -> Self { - Self { mapping: HashMap::new() } - } - - pub fn map(&self, shard_uid: &ShardUId) -> Option { - self.mapping.get(shard_uid).copied() - } - - pub fn update( - &mut self, - shard_uid: &ShardUId, - db_mapped_shard_uid: Option, - ) -> ShardUId { - // No mapping means we map shard_uid to itself - let mapped_shard_uid = db_mapped_shard_uid.unwrap_or(*shard_uid); - self.mapping.insert(*shard_uid, mapped_shard_uid); - mapped_shard_uid - } -} diff --git a/tools/replay-archive/src/cli.rs b/tools/replay-archive/src/cli.rs index b84c90c5f94..fd4651c10c3 100644 --- a/tools/replay-archive/src/cli.rs +++ b/tools/replay-archive/src/cli.rs @@ -28,7 +28,7 @@ use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{BlockHeight, Gas, ProtocolVersion, ShardId}; use near_primitives::version::ProtocolFeature; use near_state_viewer::progress_reporter::{timestamp_ms, ProgressReporter}; -use near_store::{get_genesis_state_roots, ShardUId, ShardUIdMapping, Store}; +use near_store::{get_genesis_state_roots, ShardUId, Store}; use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt}; use std::collections::HashMap; use std::path::Path; @@ -111,7 +111,7 @@ impl ReplayController { end_height: Option, ) -> Result { let storage = open_storage_for_replay(home_dir, &near_config)?; - let store = Store::new(storage.clone(), ShardUIdMapping::new()); + let store = Store::new(storage.clone()); let genesis_height = near_config.genesis.config.genesis_height; let chain_store = ChainStore::new(store.clone(), genesis_height, false); diff --git a/tools/state-viewer/src/replay_headers.rs b/tools/state-viewer/src/replay_headers.rs index 7e433a8f81c..c830b54aa4b 100644 --- a/tools/state-viewer/src/replay_headers.rs +++ b/tools/state-viewer/src/replay_headers.rs @@ -282,7 +282,6 @@ fn create_replay_store(home_dir: &Path, near_config: &NearConfig) -> Store { near_config.config.cold_store.as_ref(), ); let storage = store_opener.open_in_mode(Mode::ReadOnly).unwrap(); - let shard_uid_mapping = storage.get_shard_uid_mapping(); let read_db = if storage.has_cold() { storage.get_split_db().unwrap() @@ -290,5 +289,5 @@ fn create_replay_store(home_dir: &Path, near_config: &NearConfig) -> Store { storage.into_inner(Temperature::Hot) }; let write_db = TestDB::new(); - Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst), shard_uid_mapping) + Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst)) } From ea4e51194443aa5dd0ca7ffd8a635b0be3698e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Fri, 11 Oct 2024 12:20:45 +0200 Subject: [PATCH 11/13] refactor --- core/store/src/lib.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index e53ae28add2..fef97336d8f 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -320,15 +320,24 @@ impl Store { Ok(value) } + /// Reads shard_uid mapping for given shard. + /// If the mapping does not exist, it means that `shard_uid` maps to itself. + pub(crate) fn read_shard_uid_mapping_from_db( + &self, + shard_uid: ShardUId, + ) -> io::Result { + let mapped_shard_uid = + self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; + Ok(mapped_shard_uid.unwrap_or(shard_uid)) + } + /// Specialized `get` implementation for State column that replaces shard_uid prefix /// with a mapped value according to mapping strategy in Resharding V3. /// /// It does extra read from `DBCol::ShardUIdMapping` to map the shard_uid key prefix. fn get_impl_state(&self, key: &[u8]) -> io::Result>> { let shard_uid = retrieve_shard_uid_from_db_key(key)?; - let mapped_shard_uid = self - .get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())? - .unwrap_or(shard_uid); + let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; let mapped_key = if shard_uid != mapped_shard_uid { &replace_shard_uid_key_prefix(key, mapped_shard_uid) } else { From 606e4fb64d8e8701d8392badc7ee500f5d6020a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Fri, 11 Oct 2024 12:43:59 +0200 Subject: [PATCH 12/13] Move shard_uid mapping to TrieStoreAdapter --- core/store/src/adapter/trie_store.rs | 16 ++++++- core/store/src/lib.rs | 63 +++------------------------- 2 files changed, 21 insertions(+), 58 deletions(-) diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index e188cddc4f1..7d3a67f48f8 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -31,8 +31,22 @@ impl TrieStoreAdapter { TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) } } + /// Reads shard_uid mapping for given shard. + /// If the mapping does not exist, it means that `shard_uid` maps to itself. + pub(crate) fn read_shard_uid_mapping_from_db( + &self, + shard_uid: ShardUId, + ) -> io::Result { + let mapped_shard_uid = + self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; + Ok(mapped_shard_uid.unwrap_or(shard_uid)) + } + + /// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3. + /// For this, it does extra read from `DBCol::ShardUIdMapping`. pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result, StorageError> { - let key = get_key_from_shard_uid_and_hash(shard_uid, hash); + let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; + let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash); let val = self .store .get(DBCol::State, key.as_ref()) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index fef97336d8f..0747bbdddcb 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -291,79 +291,28 @@ impl NodeStorage { } } -pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> io::Result { - // TODO(reshardingV3) Consider changing the Error type to `StorageError`? - // Would need changing error types for `Store` methods as well. - ShardUId::try_from(&key[..8]) - .map_err(|e| io::Error::other(format!("Could not retrieve ShardUId from db key: {}", e))) -} - -pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> [u8; 40] { - let mut mapped_key = [0u8; 40]; - mapped_key[..8].copy_from_slice(&shard_uid.to_bytes()); - mapped_key[8..].copy_from_slice(&key[8..]); - mapped_key -} - impl Store { pub fn new(storage: Arc) -> Self { Self { storage } } - /// Underlying `get()` implementation for all columns. - fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result>> { - let value = if column.is_rc() { - self.storage.get_with_rc_stripped(column, &key) - } else { - self.storage.get_raw_bytes(column, &key) - }?; - Ok(value) - } - - /// Reads shard_uid mapping for given shard. - /// If the mapping does not exist, it means that `shard_uid` maps to itself. - pub(crate) fn read_shard_uid_mapping_from_db( - &self, - shard_uid: ShardUId, - ) -> io::Result { - let mapped_shard_uid = - self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; - Ok(mapped_shard_uid.unwrap_or(shard_uid)) - } - - /// Specialized `get` implementation for State column that replaces shard_uid prefix - /// with a mapped value according to mapping strategy in Resharding V3. - /// - /// It does extra read from `DBCol::ShardUIdMapping` to map the shard_uid key prefix. - fn get_impl_state(&self, key: &[u8]) -> io::Result>> { - let shard_uid = retrieve_shard_uid_from_db_key(key)?; - let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; - let mapped_key = if shard_uid != mapped_shard_uid { - &replace_shard_uid_key_prefix(key, mapped_shard_uid) - } else { - key - }; - let value = self.get_impl(DBCol::State, &mapped_key)?; - Ok(value) - } - /// Fetches value from given column. /// - /// If the key does not exist in the column returns `None`. Otherwise + /// If the key does not exist in the column returns `None`. Otherwise /// returns the data as [`DBSlice`] object. The object dereferences into /// a slice, for cases when caller doesn’t need to own the value, and /// provides conversion into a vector or an Arc. pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result>> { - let value = if column == DBCol::State { - self.get_impl_state(key)? + let value = if column.is_rc() { + self.storage.get_with_rc_stripped(column, &key) } else { - self.get_impl(column, key)? - }; + self.storage.get_raw_bytes(column, &key) + }?; tracing::trace!( target: "store", db_op = "get", col = %column, - key = %StorageKey(&key), + key = %StorageKey(key), size = value.as_deref().map(<[u8]>::len) ); Ok(value) From fee1c6ae69a756976890a48d2ceaefa721943204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Fri, 11 Oct 2024 15:00:07 +0200 Subject: [PATCH 13/13] address review comments --- chain/chain/src/garbage_collection.rs | 2 +- core/store/src/adapter/trie_store.rs | 8 +++++--- core/store/src/columns.rs | 13 +++++++------ integration-tests/src/tests/client/cold_storage.rs | 12 ++++++------ 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 7a94e862c69..690f1d5467e 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -997,7 +997,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::EpochSyncProof | DBCol::Misc | DBCol::_ReceiptIdToShardId - | DBCol::ShardUIdMapping + | DBCol::StateShardUIdMapping => unreachable!(), } self.merge(store_update); diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index 7d3a67f48f8..331efdfe211 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -36,14 +36,16 @@ impl TrieStoreAdapter { pub(crate) fn read_shard_uid_mapping_from_db( &self, shard_uid: ShardUId, - ) -> io::Result { + ) -> Result { let mapped_shard_uid = - self.get_ser::(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?; + self.store.get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()); + let mapped_shard_uid = mapped_shard_uid + .map_err(|err| StorageError::StorageInconsistentState(err.to_string()))?; Ok(mapped_shard_uid.unwrap_or(shard_uid)) } /// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3. - /// For this, it does extra read from `DBCol::ShardUIdMapping`. + /// For this, it does extra read from `DBCol::StateShardUIdMapping`. pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result, StorageError> { let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 17e27258f58..8ac8ceb4217 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -300,12 +300,12 @@ pub enum DBCol { /// - *Rows*: only one key with 0 bytes. /// - *Column type*: `EpochSyncProof` EpochSyncProof, - /// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column. - /// It could be parent shard after resharding, ancestor shard in case of many resharding, - /// or just map shard to itself if there was no resharding or we updated the mapping during state sync. + /// Mapping of ShardUId to the ShardUId that should be used as the database key prefix for the State column. + /// The mapped ShardUId value can be the parent shard after resharding, an ancestor shard after many resharding + /// or just map shard to itself if there was no resharding or the mapping was removed after node stopped tracking the shard. /// - *Rows*: `ShardUId` /// - *Column type*: `ShardUId` - ShardUIdMapping, + StateShardUIdMapping, } /// Defines different logical parts of a db key. @@ -451,7 +451,8 @@ impl DBCol { | DBCol::StateHeaders | DBCol::TransactionResultForBlock | DBCol::Transactions - | DBCol::ShardUIdMapping => true, + // TODO(reshardingV3) How the mapping will work with split storage? + | DBCol::StateShardUIdMapping => true, // TODO DBCol::ChallengedBlocks => false, @@ -582,7 +583,7 @@ impl DBCol { DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], DBCol::EpochSyncProof => &[DBKeyType::Empty], - DBCol::ShardUIdMapping => &[DBKeyType::ShardUId], + DBCol::StateShardUIdMapping => &[DBKeyType::ShardUId], } } } diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index d10a7aaed5d..e6e06a52349 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -186,11 +186,11 @@ fn test_storage_after_commit_of_cold_update() { let cold_store = &storage.get_cold_store().unwrap(); let num_checks = check_iter(client_store, cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders - || col == DBCol::ShardUIdMapping + || col == DBCol::StateShardUIdMapping || num_checks > 0 ); } @@ -309,11 +309,11 @@ fn test_cold_db_copy_with_height_skips() { let cold_store = storage.get_cold_store().unwrap(); let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders - || col == DBCol::ShardUIdMapping + || col == DBCol::StateShardUIdMapping || num_checks > 0 ); } @@ -363,10 +363,10 @@ fn test_initial_copy_to_cold(batch_size: usize) { continue; } let num_checks = check_iter(&client_store, &cold_store, col, &vec![]); - // StateChangesForSplitStates, StateHeaders, and ShardUIdMapping are empty + // StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping are empty if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders - || col == DBCol::ShardUIdMapping + || col == DBCol::StateShardUIdMapping { continue; }