Skip to content

Commit

Permalink
state mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Sep 24, 2024
1 parent d6d6e47 commit be91b49
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 72 deletions.
1 change: 1 addition & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::EpochSyncProof
| DBCol::Misc
| DBCol::_ReceiptIdToShardId
| DBCol::ShardUIdMapping
=> unreachable!(),
}
self.merge(store_update);
Expand Down
1 change: 1 addition & 0 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

Expand Down
10 changes: 9 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ pub enum DBCol {
/// - *Rows*: only one key with 0 bytes.
/// - *Column type*: `EpochSyncProof`
EpochSyncProof,
/// Mapping of ShardUId to the underlying ShardUId database key prefix for the State column.
/// It could be parent shard after resharding, ancestor shard in case of many resharding,
/// or just map shard to itself if there was no resharding or we updated the mapping during state sync.
/// - *Rows*: `ShardUId`
/// - *Column type*: `ShardUId`
ShardUIdMapping,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -502,7 +508,8 @@ impl DBCol {
| DBCol::FlatStateChanges
| DBCol::FlatStateDeltaMetadata
| DBCol::FlatStorageStatus
| DBCol::EpochSyncProof => false,
| DBCol::EpochSyncProof
| DBCol::ShardUIdMapping => false,
}
}

Expand Down Expand Up @@ -575,6 +582,7 @@ impl DBCol {
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
DBCol::EpochSyncProof => &[DBKeyType::Empty],
DBCol::ShardUIdMapping => &[DBKeyType::ShardUId],
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl FlatStorage {
let blocks = guard.get_blocks_to_head(&new_head)?;

for block_hash in blocks.into_iter().rev() {
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
let mut store_update = guard.store.store_update();
// Delta must exist because flat storage is locked and we could retrieve
// path from old to new head. Otherwise we return internal error.
let changes = store_helper::get_delta_changes(&guard.store, shard_uid, block_hash)?
Expand Down Expand Up @@ -460,7 +460,7 @@ impl FlatStorage {
if block.prev_hash != guard.flat_head.hash && !guard.deltas.contains_key(&block.prev_hash) {
return Err(guard.create_block_not_supported_error(&block_hash));
}
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
let mut store_update = guard.store.store_update();
store_helper::set_delta(&mut store_update, shard_uid, &delta);
let cached_changes: CachedFlatStateChanges = delta.changes.into();
guard.deltas.insert(
Expand Down
4 changes: 0 additions & 4 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: S
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::State);
}

pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
let mut buffer = vec![];
buffer.extend_from_slice(&shard_uid.to_bytes());
Expand Down
88 changes: 72 additions & 16 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::{AccountId, BlockHeight, StateRoot};
use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache};
use shard_uid_mapping::{replace_shard_uid_key_prefix, retrieve_shard_uid_from_db_key};
use std::borrow::Cow;
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
Expand All @@ -54,6 +56,7 @@ pub mod metrics;
pub mod migrations;
mod opener;
mod rocksdb_metrics;
mod shard_uid_mapping;
mod sync_utils;
pub mod test_utils;
pub mod trie;
Expand All @@ -62,6 +65,7 @@ pub use crate::config::{Mode, StoreConfig};
pub use crate::opener::{
checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError,
};
use crate::shard_uid_mapping::ShardUIdMapping;

/// Specifies temperature of a storage.
///
Expand Down Expand Up @@ -109,6 +113,7 @@ pub struct NodeStorage {
#[derive(Clone)]
pub struct Store {
storage: Arc<dyn Database>,
shard_uid_mapping: ShardUIdMapping,
}

impl NodeStorage {
Expand Down Expand Up @@ -184,7 +189,7 @@ impl NodeStorage {
/// store, the view client should use the split store and the cold store
/// loop should use cold store.
pub fn get_hot_store(&self) -> Store {
Store { storage: self.hot_storage.clone() }
Store::new(self.hot_storage.clone())
}

/// Returns the cold store. The cold store is only available in archival
Expand All @@ -196,7 +201,7 @@ impl NodeStorage {
/// loop should use cold store.
pub fn get_cold_store(&self) -> Option<Store> {
match &self.cold_storage {
Some(cold_storage) => Some(Store { storage: cold_storage.clone() }),
Some(cold_storage) => Some(Store::new(cold_storage.clone())),
None => None,
}
}
Expand All @@ -208,7 +213,7 @@ impl NodeStorage {
pub fn get_recovery_store(&self) -> Option<Store> {
match &self.cold_storage {
Some(cold_storage) => {
Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) })
Some(Store::new(Arc::new(crate::db::RecoveryDB::new(cold_storage.clone()))))
}
None => None,
}
Expand All @@ -222,7 +227,7 @@ impl NodeStorage {
/// store, the view client should use the split store and the cold store
/// loop should use cold store.
pub fn get_split_store(&self) -> Option<Store> {
self.get_split_db().map(|split_db| Store { storage: split_db })
self.get_split_db().map(|split_db| Store::new(split_db))
}

pub fn get_split_db(&self) -> Option<Arc<SplitDB>> {
Expand Down Expand Up @@ -284,26 +289,68 @@ impl NodeStorage {

impl Store {
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { storage }
let shard_uid_mapping = ShardUIdMapping::new();
Self { storage, shard_uid_mapping }
}

fn get_impl(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, &key)
} else {
self.storage.get_raw_bytes(column, &key)
}?;
Ok(value)
}

fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result<ShardUId> {
let mapped_shard_uid =
self.get_ser::<ShardUId>(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?;
Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid))
}

fn get_impl_state(&self, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let shard_uid = retrieve_shard_uid_from_db_key(key)?;
let mut mapping_synchronized_with_db = false;
let mapped_shard_uid = match self.shard_uid_mapping.map(&shard_uid) {
Some(mapped_shard_uid) => mapped_shard_uid,
None => {
mapping_synchronized_with_db = true;
self.read_shard_uid_mapping_from_db(shard_uid)?
}
};
let mapped_key = if shard_uid != mapped_shard_uid {
Cow::Owned(replace_shard_uid_key_prefix(key, mapped_shard_uid))
} else {
Cow::Borrowed(key)
};
let mut value = self.get_impl(DBCol::State, &mapped_key)?;
if value.is_none() && !mapping_synchronized_with_db {
let db_mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?;
if db_mapped_shard_uid != mapped_shard_uid {
let mapped_key = replace_shard_uid_key_prefix(key, db_mapped_shard_uid);
value = self.get_impl(DBCol::State, &mapped_key)?;
}
}
Ok(value)
}

/// Fetches value from given column.
///
/// If the key does not exist in the column returns `None`. Otherwise
/// If the key does not exist in the column returns `None`. Otherwise
/// returns the data as [`DBSlice`] object. The object dereferences into
/// a slice, for cases when caller doesn’t need to own the value, and
/// provides conversion into a vector or an Arc.
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, key)
let value = if column == DBCol::State {
self.get_impl_state(key)?
} else {
self.storage.get_raw_bytes(column, key)
}?;
self.get_impl(column, key)?
};
tracing::trace!(
target: "store",
db_op = "get",
col = %column,
key = %StorageKey(key),
key = %StorageKey(&key),
size = value.as_deref().map(<[u8]>::len)
);
Ok(value)
Expand All @@ -318,7 +365,11 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(Arc::clone(&self.storage))
StoreUpdate {
transaction: DBTransaction::new(),
storage: Arc::clone(&self.storage),
_shard_uid_mapping: self.shard_uid_mapping.clone(),
}
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand All @@ -336,6 +387,7 @@ impl Store {
}

pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
assert!(col != DBCol::State, "can't iter prefix of State column");
self.storage.iter_prefix(col, key_prefix)
}

Expand All @@ -346,6 +398,8 @@ impl Store {
lower_bound: Option<&[u8]>,
upper_bound: Option<&[u8]>,
) -> DBIterator<'a> {
// That would fail if called ScanDbColumnCmd for State column.
assert!(col != DBCol::State, "can't range iter State column");
self.storage.iter_range(col, lower_bound, upper_bound)
}

Expand All @@ -354,6 +408,7 @@ impl Store {
col: DBCol,
key_prefix: &'a [u8],
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
assert!(col != DBCol::State, "can't iter prefix ser of State column");
self.storage
.iter_prefix(col, key_prefix)
.map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?))))
Expand Down Expand Up @@ -447,6 +502,8 @@ impl Store {
pub struct StoreUpdate {
transaction: DBTransaction,
storage: Arc<dyn Database>,
// TODO(reshardingV3) Currently unused, see TODO inside `commit()` method.
_shard_uid_mapping: ShardUIdMapping,
}

impl StoreUpdate {
Expand All @@ -455,10 +512,6 @@ impl StoreUpdate {
None => panic!(),
};

pub(crate) fn new(db: Arc<dyn Database>) -> Self {
StoreUpdate { transaction: DBTransaction::new(), storage: db }
}

/// Inserts a new value into the database.
///
/// It is a programming error if `insert` overwrites an existing, different
Expand Down Expand Up @@ -583,6 +636,7 @@ impl StoreUpdate {
/// Must not be used for reference-counted columns; use
/// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead.
pub fn delete(&mut self, column: DBCol, key: &[u8]) {
// It would panic if called with `State` column.
assert!(!column.is_rc(), "can't delete: {column}");
self.transaction.delete(column, key.to_vec());
}
Expand All @@ -594,6 +648,7 @@ impl StoreUpdate {
/// Deletes the given key range from the database including `from`
/// and excluding `to` keys.
pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) {
assert!(column != DBCol::State, "can't range delete State column");
self.transaction.delete_range(column, from.to_vec(), to.to_vec());
}

Expand Down Expand Up @@ -718,6 +773,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Use shard_uid_mapping for ops referencing State column.
self.storage.write(self.transaction)
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/store/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl<'a> StoreOpener<'a> {
tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it.");

let db = opener.create()?;
let store = Store { storage: Arc::new(db) };
let store = Store::new(Arc::new(db));
store.set_db_version(DB_VERSION)?;
return Ok(());
}
Expand Down Expand Up @@ -467,13 +467,13 @@ impl<'a> StoreOpener<'a> {
version: DbVersion,
) -> Result<Store, StoreOpenerError> {
let (db, _) = opener.open(mode, version)?;
let store = Store { storage: Arc::new(db) };
let store = Store::new(Arc::new(db));
Ok(store)
}

fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result<Store, StoreOpenerError> {
let db = opener.open_unsafe(mode)?;
let store = Store { storage: Arc::new(db) };
let store = Store::new(Arc::new(db));
Ok(store)
}
}
Expand Down Expand Up @@ -640,7 +640,7 @@ mod tests {
fn test_checkpoint_hot_storage_and_cleanup_columns() {
let (home_dir, opener) = NodeStorage::test_opener();
let node_storage = opener.open().unwrap();
let hot_store = Store { storage: node_storage.hot_storage.clone() };
let hot_store = Store::new(node_storage.hot_storage.clone());
assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC));

let keys = vec![vec![0], vec![1], vec![2], vec![3]];
Expand Down
Loading

0 comments on commit be91b49

Please sign in to comment.