Skip to content

Commit

Permalink
feat(resharding) - Resharding mapping state update (#12232)
Browse files Browse the repository at this point in the history
The [previous PR](#12084)
introduced mapping for read operations.

This PR extends that functionality to write operations and adds some
testing for State mapping.

Following the [Zulip
discussion](https://near.zulipchat.com/#narrow/stream/407288-core.2Fresharding/topic/State.20mapping/near/476959235),
we decided to implement a panic inside the `TrieStoreUpdateAdapter`
methods. Other strategies considered were:
1. Propagating the error instead of panicking: This was rejected because
the error would need to be propagated through multiple layers that
currently don't expect errors. Additionally, an error here would
indicate a misconfiguration in the database, justifying the use of
panic.
2. Performing the mapping later in `TrieStoreUpdateAdapter::commit()`:
This would require iterating through all `DBOp`s, parsing each
operation, extracting the `shard_uid` from the database key, mapping it,
and re-encoding. This approach would make `TrieStoreUpdateAdapter`
dependent on the internal workings of `DBTransaction`. Also,
`StoreUpdate::merge()` makes me feel uneasy.

---------

Co-authored-by: Waclaw Banasik <[email protected]>
  • Loading branch information
staffik and wacban authored Oct 18, 2024
1 parent 3e1d923 commit 31ad13e
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 46 deletions.
184 changes: 162 additions & 22 deletions core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;
use std::num::NonZero;
use std::sync::Arc;

use borsh::BorshDeserialize;
use near_primitives::errors::{MissingTrieValueContext, StorageError};
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardUId};
Expand Down Expand Up @@ -31,24 +32,12 @@ impl TrieStoreAdapter {
TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) }
}

/// Reads shard_uid mapping for given shard.
/// If the mapping does not exist, it means that `shard_uid` maps to itself.
pub(crate) fn read_shard_uid_mapping_from_db(
&self,
shard_uid: ShardUId,
) -> Result<ShardUId, StorageError> {
let mapped_shard_uid =
self.store.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes());
let mapped_shard_uid = mapped_shard_uid
.map_err(|err| StorageError::StorageInconsistentState(err.to_string()))?;
Ok(mapped_shard_uid.unwrap_or(shard_uid))
}

/// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3.
/// For this, it does extra read from `DBCol::StateShardUIdMapping`.
///
/// For more details, see `get_key_from_shard_uid_and_hash()` docs.
pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result<Arc<[u8]>, StorageError> {
let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?;
let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash);
let key = get_key_from_shard_uid_and_hash(&self.store, shard_uid, hash);
let val = self
.store
.get(DBCol::State, key.as_ref())
Expand All @@ -57,6 +46,15 @@ impl TrieStoreAdapter {
Ok(val.into())
}

pub fn get_ser<T: BorshDeserialize>(
&self,
shard_uid: ShardUId,
hash: &CryptoHash,
) -> Result<T, StorageError> {
let bytes = self.get(shard_uid, hash)?;
T::try_from_slice(&bytes).map_err(|e| StorageError::StorageInconsistentState(e.to_string()))
}

pub fn get_state_snapshot_hash(&self) -> Result<CryptoHash, StorageError> {
let val = self
.store
Expand Down Expand Up @@ -100,18 +98,22 @@ impl<'a> TrieStoreUpdateAdapter<'a> {
Self { store_update: StoreUpdateHolder::Reference(store_update) }
}

fn get_key_from_shard_uid_and_hash(&self, shard_uid: ShardUId, hash: &CryptoHash) -> [u8; 40] {
get_key_from_shard_uid_and_hash(&self.store_update.store, shard_uid, hash)
}

pub fn decrement_refcount_by(
&mut self,
shard_uid: ShardUId,
hash: &CryptoHash,
decrement: NonZero<u32>,
) {
let key = get_key_from_shard_uid_and_hash(shard_uid, hash);
let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash);
self.store_update.decrement_refcount_by(DBCol::State, key.as_ref(), decrement);
}

pub fn decrement_refcount(&mut self, shard_uid: ShardUId, hash: &CryptoHash) {
let key = get_key_from_shard_uid_and_hash(shard_uid, hash);
let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash);
self.store_update.decrement_refcount(DBCol::State, key.as_ref());
}

Expand All @@ -120,10 +122,10 @@ impl<'a> TrieStoreUpdateAdapter<'a> {
shard_uid: ShardUId,
hash: &CryptoHash,
data: &[u8],
decrement: NonZero<u32>,
increment: NonZero<u32>,
) {
let key = get_key_from_shard_uid_and_hash(shard_uid, hash);
self.store_update.increment_refcount_by(DBCol::State, key.as_ref(), data, decrement);
let key = self.get_key_from_shard_uid_and_hash(shard_uid, hash);
self.store_update.increment_refcount_by(DBCol::State, key.as_ref(), data, increment);
}

pub fn set_state_snapshot_hash(&mut self, hash: Option<CryptoHash>) {
Expand Down Expand Up @@ -156,14 +158,152 @@ impl<'a> TrieStoreUpdateAdapter<'a> {
)
}

