Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reshardingV3] State ShardUIdMapping - initial implementation #12084

Merged
merged 13 commits into from
Oct 11, 2024
1 change: 1 addition & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::EpochSyncProof
| DBCol::Misc
| DBCol::_ReceiptIdToShardId
| DBCol::ShardUIdMapping
staffik marked this conversation as resolved.
Show resolved Hide resolved
=> unreachable!(),
}
self.merge(store_update);
Expand Down
1 change: 1 addition & 0 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This todo will be tricky, we will need to be careful when adding and removing a mapping. It's good that you spotted it.

unrelated to this PR:
I wonder if we can keep the original shard uids when moving data to cold storage. This would keep the cold storage sharded (as it is today).

let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

Expand Down
10 changes: 9 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ pub enum DBCol {
/// - *Rows*: only one key with 0 bytes.
/// - *Column type*: `EpochSyncProof`
EpochSyncProof,
/// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column.
/// It could be parent shard after resharding, ancestor shard in case of many resharding,
/// or just map shard to itself if there was no resharding or we updated the mapping during state sync.
staffik marked this conversation as resolved.
Show resolved Hide resolved
/// - *Rows*: `ShardUId`
/// - *Column type*: `ShardUId`
ShardUIdMapping,
staffik marked this conversation as resolved.
Show resolved Hide resolved
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -444,7 +450,8 @@ impl DBCol {
| DBCol::StateChangesForSplitStates
| DBCol::StateHeaders
| DBCol::TransactionResultForBlock
| DBCol::Transactions => true,
| DBCol::Transactions
| DBCol::ShardUIdMapping => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how will the mapping work on split storage nodes? I can't say if this is good or not without the full picture. For early MVP it doesn't matter too much so feel free to just leave a TODO here and proceed.

Copy link
Contributor Author

@staffik staffik Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was we want to copy ShardUIdMapping to cold storage because cold_store.get(state_key) would not work otherwise. Marked it as TODO to understand it deeper after early MVP.


// TODO
DBCol::ChallengedBlocks => false,
Expand Down Expand Up @@ -575,6 +582,7 @@ impl DBCol {
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
DBCol::EpochSyncProof => &[DBKeyType::Empty],
DBCol::ShardUIdMapping => &[DBKeyType::ShardUId],
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,41 +811,41 @@ mod tests {
let store = opener.open().unwrap().get_hot_store();
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 2, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 1, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
// Refcount goes to 0 -> get() returns None
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
// Internally there is an empty value
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), Some(&[][..]));
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..]));

// single_thread_rocksdb makes compact hang forever
if !cfg!(feature = "single_thread_rocksdb") {
Expand All @@ -858,14 +858,14 @@ mod tests {
// empty values.
rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[][..])
);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);

rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}
}

Expand Down
105 changes: 75 additions & 30 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,28 +291,70 @@ impl NodeStorage {
}
}

pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> io::Result<ShardUId> {
staffik marked this conversation as resolved.
Show resolved Hide resolved
// TODO(reshardingV3) Consider changing the Error type to `StorageError`?
// Would need changing error types for `Store` methods as well.
ShardUId::try_from(&key[..8])
.map_err(|e| io::Error::other(format!("Could not retrieve ShardUId from db key: {}", e)))
}

pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> [u8; 40] {
staffik marked this conversation as resolved.
Show resolved Hide resolved
let mut mapped_key = [0u8; 40];
mapped_key[..8].copy_from_slice(&shard_uid.to_bytes());
mapped_key[8..].copy_from_slice(&key[8..]);
mapped_key
}

impl Store {
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { storage }
}

/// Underlying `get()` implementation for all columns.
fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
staffik marked this conversation as resolved.
Show resolved Hide resolved
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, &key)
} else {
self.storage.get_raw_bytes(column, &key)
}?;
Ok(value)
}

/// Specialized `get` implementation for State column that replaces shard_uid prefix
/// with a mapped value according to mapping strategy in Resharding V3.
///
/// It does extra read from `DBCol::ShardUIdMapping` to map the shard_uid key prefix.
fn get_impl_state(&self, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
staffik marked this conversation as resolved.
Show resolved Hide resolved
let shard_uid = retrieve_shard_uid_from_db_key(key)?;
let mapped_shard_uid = self
.get_ser::<ShardUId>(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do the mapping in core/store/src/adapter/trie_store.rs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having TrieStoreAdapter is now so nice :)

.unwrap_or(shard_uid);
let mapped_key = if shard_uid != mapped_shard_uid {
&replace_shard_uid_key_prefix(key, mapped_shard_uid)
} else {
key
};
let value = self.get_impl(DBCol::State, &mapped_key)?;
Ok(value)
}

/// Fetches value from given column.
///
/// If the key does not exist in the column returns `None`. Otherwise
/// If the key does not exist in the column returns `None`. Otherwise
/// returns the data as [`DBSlice`] object. The object dereferences into
/// a slice, for cases when caller doesn’t need to own the value, and
/// provides conversion into a vector or an Arc.
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, key)
let value = if column == DBCol::State {
self.get_impl_state(key)?
} else {
self.storage.get_raw_bytes(column, key)
}?;
self.get_impl(column, key)?
};
tracing::trace!(
target: "store",
db_op = "get",
col = %column,
key = %StorageKey(key),
key = %StorageKey(&key),
size = value.as_deref().map(<[u8]>::len)
);
Ok(value)
Expand All @@ -327,7 +369,7 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(Arc::clone(&self.storage))
StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) }
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand All @@ -345,6 +387,7 @@ impl Store {
}

pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
assert!(col != DBCol::State, "can't iter prefix of State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.storage.iter_prefix(col, key_prefix)
}

Expand All @@ -355,6 +398,8 @@ impl Store {
lower_bound: Option<&[u8]>,
upper_bound: Option<&[u8]>,
) -> DBIterator<'a> {
// That would fail if called `ScanDbColumnCmd`` for the `State` column.
assert!(col != DBCol::State, "can't range iter State column");
self.storage.iter_range(col, lower_bound, upper_bound)
}

Expand All @@ -363,6 +408,7 @@ impl Store {
col: DBCol,
key_prefix: &'a [u8],
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
assert!(col != DBCol::State, "can't iter prefix ser of State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.storage
.iter_prefix(col, key_prefix)
.map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?))))
Expand Down Expand Up @@ -470,10 +516,6 @@ impl StoreUpdate {
None => panic!(),
};

pub(crate) fn new(db: Arc<dyn Database>) -> Self {
StoreUpdate { transaction: DBTransaction::new(), storage: db }
}

/// Inserts a new value into the database.
///
/// It is a programming error if `insert` overwrites an existing, different
Expand Down Expand Up @@ -598,6 +640,7 @@ impl StoreUpdate {
/// Must not be used for reference-counted columns; use
/// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead.
pub fn delete(&mut self, column: DBCol, key: &[u8]) {
// It would panic if called with `State` column, as it is refcounted.
assert!(!column.is_rc(), "can't delete: {column}");
self.transaction.delete(column, key.to_vec());
}
Expand All @@ -609,6 +652,7 @@ impl StoreUpdate {
/// Deletes the given key range from the database including `from`
/// and excluding `to` keys.
pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) {
assert!(column != DBCol::State, "can't range delete State column");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, that is not currently used for State column

self.transaction.delete_range(column, from.to_vec(), to.to_vec());
}

Expand Down Expand Up @@ -733,6 +777,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Map shard_uid for ops referencing State column.
staffik marked this conversation as resolved.
Show resolved Hide resolved
self.storage.write(self.transaction)
}
}
Expand Down Expand Up @@ -1171,21 +1216,21 @@ mod tests {
}

fn test_clear_column(store: Store) {
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[3], &[3]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[3; 8], &[3]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
{
let mut store_update = store.store_update();
store_update.delete_all(DBCol::State);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}

#[test]
Expand Down Expand Up @@ -1301,9 +1346,9 @@ mod tests {
{
let store = crate::test_utils::create_test_store();
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.commit().unwrap();
store.save_state_to_file(tmp.path()).unwrap();
}
Expand All @@ -1314,9 +1359,9 @@ mod tests {
std::io::Read::read_to_end(tmp.as_file_mut(), &mut buffer).unwrap();
#[rustfmt::skip]
assert_eq!(&[
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 1,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 1, 1, 1, 1, 1, 1, 1, 1,
/* val len: */ 9, 0, 0, 0, /* val: */ 1, 1, 0, 0, 0, 0, 0, 0, 0,
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 2,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 2, 2, 2, 2, 2, 2, 2, 2,
/* val len: */ 9, 0, 0, 0, /* val: */ 2, 2, 0, 0, 0, 0, 0, 0, 0,
/* end mark: */ 255,
][..], buffer.as_slice());
Expand All @@ -1325,22 +1370,22 @@ mod tests {
{
// Fresh storage, should have no data.
let store = crate::test_utils::create_test_store();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2; 8]).unwrap());

// Read data from file.
store.load_state_from_file(tmp.path()).unwrap();
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1; 8]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());

// Key &[2] should have refcount of two so once decreased it should
// still exist.
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[2]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.decrement_refcount(DBCol::State, &[2; 8]);
store_update.commit().unwrap();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());
}

// Verify detection of corrupt file.
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl ParallelMemTrieLoader {
arena: &mut impl ArenaMut,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
// TODO(reshardingV3) This seems fragile, potentially does not work with mapping.
staffik marked this conversation as resolved.
Show resolved Hide resolved
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
Expand Down
13 changes: 9 additions & 4 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ fn test_storage_after_commit_of_cold_update() {
let cold_store = &storage.get_cold_store().unwrap();
let num_checks = check_iter(client_store, cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -308,10 +309,11 @@ fn test_cold_db_copy_with_height_skips() {
let cold_store = storage.get_cold_store().unwrap();
let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and ShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -361,8 +363,11 @@ fn test_initial_copy_to_cold(batch_size: usize) {
continue;
}
let num_checks = check_iter(&client_store, &cold_store, col, &vec![]);
// StateChangesForSplitStates and StateHeaders are empty
if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders {
// StateChangesForSplitStates, StateHeaders, and ShardUIdMapping are empty
if col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::ShardUIdMapping
{
continue;
}
// assert that this test actually checks something
Expand Down
Loading