Skip to content

Commit

Permalink
Get rid of in-mem mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Oct 11, 2024
1 parent 5d53a7c commit 69ea15d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 149 deletions.
2 changes: 1 addition & 1 deletion core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl FlatStorage {
return Err(guard.create_block_not_supported_error(&block_hash));
}
let mut store_update = guard.store.store_update();
store_helper::set_delta(&mut store_update, shard_uid, &delta);
store_update.set_delta(shard_uid, &delta);
let cached_changes: CachedFlatStateChanges = delta.changes.into();
guard.deltas.insert(
block_hash,
Expand Down
107 changes: 34 additions & 73 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::{AccountId, BlockHeight, StateRoot};
use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache};
use shard_uid_mapping::{replace_shard_uid_key_prefix, retrieve_shard_uid_from_db_key};
use std::borrow::Cow;
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
Expand All @@ -59,7 +57,6 @@ pub mod metrics;
pub mod migrations;
mod opener;
mod rocksdb_metrics;
mod shard_uid_mapping;
mod sync_utils;
pub mod test_utils;
pub mod trie;
Expand All @@ -68,7 +65,6 @@ pub use crate::config::{Mode, StoreConfig};
pub use crate::opener::{
checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError,
};
pub use crate::shard_uid_mapping::ShardUIdMapping;

/// Specifies temperature of a storage.
///
Expand Down Expand Up @@ -102,12 +98,9 @@ const STATE_FILE_END_MARK: u8 = 255;
///
/// Provides access to hot storage, cold storage and split storage. Typically
/// users will want to use one of the above via the Store abstraction.
/// It holds ShardUId mapping to be used for State mapping in Resharding V3.
/// It acts as a singleton that is passed around when constructing `Store` object.
pub struct NodeStorage {
hot_storage: Arc<dyn Database>,
cold_storage: Option<Arc<crate::db::ColdDB>>,
shard_uid_mapping: ShardUIdMapping,
}

/// Node’s single storage source.
Expand All @@ -119,9 +112,6 @@ pub struct NodeStorage {
#[derive(Clone)]
pub struct Store {
storage: Arc<dyn Database>,
/// Store holds ShardUId mapping to be used for State mapping in Resharding V3.
/// See `Store::get_impl_state` for how it is used.
shard_uid_mapping: ShardUIdMapping,
}

impl StoreAdapter for Store {
Expand Down Expand Up @@ -156,9 +146,7 @@ impl NodeStorage {
None
};

let shard_uid_mapping = ShardUIdMapping::new();

Self { hot_storage, cold_storage: cold_db, shard_uid_mapping }
Self { hot_storage, cold_storage: cold_db }
}

/// Initialises an opener for a new temporary test store.
Expand Down Expand Up @@ -186,8 +174,7 @@ impl NodeStorage {
/// possibly [`crate::test_utils::create_test_store`] (depending whether you
/// need [`NodeStorage`] or [`Store`] object.
pub fn new(storage: Arc<dyn Database>) -> Self {
let shard_uid_mapping = ShardUIdMapping::new();
Self { hot_storage: storage, cold_storage: None, shard_uid_mapping }
Self { hot_storage: storage, cold_storage: None }
}
}

Expand All @@ -206,7 +193,7 @@ impl NodeStorage {
/// store, the view client should use the split store and the cold store
/// loop should use cold store.
pub fn get_hot_store(&self) -> Store {
Store::new(self.hot_storage.clone(), self.shard_uid_mapping.clone())
Store { storage: self.hot_storage.clone() }
}

/// Returns the cold store. The cold store is only available in archival
Expand All @@ -218,9 +205,7 @@ impl NodeStorage {
/// loop should use cold store.
pub fn get_cold_store(&self) -> Option<Store> {
match &self.cold_storage {
Some(cold_storage) => {
Some(Store::new(cold_storage.clone(), self.shard_uid_mapping.clone()))
}
Some(cold_storage) => Some(Store { storage: cold_storage.clone() }),
None => None,
}
}
Expand All @@ -231,10 +216,9 @@ impl NodeStorage {
/// Recovery store should be use only to perform data recovery on archival nodes.
pub fn get_recovery_store(&self) -> Option<Store> {
match &self.cold_storage {
Some(cold_storage) => Some(Store::new(
Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())),
self.shard_uid_mapping.clone(),
)),
Some(cold_storage) => {
Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) })
}
None => None,
}
}
Expand All @@ -247,7 +231,7 @@ impl NodeStorage {
/// store, the view client should use the split store and the cold store
/// loop should use cold store.
pub fn get_split_store(&self) -> Option<Store> {
self.get_split_db().map(|split_db| Store::new(split_db, self.shard_uid_mapping.clone()))
self.get_split_db().map(|split_db| Store { storage: split_db })
}

pub fn get_split_db(&self) -> Option<Arc<SplitDB>> {
Expand All @@ -256,10 +240,6 @@ impl NodeStorage {
.map(|cold_db| SplitDB::new(self.hot_storage.clone(), cold_db.clone()))
}

pub fn get_shard_uid_mapping(&self) -> ShardUIdMapping {
self.shard_uid_mapping.clone()
}

/// Returns underlying database for given temperature.
///
/// This allows accessing underlying hot and cold databases directly
Expand Down Expand Up @@ -303,22 +283,31 @@ impl NodeStorage {
}

pub fn new_with_cold(hot: Arc<dyn Database>, cold: Arc<dyn Database>) -> Self {
let shard_uid_mapping = ShardUIdMapping::new();
Self {
hot_storage: hot,
cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))),
shard_uid_mapping,
}
Self { hot_storage: hot, cold_storage: Some(Arc::new(crate::db::ColdDB::new(cold))) }
}

