Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reshardingV3] State ShardUIdMapping - initial implementation #12084

Merged
merged 13 commits into from
Oct 11, 2024
1 change: 1 addition & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::EpochSyncProof
| DBCol::Misc
| DBCol::_ReceiptIdToShardId
| DBCol::ShardUIdMapping
staffik marked this conversation as resolved.
Show resolved Hide resolved
=> unreachable!(),
}
self.merge(store_update);
Expand Down
16 changes: 15 additions & 1 deletion core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,22 @@ impl TrieStoreAdapter {
TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) }
}

/// Reads shard_uid mapping for given shard.
/// If the mapping does not exist, it means that `shard_uid` maps to itself.
pub(crate) fn read_shard_uid_mapping_from_db(
&self,
shard_uid: ShardUId,
) -> io::Result<ShardUId> {
let mapped_shard_uid =
self.get_ser::<ShardUId>(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?;
Ok(mapped_shard_uid.unwrap_or(shard_uid))
}

/// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3.
/// For this, it does extra read from `DBCol::ShardUIdMapping`.
pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result<Arc<[u8]>, StorageError> {
let key = get_key_from_shard_uid_and_hash(shard_uid, hash);
let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?;
let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash);
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

let val = self
.store
.get(DBCol::State, key.as_ref())
Expand Down
1 change: 1 addition & 0 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This todo will be tricky, we will need to be careful when adding and removing a mapping. It's good that you spotted it.

unrelated to this PR:
I wonder if we can keep the original shard uids when moving data to cold storage. This would keep the cold storage sharded (as it is today).

let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

Expand Down
10 changes: 9 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ pub enum DBCol {
/// - *Rows*: only one key with 0 bytes.
/// - *Column type*: `EpochSyncProof`
EpochSyncProof,
/// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column.
/// It could be parent shard after resharding, ancestor shard in case of many resharding,
/// or just map shard to itself if there was no resharding or we updated the mapping during state sync.
staffik marked this conversation as resolved.
Show resolved Hide resolved
/// - *Rows*: `ShardUId`
/// - *Column type*: `ShardUId`
ShardUIdMapping,
staffik marked this conversation as resolved.
Show resolved Hide resolved
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -444,7 +450,8 @@ impl DBCol {
| DBCol::StateChangesForSplitStates
| DBCol::StateHeaders
| DBCol::TransactionResultForBlock
| DBCol::Transactions => true,
| DBCol::Transactions
| DBCol::ShardUIdMapping => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how will the mapping work on split storage nodes? I can't say if this is good or not without the full picture. For early MVP it doesn't matter too much so feel free to just leave a TODO here and proceed.

Copy link
Contributor Author

@staffik staffik Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was we want to copy ShardUIdMapping to cold storage because cold_store.get(state_key) would not work otherwise. Marked it as TODO to understand it deeper after early MVP.


// TODO
DBCol::ChallengedBlocks => false,
Expand Down Expand Up @@ -575,6 +582,7 @@ impl DBCol {
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
DBCol::EpochSyncProof => &[DBKeyType::Empty],
DBCol::ShardUIdMapping => &[DBKeyType::ShardUId],
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,41 +811,41 @@ mod tests {
let store = opener.open().unwrap().get_hot_store();
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 2, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 1, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
// Refcount goes to 0 -> get() returns None
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
// Internally there is an empty value
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), Some(&[][..]));
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..]));

// single_thread_rocksdb makes compact hang forever
if !cfg!(feature = "single_thread_rocksdb") {
Expand All @@ -858,14 +858,14 @@ mod tests {
// empty values.
rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[][..])
);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);

rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}
}

Expand Down
55 changes: 29 additions & 26 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ impl Store {
/// provides conversion into a vector or an Arc.
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, key)
self.storage.get_with_rc_stripped(column, &key)
} else {
self.storage.get_raw_bytes(column, key)
self.storage.get_raw_bytes(column, &key)
}?;
tracing::trace!(
target: "store",
Expand All @@ -327,7 +327,7 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(Arc::clone(&self.storage))
StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) }
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand All @@ -345,6 +345,7 @@ impl Store {
}

pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
assert!(col != DBCol::State, "can't iter prefix of State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.storage.iter_prefix(col, key_prefix)
}

Expand All @@ -355,6 +356,8 @@ impl Store {
lower_bound: Option<&[u8]>,
upper_bound: Option<&[u8]>,
) -> DBIterator<'a> {
// That would fail if called `ScanDbColumnCmd`` for the `State` column.
assert!(col != DBCol::State, "can't range iter State column");
self.storage.iter_range(col, lower_bound, upper_bound)
}