/// Set the mapping from `child_shard_uid` to `parent_shard_uid`.
/// Used by Resharding V3 for State mapping.
#[cfg(test)]
fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) {
self.store_update.set(
DBCol::StateShardUIdMapping,
child_shard_uid.to_bytes().as_ref(),
&borsh::to_vec(&parent_shard_uid).expect("Borsh serialize cannot fail"),
)
}

pub fn delete_all_state(&mut self) {
self.store_update.delete_all(DBCol::State)
}
}

pub fn get_key_from_shard_uid_and_hash(shard_uid: ShardUId, hash: &CryptoHash) -> [u8; 40] {
/// Constructs db key to be used to access the State column.
/// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix
/// to its ancestor in the resharding tree (according to Resharding V3)
/// or map to itself if the mapping does not exist.
///
/// Please note that the mapped shard uid is read from db each time which may seem slow.
/// In practice the `StateShardUIdMapping` is very small and should always be stored in the RocksDB cache.
/// The deserialization of ShardUId is also very cheap.
fn get_key_from_shard_uid_and_hash(
store: &Store,
shard_uid: ShardUId,
hash: &CryptoHash,
) -> [u8; 40] {
let mapped_shard_uid = store
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes())
.expect("get_key_from_shard_uid_and_hash() failed")
.unwrap_or(shard_uid);
let mut key = [0; 40];
key[0..8].copy_from_slice(&shard_uid.to_bytes());
key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes());
key[8..].copy_from_slice(hash.as_ref());
key
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;

use crate::adapter::trie_store::TrieStoreAdapter;
use crate::NodeStorage;

const ONE: std::num::NonZeroU32 = match std::num::NonZeroU32::new(1) {
Some(num) => num,
None => panic!(),
};

