diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 777d83edaef..690f1d5467e 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -997,6 +997,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::EpochSyncProof | DBCol::Misc | DBCol::_ReceiptIdToShardId + | DBCol::StateShardUIdMapping => unreachable!(), } self.merge(store_update); diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index e188cddc4f1..331efdfe211 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -31,8 +31,24 @@ impl TrieStoreAdapter { TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) } } + /// Reads shard_uid mapping for given shard. + /// If the mapping does not exist, it means that `shard_uid` maps to itself. + pub(crate) fn read_shard_uid_mapping_from_db( + &self, + shard_uid: ShardUId, + ) -> Result { + let mapped_shard_uid = + self.store.get_ser::(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()); + let mapped_shard_uid = mapped_shard_uid + .map_err(|err| StorageError::StorageInconsistentState(err.to_string()))?; + Ok(mapped_shard_uid.unwrap_or(shard_uid)) + } + + /// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3. + /// For this, it does extra read from `DBCol::StateShardUIdMapping`. pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result, StorageError> { - let key = get_key_from_shard_uid_and_hash(shard_uid, hash); + let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?; + let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash); let val = self .store .get(DBCol::State, key.as_ref()) diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index e4ea95f4378..ef64b3fdecb 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -186,6 +186,7 @@ fn copy_state_from_store( let Some(trie_changes) = trie_changes else { continue }; for op in trie_changes.insertions() { + // TODO(reshardingV3) Handle shard_uid not mapped there let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); let value = op.payload().to_vec(); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 48b2a7f34fb..8ac8ceb4217 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -300,6 +300,12 @@ pub enum DBCol { /// - *Rows*: only one key with 0 bytes. /// - *Column type*: `EpochSyncProof` EpochSyncProof, + /// Mapping of ShardUId to the ShardUId that should be used as the database key prefix for the State column. + /// The mapped ShardUId value can be the parent shard after resharding, an ancestor shard after many resharding + /// or just map shard to itself if there was no resharding or the mapping was removed after node stopped tracking the shard. + /// - *Rows*: `ShardUId` + /// - *Column type*: `ShardUId` + StateShardUIdMapping, } /// Defines different logical parts of a db key. @@ -444,7 +450,9 @@ impl DBCol { | DBCol::StateChangesForSplitStates | DBCol::StateHeaders | DBCol::TransactionResultForBlock - | DBCol::Transactions => true, + | DBCol::Transactions + // TODO(reshardingV3) How the mapping will work with split storage? + | DBCol::StateShardUIdMapping => true, // TODO DBCol::ChallengedBlocks => false, @@ -575,6 +583,7 @@ impl DBCol { DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], DBCol::EpochSyncProof => &[DBKeyType::Empty], + DBCol::StateShardUIdMapping => &[DBKeyType::ShardUId], } } } diff --git a/core/store/src/db/rocksdb.rs b/core/store/src/db/rocksdb.rs index a39428e6a12..1cca0e3166b 100644 --- a/core/store/src/db/rocksdb.rs +++ b/core/store/src/db/rocksdb.rs @@ -811,41 +811,41 @@ mod tests { let store = opener.open().unwrap().get_hot_store(); let ptr = (&*store.storage) as *const (dyn Database + 'static); let rocksdb = unsafe { &*(ptr as *const RocksDB) }; - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); store_update.commit().unwrap(); } { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1, 2, 0, 0, 0, 0, 0, 0, 0][..]) ); { let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1, 1, 0, 0, 0, 0, 0, 0, 0][..]) ); { let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); store_update.commit().unwrap(); } // Refcount goes to 0 -> get() returns None - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); // Internally there is an empty value - assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), Some(&[][..])); + assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..])); // single_thread_rocksdb makes compact hang forever if !cfg!(feature = "single_thread_rocksdb") { @@ -858,14 +858,14 @@ mod tests { // empty values. rocksdb.db.compact_range_cf(cf, none, none); assert_eq!( - rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), + rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..]) ); - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); rocksdb.db.compact_range_cf(cf, none, none); - assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap(), None); - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 3804b60a47b..0747bbdddcb 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -304,9 +304,9 @@ impl Store { /// provides conversion into a vector or an Arc. pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result>> { let value = if column.is_rc() { - self.storage.get_with_rc_stripped(column, key) + self.storage.get_with_rc_stripped(column, &key) } else { - self.storage.get_raw_bytes(column, key) + self.storage.get_raw_bytes(column, &key) }?; tracing::trace!( target: "store", @@ -327,7 +327,7 @@ impl Store { } pub fn store_update(&self) -> StoreUpdate { - StoreUpdate::new(Arc::clone(&self.storage)) + StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) } } pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { @@ -345,6 +345,7 @@ impl Store { } pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { + assert!(col != DBCol::State, "can't iter prefix of State column"); self.storage.iter_prefix(col, key_prefix) } @@ -355,6 +356,8 @@ impl Store { lower_bound: Option<&[u8]>, upper_bound: Option<&[u8]>, ) -> DBIterator<'a> { + // That would fail if called `ScanDbColumnCmd`` for the `State` column. + assert!(col != DBCol::State, "can't range iter State column"); self.storage.iter_range(col, lower_bound, upper_bound) } @@ -363,6 +366,7 @@ impl Store { col: DBCol, key_prefix: &'a [u8], ) -> impl Iterator, T)>> + 'a { + assert!(col != DBCol::State, "can't iter prefix ser of State column"); self.storage .iter_prefix(col, key_prefix) .map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))) @@ -470,10 +474,6 @@ impl StoreUpdate { None => panic!(), }; - pub(crate) fn new(db: Arc) -> Self { - StoreUpdate { transaction: DBTransaction::new(), storage: db } - } - /// Inserts a new value into the database. /// /// It is a programming error if `insert` overwrites an existing, different @@ -598,6 +598,7 @@ impl StoreUpdate { /// Must not be used for reference-counted columns; use /// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead. pub fn delete(&mut self, column: DBCol, key: &[u8]) { + // It would panic if called with `State` column, as it is refcounted. assert!(!column.is_rc(), "can't delete: {column}"); self.transaction.delete(column, key.to_vec()); } @@ -609,6 +610,7 @@ impl StoreUpdate { /// Deletes the given key range from the database including `from` /// and excluding `to` keys. pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) { + assert!(column != DBCol::State, "can't range delete State column"); self.transaction.delete_range(column, from.to_vec(), to.to_vec()); } @@ -733,6 +735,7 @@ impl StoreUpdate { } } } + // TODO(reshardingV3) Map shard_uid for ops referencing State column. self.storage.write(self.transaction) } } @@ -1171,21 +1174,21 @@ mod tests { } fn test_clear_column(store: Store) { - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); { let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); - store_update.increment_refcount(DBCol::State, &[3], &[3]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); + store_update.increment_refcount(DBCol::State, &[3; 8], &[3]); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..])); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..])); { let mut store_update = store.store_update(); store_update.delete_all(DBCol::State); store_update.commit().unwrap(); } - assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None); + assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None); } #[test] @@ -1301,9 +1304,9 @@ mod tests { { let store = crate::test_utils::create_test_store(); let mut store_update = store.store_update(); - store_update.increment_refcount(DBCol::State, &[1], &[1]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); - store_update.increment_refcount(DBCol::State, &[2], &[2]); + store_update.increment_refcount(DBCol::State, &[1; 8], &[1]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); + store_update.increment_refcount(DBCol::State, &[2; 8], &[2]); store_update.commit().unwrap(); store.save_state_to_file(tmp.path()).unwrap(); } @@ -1314,9 +1317,9 @@ mod tests { std::io::Read::read_to_end(tmp.as_file_mut(), &mut buffer).unwrap(); #[rustfmt::skip] assert_eq!(&[ - /* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 1, + /* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 1, 1, 1, 1, 1, 1, 1, 1, /* val len: */ 9, 0, 0, 0, /* val: */ 1, 1, 0, 0, 0, 0, 0, 0, 0, - /* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 2, + /* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 2, 2, 2, 2, 2, 2, 2, 2, /* val len: */ 9, 0, 0, 0, /* val: */ 2, 2, 0, 0, 0, 0, 0, 0, 0, /* end mark: */ 255, ][..], buffer.as_slice()); @@ -1325,22 +1328,22 @@ mod tests { { // Fresh storage, should have no data. let store = crate::test_utils::create_test_store(); - assert_eq!(None, store.get(DBCol::State, &[1]).unwrap()); - assert_eq!(None, store.get(DBCol::State, &[2]).unwrap()); + assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap()); + assert_eq!(None, store.get(DBCol::State, &[2; 8]).unwrap()); // Read data from file. store.load_state_from_file(tmp.path()).unwrap(); - assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1]).unwrap().as_deref()); - assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref()); + assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1; 8]).unwrap().as_deref()); + assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref()); // Key &[2] should have refcount of two so once decreased it should // still exist. let mut store_update = store.store_update(); - store_update.decrement_refcount(DBCol::State, &[1]); - store_update.decrement_refcount(DBCol::State, &[2]); + store_update.decrement_refcount(DBCol::State, &[1; 8]); + store_update.decrement_refcount(DBCol::State, &[2; 8]); store_update.commit().unwrap(); - assert_eq!(None, store.get(DBCol::State, &[1]).unwrap()); - assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref()); + assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap()); + assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref()); } // Verify detection of corrupt file. diff --git a/core/store/src/trie/mem/parallel_loader.rs b/core/store/src/trie/mem/parallel_loader.rs index e38254a3031..ee0a4ddee41 100644 --- a/core/store/src/trie/mem/parallel_loader.rs +++ b/core/store/src/trie/mem/parallel_loader.rs @@ -191,6 +191,7 @@ impl ParallelMemTrieLoader { arena: &mut impl ArenaMut, ) -> Result { // Figure out which range corresponds to the prefix of this subtree. + // TODO(reshardingV3) This seems fragile, potentially does not work with mapping. let (start, end) = subtree_to_load.to_iter_range(self.shard_uid); // Load all the keys in this range from the FlatState column. diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index 10ffd533856..e6e06a52349 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -186,10 +186,11 @@ fn test_storage_after_commit_of_cold_update() { let cold_store = &storage.get_cold_store().unwrap(); let num_checks = check_iter(client_store, cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates and StateHeaders, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders + || col == DBCol::StateShardUIdMapping || num_checks > 0 ); } @@ -308,10 +309,11 @@ fn test_cold_db_copy_with_height_skips() { let cold_store = storage.get_cold_store().unwrap(); let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules); // assert that this test actually checks something - // apart from StateChangesForSplitStates and StateHeaders, that are empty + // apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty assert!( col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders + || col == DBCol::StateShardUIdMapping || num_checks > 0 ); } @@ -361,8 +363,11 @@ fn test_initial_copy_to_cold(batch_size: usize) { continue; } let num_checks = check_iter(&client_store, &cold_store, col, &vec![]); - // StateChangesForSplitStates and StateHeaders are empty - if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders { + // StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping are empty + if col == DBCol::StateChangesForSplitStates + || col == DBCol::StateHeaders + || col == DBCol::StateShardUIdMapping + { continue; } // assert that this test actually checks something