pub fn cold_db(&self) -> Option<&Arc<crate::db::ColdDB>> {
self.cold_storage.as_ref()
}
}

pub fn retrieve_shard_uid_from_db_key(key: &[u8]) -> io::Result<ShardUId> {
// TODO(reshardingV3) Consider changing the Error type to `StorageError`?
// Would need changing error types for `Store` methods as well.
ShardUId::try_from(&key[..8])
.map_err(|e| io::Error::other(format!("Could not retrieve ShardUId from db key: {}", e)))
}

pub fn replace_shard_uid_key_prefix(key: &[u8], shard_uid: ShardUId) -> [u8; 40] {
let mut mapped_key = [0u8; 40];
mapped_key[..8].copy_from_slice(&shard_uid.to_bytes());
mapped_key[8..].copy_from_slice(&key[8..]);
mapped_key
}

impl Store {
pub fn new(storage: Arc<dyn Database>, shard_uid_mapping: ShardUIdMapping) -> Self {
Self { storage, shard_uid_mapping }
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { storage }
}

/// Underlying `get()` implementation for all columns.
Expand All @@ -331,43 +320,21 @@ impl Store {
Ok(value)
}

/// Reads shard_uid mapping for given shard and updates the in-memory mapping.
fn read_shard_uid_mapping_from_db(&self, shard_uid: ShardUId) -> io::Result<ShardUId> {
let mapped_shard_uid =
self.get_ser::<ShardUId>(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?;
Ok(self.shard_uid_mapping.update(&shard_uid, mapped_shard_uid))
}

/// Specialized `get` implementation for State column that replaces shard_uid prefix
/// with a mapped value according to mapping strategy in Resharding V3.
///
/// It first uses the in-memory `shard_uid_mapping` and attempts to read from storage.
/// If the node was not found in storage, it tries to update the in-memory mapping with
/// the mapping stored in `DBCol::ShardUIdMapping`.
/// If it was different, then we do the second attempt to read from storage.
/// It does extra read from `DBCol::ShardUIdMapping` to map the shard_uid key prefix.
fn get_impl_state(&self, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let shard_uid = retrieve_shard_uid_from_db_key(key)?;
let mut mapping_synchronized_with_db = false;
let mapped_shard_uid = match self.shard_uid_mapping.map(&shard_uid) {
Some(mapped_shard_uid) => mapped_shard_uid,
None => {
mapping_synchronized_with_db = true;
self.read_shard_uid_mapping_from_db(shard_uid)?
}
};
let mapped_shard_uid = self
.get_ser::<ShardUId>(DBCol::ShardUIdMapping, &shard_uid.to_bytes())?
.unwrap_or(shard_uid);
let mapped_key = if shard_uid != mapped_shard_uid {
Cow::Owned(replace_shard_uid_key_prefix(key, mapped_shard_uid))
&replace_shard_uid_key_prefix(key, mapped_shard_uid)
} else {
Cow::Borrowed(key)
key
};
let mut value = self.get_impl(DBCol::State, &mapped_key)?;
if value.is_none() && !mapping_synchronized_with_db {
let db_mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?;
if db_mapped_shard_uid != mapped_shard_uid {
let mapped_key = replace_shard_uid_key_prefix(key, db_mapped_shard_uid);
value = self.get_impl(DBCol::State, &mapped_key)?;
}
}
let value = self.get_impl(DBCol::State, &mapped_key)?;
Ok(value)
}

Expand Down Expand Up @@ -402,11 +369,7 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate {
transaction: DBTransaction::new(),
storage: Arc::clone(&self.storage),
_shard_uid_mapping: self.shard_uid_mapping.clone(),
}
StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) }
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand Down Expand Up @@ -539,8 +502,6 @@ impl Store {
pub struct StoreUpdate {
transaction: DBTransaction,
storage: Arc<dyn Database>,
// TODO(reshardingV3) Currently unused, see TODO inside `commit()` method.
_shard_uid_mapping: ShardUIdMapping,
}

impl StoreUpdateAdapter for StoreUpdate {
Expand Down Expand Up @@ -816,7 +777,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Use shard_uid_mapping for ops referencing State column.
// TODO(reshardingV3) Map shard_uid for ops referencing State column.
self.storage.write(self.transaction)
}
}
Expand Down
10 changes: 4 additions & 6 deletions core/store/src/opener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::db::rocksdb::snapshot::{Snapshot, SnapshotError, SnapshotRemoveError};
use crate::db::rocksdb::RocksDB;
use crate::metadata::{DbKind, DbMetadata, DbVersion, DB_VERSION};
use crate::shard_uid_mapping::ShardUIdMapping;
use crate::{DBCol, DBTransaction, Mode, NodeStorage, Store, StoreConfig, Temperature};
use std::sync::Arc;