#[test]
fn test_trie_store_adapter() {
let (_tmp_dir, opener) = NodeStorage::test_opener();
let store = TrieStoreAdapter::new(opener.open().unwrap().get_hot_store());
let shard_uids: Vec<ShardUId> =
(0..3).map(|i| ShardUId { version: 0, shard_id: i }).collect();
let dummy_hash = CryptoHash::default();

assert_matches!(
store.get(shard_uids[0], &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
{
let mut store_update = store.store_update();
store_update.increment_refcount_by(shard_uids[0], &dummy_hash, &[0], ONE);
store_update.increment_refcount_by(shard_uids[1], &dummy_hash, &[1], ONE);
store_update.increment_refcount_by(shard_uids[2], &dummy_hash, &[2], ONE);
store_update.commit().unwrap();
}
assert_eq!(*store.get(shard_uids[0], &dummy_hash).unwrap(), [0]);
{
let mut store_update = store.store_update();
store_update.delete_all_state();
store_update.commit().unwrap();
}
assert_matches!(
store.get(shard_uids[0], &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
}

#[test]
fn test_shard_uid_mapping() {
let (_tmp_dir, opener) = NodeStorage::test_opener();
let store = TrieStoreAdapter::new(opener.open().unwrap().get_hot_store());
let parent_shard = ShardUId { version: 0, shard_id: 0 };
let child_shard = ShardUId { version: 0, shard_id: 1 };
let dummy_hash = CryptoHash::default();
// Write some data to `parent_shard`.
{
let mut store_update = store.store_update();
store_update.increment_refcount_by(parent_shard, &dummy_hash, &[0], ONE);
store_update.commit().unwrap();
}
// The data is not yet visible to child shard, because the mapping has not been set yet.
assert_matches!(
store.get(child_shard, &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
// Set the shard_uid mapping from `child_shard` to `parent_shard`.
{
let mut store_update = store.store_update();
store_update.set_shard_uid_mapping(child_shard, parent_shard);
store_update.commit().unwrap();
}
// The data is now visible to both `parent_shard` and `child_shard`.
assert_eq!(*store.get(child_shard, &dummy_hash).unwrap(), [0]);
assert_eq!(*store.get(parent_shard, &dummy_hash).unwrap(), [0]);
// Remove the data using `parent_shard` UId.
{
let mut store_update = store.store_update();
store_update.decrement_refcount(parent_shard, &dummy_hash);
store_update.commit().unwrap();
}
// The data is now not visible to any shard.
assert_matches!(
store.get(child_shard, &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
assert_matches!(
store.get(parent_shard, &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
// Restore the data now using the `child_shard` UId.
{
let mut store_update = store.store_update();
store_update.increment_refcount_by(child_shard, &dummy_hash, &[0], ONE);
store_update.commit().unwrap();
}
// The data is now visible to both shards again.
assert_eq!(*store.get(child_shard, &dummy_hash).unwrap(), [0]);
assert_eq!(*store.get(parent_shard, &dummy_hash).unwrap(), [0]);
// Remove the data using `child_shard` UId.
{
let mut store_update = store.store_update();
store_update.decrement_refcount_by(child_shard, &dummy_hash, ONE);
store_update.commit().unwrap();
}
// The data is not visible to any shard again.
assert_matches!(
store.get(child_shard, &dummy_hash),
Err(StorageError::MissingTrieValue(_, _))
);
}
}
1 change: 1 addition & 0 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ pub fn copy_all_data_to_cold(
tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold");
return Ok(CopyAllDataToColdStatus::Interrupted);
}
// TODO(reshardingV3) Should do mapping here?
let (key, value) = result?;
transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?;
}
Expand Down
12 changes: 7 additions & 5 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) }
StoreUpdate { transaction: DBTransaction::new(), store: self.clone() }
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand Down Expand Up @@ -459,7 +459,7 @@ impl Store {
/// Keeps track of current changes to the database and can commit all of them to the database.
pub struct StoreUpdate {
transaction: DBTransaction,
storage: Arc<dyn Database>,
store: Store,
}

impl StoreUpdateAdapter for StoreUpdate {
Expand Down Expand Up @@ -618,7 +618,10 @@ impl StoreUpdate {
///
/// Panics if `self`’s and `other`’s storage are incompatible.
pub fn merge(&mut self, other: StoreUpdate) {
assert!(core::ptr::addr_eq(Arc::as_ptr(&self.storage), Arc::as_ptr(&other.storage)));
assert!(core::ptr::addr_eq(
Arc::as_ptr(&self.store.storage),
Arc::as_ptr(&other.store.storage)
));
self.transaction.merge(other.transaction)
}

Expand Down Expand Up @@ -735,8 +738,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Map shard_uid for ops referencing State column.
self.storage.write(self.transaction)
self.store.storage.write(self.transaction)
}
}

Expand Down
33 changes: 14 additions & 19 deletions nearcore/src/entity_debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use near_primitives::views::{
BlockHeaderView, BlockView, ChunkView, ExecutionOutcomeView, ReceiptView, SignedTransactionView,
};
use near_store::adapter::flat_store::encode_flat_state_db_key;
use near_store::adapter::trie_store::get_key_from_shard_uid_and_hash;
use near_store::adapter::StoreAdapter;
use near_store::db::GENESIS_CONGESTION_INFO_KEY;
use near_store::flat::delta::KeyForFlatStateDelta;
use near_store::flat::{FlatStateChanges, FlatStateDeltaMetadata, FlatStorageStatus};
Expand Down Expand Up @@ -249,11 +249,9 @@ impl EntityDebugHandlerImpl {
}
EntityQuery::RawTrieNodeByHash { trie_node_hash, shard_uid } => {
let node = store
.get_ser::<RawTrieNodeWithSize>(
DBCol::State,
&get_key_from_shard_uid_and_hash(shard_uid, &trie_node_hash),
)?
.ok_or_else(|| anyhow!("Trie node not found"))?;
.trie_store()
.get_ser::<RawTrieNodeWithSize>(shard_uid, &trie_node_hash)
.map_err(|e| anyhow!("Trie node not found: {e}"))?;
Ok(serialize_raw_trie_node(node))
}
EntityQuery::RawTrieRootByChunkHash { chunk_hash } => {
Expand All @@ -270,21 +268,17 @@ impl EntityDebugHandlerImpl {
.nth(shard_index)
.ok_or_else(|| anyhow!("Shard {} not found", chunk.shard_id()))?;
let node = store
.get_ser::<RawTrieNodeWithSize>(
DBCol::State,
&get_key_from_shard_uid_and_hash(shard_uid, &chunk.prev_state_root()),
)?
.ok_or_else(|| anyhow!("State root not found"))?;
.trie_store()
.get_ser::<RawTrieNodeWithSize>(shard_uid, &chunk.prev_state_root())
.map_err(|e| anyhow!("State root not found: {e}"))?;
Ok(serialize_raw_trie_node(node))
}
EntityQuery::RawTrieValueByHash { trie_value_hash, shard_uid } => {
let value = store
.get(
DBCol::State,
&get_key_from_shard_uid_and_hash(shard_uid, &trie_value_hash),
)?
.ok_or_else(|| anyhow!("Trie value not found"))?;
Ok(serialize_entity(&hex::encode(value.as_slice())))
.trie_store()
.get(shard_uid, &trie_value_hash)
.map_err(|e| anyhow!("Trie value not found: {e}"))?;
Ok(serialize_entity(&hex::encode(value)))
}
EntityQuery::ReceiptById { receipt_id } => {
let receipt = store
Expand Down Expand Up @@ -461,8 +455,9 @@ impl EntityDebugHandlerImpl {
) -> anyhow::Result<Vec<u8>> {
Ok(match state {
FlatStateValue::Ref(value) => store
.get(DBCol::State, &get_key_from_shard_uid_and_hash(shard_uid, &value.hash))?
.ok_or_else(|| anyhow!("ValueRef could not be dereferenced"))?
.trie_store()
.get(shard_uid, &value.hash)
.map_err(|e| anyhow!("ValueRef could not be dereferenced: {e}"))?
.to_vec(),
FlatStateValue::Inlined(data) => data,
})
Expand Down

0 comments on commit 31ad13e

Please sign in to comment.