Expand All @@ -363,6 +366,7 @@ impl Store {
col: DBCol,
key_prefix: &'a [u8],
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
assert!(col != DBCol::State, "can't iter prefix ser of State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.storage
.iter_prefix(col, key_prefix)
.map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?))))
Expand Down Expand Up @@ -470,10 +474,6 @@ impl StoreUpdate {
None => panic!(),
};

pub(crate) fn new(db: Arc<dyn Database>) -> Self {
StoreUpdate { transaction: DBTransaction::new(), storage: db }
}

/// Inserts a new value into the database.
///
/// It is a programming error if `insert` overwrites an existing, different
Expand Down Expand Up @@ -598,6 +598,7 @@ impl StoreUpdate {
/// Must not be used for reference-counted columns; use
/// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead.
pub fn delete(&mut self, column: DBCol, key: &[u8]) {
// It would panic if called with `State` column, as it is refcounted.
assert!(!column.is_rc(), "can't delete: {column}");
self.transaction.delete(column, key.to_vec());
}
Expand All @@ -609,6 +610,7 @@ impl StoreUpdate {
/// Deletes the given key range from the database including `from`
/// and excluding `to` keys.
pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) {
assert!(column != DBCol::State, "can't range delete State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.transaction.delete_range(column, from.to_vec(), to.to_vec());
}

Expand Down Expand Up @@ -733,6 +735,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Map shard_uid for ops referencing State column.
staffik marked this conversation as resolved.
Show resolved Hide resolved
self.storage.write(self.transaction)
}
}
Expand Down Expand Up @@ -1171,21 +1174,21 @@ mod tests {
}

fn test_clear_column(store: Store) {
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[3], &[3]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[3; 8], &[3]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
{
let mut store_update = store.store_update();
store_update.delete_all(DBCol::State);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}

#[test]
Expand Down Expand Up @@ -1301,9 +1304,9 @@ mod tests {
{
let store = crate::test_utils::create_test_store();
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.commit().unwrap();
store.save_state_to_file(tmp.path()).unwrap();
}
Expand All @@ -1314,9 +1317,9 @@ mod tests {
std::io::Read::read_to_end(tmp.as_file_mut(), &mut buffer).unwrap();
#[rustfmt::skip]
assert_eq!(&[
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 1,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 1, 1, 1, 1, 1, 1, 1, 1,
/* val len: */ 9, 0, 0, 0, /* val: */ 1, 1, 0, 0, 0, 0, 0, 0, 0,
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 2,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 2, 2, 2, 2, 2, 2, 2, 2,
/* val len: */ 9, 0, 0, 0, /* val: */ 2, 2, 0, 0, 0, 0, 0, 0, 0,
/* end mark: */ 255,
][..], buffer.as_slice());
Expand All @@ -1325,22 +1328,22 @@ mod tests {
{
// Fresh storage, should have no data.
let store = crate::test_utils::create_test_store();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2; 8]).unwrap());

// Read data from file.
store.load_state_from_file(tmp.path()).unwrap();
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1; 8]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());

// Key &[2] should have refcount of two so once decreased it should
// still exist.
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[2]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.decrement_refcount(DBCol::State, &[2; 8]);
store_update.commit().unwrap();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());
}

// Verify detection of corrupt file.
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl ParallelMemTrieLoader {
arena: &mut impl ArenaMut,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
// TODO(reshardingV3) This seems fragile, potentially does not work with mapping.
staffik marked this conversation as resolved.
Show resolved Hide resolved
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
Expand Down
13 changes: 9 additions & 4 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ fn test_storage_after_commit_of_cold_update() {
let cold_store = &storage.get_cold_store().unwrap();
let num_checks = check_iter(client_store, cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -308,10 +309,11 @@ fn test_cold_db_copy_with_height_skips() {
let cold_store = storage.get_cold_store().unwrap();
let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -361,8 +363,11 @@ fn test_initial_copy_to_cold(batch_size: usize) {
continue;
}
let num_checks = check_iter(&client_store, &cold_store, col, &vec![]);
// StateChangesForSplitStates and StateHeaders are empty
if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders {
// StateChangesForSplitStates, StateHeaders, and ShardUIdMapping are empty
if col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
{
continue;
}
// assert that this test actually checks something
Expand Down
Loading