Expand Down Expand Up @@ -340,7 +339,7 @@ impl<'a> StoreOpener<'a> {
tracing::info!(target: "db_opener", path=%opener.path.display(), "The database doesn't exist, creating it.");

let db = opener.create()?;
let store = Store::new(Arc::new(db), ShardUIdMapping::new());
let store = Store { storage: Arc::new(db) };
store.set_db_version(DB_VERSION)?;
return Ok(());
}
Expand Down Expand Up @@ -468,13 +467,13 @@ impl<'a> StoreOpener<'a> {
version: DbVersion,
) -> Result<Store, StoreOpenerError> {
let (db, _) = opener.open(mode, version)?;
let store = Store::new(Arc::new(db), ShardUIdMapping::new());
let store = Store { storage: Arc::new(db) };
Ok(store)
}

fn open_store_unsafe(mode: Mode, opener: &DBOpener) -> Result<Store, StoreOpenerError> {
let db = opener.open_unsafe(mode)?;
let store = Store::new(Arc::new(db), ShardUIdMapping::new());
let store = Store { storage: Arc::new(db) };
Ok(store)
}
}
Expand Down Expand Up @@ -641,8 +640,7 @@ mod tests {
fn test_checkpoint_hot_storage_and_cleanup_columns() {
let (home_dir, opener) = NodeStorage::test_opener();
let node_storage = opener.open().unwrap();
let hot_store =
Store::new(node_storage.hot_storage.clone(), node_storage.shard_uid_mapping.clone());
let hot_store = Store { storage: node_storage.hot_storage.clone() };
assert_eq!(hot_store.get_db_kind().unwrap(), Some(DbKind::RPC));

let keys = vec![vec![0], vec![1], vec![2], vec![3]];
Expand Down
65 changes: 0 additions & 65 deletions core/store/src/shard_uid_mapping.rs

This file was deleted.

4 changes: 2 additions & 2 deletions tools/replay-archive/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{BlockHeight, Gas, ProtocolVersion, ShardId};
use near_primitives::version::ProtocolFeature;
use near_state_viewer::progress_reporter::{timestamp_ms, ProgressReporter};
use near_store::{get_genesis_state_roots, ShardUId, ShardUIdMapping, Store};
use near_store::{get_genesis_state_roots, ShardUId, Store};
use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::collections::HashMap;
use std::path::Path;
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ReplayController {
end_height: Option<BlockHeight>,
) -> Result<Self> {
let storage = open_storage_for_replay(home_dir, &near_config)?;
let store = Store::new(storage.clone(), ShardUIdMapping::new());
let store = Store::new(storage.clone());

let genesis_height = near_config.genesis.config.genesis_height;
let chain_store = ChainStore::new(store.clone(), genesis_height, false);
Expand Down
3 changes: 1 addition & 2 deletions tools/state-viewer/src/replay_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,12 @@ fn create_replay_store(home_dir: &Path, near_config: &NearConfig) -> Store {
near_config.config.cold_store.as_ref(),
);
let storage = store_opener.open_in_mode(Mode::ReadOnly).unwrap();
let shard_uid_mapping = storage.get_shard_uid_mapping();

let read_db = if storage.has_cold() {
storage.get_split_db().unwrap()
} else {
storage.into_inner(Temperature::Hot)
};
let write_db = TestDB::new();
Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst), shard_uid_mapping)
Store::new(MixedDB::new(read_db, write_db, ReadOrder::WriteDBFirst))
}

0 comments on commit 69ea15d

Please sign in to comment.