From c9def1635ec7b7d4230d275c0f31342cbb35f7dc Mon Sep 17 00:00:00 2001 From: Shreyan Gupta Date: Thu, 26 Sep 2024 11:19:46 -0700 Subject: [PATCH] [store] Introduce Flat store adapter (#12123) This is the first concept PR of having adapters on top of store. Most of the details for how it works can be found in core/store/src/adapter/mod.rs The functions in core/store/src/adapter/flat_store.rs are moved from store_helper file. --- chain/chain/src/chain.rs | 8 +- chain/chain/src/chain_update.rs | 8 +- chain/chain/src/flat_storage_creator.rs | 83 ++--- chain/chain/src/garbage_collection.rs | 4 +- chain/chain/src/runtime/mod.rs | 5 +- chain/chain/src/runtime/tests.rs | 4 +- chain/client/src/sync_jobs_actor.rs | 3 +- core/store/src/adapter/flat_store.rs | 332 ++++++++++++++++++ core/store/src/adapter/mod.rs | 104 ++++++ core/store/src/flat/chunk_view.rs | 16 +- core/store/src/flat/delta.rs | 13 +- core/store/src/flat/manager.rs | 38 +- core/store/src/flat/mod.rs | 1 - core/store/src/flat/storage.rs | 275 ++++++--------- core/store/src/flat/store_helper.rs | 285 --------------- core/store/src/genesis/initialization.rs | 8 +- core/store/src/genesis/state_applier.rs | 3 +- core/store/src/lib.rs | 14 + core/store/src/test_utils.rs | 18 +- core/store/src/trie/from_flat.rs | 9 +- core/store/src/trie/mem/loading.rs | 30 +- core/store/src/trie/resharding_v2.rs | 5 +- core/store/src/trie/shard_tries.rs | 14 +- core/store/src/trie/state_parts.rs | 8 +- core/store/src/trie/state_snapshot.rs | 5 +- genesis-tools/genesis-populate/src/lib.rs | 3 +- .../src/tests/client/flat_storage.rs | 99 ++---- .../src/tests/client/process_blocks.rs | 3 +- .../src/tests/client/state_dump.rs | 9 +- .../src/tests/client/state_snapshot.rs | 3 +- .../src/tests/client/sync_state_nodes.rs | 6 +- integration-tests/src/user/runtime_user.rs | 3 +- nearcore/src/entity_debug.rs | 2 +- .../src/estimator_context.rs | 16 +- runtime/runtime/src/prefetch.rs | 3 +- tools/database/src/analyze_delayed_receipt.rs | 3 +- tools/database/src/corrupt.rs | 8 +- tools/database/src/state_perf.rs | 16 +- tools/flat-storage/src/commands.rs | 41 ++- tools/fork-network/src/cli.rs | 9 +- .../src/single_shard_storage_mutator.rs | 5 +- tools/state-viewer/src/apply_chain_range.rs | 6 +- tools/state-viewer/src/commands.rs | 4 +- tools/state-viewer/src/scan_db.rs | 4 +- 44 files changed, 800 insertions(+), 736 deletions(-) create mode 100644 core/store/src/adapter/flat_store.rs create mode 100644 core/store/src/adapter/mod.rs delete mode 100644 core/store/src/flat/store_helper.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 1be35c74e95..f2e8be752f5 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -90,8 +90,9 @@ use near_primitives::views::{ FinalExecutionOutcomeView, FinalExecutionOutcomeWithReceiptView, FinalExecutionStatus, LightClientBlockView, SignedTransactionView, }; +use near_store::adapter::StoreUpdateAdapter; use near_store::config::StateSnapshotType; -use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus}; +use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use near_store::trie::mem::resharding::RetainMode; use near_store::DBCol; use near_store::{get_genesis_state_roots, PartialStorage}; @@ -485,7 +486,7 @@ impl Chain { let mut tmp_store_update = store_update.store().store_update(); for shard_uid in epoch_manager.get_shard_layout(genesis_epoch_id)?.shard_uids() { flat_storage_manager.set_flat_storage_for_genesis( - &mut tmp_store_update, + &mut tmp_store_update.flat_store_update(), shard_uid, genesis.hash(), genesis.header().height(), @@ -3022,8 +3023,7 @@ impl Chain { tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage"); let mut store_update = self.runtime_adapter.store().store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.flat_store_update().set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: near_store::flat::BlockInfo { diff --git a/chain/chain/src/chain_update.rs b/chain/chain/src/chain_update.rs index 6130e5a4729..9d0a089d767 100644 --- a/chain/chain/src/chain_update.rs +++ b/chain/chain/src/chain_update.rs @@ -134,7 +134,7 @@ impl<'a> ChainUpdate<'a> { shard_uid, apply_result.trie_changes.state_changes(), )?; - self.chain_store_update.merge(store_update); + self.chain_store_update.merge(store_update.into()); self.chain_store_update.save_trie_changes(apply_result.trie_changes); self.chain_store_update.save_outgoing_receipt( @@ -174,7 +174,7 @@ impl<'a> ChainUpdate<'a> { shard_uid, apply_result.trie_changes.state_changes(), )?; - self.chain_store_update.merge(store_update); + self.chain_store_update.merge(store_update.into()); self.chain_store_update.save_chunk_extra(block_hash, &shard_uid, new_extra); self.chain_store_update.save_trie_changes(apply_result.trie_changes); @@ -544,7 +544,7 @@ impl<'a> ChainUpdate<'a> { shard_uid, apply_result.trie_changes.state_changes(), )?; - self.chain_store_update.merge(store_update); + self.chain_store_update.merge(store_update.into()); self.chain_store_update.save_trie_changes(apply_result.trie_changes); @@ -643,7 +643,7 @@ impl<'a> ChainUpdate<'a> { shard_uid, apply_result.trie_changes.state_changes(), )?; - self.chain_store_update.merge(store_update); + self.chain_store_update.merge(store_update.into()); self.chain_store_update.save_trie_changes(apply_result.trie_changes); // The chunk is missing but some fields may need to be updated diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 70064dc908a..edda5a5e6b9 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -20,12 +20,13 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; use near_primitives::state_part::PartId; use near_primitives::types::{BlockHeight, StateRoot}; +use near_store::adapter::flat_store::FlatStoreAdapter; +use near_store::adapter::StoreAdapter; use near_store::flat::{ - store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics, + BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics, FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT, }; -use near_store::Store; use near_store::{Trie, TrieDBStorage, TrieTraversalItem}; use std::collections::HashMap; use std::sync::atomic::AtomicU64; @@ -88,14 +89,14 @@ impl FlatStorageShardCreator { /// Fetch state part, write all state items to flat storage and send the number of items to the given channel. fn fetch_state_part( - store: Store, + store: FlatStoreAdapter, shard_uid: ShardUId, state_root: StateRoot, part_id: PartId, progress: Arc, result_sender: Sender, ) { - let trie_storage = TrieDBStorage::new(store.clone(), shard_uid); + let trie_storage = TrieDBStorage::new(store.store(), shard_uid); let trie = Trie::new(Arc::new(trie_storage), state_root, None); let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap(); let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap(); @@ -110,12 +111,7 @@ impl FlatStorageShardCreator { { if let Some(key) = key { let value = trie.retrieve_value(&hash).unwrap(); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - key, - Some(FlatStateValue::value_ref(&value)), - ); + store_update.set(shard_uid, key, Some(FlatStateValue::value_ref(&value))); num_items += 1; } } @@ -149,16 +145,16 @@ impl FlatStorageShardCreator { chain_store: &ChainStore, thread_pool: &rayon::ThreadPool, ) -> Result { + let store = chain_store.store().flat_store(); let shard_id = self.shard_uid.shard_id(); - let current_status = - store_helper::get_flat_storage_status(chain_store.store(), self.shard_uid) - .expect("failed to read flat storage status"); + let current_status = store + .get_flat_storage_status(self.shard_uid) + .expect("failed to read flat storage status"); self.metrics.set_status(¤t_status); match ¤t_status { FlatStorageStatus::Empty => { - let mut store_update = chain_store.store().store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + let mut store_update = store.store_update(); + store_update.set_flat_storage_status( self.shard_uid, FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas), ); @@ -179,11 +175,7 @@ impl FlatStorageShardCreator { for hash in hashes { debug!(target: "store", %shard_id, %height, %hash, "Checking delta existence"); assert_matches!( - store_helper::get_delta_changes( - chain_store.store(), - self.shard_uid, - *hash - ), + store.get_delta(self.shard_uid, *hash), Ok(Some(_)) ); } @@ -192,10 +184,9 @@ impl FlatStorageShardCreator { // We continue saving deltas, and also start fetching state. let block_hash = final_head.last_block_hash; - let store = self.runtime.store().clone(); let epoch_id = self.epoch_manager.get_epoch_id(&block_hash)?; let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; - let trie_storage = TrieDBStorage::new(store, shard_uid); + let trie_storage = TrieDBStorage::new(store.store(), shard_uid); let state_root = *chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root(); let trie = Trie::new(Arc::new(trie_storage), state_root, None); @@ -210,10 +201,9 @@ impl FlatStorageShardCreator { }; info!(target: "store", %shard_id, %final_height, ?status, "Switching status to fetching state"); - let mut store_update = chain_store.store().store_update(); + let mut store_update = store.store_update(); self.metrics.set_flat_head_height(final_head.height); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( self.shard_uid, FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( status, @@ -225,7 +215,6 @@ impl FlatStorageShardCreator { FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( fetching_state_status, )) => { - let store = self.runtime.store().clone(); let block_hash = fetching_state_status.block_hash; let start_part_id = fetching_state_status.part_id; let num_parts_in_step = fetching_state_status.num_parts_in_step; @@ -280,7 +269,7 @@ impl FlatStorageShardCreator { // Mark that we don't wait for new state parts. self.remaining_state_parts = None; - let mut store_update = chain_store.store().store_update(); + let mut store_update = store.store_update(); if next_start_part_id < num_parts { // If there are still remaining state parts, switch status to the new range of state parts. // We will spawn new rayon tasks on the next status update. @@ -291,8 +280,7 @@ impl FlatStorageShardCreator { num_parts, }; debug!(target: "chain", %shard_id, %block_hash, ?new_status); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( self.shard_uid, FlatStorageStatus::Creation( FlatStorageCreationStatus::FetchingState(new_status), @@ -302,13 +290,8 @@ impl FlatStorageShardCreator { // If all parts were fetched, we can start catchup. info!(target: "chain", %shard_id, %block_hash, "Finished fetching state"); self.metrics.set_remaining_state_parts(0); - store_helper::remove_delta( - &mut store_update, - self.shard_uid, - block_hash, - ); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.remove_delta(self.shard_uid, block_hash); + store_update.set_flat_storage_status( self.shard_uid, FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp( block_hash, @@ -320,11 +303,10 @@ impl FlatStorageShardCreator { } } FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(old_flat_head)) => { - let store = self.runtime.store(); let mut flat_head = *old_flat_head; let chain_final_head = chain_store.final_head()?; let mut merged_changes = FlatStateChanges::default(); - let mut store_update = self.runtime.store().store_update(); + let mut store_update = store.store_update(); // Merge up to 50 deltas of the next blocks until we reach chain final head. // TODO: consider merging 10 deltas at once to limit memory usage @@ -338,11 +320,9 @@ impl FlatStorageShardCreator { break; } flat_head = chain_store.get_next_block_hash(&flat_head).unwrap(); - let changes = store_helper::get_delta_changes(store, self.shard_uid, flat_head) - .unwrap() - .unwrap(); + let changes = store.get_delta(self.shard_uid, flat_head).unwrap().unwrap(); merged_changes.merge(changes); - store_helper::remove_delta(&mut store_update, self.shard_uid, flat_head); + store_update.remove_delta(self.shard_uid, flat_head); } if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash) @@ -356,8 +336,7 @@ impl FlatStorageShardCreator { debug!(target: "chain", %shard_id, %old_flat_head, %old_height, %flat_head, %height, "Catching up flat head"); self.metrics.set_flat_head_height(height); merged_changes.apply_to_flat_state(&mut store_update, shard_uid); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp( flat_head, @@ -370,26 +349,22 @@ impl FlatStorageShardCreator { // GC deltas from forks which could have appeared on chain during catchup. // Assuming that flat storage creation finishes in < 2 days, all deltas metadata cannot occupy // more than 2 * (Blocks per day = 48 * 60 * 60) * (BlockInfo size = 72) ~= 12.4 MB. - let mut store_update = self.runtime.store().store_update(); - let deltas_metadata = store_helper::get_all_deltas_metadata(&store, shard_uid) + let mut store_update = store.store_update(); + let deltas_metadata = store.get_all_deltas_metadata(shard_uid) .unwrap_or_else(|_| { panic!("Cannot read flat state deltas metadata for shard {shard_id} from storage") }); let mut gc_count = 0; for delta_metadata in deltas_metadata { if delta_metadata.block.height <= chain_final_head.height { - store_helper::remove_delta( - &mut store_update, - self.shard_uid, - delta_metadata.block.hash, - ); + store_update + .remove_delta(self.shard_uid, delta_metadata.block.hash); gc_count += 1; } } // If we reached chain final head, we can finish catchup and finally create flat storage. - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( self.shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: BlockInfo { diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 00da87b8ffe..1b08236231e 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -11,7 +11,7 @@ use near_primitives::shard_layout::get_block_shard_uid; use near_primitives::state_sync::{StateHeaderKey, StatePartKey}; use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId}; use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes}; -use near_store::flat::store_helper; +use near_store::adapter::StoreUpdateAdapter; use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId}; use crate::types::RuntimeAdapter; @@ -711,7 +711,7 @@ impl<'a> ChainStoreUpdate<'a> { // delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata let mut store_update = self.store().store_update(); - store_helper::remove_delta(&mut store_update, shard_uid, block_hash); + store_update.flat_store_update().remove_delta(shard_uid, block_hash); self.merge(store_update); } diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index e79b8e8ea8e..1fd28b915fe 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -37,6 +37,7 @@ use near_primitives::views::{ AccessKeyInfoView, CallResult, ContractCodeView, QueryRequest, QueryResponse, QueryResponseKind, ViewStateResult, }; +use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::config::StateSnapshotType; use near_store::flat::FlatStorageManager; use near_store::metadata::DbKind; @@ -99,7 +100,7 @@ impl NightshadeRuntime { let runtime = Runtime::new(); let trie_viewer = TrieViewer::new(trie_viewer_state_size_limit, max_gas_burnt_view); - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); let shard_uids: Vec<_> = genesis_config.shard_layout.shard_uids().collect(); let tries = ShardTries::new( store.clone(), @@ -1244,7 +1245,7 @@ impl RuntimeAdapter for NightshadeRuntime { debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len()); // TODO: `apply_to_flat_state` inserts values with random writes, which can be time consuming. // Optimize taking into account that flat state values always correspond to a consecutive range of keys. - flat_state_delta.apply_to_flat_state(&mut store_update, shard_uid); + flat_state_delta.apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); self.precompile_contracts(epoch_id, contract_codes)?; Ok(store_update.commit()?) } diff --git a/chain/chain/src/runtime/tests.rs b/chain/chain/src/runtime/tests.rs index a852bbdf51e..47d25c449cd 100644 --- a/chain/chain/src/runtime/tests.rs +++ b/chain/chain/src/runtime/tests.rs @@ -150,7 +150,7 @@ impl TestEnv { { let mut store_update = store.store_update(); flat_storage_manager.set_flat_storage_for_genesis( - &mut store_update, + &mut store_update.flat_store_update(), shard_uid, &genesis_hash, 0, @@ -301,7 +301,7 @@ impl TestEnv { }, }; let new_store_update = flat_storage.add_delta(delta).unwrap(); - store_update.merge(new_store_update); + store_update.merge(new_store_update.into()); } store_update.commit().unwrap(); diff --git a/chain/client/src/sync_jobs_actor.rs b/chain/client/src/sync_jobs_actor.rs index f19370ba73c..176151823ad 100644 --- a/chain/client/src/sync_jobs_actor.rs +++ b/chain/client/src/sync_jobs_actor.rs @@ -10,6 +10,7 @@ use near_performance_metrics_macros::perf; use near_primitives::state_part::PartId; use near_primitives::state_sync::StatePartKey; use near_primitives::types::ShardId; +use near_store::adapter::StoreUpdateAdapter; use near_store::DBCol; // Set the mailbox capacity for the SyncJobsActor from default 16 to 100. @@ -100,7 +101,7 @@ impl SyncJobsActor { let success = msg .runtime_adapter .get_flat_storage_manager() - .remove_flat_storage_for_shard(msg.shard_uid, &mut store_update)?; + .remove_flat_storage_for_shard(msg.shard_uid, &mut store_update.flat_store_update())?; store_update.commit()?; Ok(success) } diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs new file mode 100644 index 00000000000..07eabea27e6 --- /dev/null +++ b/core/store/src/adapter/flat_store.rs @@ -0,0 +1,332 @@ +use std::io; + +use borsh::BorshDeserialize; +use near_primitives::hash::CryptoHash; +use near_primitives::shard_layout::ShardUId; +use near_primitives::state::FlatStateValue; + +use crate::flat::delta::{BlockWithChangesInfo, KeyForFlatStateDelta}; +use crate::flat::{ + FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStateIterator, FlatStorageError, + FlatStorageReadyStatus, FlatStorageStatus, +}; +use crate::{DBCol, Store, StoreUpdate}; + +use super::{StoreAdapter, StoreUpdateAdapter, StoreUpdateHolder}; + +#[derive(Clone)] +pub struct FlatStoreAdapter { + store: Store, +} + +impl StoreAdapter for FlatStoreAdapter { + fn store(&self) -> Store { + self.store.clone() + } +} + +impl FlatStoreAdapter { + pub fn new(store: Store) -> Self { + Self { store } + } + + pub fn store_update(&self) -> FlatStoreUpdateAdapter<'static> { + FlatStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) } + } + + pub fn exists(&self, shard_uid: ShardUId, key: &[u8]) -> Result { + let db_key = encode_flat_state_db_key(shard_uid, key); + self.store.exists(DBCol::FlatState, &db_key).map_err(|err| { + FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}")) + }) + } + + pub fn get( + &self, + shard_uid: ShardUId, + key: &[u8], + ) -> Result, FlatStorageError> { + let db_key = encode_flat_state_db_key(shard_uid, key); + self.store.get_ser(DBCol::FlatState, &db_key).map_err(|err| { + FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}")) + }) + } + + pub fn get_flat_storage_status( + &self, + shard_uid: ShardUId, + ) -> Result { + self.store + .get_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes()) + .map(|status| status.unwrap_or(FlatStorageStatus::Empty)) + .map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "failed to read flat storage status: {err}" + )) + }) + } + + pub fn get_delta( + &self, + shard_uid: ShardUId, + block_hash: CryptoHash, + ) -> Result, FlatStorageError> { + let key = KeyForFlatStateDelta { shard_uid, block_hash }; + self.store.get_ser::(DBCol::FlatStateChanges, &key.to_bytes()).map_err( + |err| { + FlatStorageError::StorageInternalError(format!( + "failed to read delta changes for {key:?}: {err}" + )) + }, + ) + } + + pub fn get_all_deltas_metadata( + &self, + shard_uid: ShardUId, + ) -> Result, FlatStorageError> { + self.store + .iter_prefix_ser(DBCol::FlatStateDeltaMetadata, &shard_uid.to_bytes()) + .map(|res| { + res.map(|(_, value)| value).map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "failed to read delta metadata: {err}" + )) + }) + }) + .collect() + } + + pub fn get_prev_block_with_changes( + &self, + shard_uid: ShardUId, + block_hash: CryptoHash, + prev_hash: CryptoHash, + ) -> Result, FlatStorageError> { + let key = KeyForFlatStateDelta { shard_uid, block_hash: prev_hash }.to_bytes(); + let prev_delta_metadata: Option = + self.store.get_ser(DBCol::FlatStateDeltaMetadata, &key).map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "failed to read delta metadata for {key:?}: {err}" + )) + })?; + + let prev_block_with_changes = match prev_delta_metadata { + None => { + // DeltaMetadata not found, which means the prev block is the flat head. + let flat_storage_status = self.get_flat_storage_status(shard_uid)?; + match flat_storage_status { + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) => { + if flat_head.hash == prev_hash { + Some(BlockWithChangesInfo { hash: prev_hash, height: flat_head.height }) + } else { + tracing::error!(target: "store", ?block_hash, ?prev_hash, "Missing delta metadata"); + None + } + } + // Don't do any performance optimizations while flat storage is not ready. + _ => None, + } + } + Some(metadata) => { + // If the prev block contains `prev_block_with_changes`, then use that value. + // Otherwise reference the prev block. + Some(metadata.prev_block_with_changes.unwrap_or(BlockWithChangesInfo { + hash: metadata.block.hash, + height: metadata.block.height, + })) + } + }; + Ok(prev_block_with_changes) + } + + /// Returns iterator over entire range of flat storage entries. + /// It reads data only from `FlatState` column which represents the state at + /// flat storage head. Reads only committed changes. + pub fn iter<'a>(&'a self, shard_uid: ShardUId) -> FlatStateIterator<'a> { + self.iter_range(shard_uid, None, None) + } + + /// Returns iterator over flat storage entries for a given shard and range of state keys. + /// It reads data only from `FlatState` column which represents the state at + /// flat storage head. Reads only committed changes. + pub fn iter_range<'a>( + &'a self, + shard_uid: ShardUId, + from: Option<&[u8]>, + to: Option<&[u8]>, + ) -> FlatStateIterator<'a> { + // If left direction is unbounded, encoded `shard_uid` serves as the + // smallest possible key in DB for the shard. + let db_key_from = match from { + Some(from) => encode_flat_state_db_key(shard_uid, from), + None => shard_uid.to_bytes().to_vec(), + }; + // If right direction is unbounded, `ShardUId::next_shard_prefix` serves as + // the key which is strictly bigger than all keys in DB for this shard and + // still doesn't include keys from other shards. + let db_key_to = match to { + Some(to) => encode_flat_state_db_key(shard_uid, to), + None => ShardUId::next_shard_prefix(&shard_uid.to_bytes()).to_vec(), + }; + let iter = self + .store + .iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)) + .map(|result| match result { + Ok((key, value)) => Ok(( + decode_flat_state_db_key(&key) + .map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "invalid FlatState key format: {err}" + )) + })? + .1, + FlatStateValue::try_from_slice(&value).map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "invalid FlatState value format: {err}" + )) + })?, + )), + Err(err) => Err(FlatStorageError::StorageInternalError(format!( + "FlatState iterator error: {err}" + ))), + }); + Box::new(iter) + } +} + +pub struct FlatStoreUpdateAdapter<'a> { + store_update: StoreUpdateHolder<'a>, +} + +impl Into for FlatStoreUpdateAdapter<'static> { + fn into(self) -> StoreUpdate { + self.store_update.into() + } +} + +impl FlatStoreUpdateAdapter<'static> { + pub fn commit(self) -> io::Result<()> { + let store_update: StoreUpdate = self.into(); + store_update.commit() + } +} + +impl<'a> StoreUpdateAdapter for FlatStoreUpdateAdapter<'a> { + fn store_update(&mut self) -> &mut StoreUpdate { + &mut self.store_update + } +} + +impl<'a> FlatStoreUpdateAdapter<'a> { + pub fn new(store_update: &'a mut StoreUpdate) -> Self { + Self { store_update: StoreUpdateHolder::Reference(store_update) } + } + + pub fn set(&mut self, shard_uid: ShardUId, key: Vec, value: Option) { + let db_key = encode_flat_state_db_key(shard_uid, &key); + match value { + Some(value) => self + .store_update + .set_ser(DBCol::FlatState, &db_key, &value) + .expect("Borsh should not have failed here"), + None => self.store_update.delete(DBCol::FlatState, &db_key), + } + } + + pub fn remove_all(&mut self, shard_uid: ShardUId) { + self.remove_range_by_shard_uid(shard_uid, DBCol::FlatState); + } + + pub fn set_flat_storage_status(&mut self, shard_uid: ShardUId, status: FlatStorageStatus) { + self.store_update + .set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &status) + .expect("Borsh should not have failed here") + } + + pub fn set_delta(&mut self, shard_uid: ShardUId, delta: &FlatStateDelta) { + let key = + KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes(); + self.store_update + .set_ser(DBCol::FlatStateChanges, &key, &delta.changes) + .expect("Borsh should not have failed here"); + self.store_update + .set_ser(DBCol::FlatStateDeltaMetadata, &key, &delta.metadata) + .expect("Borsh should not have failed here"); + } + + pub fn remove_delta(&mut self, shard_uid: ShardUId, block_hash: CryptoHash) { + let key = KeyForFlatStateDelta { shard_uid, block_hash }.to_bytes(); + self.store_update.delete(DBCol::FlatStateChanges, &key); + self.store_update.delete(DBCol::FlatStateDeltaMetadata, &key); + } + + pub fn remove_all_deltas(&mut self, shard_uid: ShardUId) { + self.remove_range_by_shard_uid(shard_uid, DBCol::FlatStateChanges); + self.remove_range_by_shard_uid(shard_uid, DBCol::FlatStateDeltaMetadata); + } + + // helper + fn remove_range_by_shard_uid(&mut self, shard_uid: ShardUId, col: DBCol) { + let key_from = shard_uid.to_bytes(); + let key_to = ShardUId::next_shard_prefix(&key_from); + self.store_update.delete_range(col, &key_from, &key_to); + } +} + +pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { + let mut buffer = vec![]; + buffer.extend_from_slice(&shard_uid.to_bytes()); + buffer.extend_from_slice(key); + buffer +} + +pub fn decode_flat_state_db_key(key: &[u8]) -> io::Result<(ShardUId, Vec)> { + let (shard_uid_bytes, trie_key) = key.split_at_checked(8).ok_or_else(|| { + io::Error::other(format!("expected FlatState key length to be at least 8: {key:?}")) + })?; + let shard_uid = shard_uid_bytes.try_into().map_err(|err| { + io::Error::other(format!("failed to decode shard_uid as part of FlatState key: {err}")) + })?; + Ok((shard_uid, trie_key.to_vec())) +} + +#[cfg(test)] +mod tests { + use near_primitives::shard_layout::ShardUId; + use near_primitives::state::FlatStateValue; + + use crate::adapter::{StoreAdapter, StoreUpdateAdapter}; + use crate::test_utils::create_test_store; + + #[test] + fn iter_flat_state_entries() { + // Setup shards and store + let store = create_test_store().flat_store(); + let shard_uids = [0, 1, 2].map(|id| ShardUId { version: 0, shard_id: id }); + + for (i, shard_uid) in shard_uids.iter().enumerate() { + let mut store_update = store.store_update(); + let key: Vec = vec![0, 1, i as u8]; + let val: Vec = vec![0, 1, 2, i as u8]; + + // Add value to FlatState + store_update.flat_store_update().set( + *shard_uid, + key.clone(), + Some(FlatStateValue::inlined(&val)), + ); + + store_update.commit().unwrap(); + } + + for (i, shard_uid) in shard_uids.iter().enumerate() { + let entries: Vec<_> = store.iter(*shard_uid).collect(); + assert_eq!(entries.len(), 1); + let key: Vec = vec![0, 1, i as u8]; + let val: Vec = vec![0, 1, 2, i as u8]; + + assert_eq!(entries, vec![Ok((key, FlatStateValue::inlined(&val)))]); + } + } +} diff --git a/core/store/src/adapter/mod.rs b/core/store/src/adapter/mod.rs new file mode 100644 index 00000000000..4475d866489 --- /dev/null +++ b/core/store/src/adapter/mod.rs @@ -0,0 +1,104 @@ +pub mod flat_store; + +use std::ops::{Deref, DerefMut}; + +use crate::{Store, StoreUpdate}; + +/// Internal enum that can store either an owned StoreUpdate to a reference to StoreUpdate. +/// +/// While dealing with store update, the typical pattern is to do something like: +/// ```rust, ignore +/// let store_update: StoreUpdate = store.store_update(); +/// +/// store_update.set_foo("bar"); +/// some_large_update_function(&mut store_update); +/// +/// store_update.commit()?; +/// ``` +/// Now with StoreAdapters, store could be of any of the type of the adapters, example `FlatStoreAdapter`. +/// In that case, we expect the above pattern to look similar, however we would expect calls to +/// `flat_store.store_update()` to return type `FlatStoreUpdateAdapter` instead of `StoreUpdate`. +/// +/// At the same time we would like to allow conversion of `StoreUpdate` to `FlatStoreUpdateAdapter`. +/// +/// ```rust, ignore +/// fn update_flat_store(flat_store_update: &mut FlatStoreUpdateAdapter) { +/// ... +/// } +/// +/// // Pattern 1: reference to store_update +/// let store_update: StoreUpdate = store.store_update(); +/// update_flat_store(&mut store_update.flat_store_update()); +/// store_update.commit()?; +/// +/// // Pattern 2: owned store_update +/// let flat_store: FlatStoreAdapter = store.flat_store(); +/// let flat_store_update: FlatStoreUpdateAdapter<'static> = flat_store.store_update(); +/// update_flat_store(&mut flat_store_update); +/// flat_store_update.commit()?; +/// ``` +/// +/// To make both these patterns possible, where in pattern 1, flat_store_update holds a reference to store_update +/// and in pattern 2, flat_store_update owns the instance of store_update, we use this enum. +/// +/// Note that owned versions of flat_store_update have a static lifetime as compared to borrowed versions. +enum StoreUpdateHolder<'a> { + Reference(&'a mut StoreUpdate), + Owned(StoreUpdate), +} + +// Seamless conversion from &store_update_holder to &store_update. +impl Deref for StoreUpdateHolder<'_> { + type Target = StoreUpdate; + + fn deref(&self) -> &Self::Target { + match self { + StoreUpdateHolder::Reference(store_update) => store_update, + StoreUpdateHolder::Owned(store_update) => store_update, + } + } +} + +// Seamless conversion from &mut store_update_holder to &mut store_update. +impl DerefMut for StoreUpdateHolder<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + StoreUpdateHolder::Reference(store_update) => store_update, + StoreUpdateHolder::Owned(store_update) => store_update, + } + } +} + +// Static instances of StoreUpdateHolder always hold an owned StoreUpdate instance. +// In such case it should be possible to convert it to StoreUpdate. +impl Into for StoreUpdateHolder<'static> { + fn into(self) -> StoreUpdate { + match self { + StoreUpdateHolder::Reference(_) => panic!("converting borrowed store update"), + StoreUpdateHolder::Owned(store_update) => store_update, + } + } +} + +/// Simple adapter wrapper on top of Store to provide a more ergonomic interface for different store types. +/// We provide simple inter-convertibility between different store types like FlatStoreAdapter and TrieStoreAdapter. +pub trait StoreAdapter { + fn store(&self) -> Store; + + fn flat_store(&self) -> flat_store::FlatStoreAdapter { + flat_store::FlatStoreAdapter::new(self.store()) + } +} + +/// Simple adapter wrapper on top of StoreUpdate to provide a more ergonomic interface for +/// different store update types. +/// We provide simple inter-convertibility between different store update types like FlatStoreUpdateAdapter +/// and TrieStoreUpdateAdapter, however these are conversions by reference only. +/// The underlying StoreUpdate instance remains the same. +pub trait StoreUpdateAdapter: Sized { + fn store_update(&mut self) -> &mut StoreUpdate; + + fn flat_store_update(&mut self) -> flat_store::FlatStoreUpdateAdapter { + flat_store::FlatStoreUpdateAdapter::new(self.store_update()) + } +} diff --git a/core/store/src/flat/chunk_view.rs b/core/store/src/flat/chunk_view.rs index c7cc583dcfb..d704e2a013c 100644 --- a/core/store/src/flat/chunk_view.rs +++ b/core/store/src/flat/chunk_view.rs @@ -1,10 +1,8 @@ -use crate::flat::store_helper; +use crate::adapter::flat_store::FlatStoreAdapter; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; -use crate::Store; - use super::types::FlatStateIterator; use super::FlatStorage; @@ -21,7 +19,7 @@ pub struct FlatStorageChunkView { /// Used to access flat state stored at the head of flat storage. /// It should store all trie keys and values/value refs for the state on top of /// flat_storage.head, except for delayed receipt keys. - store: Store, + store: FlatStoreAdapter, /// The block for which key-value pairs of its state will be retrieved. The flat state /// will reflect the state AFTER the block is applied. block_hash: CryptoHash, @@ -31,7 +29,7 @@ pub struct FlatStorageChunkView { } impl FlatStorageChunkView { - pub fn new(store: Store, block_hash: CryptoHash, flat_storage: FlatStorage) -> Self { + pub fn new(store: FlatStoreAdapter, block_hash: CryptoHash, flat_storage: FlatStorage) -> Self { Self { store, block_hash, flat_storage } } /// Returns value reference using raw trie key, taken from the state @@ -49,12 +47,8 @@ impl FlatStorageChunkView { self.flat_storage.contains_key(&self.block_hash, key) } - pub fn iter_flat_state_entries<'a>( - &'a self, - from: Option<&[u8]>, - to: Option<&[u8]>, - ) -> FlatStateIterator<'a> { - store_helper::iter_flat_state_entries(self.flat_storage.shard_uid(), &self.store, from, to) + pub fn iter_range(&self, from: Option<&[u8]>, to: Option<&[u8]>) -> FlatStateIterator { + self.store.iter_range(self.flat_storage.shard_uid(), from, to) } pub fn get_head_hash(&self) -> CryptoHash { diff --git a/core/store/src/flat/delta.rs b/core/store/src/flat/delta.rs index 1fb086c3501..6cc5363782f 100644 --- a/core/store/src/flat/delta.rs +++ b/core/store/src/flat/delta.rs @@ -9,8 +9,9 @@ use near_schema_checker_lib::ProtocolSchema; use std::collections::HashMap; use std::sync::Arc; -use super::{store_helper, BlockInfo}; -use crate::{CryptoHash, StoreUpdate}; +use super::BlockInfo; +use crate::adapter::flat_store::FlatStoreUpdateAdapter; +use crate::CryptoHash; #[derive(Debug)] pub struct FlatStateDelta { @@ -132,9 +133,13 @@ impl FlatStateChanges { } /// Applies delta to the flat state. - pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate, shard_uid: ShardUId) { + pub fn apply_to_flat_state( + self, + store_update: &mut FlatStoreUpdateAdapter, + shard_uid: ShardUId, + ) { for (key, value) in self.0.into_iter() { - store_helper::set_flat_state_value(store_update, shard_uid, key, value); + store_update.set(shard_uid, key, value); } } } diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 558f9798d5a..47168512acb 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -1,6 +1,5 @@ -use crate::flat::{ - store_helper, BlockInfo, FlatStorageReadyStatus, FlatStorageStatus, POISONED_LOCK_ERR, -}; +use crate::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; +use crate::flat::{BlockInfo, FlatStorageReadyStatus, FlatStorageStatus, POISONED_LOCK_ERR}; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; @@ -9,8 +8,6 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tracing::debug; -use crate::{Store, StoreUpdate}; - use super::chunk_view::FlatStorageChunkView; use super::{ FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorage, FlatStorageError, @@ -23,7 +20,7 @@ use super::{ pub struct FlatStorageManager(Arc); pub struct FlatStorageManagerInner { - store: Store, + store: FlatStoreAdapter, /// Here we store the flat_storage per shard. The reason why we don't use the same /// FlatStorage for all shards is that there are two modes of block processing, /// normal block processing and block catchups. Since these are performed on different range @@ -36,7 +33,7 @@ pub struct FlatStorageManagerInner { } impl FlatStorageManager { - pub fn new(store: Store) -> Self { + pub fn new(store: FlatStoreAdapter) -> Self { Self(Arc::new(FlatStorageManagerInner { store, flat_storages: Default::default() })) } @@ -47,15 +44,14 @@ impl FlatStorageManager { /// an empty database. pub fn set_flat_storage_for_genesis( &self, - store_update: &mut StoreUpdate, + store_update: &mut FlatStoreUpdateAdapter, shard_uid: ShardUId, genesis_block: &CryptoHash, genesis_height: BlockHeight, ) { let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); assert!(!flat_storages.contains_key(&shard_uid)); - store_helper::set_flat_storage_status( - store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: BlockInfo::genesis(*genesis_block, genesis_height), @@ -136,18 +132,15 @@ impl FlatStorageManager { height: BlockHeight, shard_uid: ShardUId, state_changes: &[RawStateChangesWithTrieKey], - ) -> Result { + ) -> Result, StorageError> { let prev_block_with_changes = if state_changes.is_empty() { // The current block has no flat state changes. // Find the last block with flat state changes by looking it up in // the prev block. - store_helper::get_prev_block_with_changes( - &self.0.store, - shard_uid, - block_hash, - prev_hash, - ) - .map_err(|e| StorageError::from(e))? + self.0 + .store + .get_prev_block_with_changes(shard_uid, block_hash, prev_hash) + .map_err(|e| StorageError::from(e))? } else { // The current block has flat state changes. None @@ -167,8 +160,8 @@ impl FlatStorageManager { } else { // Otherwise, save delta to disk so it will be used for flat storage creation later. debug!(target: "store", %shard_uid, "Add delta for flat storage creation"); - let mut store_update: StoreUpdate = self.0.store.store_update(); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + let mut store_update = self.0.store.store_update(); + store_update.set_delta(shard_uid, &delta); store_update }; @@ -176,8 +169,7 @@ impl FlatStorageManager { } pub fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus { - store_helper::get_flat_storage_status(&self.0.store, shard_uid) - .expect("failed to read flat storage status") + self.0.store.get_flat_storage_status(shard_uid).expect("failed to read flat storage status") } /// Creates `FlatStorageChunkView` to access state for `shard_uid` and block `block_hash`. @@ -216,7 +208,7 @@ impl FlatStorageManager { pub fn remove_flat_storage_for_shard( &self, shard_uid: ShardUId, - store_update: &mut StoreUpdate, + store_update: &mut FlatStoreUpdateAdapter, ) -> Result { let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); if let Some(flat_store) = flat_storages.remove(&shard_uid) { diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index ff714ace1e6..1e0b1d967ab 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -30,7 +30,6 @@ pub mod delta; mod manager; mod metrics; mod storage; -pub mod store_helper; #[cfg(test)] pub mod test_utils; mod types; diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index e16d4a6113a..172928d35eb 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -8,14 +8,13 @@ use near_primitives::state::FlatStateValue; use near_primitives::types::BlockHeight; use tracing::{debug, warn}; +use crate::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; use crate::flat::delta::{BlockWithChangesInfo, CachedFlatStateChanges}; use crate::flat::BlockInfo; use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus}; -use crate::{Store, StoreUpdate}; use super::delta::{CachedFlatStateDelta, FlatStateDelta}; use super::metrics::FlatStorageMetrics; -use super::store_helper; use super::types::FlatStorageError; /// FlatStorage stores information on which blocks flat storage current supports key lookups on. @@ -35,7 +34,7 @@ pub struct FlatStorage(pub(crate) Arc>); // This makes sure that when a node restarts, FlatStorage can load changes for all blocks // after the `flat_head` block successfully. pub(crate) struct FlatStorageInner { - store: Store, + store: FlatStoreAdapter, /// UId of the shard which state is accessed by this flat storage. shard_uid: ShardUId, /// The block for which we store the key value pairs of the state after it is applied. @@ -235,9 +234,9 @@ impl FlatStorage { /// Create a new FlatStorage for `shard_uid` using flat head if it is stored on storage. /// We also load all blocks with height between flat head to `latest_block_height` /// including those on forks into the returned FlatStorage. - pub fn new(store: Store, shard_uid: ShardUId) -> Result { + pub fn new(store: FlatStoreAdapter, shard_uid: ShardUId) -> Result { let shard_id = shard_uid.shard_id(); - let flat_head = match store_helper::get_flat_storage_status(&store, shard_uid) { + let flat_head = match store.get_flat_storage_status(shard_uid) { Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head, status => { return Err(StorageError::StorageInconsistentState(format!( @@ -248,15 +247,15 @@ impl FlatStorage { let metrics = FlatStorageMetrics::new(shard_uid); metrics.set_flat_head_height(flat_head.height); - let deltas_metadata = store_helper::get_all_deltas_metadata(&store, shard_uid) - .unwrap_or_else(|_| { - panic!("Cannot read flat state deltas metadata for shard {shard_id} from storage") - }); + let deltas_metadata = store.get_all_deltas_metadata(shard_uid).unwrap_or_else(|_| { + panic!("Cannot read flat state deltas metadata for shard {shard_id} from storage") + }); let mut deltas = HashMap::new(); for delta_metadata in deltas_metadata { let block_hash = delta_metadata.block.hash; let changes: CachedFlatStateChanges = if delta_metadata.has_changes() { - store_helper::get_delta_changes(&store, shard_uid, block_hash) + store + .get_delta(shard_uid, block_hash) .expect("failed to read flat state delta changes") .unwrap_or_else(|| { panic!("cannot find block delta for block {block_hash:?} shard {shard_id}") @@ -301,8 +300,7 @@ impl FlatStorage { key: &[u8], ) -> Result, crate::StorageError> { let guard = self.0.read().expect(super::POISONED_LOCK_ERR); - let blocks_to_head = - guard.get_blocks_to_head(block_hash).map_err(|e| StorageError::from(e))?; + let blocks_to_head = guard.get_blocks_to_head(block_hash)?; for block_hash in blocks_to_head.iter() { // If we found a key in changes, we can return a value because it is the most recent key update. let changes = guard.get_block_changes(block_hash)?; @@ -314,7 +312,7 @@ impl FlatStorage { }; } - let value = store_helper::get_flat_state_value(&guard.store, guard.shard_uid, key)?; + let value = guard.store.get(guard.shard_uid, key)?; Ok(value) } @@ -336,10 +334,7 @@ impl FlatStorage { }; } - let db_key = store_helper::encode_flat_state_db_key(guard.shard_uid, key); - Ok(guard.store.exists(crate::DBCol::FlatState, &db_key).map_err(|err| { - FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}")) - })?) + Ok(guard.store.exists(guard.shard_uid, key)?) } // TODO(#11601): Direct call is DEPRECATED, consider removing non-strict mode. @@ -389,10 +384,12 @@ impl FlatStorage { let blocks = guard.get_blocks_to_head(&new_head)?; for block_hash in blocks.into_iter().rev() { - let mut store_update = StoreUpdate::new(guard.store.storage.clone()); + let mut store_update = guard.store.store_update(); // Delta must exist because flat storage is locked and we could retrieve // path from old to new head. Otherwise we return internal error. - let changes = store_helper::get_delta_changes(&guard.store, shard_uid, block_hash)? + let changes = guard + .store + .get_delta(shard_uid, block_hash)? .ok_or_else(|| missing_delta_error(&block_hash))?; changes.apply_to_flat_state(&mut store_update, guard.shard_uid); let metadata = guard @@ -402,8 +399,7 @@ impl FlatStorage { .metadata; let block = metadata.block; let block_height = block.height; - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: block }), ); @@ -425,7 +421,7 @@ impl FlatStorage { .cloned() .collect(); for hash in hashes_to_remove { - store_helper::remove_delta(&mut store_update, shard_uid, hash); + store_update.remove_delta(shard_uid, hash); guard.deltas.remove(&hash); } @@ -450,7 +446,10 @@ impl FlatStorage { /// committed to disk in one db transaction together with the rest of changes caused by block, /// in case the node stopped or crashed in between and a block is on chain but its delta is not /// stored or vice versa. - pub fn add_delta(&self, delta: FlatStateDelta) -> Result { + pub fn add_delta( + &self, + delta: FlatStateDelta, + ) -> Result, FlatStorageError> { let mut guard = self.0.write().expect(super::POISONED_LOCK_ERR); let shard_uid = guard.shard_uid; let block = &delta.metadata.block; @@ -460,8 +459,8 @@ impl FlatStorage { if block.prev_hash != guard.flat_head.hash && !guard.deltas.contains_key(&block.prev_hash) { return Err(guard.create_block_not_supported_error(&block_hash)); } - let mut store_update = StoreUpdate::new(guard.store.storage.clone()); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + let mut store_update = guard.store.store_update(); + store_update.set_delta(shard_uid, &delta); let cached_changes: CachedFlatStateChanges = delta.changes.into(); guard.deltas.insert( block_hash, @@ -473,12 +472,15 @@ impl FlatStorage { } /// Clears all State key-value pairs from flat storage. - pub fn clear_state(&self, store_update: &mut StoreUpdate) -> Result<(), StorageError> { + pub fn clear_state( + &self, + store_update: &mut FlatStoreUpdateAdapter, + ) -> Result<(), StorageError> { let guard = self.0.write().expect(super::POISONED_LOCK_ERR); let shard_uid = guard.shard_uid; - store_helper::remove_all_flat_state_values(store_update, shard_uid); - store_helper::remove_all_deltas(store_update, shard_uid); - store_helper::set_flat_storage_status(store_update, shard_uid, FlatStorageStatus::Empty); + store_update.remove_all(shard_uid); + store_update.remove_all_deltas(shard_uid); + store_update.set_flat_storage_status(shard_uid, FlatStorageStatus::Empty); guard.update_delta_metrics(); Ok(()) } @@ -511,6 +513,7 @@ fn missing_delta_error(block_hash: &CryptoHash) -> FlatStorageError { #[cfg(test)] mod tests { + use crate::adapter::StoreAdapter; use crate::flat::delta::{ BlockWithChangesInfo, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, }; @@ -518,7 +521,7 @@ mod tests { use crate::flat::storage::FlatStorageInner; use crate::flat::test_utils::MockChain; use crate::flat::types::FlatStorageError; - use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus}; + use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use crate::test_utils::create_test_store; use crate::StorageError; use assert_matches::assert_matches; @@ -536,10 +539,9 @@ mod tests { // Create a chain with two forks. Set flat head to be at block 0. let chain = MockChain::chain_with_two_forks(5); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); @@ -551,7 +553,7 @@ mod tests { prev_block_with_changes: None, }, }; - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); } store_update.commit().unwrap(); @@ -584,7 +586,7 @@ mod tests { // Corrupt DB state for block 3 and try moving flat head to it. // Should result in `StorageInternalError` indicating that flat storage is broken. let mut store_update = store.store_update(); - store_helper::remove_delta(&mut store_update, shard_uid, chain.get_block_hash(3)); + store_update.remove_delta(shard_uid, chain.get_block_hash(3)); store_update.commit().unwrap(); assert_matches!( flat_storage.update_flat_head_impl(&chain.get_block_hash(3), true), @@ -615,10 +617,9 @@ mod tests { } }); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); @@ -630,7 +631,7 @@ mod tests { prev_block_with_changes: None, }, }; - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); } store_update.commit().unwrap(); @@ -658,10 +659,9 @@ mod tests { // Create a linear chain where some heights are skipped. let chain = MockChain::linear_chain_with_skips(5); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); @@ -673,7 +673,7 @@ mod tests { prev_block_with_changes: None, }, }; - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); } store_update.commit().unwrap(); @@ -697,19 +697,13 @@ mod tests { // Block i sets value for key &[1] to &[i]. let mut chain = MockChain::linear_chain(10); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); for i in 1..10 { let delta = FlatStateDelta { changes: FlatStateChanges::from([( @@ -721,7 +715,7 @@ mod tests { prev_block_with_changes: None, }, }; - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); } store_update.commit().unwrap(); @@ -770,22 +764,13 @@ mod tests { assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1]))); assert_eq!(chunk_view1.get_value(&[1]).unwrap(), Some(FlatStateValue::value_ref(&[4]))); assert_eq!(chunk_view1.get_value(&[2]).unwrap(), None); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(5)).unwrap(), - Some(_) - ); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(10)).unwrap(), - Some(_) - ); + assert_matches!(store.get_delta(shard_uid, chain.get_block_hash(5)).unwrap(), Some(_)); + assert_matches!(store.get_delta(shard_uid, chain.get_block_hash(10)).unwrap(), Some(_)); // 5. Move the flat head to block 5, verify that chunk_view0 still returns the same values // and chunk_view1 returns an error. Also check that DBCol::FlatState is updated correctly flat_storage.update_flat_head_impl(&chain.get_block_hash(5), true).unwrap(); - assert_eq!( - store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), - Some(FlatStateValue::value_ref(&[5])) - ); + assert_eq!(store.get(shard_uid, &[1]).unwrap(), Some(FlatStateValue::value_ref(&[5]))); let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 5); assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None); @@ -794,31 +779,19 @@ mod tests { chunk_view1.get_value(&[1]), Err(StorageError::FlatStorageBlockNotSupported(_)) ); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(5)).unwrap(), - None - ); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(10)).unwrap(), - Some(_) - ); + assert_matches!(store.get_delta(shard_uid, chain.get_block_hash(5)).unwrap(), None); + assert_matches!(store.get_delta(shard_uid, chain.get_block_hash(10)).unwrap(), Some(_)); // 6. Move the flat head to block 10, verify that chunk_view0 still returns the same values // Also checks that DBCol::FlatState is updated correctly. flat_storage.update_flat_head_impl(&chain.get_block_hash(10), true).unwrap(); let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 0); - assert_eq!(store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), None); - assert_eq!( - store_helper::get_flat_state_value(&store, shard_uid, &[2]).unwrap(), - Some(FlatStateValue::value_ref(&[1])) - ); + assert_eq!(store.get(shard_uid, &[1]).unwrap(), None); + assert_eq!(store.get(shard_uid, &[2]).unwrap(), Some(FlatStateValue::value_ref(&[1]))); assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None); assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1]))); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(10)).unwrap(), - None - ); + assert_matches!(store.get_delta(shard_uid, chain.get_block_hash(10)).unwrap(), None); } #[test] @@ -828,19 +801,13 @@ mod tests { let num_blocks = 15; let chain = MockChain::linear_chain(num_blocks); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -855,13 +822,13 @@ mod tests { // Simulates `Chain::save_flat_state_changes()`. let prev_block_with_changes = if changes.0.is_empty() { - store_helper::get_prev_block_with_changes( - &store, - shard_uid, - chain.get_block(i).hash, - chain.get_block(i).prev_hash, - ) - .unwrap() + store + .get_prev_block_with_changes( + shard_uid, + chain.get_block(i).hash, + chain.get_block(i).prev_hash, + ) + .unwrap() } else { None }; @@ -873,7 +840,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } @@ -893,9 +860,7 @@ mod tests { // Don't check the first block because it may be a block with no changes. for i in 1..blocks.len() { let block_hash = blocks[i]; - let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) - .unwrap() - .unwrap(); + let delta = store.get_delta(shard_uid, block_hash).unwrap().unwrap(); assert!( !delta.0.is_empty(), "i: {i}, block_hash: {block_hash:?}, delta: {delta:?}" @@ -926,19 +891,13 @@ mod tests { let num_blocks = 10; let chain = MockChain::linear_chain(num_blocks); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -946,13 +905,13 @@ mod tests { // No changes. let changes = FlatStateChanges::default(); // Simulates `Chain::save_flat_state_changes()`. - let prev_block_with_changes = store_helper::get_prev_block_with_changes( - &store, - shard_uid, - chain.get_block(i).hash, - chain.get_block(i).prev_hash, - ) - .unwrap(); + let prev_block_with_changes = store + .get_prev_block_with_changes( + shard_uid, + chain.get_block(i).hash, + chain.get_block(i).prev_hash, + ) + .unwrap(); let delta = FlatStateDelta { changes, metadata: FlatStateDeltaMetadata { @@ -961,7 +920,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } @@ -990,19 +949,13 @@ mod tests { let mut rng = thread_rng(); let chain = MockChain::linear_chain(num_blocks); let shard_uid = ShardUId::single_shard(); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -1017,13 +970,13 @@ mod tests { // Simulates `Chain::save_flat_state_changes()`. let prev_block_with_changes = if changes.0.is_empty() { - store_helper::get_prev_block_with_changes( - &store, - shard_uid, - chain.get_block(i).hash, - chain.get_block(i).prev_hash, - ) - .unwrap() + store + .get_prev_block_with_changes( + shard_uid, + chain.get_block(i).hash, + chain.get_block(i).prev_hash, + ) + .unwrap() } else { None }; @@ -1035,7 +988,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } @@ -1061,9 +1014,7 @@ mod tests { // Don't check the first block because it may be a block with no changes. for i in 1..blocks.len() { let block_hash = blocks[i]; - let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) - .unwrap() - .unwrap(); + let delta = store.get_delta(shard_uid, block_hash).unwrap().unwrap(); assert!( !delta.0.is_empty(), "i: {i}, block_hash: {block_hash:?}, delta: {delta:?}" @@ -1074,9 +1025,7 @@ mod tests { let flat_head_height = hashes.get(&flat_head_hash).unwrap(); let flat_head_lag = i - flat_head_height; - let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) - .unwrap() - .unwrap(); + let delta = store.get_delta(shard_uid, block_hash).unwrap().unwrap(); let has_changes = !delta.0.is_empty(); tracing::info!(?i, has_changes, ?flat_head_lag); max_lag = max_lag.max(Some(flat_head_lag)); @@ -1095,19 +1044,13 @@ mod tests { tracing::info!("Case 1"); let num_blocks = 10; let chain = MockChain::linear_chain(num_blocks); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -1125,7 +1068,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } @@ -1151,19 +1094,13 @@ mod tests { tracing::info!("Case 2"); let num_blocks = 20; let chain = MockChain::linear_chain(num_blocks); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -1196,7 +1133,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } @@ -1242,19 +1179,13 @@ mod tests { tracing::info!("Case 3"); let num_blocks = 20; let chain = MockChain::linear_chain(num_blocks); - let store = create_test_store(); + let store = create_test_store().flat_store(); let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - vec![1], - Some(FlatStateValue::value_ref(&[0])), - ); + store_update.set(shard_uid, vec![1], Some(FlatStateValue::value_ref(&[0]))); store_update.commit().unwrap(); for i in 1..num_blocks as BlockHeight { @@ -1287,7 +1218,7 @@ mod tests { }, }; tracing::info!(?i, ?delta); - store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.set_delta(shard_uid, &delta); store_update.commit().unwrap(); } diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs deleted file mode 100644 index 472c7b0816d..00000000000 --- a/core/store/src/flat/store_helper.rs +++ /dev/null @@ -1,285 +0,0 @@ -//! This file contains helper functions for accessing flat storage data in DB -//! TODO(#8577): remove this file and move functions to the corresponding structs - -use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; -use super::types::{FlatStateIterator, FlatStorageResult, FlatStorageStatus}; -use crate::flat::delta::{BlockWithChangesInfo, FlatStateChanges, KeyForFlatStateDelta}; -use crate::flat::types::FlatStorageError; -use crate::flat::FlatStorageReadyStatus; -use crate::{DBCol, Store, StoreUpdate}; -use borsh::BorshDeserialize; -use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::ShardUId; -use near_primitives::state::FlatStateValue; -use std::io; - -pub fn get_delta_changes( - store: &Store, - shard_uid: ShardUId, - block_hash: CryptoHash, -) -> FlatStorageResult> { - let key = KeyForFlatStateDelta { shard_uid, block_hash }; - store.get_ser::(DBCol::FlatStateChanges, &key.to_bytes()).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read delta changes for {key:?}: {err}" - )) - }) -} - -pub fn get_all_deltas_metadata( - store: &Store, - shard_uid: ShardUId, -) -> FlatStorageResult> { - store - .iter_prefix_ser(DBCol::FlatStateDeltaMetadata, &shard_uid.to_bytes()) - .map(|res| { - res.map(|(_, value)| value).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read delta metadata: {err}" - )) - }) - }) - .collect() -} - -/// Retrieves a row of `FlatStateDeltaMetadata` for the given key. -fn get_delta_metadata( - store: &Store, - shard_uid: ShardUId, - block_hash: CryptoHash, -) -> FlatStorageResult> { - let key = KeyForFlatStateDelta { shard_uid, block_hash }.to_bytes(); - store.get_ser(DBCol::FlatStateDeltaMetadata, &key).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read delta metadata for {key:?}: {err}" - )) - }) -} - -pub fn get_prev_block_with_changes( - store: &Store, - shard_uid: ShardUId, - block_hash: CryptoHash, - prev_hash: CryptoHash, -) -> FlatStorageResult> { - let prev_delta_metadata = get_delta_metadata(store, shard_uid, prev_hash)?; - let prev_block_with_changes = match prev_delta_metadata { - None => { - // DeltaMetadata not found, which means the prev block is the flat head. - let flat_storage_status = get_flat_storage_status(store, shard_uid)?; - match flat_storage_status { - FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) => { - if flat_head.hash == prev_hash { - Some(BlockWithChangesInfo { hash: prev_hash, height: flat_head.height }) - } else { - tracing::error!(target: "store", ?block_hash, ?prev_hash, "Missing delta metadata"); - None - } - } - // Don't do any performance optimizations while flat storage is not ready. - _ => None, - } - } - Some(metadata) => { - // If the prev block contains `prev_block_with_changes`, then use that value. - // Otherwise reference the prev block. - Some(metadata.prev_block_with_changes.unwrap_or(BlockWithChangesInfo { - hash: metadata.block.hash, - height: metadata.block.height, - })) - } - }; - Ok(prev_block_with_changes) -} - -pub fn set_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, delta: &FlatStateDelta) { - let key = KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes(); - store_update - .set_ser(DBCol::FlatStateChanges, &key, &delta.changes) - .expect("Borsh should not have failed here"); - store_update - .set_ser(DBCol::FlatStateDeltaMetadata, &key, &delta.metadata) - .expect("Borsh should not have failed here"); -} - -pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_hash: CryptoHash) { - let key = KeyForFlatStateDelta { shard_uid, block_hash }.to_bytes(); - store_update.delete(DBCol::FlatStateChanges, &key); - store_update.delete(DBCol::FlatStateDeltaMetadata, &key); -} - -fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) { - let key_from = shard_uid.to_bytes(); - let key_to = ShardUId::next_shard_prefix(&key_from); - store_update.delete_range(col, &key_from, &key_to); -} - -pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges); - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata); -} - -pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState); -} - -pub fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::State); -} - -pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { - let mut buffer = vec![]; - buffer.extend_from_slice(&shard_uid.to_bytes()); - buffer.extend_from_slice(key); - buffer -} - -pub fn decode_flat_state_db_key(key: &[u8]) -> io::Result<(ShardUId, Vec)> { - let (shard_uid_bytes, trie_key) = key.split_at_checked(8).ok_or_else(|| { - io::Error::other(format!("expected FlatState key length to be at least 8: {key:?}")) - })?; - let shard_uid = shard_uid_bytes.try_into().map_err(|err| { - io::Error::other(format!("failed to decode shard_uid as part of FlatState key: {err}")) - })?; - Ok((shard_uid, trie_key.to_vec())) -} - -pub fn get_flat_state_value( - store: &Store, - shard_uid: ShardUId, - key: &[u8], -) -> FlatStorageResult> { - let db_key = encode_flat_state_db_key(shard_uid, key); - store.get_ser(DBCol::FlatState, &db_key).map_err(|err| { - FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}")) - }) -} - -// TODO(#8577): make pub(crate) once flat storage creator is moved inside `flat` module. -pub fn set_flat_state_value( - store_update: &mut StoreUpdate, - shard_uid: ShardUId, - key: Vec, - value: Option, -) { - let db_key = encode_flat_state_db_key(shard_uid, &key); - match value { - Some(value) => store_update - .set_ser(DBCol::FlatState, &db_key, &value) - .expect("Borsh should not have failed here"), - None => store_update.delete(DBCol::FlatState, &db_key), - } -} - -pub fn get_flat_storage_status( - store: &Store, - shard_uid: ShardUId, -) -> FlatStorageResult { - store - .get_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes()) - .map(|status| status.unwrap_or(FlatStorageStatus::Empty)) - .map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read flat storage status: {err}" - )) - }) -} - -pub fn set_flat_storage_status( - store_update: &mut StoreUpdate, - shard_uid: ShardUId, - status: FlatStorageStatus, -) { - store_update - .set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &status) - .expect("Borsh should not have failed here") -} - -/// Returns iterator over flat storage entries for a given shard and range of -/// state keys. `None` means that there is no bound in respective direction. -/// It reads data only from `FlatState` column which represents the state at -/// flat storage head. Reads only committed changes. -pub fn iter_flat_state_entries<'a>( - shard_uid: ShardUId, - store: &'a Store, - from: Option<&[u8]>, - to: Option<&[u8]>, -) -> FlatStateIterator<'a> { - // If left direction is unbounded, encoded `shard_uid` serves as the - // smallest possible key in DB for the shard. - let db_key_from = match from { - Some(from) => encode_flat_state_db_key(shard_uid, from), - None => shard_uid.to_bytes().to_vec(), - }; - // If right direction is unbounded, `ShardUId::next_shard_prefix` serves as - // the key which is strictly bigger than all keys in DB for this shard and - // still doesn't include keys from other shards. - let db_key_to = match to { - Some(to) => encode_flat_state_db_key(shard_uid, to), - None => ShardUId::next_shard_prefix(&shard_uid.to_bytes()).to_vec(), - }; - let iter = - store.iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)).map(|result| { - match result { - Ok((key, value)) => Ok(( - decode_flat_state_db_key(&key) - .map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "invalid FlatState key format: {err}" - )) - })? - .1, - FlatStateValue::try_from_slice(&value).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "invalid FlatState value format: {err}" - )) - })?, - )), - Err(err) => Err(FlatStorageError::StorageInternalError(format!( - "FlatState iterator error: {err}" - ))), - } - }); - Box::new(iter) -} - -#[cfg(test)] -mod tests { - use crate::flat::store_helper::set_flat_state_value; - use crate::test_utils::create_test_store; - use near_primitives::shard_layout::ShardUId; - use near_primitives::state::FlatStateValue; - - #[test] - fn iter_flat_state_entries() { - // Setup shards and store - let store = create_test_store(); - let shard_uids = [0, 1, 2].map(|id| ShardUId { version: 0, shard_id: id }); - - for (i, shard_uid) in shard_uids.iter().enumerate() { - let mut store_update = store.store_update(); - let key: Vec = vec![0, 1, i as u8]; - let val: Vec = vec![0, 1, 2, i as u8]; - - // Add value to FlatState - set_flat_state_value( - &mut store_update, - *shard_uid, - key.clone(), - Some(FlatStateValue::inlined(&val)), - ); - - store_update.commit().unwrap(); - } - - for (i, shard_uid) in shard_uids.iter().enumerate() { - let entries: Vec<_> = - super::iter_flat_state_entries(*shard_uid, &store, None, None).collect(); - assert_eq!(entries.len(), 1); - let key: Vec = vec![0, 1, i as u8]; - let val: Vec = vec![0, 1, 2, i as u8]; - - assert_eq!(entries, vec![Ok((key, FlatStateValue::inlined(&val)))]); - } - } -} diff --git a/core/store/src/genesis/initialization.rs b/core/store/src/genesis/initialization.rs index ed35943ce95..fcf77179a74 100644 --- a/core/store/src/genesis/initialization.rs +++ b/core/store/src/genesis/initialization.rs @@ -16,9 +16,9 @@ use near_primitives::{ use tracing::{error, info, warn}; use crate::{ - flat::FlatStorageManager, genesis::GenesisStateApplier, get_genesis_hash, - get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, ShardTries, - StateSnapshotConfig, Store, TrieConfig, + adapter::StoreAdapter, flat::FlatStorageManager, genesis::GenesisStateApplier, + get_genesis_hash, get_genesis_state_roots, set_genesis_hash, set_genesis_state_roots, + ShardTries, StateSnapshotConfig, Store, TrieConfig, }; const STATE_DUMP_FILE: &str = "state_dump"; @@ -132,7 +132,7 @@ fn genesis_state_from_genesis( store.clone(), TrieConfig::default(), &shard_uids, - FlatStorageManager::new(store), + FlatStorageManager::new(store.flat_store()), StateSnapshotConfig::default(), ); diff --git a/core/store/src/genesis/state_applier.rs b/core/store/src/genesis/state_applier.rs index e0c4a187883..349c19b432c 100644 --- a/core/store/src/genesis/state_applier.rs +++ b/core/store/src/genesis/state_applier.rs @@ -1,3 +1,4 @@ +use crate::adapter::StoreUpdateAdapter; use crate::flat::FlatStateChanges; use crate::{ get_account, has_received_data, set, set_access_key, set_account, set_code, @@ -143,7 +144,7 @@ impl<'a> AutoFlushingTrieUpdate<'a> { let mut store_update = self.tries.store_update(); *state_root = self.tries.apply_all(&trie_changes, self.shard_uid, &mut store_update); FlatStateChanges::from_state_changes(&state_changes) - .apply_to_flat_state(&mut store_update, self.shard_uid); + .apply_to_flat_state(&mut store_update.flat_store_update(), self.shard_uid); store_update.commit().expect("Store update failed on genesis initialization"); *state_update = Some(self.tries.new_trie_update(self.shard_uid, *state_root)); *changes = 0; diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index d03e710d8c6..cec9740cfdd 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -12,6 +12,7 @@ pub use crate::trie::{ TrieChanges, TrieConfig, TrieDBStorage, TrieStorage, WrappedTrieChanges, STATE_SNAPSHOT_COLUMNS, }; +use adapter::{StoreAdapter, StoreUpdateAdapter}; use borsh::{BorshDeserialize, BorshSerialize}; pub use columns::DBCol; use db::{SplitDB, GENESIS_CONGESTION_INFO_KEY}; @@ -43,6 +44,7 @@ use std::sync::LazyLock; use std::{fmt, io}; use strum; +pub mod adapter; pub mod cold_storage; mod columns; pub mod config; @@ -112,6 +114,12 @@ pub struct Store { storage: Arc, } +impl StoreAdapter for Store { + fn store(&self) -> Store { + self.clone() + } +} + impl NodeStorage { /// Initialises a new opener with given home directory and hot and cold /// store config. @@ -450,6 +458,12 @@ pub struct StoreUpdate { storage: Arc, } +impl StoreUpdateAdapter for StoreUpdate { + fn store_update(&mut self) -> &mut StoreUpdate { + self + } +} + impl StoreUpdate { const ONE: std::num::NonZeroU32 = match std::num::NonZeroU32::new(1) { Some(num) => num, diff --git a/core/store/src/test_utils.rs b/core/store/src/test_utils.rs index 1c66a7b29a2..577294c66c5 100644 --- a/core/store/src/test_utils.rs +++ b/core/store/src/test_utils.rs @@ -1,7 +1,6 @@ +use crate::adapter::{StoreAdapter, StoreUpdateAdapter}; use crate::db::TestDB; -use crate::flat::{ - store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus, -}; +use crate::flat::{BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus}; use crate::metadata::{DbKind, DbVersion, DB_VERSION}; use crate::{ get, get_delayed_receipt_indices, get_promise_yield_indices, DBCol, NodeStorage, ShardTries, @@ -123,7 +122,7 @@ impl TestTriesBuilder { let shard_uids = (0..self.num_shards) .map(|shard_id| ShardUId { shard_id: shard_id as u32, version: self.shard_version }) .collect::>(); - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); let tries = ShardTries::new( store.clone(), TrieConfig { @@ -141,8 +140,7 @@ impl TestTriesBuilder { version: self.shard_version, shard_id: shard_id.try_into().unwrap(), }; - store_helper::set_flat_storage_status( - &mut store_update, + store_update.flat_store_update().set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: BlockInfo::genesis(CryptoHash::default(), 0), @@ -220,17 +218,15 @@ pub fn test_populate_flat_storage( prev_block_hash: &CryptoHash, changes: &Vec<(Vec, Option>)>, ) { - let mut store_update = tries.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + let mut store_update = tries.get_store().flat_store().store_update(); + store_update.set_flat_storage_status( shard_uid, crate::flat::FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: BlockInfo { hash: *block_hash, prev_hash: *prev_block_hash, height: 1 }, }), ); for (key, value) in changes { - store_helper::set_flat_state_value( - &mut store_update, + store_update.set( shard_uid, key.clone(), value.as_ref().map(|value| FlatStateValue::on_disk(value)), diff --git a/core/store/src/trie/from_flat.rs b/core/store/src/trie/from_flat.rs index 71cf52684ea..b476ee25492 100644 --- a/core/store/src/trie/from_flat.rs +++ b/core/store/src/trie/from_flat.rs @@ -1,4 +1,5 @@ -use crate::flat::{store_helper, FlatStorageError, FlatStorageManager}; +use crate::adapter::StoreAdapter; +use crate::flat::{FlatStorageError, FlatStorageManager}; use crate::{ShardTries, StateSnapshotConfig, Store, Trie, TrieConfig, TrieDBStorage, TrieStorage}; use near_primitives::{shard_layout::ShardUId, state::FlatStateValue}; use std::time::Instant; @@ -24,15 +25,15 @@ pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: Sha (key, value) }; - let mut iter = store_helper::iter_flat_state_entries(shard_uid, &store, None, None) - .map(flat_state_to_trie_kv); + let store = store.flat_store(); + let mut iter = store.iter(shard_uid).map(flat_state_to_trie_kv); // new ShardTries for write storage location let tries = ShardTries::new( write_store.clone(), TrieConfig::default(), &[shard_uid], - FlatStorageManager::new(write_store), + FlatStorageManager::new(write_store.flat_store()), StateSnapshotConfig::default(), ); let mut trie_root = Trie::EMPTY_ROOT; diff --git a/core/store/src/trie/mem/loading.rs b/core/store/src/trie/mem/loading.rs index b98e3b15d5c..727fef5a6de 100644 --- a/core/store/src/trie/mem/loading.rs +++ b/core/store/src/trie/mem/loading.rs @@ -1,9 +1,8 @@ use super::arena::single_thread::STArena; use super::mem_tries::MemTries; use super::node::MemTrieNodeId; -use crate::flat::store_helper::{ - decode_flat_state_db_key, get_all_deltas_metadata, get_delta_changes, get_flat_storage_status, -}; +use crate::adapter::flat_store::decode_flat_state_db_key; +use crate::adapter::StoreAdapter; use crate::flat::{FlatStorageError, FlatStorageStatus}; use crate::trie::mem::arena::Arena; use crate::trie::mem::construction::TrieConstructor; @@ -129,7 +128,8 @@ pub fn load_trie_from_flat_state_and_delta( parallelize: bool, ) -> Result { debug!(target: "memtrie", %shard_uid, "Loading base trie from flat state..."); - let flat_head = match get_flat_storage_status(&store, shard_uid)? { + let flat_store = store.flat_store(); + let flat_head = match flat_store.get_flat_storage_status(shard_uid)? { FlatStorageStatus::Ready(status) => status.flat_head, other => { return Err(StorageError::MemTrieLoadingError(format!( @@ -152,13 +152,13 @@ pub fn load_trie_from_flat_state_and_delta( // We load the deltas in order of height, so that we always have the previous state root // already loaded. let mut sorted_deltas: BTreeSet<(BlockHeight, CryptoHash, CryptoHash)> = Default::default(); - for delta in get_all_deltas_metadata(&store, shard_uid).unwrap() { + for delta in flat_store.get_all_deltas_metadata(shard_uid).unwrap() { sorted_deltas.insert((delta.block.height, delta.block.hash, delta.block.prev_hash)); } debug!(target: "memtrie", %shard_uid, "{} deltas to apply", sorted_deltas.len()); for (height, hash, prev_hash) in sorted_deltas.into_iter() { - let delta = get_delta_changes(&store, shard_uid, hash).unwrap(); + let delta = flat_store.get_delta(shard_uid, hash).unwrap(); if let Some(changes) = delta { let old_state_root = get_state_root(store, prev_hash, shard_uid)?; let new_state_root = get_state_root(store, hash, shard_uid)?; @@ -187,8 +187,9 @@ pub fn load_trie_from_flat_state_and_delta( #[cfg(test)] mod tests { use super::load_trie_from_flat_state_and_delta; + use crate::adapter::StoreAdapter; use crate::flat::test_utils::MockChain; - use crate::flat::{store_helper, BlockInfo, FlatStorageReadyStatus, FlatStorageStatus}; + use crate::flat::{BlockInfo, FlatStorageReadyStatus, FlatStorageStatus}; use crate::test_utils::{ create_test_store, simplify_changes, test_populate_flat_storage, test_populate_trie, TestTriesBuilder, @@ -392,18 +393,12 @@ mod tests { let shard_uid = ShardUId { version: 1, shard_id: 1 }; // Populate the initial flat storage state at block 0. - let mut store_update = shard_tries.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + let mut store_update = shard_tries.get_store().flat_store().store_update(); + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), ); - store_helper::set_flat_state_value( - &mut store_update, - shard_uid, - test_key.to_vec(), - Some(FlatStateValue::inlined(&test_val0)), - ); + store_update.set(shard_uid, test_key.to_vec(), Some(FlatStateValue::inlined(&test_val0))); store_update.commit().unwrap(); // Populate the initial trie at block 0 too. @@ -511,7 +506,8 @@ mod tests { shard_uid, &state_changes, ) - .unwrap(), + .unwrap() + .into(), ); store_update.commit().unwrap(); diff --git a/core/store/src/trie/resharding_v2.rs b/core/store/src/trie/resharding_v2.rs index e0455893768..d02055001cb 100644 --- a/core/store/src/trie/resharding_v2.rs +++ b/core/store/src/trie/resharding_v2.rs @@ -1,3 +1,4 @@ +use crate::adapter::StoreUpdateAdapter; use crate::flat::FlatStateChanges; use crate::{ get, get_delayed_receipt_indices, get_promise_yield_indices, set, ShardTries, StoreUpdate, @@ -72,7 +73,7 @@ impl ShardTries { let mut store_update = self.store_update(); for (shard_uid, changes) in changes_by_shard { FlatStateChanges::from_raw_key_value(&changes) - .apply_to_flat_state(&mut store_update, shard_uid); + .apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); // Here we assume that state_roots contains shard_uid, the caller of this method will guarantee that. let trie_changes = self.get_trie_for_shard(shard_uid, state_roots[&shard_uid]).update(changes)?; @@ -136,7 +137,7 @@ impl ShardTries { let (_, trie_changes, state_changes) = update.finalize()?; let state_root = self.apply_all(&trie_changes, shard_uid, &mut store_update); FlatStateChanges::from_state_changes(&state_changes) - .apply_to_flat_state(&mut store_update, shard_uid); + .apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); new_state_roots.insert(shard_uid, state_root); } Ok((store_update, new_state_roots)) diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 8b83b6d02ff..f6990b191ea 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -1,7 +1,6 @@ use super::mem::mem_tries::MemTries; use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; use super::TrieRefcountSubtraction; -use crate::flat::store_helper::remove_all_state_values; use crate::flat::{FlatStorageManager, FlatStorageStatus}; use crate::trie::config::TrieConfig; use crate::trie::mem::loading::load_trie_from_flat_state_and_delta; @@ -404,7 +403,13 @@ impl ShardTries { // Clear both caches and remove state values from store let _cache = self.0.caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); let _view_cache = self.0.view_caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); - remove_all_state_values(store_update, shard_uid); + Self::remove_all_state_values(store_update, shard_uid); + } + + fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { + let key_from = shard_uid.to_bytes(); + let key_to = ShardUId::next_shard_prefix(&key_from); + store_update.delete_range(DBCol::State, &key_from, &key_to); } /// Retains in-memory tries for given shards, i.e. unload tries from memory for shards that are NOT @@ -743,6 +748,7 @@ impl KeyForStateChanges { #[cfg(test)] mod test { + use crate::adapter::StoreAdapter; use crate::{ config::TrieCacheConfig, test_utils::create_test_store, trie::DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT, TrieConfig, @@ -768,7 +774,7 @@ mod test { store.clone(), trie_config, &shard_uids, - FlatStorageManager::new(store), + FlatStorageManager::new(store.flat_store()), StateSnapshotConfig::default(), ) } @@ -886,7 +892,7 @@ mod test { store.clone(), trie_config, &shard_uids, - FlatStorageManager::new(store), + FlatStorageManager::new(store.flat_store()), StateSnapshotConfig::default(), ); diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 7a0ee49ad20..23c89b19b6f 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -119,8 +119,7 @@ impl Trie { Some(NibbleSlice::nibbles_to_bytes(&nibbles_end)) }; - Ok(flat_storage_chunk_view - .iter_flat_state_entries(key_begin.as_deref(), key_end.as_deref())) + Ok(flat_storage_chunk_view.iter_range(key_begin.as_deref(), key_end.as_deref())) } /// Determines the boundaries of a state part by accessing the Trie (i.e. State column). @@ -518,6 +517,7 @@ mod tests { use near_primitives::hash::{hash, CryptoHash}; + use crate::adapter::StoreUpdateAdapter; use crate::test_utils::{gen_changes, test_populate_trie, TestTriesBuilder}; use crate::trie::iterator::CrumbStatus; use crate::trie::{ @@ -1205,7 +1205,7 @@ mod tests { state_items.into_iter().map(|(k, v)| (k, Some(FlatStateValue::inlined(&v)))); let delta = FlatStateChanges::from(changes_for_delta); let mut store_update = tries.store_update(); - delta.apply_to_flat_state(&mut store_update, shard_uid); + delta.apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); store_update.commit().unwrap(); let (partial_state, nibbles_begin, nibbles_end) = @@ -1253,7 +1253,7 @@ mod tests { // is invalid. let mut store_update = tries.store_update(); let delta = FlatStateChanges::from(vec![(b"ba".to_vec(), None)]); - delta.apply_to_flat_state(&mut store_update, shard_uid); + delta.apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); store_update.commit().unwrap(); assert_matches!( diff --git a/core/store/src/trie/state_snapshot.rs b/core/store/src/trie/state_snapshot.rs index 43e2c1f75d9..d1874af6724 100644 --- a/core/store/src/trie/state_snapshot.rs +++ b/core/store/src/trie/state_snapshot.rs @@ -1,3 +1,4 @@ +use crate::adapter::StoreAdapter; use crate::config::StateSnapshotType; use crate::db::STATE_SNAPSHOT_KEY; use crate::flat::{FlatStorageManager, FlatStorageStatus}; @@ -218,7 +219,7 @@ impl ShardTries { // It is fine to create a separate FlatStorageManager, because // it is used only for reading flat storage in the snapshot a // doesn't introduce memory overhead. - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); *state_snapshot_lock = Some(StateSnapshot::new( store, prev_block_hash, @@ -361,7 +362,7 @@ impl ShardTries { let opener = NodeStorage::opener(&snapshot_path, false, &store_config, None); let storage = opener.open_in_mode(Mode::ReadOnly)?; let store = storage.get_hot_store(); - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); let shard_uids = get_shard_uids_fn(snapshot_hash)?; let mut guard = self.state_snapshot().write().unwrap(); diff --git a/genesis-tools/genesis-populate/src/lib.rs b/genesis-tools/genesis-populate/src/lib.rs index 72b1bf3e3aa..905dc8fa469 100644 --- a/genesis-tools/genesis-populate/src/lib.rs +++ b/genesis-tools/genesis-populate/src/lib.rs @@ -21,6 +21,7 @@ use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{AccountId, Balance, EpochId, ShardId, StateChangeCause, StateRoot}; use near_primitives::utils::to_timestamp; use near_primitives::version::ProtocolFeature; +use near_store::adapter::StoreUpdateAdapter; use near_store::genesis::{compute_storage_usage, initialize_genesis_state}; use near_store::{ get_account, get_genesis_state_roots, set_access_key, set_account, set_code, Store, TrieUpdate, @@ -203,7 +204,7 @@ impl GenesisBuilder { let mut store_update = tries.store_update(); let root = tries.apply_all(&trie_changes, shard_uid, &mut store_update); near_store::flat::FlatStateChanges::from_state_changes(&state_changes) - .apply_to_flat_state(&mut store_update, shard_uid); + .apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); store_update.commit()?; self.roots.insert(shard_idx, root); diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index fd3d30e114a..1e63f0e357c 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -13,9 +13,10 @@ use near_primitives::transaction::SignedTransaction; use near_primitives::trie_key::TrieKey; use near_primitives::types::AccountId; use near_primitives_core::types::BlockHeight; +use near_store::adapter::StoreAdapter; use near_store::flat::{ - store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageManager, - FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, + FetchingStateStatus, FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, + FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, }; use near_store::test_utils::create_test_store; use near_store::trie::TrieNodesCount; @@ -46,16 +47,16 @@ fn wait_for_flat_storage_creation( shard_uid: ShardUId, produce_blocks: bool, ) -> BlockHeight { - let store = env.clients[0].runtime_adapter.store().clone(); + let store = env.clients[0].runtime_adapter.store().flat_store(); let mut next_height = start_height; - let mut prev_status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap(); + let mut prev_status = store.get_flat_storage_status(shard_uid).unwrap(); while next_height < start_height + CREATION_TIMEOUT { if produce_blocks { env.produce_block(0, next_height); } env.clients[0].run_flat_storage_creation_step().unwrap(); - let status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap(); + let status = store.get_flat_storage_status(shard_uid).unwrap(); // Check validity of state transition for flat storage creation. match &prev_status { FlatStorageStatus::Empty => assert_matches!( @@ -109,8 +110,7 @@ fn wait_for_flat_storage_creation( // We don't expect any forks in the chain after flat storage head, so the number of // deltas stored on DB should be exactly 2, as there are only 2 blocks after // the final block. - let deltas_in_metadata = - store_helper::get_all_deltas_metadata(&store, shard_uid).unwrap().len() as u64; + let deltas_in_metadata = store.get_all_deltas_metadata(shard_uid).unwrap().len() as u64; assert_eq!(deltas_in_metadata, 2); next_height @@ -122,11 +122,11 @@ fn test_flat_storage_creation_sanity() { init_test_logger(); let genesis = Genesis::test(vec!["test0".parse().unwrap()], 1); let shard_uid = genesis.config.shard_layout.shard_uids().next().unwrap(); - let store = create_test_store(); + let store = create_test_store().flat_store(); // Process some blocks with flat storage. Then remove flat storage data from disk. { - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0").into(); let genesis_hash = *env.clients[0].chain.genesis().hash(); @@ -149,7 +149,7 @@ fn test_flat_storage_creation_sanity() { let flat_head_height = START_HEIGHT - 4; let expected_flat_storage_head = env.clients[0].chain.get_block_hash_by_height(flat_head_height).unwrap(); - let status = store_helper::get_flat_storage_status(&store, shard_uid); + let status = store.get_flat_storage_status(shard_uid); if let Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) = status { assert_eq!(flat_head.hash, expected_flat_storage_head); assert_eq!(flat_head.height, flat_head_height); @@ -160,14 +160,14 @@ fn test_flat_storage_creation_sanity() { // Deltas for blocks until `flat_head_height` should not exist. for height in 0..=flat_head_height { let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); - assert_eq!(store_helper::get_delta_changes(&store, shard_uid, block_hash), Ok(None)); + assert_eq!(store.get_delta(shard_uid, block_hash), Ok(None)); } // Deltas for blocks until `START_HEIGHT` should still exist, // because they come after flat storage head. for height in flat_head_height + 1..START_HEIGHT { let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, block_hash), + store.get_delta(shard_uid, block_hash), Ok(Some(_)), "height: {height}" ); @@ -182,21 +182,18 @@ fn test_flat_storage_creation_sanity() { // Create new chain and runtime using the same store. It should produce next blocks normally, but now it should // think that flat storage does not exist and background creation should be initiated. - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); for height in START_HEIGHT..START_HEIGHT + 2 { env.produce_block(0, height); } assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); - assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Empty) - ); + assert_eq!(store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Empty)); assert!(!env.clients[0].run_flat_storage_creation_step().unwrap()); // At first, flat storage state should start saving deltas. Deltas for all newly processed blocks should be saved to // disk. assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), + store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas)) ); // Introduce fork block to check that deltas for it will be GC-d later. @@ -207,14 +204,8 @@ fn test_flat_storage_creation_sanity() { env.process_block(0, fork_block, Provenance::PRODUCED); env.process_block(0, next_block, Provenance::PRODUCED); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, fork_block_hash), - Ok(Some(_)) - ); - assert_matches!( - store_helper::get_delta_changes(&store, shard_uid, next_block_hash), - Ok(Some(_)) - ); + assert_matches!(store.get_delta(shard_uid, fork_block_hash), Ok(Some(_))); + assert_matches!(store.get_delta(shard_uid, next_block_hash), Ok(Some(_))); // Produce new block and run flat storage creation step. // We started the node from height `START_HEIGHT - 1`, and now final head should move to height `START_HEIGHT`. @@ -224,7 +215,7 @@ fn test_flat_storage_creation_sanity() { assert!(!env.clients[0].run_flat_storage_creation_step().unwrap()); let final_block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT).unwrap(); assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), + store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( FetchingStateStatus { block_hash: final_block_hash, @@ -246,11 +237,11 @@ fn test_flat_storage_creation_two_shards() { let genesis = Genesis::test_sharded_new_version(vec!["test0".parse().unwrap()], 1, vec![1; num_shards]); let shard_uids: Vec<_> = genesis.config.shard_layout.shard_uids().collect(); - let store = create_test_store(); + let store = create_test_store().flat_store(); // Process some blocks with flat storages for two shards. Then remove flat storage data from disk for shard 0. { - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0").into(); let genesis_hash = *env.clients[0].chain.genesis().hash(); @@ -270,7 +261,7 @@ fn test_flat_storage_creation_two_shards() { for &shard_uid in shard_uids.iter() { assert_matches!( - store_helper::get_flat_storage_status(&store, shard_uid), + store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Ready(_)) ); } @@ -283,17 +274,11 @@ fn test_flat_storage_creation_two_shards() { } // Check that flat storage is not ready for shard 0 but ready for shard 1. - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[0]).is_none()); - assert_matches!( - store_helper::get_flat_storage_status(&store, shard_uids[0]), - Ok(FlatStorageStatus::Empty) - ); + assert_matches!(store.get_flat_storage_status(shard_uids[0]), Ok(FlatStorageStatus::Empty)); assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[1]).is_some()); - assert_matches!( - store_helper::get_flat_storage_status(&store, shard_uids[1]), - Ok(FlatStorageStatus::Ready(_)) - ); + assert_matches!(store.get_flat_storage_status(shard_uids[1]), Ok(FlatStorageStatus::Ready(_))); wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uids[0], true); } @@ -308,21 +293,18 @@ fn test_flat_storage_creation_start_from_state_part() { (0..4).map(|i| AccountId::from_str(&format!("test{}", i)).unwrap()).collect::>(); let genesis = Genesis::test(accounts, 1); let shard_uid = genesis.config.shard_layout.shard_uids().next().unwrap(); - let store = create_test_store(); + let store = create_test_store().flat_store(); // Process some blocks with flat storage. // Reshard into two parts and return trie keys corresponding to each part. const NUM_PARTS: u64 = 2; let trie_keys: Vec<_> = { - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); for height in 1..START_HEIGHT { env.produce_block(0, height); } - assert_matches!( - store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Ready(_)) - ); + assert_matches!(store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Ready(_))); let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); let state_root = @@ -353,7 +335,7 @@ fn test_flat_storage_creation_start_from_state_part() { { // Remove keys of part 1 from the flat state. // Manually set flat storage creation status to the step when it should start from fetching part 1. - let status = store_helper::get_flat_storage_status(&store, shard_uid); + let status = store.get_flat_storage_status(shard_uid); let flat_head = if let Ok(FlatStorageStatus::Ready(ready_status)) = status { ready_status.flat_head.hash } else { @@ -361,10 +343,9 @@ fn test_flat_storage_creation_start_from_state_part() { }; let mut store_update = store.store_update(); for key in trie_keys[1].iter() { - store_helper::set_flat_state_value(&mut store_update, shard_uid, key.clone(), None); + store_update.set(shard_uid, key.clone(), None); } - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( FetchingStateStatus { @@ -378,7 +359,7 @@ fn test_flat_storage_creation_start_from_state_part() { store_update.commit().unwrap(); // Re-create runtime, check that flat storage is not created yet. - let mut env = setup_env(&genesis, store); + let mut env = setup_env(&genesis, store.store()); assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); // Run chain for a couple of blocks and check that flat storage for shard 0 is eventually created. @@ -411,12 +392,12 @@ fn test_flat_storage_creation_start_from_state_part() { fn test_catchup_succeeds_even_if_no_new_blocks() { init_test_logger(); let genesis = Genesis::test(vec!["test0".parse().unwrap()], 1); - let store = create_test_store(); + let store = create_test_store().flat_store(); let shard_uid = ShardLayout::v0_single_shard().shard_uids().next().unwrap(); // Process some blocks with flat storage. Then remove flat storage data from disk. { - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); for height in 1..START_HEIGHT { env.produce_block(0, height); } @@ -427,12 +408,9 @@ fn test_catchup_succeeds_even_if_no_new_blocks() { .unwrap(); store_update.commit().unwrap(); } - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); - assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Empty) - ); + assert_eq!(store.get_flat_storage_status(shard_uid), Ok(FlatStorageStatus::Empty)); // Create 3 more blocks (so that the deltas are generated) - and assume that no new blocks are received. // In the future, we should also support the scenario where no new blocks are created. @@ -460,17 +438,16 @@ fn test_flat_storage_iter() { shard_layout.clone(), ); - let store = create_test_store(); + let store = create_test_store().flat_store(); - let mut env = setup_env(&genesis, store.clone()); + let mut env = setup_env(&genesis, store.store()); for height in 1..START_HEIGHT { env.produce_block(0, height); } for shard_id in 0..3 { let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout); - let items: Vec<_> = - store_helper::iter_flat_state_entries(shard_uid, &store, None, None).collect(); + let items: Vec<_> = store.iter(shard_uid).collect(); match shard_id { 0 => { diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index a05c2419c44..c4c991c0110 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -65,6 +65,7 @@ use near_primitives::views::{ }; use near_primitives_core::num_rational::{Ratio, Rational32}; use near_primitives_core::types::ShardId; +use near_store::adapter::StoreUpdateAdapter; use near_store::cold_storage::{update_cold_db, update_cold_head}; use near_store::metadata::DbKind; use near_store::metadata::DB_VERSION; @@ -2420,7 +2421,7 @@ fn test_catchup_gas_price_change() { let mut store_update = store.store_update(); assert!(rt .get_flat_storage_manager() - .remove_flat_storage_for_shard(msg.shard_uid, &mut store_update) + .remove_flat_storage_for_shard(msg.shard_uid, &mut store_update.flat_store_update()) .unwrap()); store_update.commit().unwrap(); for part_id in 0..msg.num_parts { diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index d64e692de6a..266f426458c 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -20,7 +20,7 @@ use near_primitives::transaction::SignedTransaction; use near_primitives::types::BlockHeight; use near_primitives::validator_signer::{EmptyValidatorSigner, InMemoryValidatorSigner}; use near_primitives::views::{QueryRequest, QueryResponseKind}; -use near_store::flat::store_helper; +use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::DBCol; use near_store::Store; use nearcore::state_sync::StateSyncDumper; @@ -305,7 +305,10 @@ fn run_state_sync_with_dumped_parts( let mut store_update = runtime_client_1.store().store_update(); assert!(runtime_client_1 .get_flat_storage_manager() - .remove_flat_storage_for_shard(ShardUId::single_shard(), &mut store_update) + .remove_flat_storage_for_shard( + ShardUId::single_shard(), + &mut store_update.flat_store_update() + ) .unwrap()); store_update.commit().unwrap(); @@ -395,7 +398,7 @@ fn test_state_sync_w_dumped_parts() { fn count_flat_state_value_kinds(store: &Store) -> (u64, u64) { let mut num_inlined_values = 0; let mut num_ref_values = 0; - for item in store_helper::iter_flat_state_entries(ShardUId::single_shard(), store, None, None) { + for item in store.flat_store().iter(ShardUId::single_shard()) { match item { Ok((_, FlatStateValue::Ref(_))) => { num_ref_values += 1; diff --git a/integration-tests/src/tests/client/state_snapshot.rs b/integration-tests/src/tests/client/state_snapshot.rs index 6d1e6b7f85b..07fbfcef016 100644 --- a/integration-tests/src/tests/client/state_snapshot.rs +++ b/integration-tests/src/tests/client/state_snapshot.rs @@ -8,6 +8,7 @@ use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::transaction::SignedTransaction; +use near_store::adapter::StoreAdapter; use near_store::config::StateSnapshotType; use near_store::flat::FlatStorageManager; use near_store::{ @@ -42,7 +43,7 @@ impl StateSnaptshotTestEnv { view_shard_cache_config: trie_cache_config, ..TrieConfig::default() }; - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); let shard_uids = [ShardUId::single_shard()]; let state_snapshot_config = StateSnapshotConfig { state_snapshot_type: StateSnapshotType::EveryEpoch, diff --git a/integration-tests/src/tests/client/sync_state_nodes.rs b/integration-tests/src/tests/client/sync_state_nodes.rs index d7473ce51fe..71129748dbe 100644 --- a/integration-tests/src/tests/client/sync_state_nodes.rs +++ b/integration-tests/src/tests/client/sync_state_nodes.rs @@ -23,6 +23,7 @@ use near_primitives::transaction::SignedTransaction; use near_primitives::types::{BlockId, BlockReference, EpochId, EpochReference}; use near_primitives::utils::MaybeValidated; use near_primitives_core::types::ShardId; +use near_store::adapter::StoreUpdateAdapter; use near_store::DBCol; use nearcore::test_utils::TestEnvNightshadeSetupExt; use nearcore::{load_test_config, start_with_config}; @@ -678,7 +679,10 @@ fn test_dump_epoch_missing_chunk_in_last_block() { let mut store_update = store.store_update(); assert!(rt .get_flat_storage_manager() - .remove_flat_storage_for_shard(msg.shard_uid, &mut store_update) + .remove_flat_storage_for_shard( + msg.shard_uid, + &mut store_update.flat_store_update() + ) .unwrap()); store_update.commit().unwrap(); diff --git a/integration-tests/src/user/runtime_user.rs b/integration-tests/src/user/runtime_user.rs index 7be65456a57..ef9df7afa0d 100644 --- a/integration-tests/src/user/runtime_user.rs +++ b/integration-tests/src/user/runtime_user.rs @@ -20,6 +20,7 @@ use near_primitives::views::{ ExecutionOutcomeView, ExecutionOutcomeWithIdView, ExecutionStatusView, FinalExecutionOutcomeView, FinalExecutionStatus, ViewStateResult, }; +use near_store::adapter::StoreUpdateAdapter; use near_store::{ShardTries, TrieUpdate}; use node_runtime::state_viewer::TrieViewer; use node_runtime::{state_viewer::ViewApplyState, ApplyState, Runtime}; @@ -136,7 +137,7 @@ impl RuntimeUser { ); if use_flat_storage { near_store::flat::FlatStateChanges::from_state_changes(&apply_result.state_changes) - .apply_to_flat_state(&mut update, ShardUId::single_shard()); + .apply_to_flat_state(&mut update.flat_store_update(), ShardUId::single_shard()); } update.commit().unwrap(); client.state_root = apply_result.state_root; diff --git a/nearcore/src/entity_debug.rs b/nearcore/src/entity_debug.rs index 70bca547610..e074e572ebe 100644 --- a/nearcore/src/entity_debug.rs +++ b/nearcore/src/entity_debug.rs @@ -30,9 +30,9 @@ use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash}; use near_primitives::views::{ BlockHeaderView, BlockView, ChunkView, ExecutionOutcomeView, ReceiptView, SignedTransactionView, }; +use near_store::adapter::flat_store::encode_flat_state_db_key; use near_store::db::GENESIS_CONGESTION_INFO_KEY; use near_store::flat::delta::KeyForFlatStateDelta; -use near_store::flat::store_helper::encode_flat_state_db_key; use near_store::flat::{FlatStateChanges, FlatStateDeltaMetadata, FlatStorageStatus}; use near_store::{ DBCol, NibbleSlice, RawTrieNode, RawTrieNodeWithSize, ShardUId, Store, TrieCachingStorage, diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index 071527c26eb..5b72701e05e 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -15,8 +15,9 @@ use near_primitives::test_utils::MockEpochInfoProvider; use near_primitives::transaction::{ExecutionStatus, SignedTransaction}; use near_primitives::types::{Gas, MerkleHash}; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; +use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::{ - store_helper, BlockInfo, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorage, + BlockInfo, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorage, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus, }; use near_store::{ShardTries, ShardUId, StateSnapshotConfig, TrieUpdate}; @@ -77,10 +78,10 @@ impl<'c> EstimatorContext<'c> { let root = roots[0]; let shard_uid = ShardUId::single_shard(); - let flat_storage_manager = FlatStorageManager::new(store.clone()); - let mut store_update = store.store_update(); - store_helper::set_flat_storage_status( - &mut store_update, + let flat_store = store.flat_store(); + let flat_storage_manager = FlatStorageManager::new(flat_store.clone()); + let mut store_update = flat_store.store_update(); + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: BlockInfo::genesis(CryptoHash::hash_borsh(0usize), 0), @@ -358,7 +359,8 @@ impl Testbed<'_> { ) .unwrap(); - let mut store_update = self.tries.store_update(); + let store = self.tries.get_store(); + let mut store_update = store.store_update(); let shard_uid = ShardUId::single_shard(); self.root = self.tries.apply_all(&apply_result.trie_changes, shard_uid, &mut store_update); if self.config.memtrie { @@ -375,7 +377,7 @@ impl Testbed<'_> { assert_eq!(self.root, memtrie_root); } near_store::flat::FlatStateChanges::from_state_changes(&apply_result.state_changes) - .apply_to_flat_state(&mut store_update, shard_uid); + .apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid); store_update.commit().unwrap(); self.apply_state.block_height += 1; if let Some(congestion_info) = apply_result.congestion_info { diff --git a/runtime/runtime/src/prefetch.rs b/runtime/runtime/src/prefetch.rs index e75e51f83ce..033d48f7e50 100644 --- a/runtime/runtime/src/prefetch.rs +++ b/runtime/runtime/src/prefetch.rs @@ -386,6 +386,7 @@ impl TriePrefetcher { mod tests { use super::TriePrefetcher; use near_primitives::{trie_key::TrieKey, types::AccountId}; + use near_store::adapter::StoreAdapter; use near_store::test_utils::{create_test_store, test_populate_trie}; use near_store::{ShardTries, ShardUId, StateSnapshotConfig, Trie, TrieConfig}; use std::str::FromStr; @@ -474,7 +475,7 @@ mod tests { let shard_uids = vec![ShardUId::single_shard()]; let trie_config = TrieConfig { enable_receipt_prefetching: true, ..TrieConfig::default() }; let store = create_test_store(); - let flat_storage_manager = near_store::flat::FlatStorageManager::new(store.clone()); + let flat_storage_manager = near_store::flat::FlatStorageManager::new(store.flat_store()); let tries = ShardTries::new( store, trie_config, diff --git a/tools/database/src/analyze_delayed_receipt.rs b/tools/database/src/analyze_delayed_receipt.rs index 13942a8d0b1..b589649f82e 100644 --- a/tools/database/src/analyze_delayed_receipt.rs +++ b/tools/database/src/analyze_delayed_receipt.rs @@ -1,4 +1,5 @@ use clap::Parser; +use near_store::adapter::StoreAdapter; use near_store::flat::FlatStorageManager; use near_store::{get_delayed_receipt_indices, ShardTries, StateSnapshotConfig, TrieConfig}; use std::collections::HashMap; @@ -61,7 +62,7 @@ impl AnalyzeDelayedReceiptCommand { store.clone(), TrieConfig::default(), &shard_uids, - FlatStorageManager::new(store), + FlatStorageManager::new(store.flat_store()), StateSnapshotConfig::default(), ); // Create an iterator over the blocks that should be analysed diff --git a/tools/database/src/corrupt.rs b/tools/database/src/corrupt.rs index af299efa908..5fe5ae360d9 100644 --- a/tools/database/src/corrupt.rs +++ b/tools/database/src/corrupt.rs @@ -2,7 +2,9 @@ use crate::utils::open_state_snapshot; use anyhow::anyhow; use clap::Parser; use near_primitives::shard_layout::{ShardLayout, ShardVersion}; -use near_store::{flat::FlatStorageManager, ShardUId, StoreUpdate}; +use near_store::adapter::flat_store::FlatStoreUpdateAdapter; +use near_store::adapter::StoreAdapter; +use near_store::{flat::FlatStorageManager, ShardUId}; use std::path::PathBuf; #[derive(Parser)] @@ -13,7 +15,7 @@ pub(crate) struct CorruptStateSnapshotCommand { impl CorruptStateSnapshotCommand { pub(crate) fn run(&self, home: &PathBuf) -> anyhow::Result<()> { - let store = open_state_snapshot(home, near_store::Mode::ReadWrite)?; + let store = open_state_snapshot(home, near_store::Mode::ReadWrite)?.flat_store(); let flat_storage_manager = FlatStorageManager::new(store.clone()); let mut store_update = store.store_update(); @@ -41,7 +43,7 @@ impl CorruptStateSnapshotCommand { } fn corrupt( - store_update: &mut StoreUpdate, + store_update: &mut FlatStoreUpdateAdapter, flat_storage_manager: &FlatStorageManager, shard_uid: ShardUId, ) -> Result<(), anyhow::Error> { diff --git a/tools/database/src/state_perf.rs b/tools/database/src/state_perf.rs index 9e6af54ccef..2b35612fb88 100644 --- a/tools/database/src/state_perf.rs +++ b/tools/database/src/state_perf.rs @@ -1,5 +1,7 @@ use clap::Parser; use indicatif::{ProgressBar, ProgressIterator}; +use near_store::adapter::flat_store::FlatStoreAdapter; +use near_store::adapter::StoreAdapter; use std::collections::BTreeMap; use std::fmt::{Display, Write}; use std::path::Path; @@ -12,8 +14,7 @@ use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::SeedableRng; -use near_store::flat::store_helper::iter_flat_state_entries; -use near_store::{Store, TrieStorage}; +use near_store::TrieStorage; use crate::utils::open_rocksdb; @@ -38,7 +39,10 @@ impl StatePerfCommand { let mut perf_context = PerfContext::new(); let total_samples = self.warmup_samples + self.samples; for (sample_i, (shard_uid, value_ref)) in - generate_state_requests(store.clone(), total_samples).into_iter().enumerate().progress() + generate_state_requests(store.flat_store(), total_samples) + .into_iter() + .enumerate() + .progress() { let trie_storage = near_store::TrieDBStorage::new(store.clone(), shard_uid); let include_sample = sample_i >= self.warmup_samples; @@ -159,7 +163,7 @@ impl PerfContext { } } -fn generate_state_requests(store: Store, samples: usize) -> Vec<(ShardUId, ValueRef)> { +fn generate_state_requests(store: FlatStoreAdapter, samples: usize) -> Vec<(ShardUId, ValueRef)> { eprintln!("Generate {samples} requests to State"); let shard_uids = ShardLayout::get_simple_nightshade_layout().shard_uids().collect::>(); let num_shards = shard_uids.len(); @@ -168,8 +172,8 @@ fn generate_state_requests(store: Store, samples: usize) -> Vec<(ShardUId, Value for shard_uid in shard_uids { let shard_samples = samples / num_shards; let mut keys_read = std::collections::HashSet::new(); - for value_ref in iter_flat_state_entries(shard_uid, &store, None, None) - .flat_map(|res| res.map(|(_, value)| value.to_value_ref())) + for value_ref in + store.iter(shard_uid).flat_map(|res| res.map(|(_, value)| value.to_value_ref())) { if value_ref.length > 4096 || !keys_read.insert(value_ref.hash) { continue; diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index ef6527411c4..9c2a2b9f12b 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -9,8 +9,10 @@ use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle}; use near_primitives::shard_layout::{account_id_to_shard_id, ShardVersion}; use near_primitives::state::FlatStateValue; use near_primitives::types::{BlockHeight, ShardId}; +use near_store::adapter::flat_store::FlatStoreAdapter; +use near_store::adapter::StoreAdapter; use near_store::flat::{ - store_helper, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorageStatus, + FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorageStatus, }; use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener}; use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt}; @@ -125,14 +127,13 @@ pub struct MoveFlatHeadCmd { mode: MoveFlatHeadMode, } -fn print_delta(store: &Store, shard_uid: ShardUId, metadata: FlatStateDeltaMetadata) { - let changes = - store_helper::get_delta_changes(store, shard_uid, metadata.block.hash).unwrap().unwrap(); +fn print_delta(store: &FlatStoreAdapter, shard_uid: ShardUId, metadata: FlatStateDeltaMetadata) { + let changes = store.get_delta(shard_uid, metadata.block.hash).unwrap().unwrap(); println!("{:?}", FlatStateDelta { metadata, changes }); } -fn print_deltas(store: &Store, shard_uid: ShardUId) { - let deltas_metadata = store_helper::get_all_deltas_metadata(store, shard_uid).unwrap(); +fn print_deltas(store: &FlatStoreAdapter, shard_uid: ShardUId) { + let deltas_metadata = store.get_all_deltas_metadata(shard_uid).unwrap(); let num_deltas = deltas_metadata.len(); println!("Deltas: {}", num_deltas); @@ -202,7 +203,7 @@ impl FlatStorageCommand { "Shard: {shard_uid:?} - flat storage @{:?} ({})", ready_status.flat_head.height, ready_status.flat_head.hash, ); - print_deltas(&hot_store, shard_uid); + print_deltas(&hot_store.flat_store(), shard_uid); } status => { println!("Shard: {shard_uid:?} - no flat storage: {status:?}"); @@ -239,7 +240,7 @@ impl FlatStorageCommand { let shard_uid = epoch_manager.shard_id_to_uid(cmd.shard_id, &tip.epoch_id)?; let flat_storage_manager = rw_hot_runtime.get_flat_storage_manager(); flat_storage_manager.create_flat_storage_for_shard(shard_uid)?; - let mut store_update = store.store_update(); + let mut store_update = store.flat_store().store_update(); flat_storage_manager.remove_flat_storage_for_shard(shard_uid, &mut store_update)?; store_update.commit()?; Ok(()) @@ -266,7 +267,7 @@ impl FlatStorageCommand { if status { break; } - let current_status = store_helper::get_flat_storage_status(&rw_hot_store, shard_uid); + let current_status = rw_hot_store.flat_store().get_flat_storage_status(shard_uid); println!("Status: {:?}", current_status); std::thread::sleep(Duration::from_secs(1)); @@ -287,8 +288,10 @@ impl FlatStorageCommand { Self::get_db(&opener, home_dir, &near_config, near_store::Mode::ReadOnly); let tip = chain_store.final_head()?; let shard_uid = epoch_manager.shard_id_to_uid(cmd.shard_id, &tip.epoch_id)?; + let hot_store = hot_store.flat_store(); - let head_hash = match store_helper::get_flat_storage_status(&hot_store, shard_uid) + let head_hash = match hot_store + .get_flat_storage_status(shard_uid) .expect("falied to read flat storage status") { FlatStorageStatus::Ready(ready_status) => ready_status.flat_head.hash, @@ -321,8 +324,7 @@ impl FlatStorageCommand { let trie = hot_runtime.get_view_trie_for_shard(cmd.shard_id, &head_hash, *state_root)?; - let flat_state_entries_iter = - store_helper::iter_flat_state_entries(shard_uid, &hot_store, None, None); + let flat_state_entries_iter = hot_store.iter(shard_uid); let trie_iter = trie.disk_iter()?; let mut verified = 0; @@ -428,9 +430,9 @@ impl FlatStorageCommand { }), ); - let iter = store_helper::iter_flat_state_entries( + let flat_store = store.flat_store(); + let iter = flat_store.iter_range( shard_uid, - &store, Some(missing_keys_left_boundary), Some(missing_keys_right_boundary), ); @@ -462,7 +464,8 @@ impl FlatStorageCommand { blocks: usize, ) -> anyhow::Result<()> { let store = chain_store.store(); - let flat_head = match store_helper::get_flat_storage_status(&store, shard_uid) { + let flat_store = store.flat_store(); + let flat_head = match flat_store.get_flat_storage_status(shard_uid) { Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head, status => { panic!("invalid flat storage status for shard {shard_uid:?}: {status:?}") @@ -529,7 +532,8 @@ impl FlatStorageCommand { .map(|value_ref| { near_primitives::state::FlatStateValue::Ref(value_ref.into_value_ref()) }); - let value = store_helper::get_flat_state_value(&store, shard_uid, trie_key)? + let value = flat_store + .get(shard_uid, trie_key)? .map(|val| near_primitives::state::FlatStateValue::Ref(val.to_value_ref())); if prev_value != value { prev_delta.insert(trie_key.to_vec(), prev_value); @@ -547,10 +551,9 @@ impl FlatStorageCommand { // Note that we don't write delta to DB, because this command is // used to simulate applying chunks from past blocks, and in that // simulations future deltas should not exist. - let mut store_update = store.store_update(); + let mut store_update = flat_store.store_update(); prev_delta.apply_to_flat_state(&mut store_update, shard_uid); - store_helper::set_flat_storage_status( - &mut store_update, + store_update.set_flat_storage_status( shard_uid, FlatStorageStatus::Ready(near_store::flat::FlatStorageReadyStatus { flat_head: near_store::flat::BlockInfo { diff --git a/tools/fork-network/src/cli.rs b/tools/fork-network/src/cli.rs index 8c17e41f9e3..b5bbccebeea 100644 --- a/tools/fork-network/src/cli.rs +++ b/tools/fork-network/src/cli.rs @@ -25,8 +25,9 @@ use near_primitives::types::{ AccountId, AccountInfo, Balance, BlockHeight, EpochId, NumBlocks, ShardId, StateRoot, }; use near_primitives::version::{ProtocolVersion, PROTOCOL_VERSION}; +use near_store::adapter::StoreAdapter; use near_store::db::RocksDB; -use near_store::flat::{store_helper, BlockInfo, FlatStorageManager, FlatStorageStatus}; +use near_store::flat::{BlockInfo, FlatStorageManager, FlatStorageStatus}; use near_store::{ checkpoint_hot_storage_and_cleanup_columns, DBCol, Store, TrieDBStorage, TrieStorage, FINAL_HEAD_KEY, @@ -268,7 +269,7 @@ impl ForkNetworkCommand { let desired_block_header = chain.get_block_header(&desired_block_hash)?; let epoch_id = desired_block_header.epoch_id(); - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_storage_manager = FlatStorageManager::new(store.flat_store()); // Advance flat heads to the same (max) block height to ensure // consistency of state across the shards. @@ -519,7 +520,7 @@ impl ForkNetworkCommand { let mut postponed_receipts_updated = 0; let mut received_data_updated = 0; let mut fake_block_height = block_height + 1; - for item in store_helper::iter_flat_state_entries(shard_uid, &store, None, None) { + for item in store.flat_store().iter(shard_uid) { let (key, value) = match item { Ok((key, FlatStateValue::Ref(ref_value))) => { ref_keys_retrieved += 1; @@ -660,7 +661,7 @@ impl ForkNetworkCommand { // Iterating over the whole flat state is very fast compared to writing all the updates. let mut num_added = 0; let mut num_accounts = 0; - for item in store_helper::iter_flat_state_entries(shard_uid, &store, None, None) { + for item in store.flat_store().iter(shard_uid) { if let Ok((key, _)) = item { if key[0] == col::ACCOUNT { num_accounts += 1; diff --git a/tools/fork-network/src/single_shard_storage_mutator.rs b/tools/fork-network/src/single_shard_storage_mutator.rs index 70bf695c689..76e74fee5fd 100644 --- a/tools/fork-network/src/single_shard_storage_mutator.rs +++ b/tools/fork-network/src/single_shard_storage_mutator.rs @@ -8,6 +8,7 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, StateRoot}; use near_primitives::types::{StoreKey, StoreValue}; +use near_store::adapter::StoreUpdateAdapter; use near_store::{flat::FlatStateChanges, DBCol, ShardTries}; use nearcore::NightshadeRuntime; @@ -150,9 +151,9 @@ impl SingleShardStorageMutator { ) -> anyhow::Result { let num_updates = self.updates.len(); tracing::info!(?shard_uid, num_updates, "commit"); - let mut update = self.shard_tries.store_update(); let flat_state_changes = FlatStateChanges::from_raw_key_value(&self.updates); - flat_state_changes.apply_to_flat_state(&mut update, *shard_uid); + let mut update = self.shard_tries.store_update(); + flat_state_changes.apply_to_flat_state(&mut update.flat_store_update(), *shard_uid); let trie_changes = self .shard_tries diff --git a/tools/state-viewer/src/apply_chain_range.rs b/tools/state-viewer/src/apply_chain_range.rs index 77925fbfc8d..2a07740b234 100644 --- a/tools/state-viewer/src/apply_chain_range.rs +++ b/tools/state-viewer/src/apply_chain_range.rs @@ -15,6 +15,7 @@ use near_primitives::transaction::{Action, ExecutionOutcomeWithId, ExecutionOutc use near_primitives::trie_key::TrieKey; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{BlockHeight, ShardId}; +use near_store::adapter::StoreAdapter; use near_store::flat::{BlockInfo, FlatStateChanges, FlatStorageStatus}; use near_store::{DBCol, Store}; use nearcore::NightshadeRuntime; @@ -360,10 +361,7 @@ pub fn apply_chain_range( shard_id, &shard_layout, ); - let flat_head = match near_store::flat::store_helper::get_flat_storage_status( - &read_store, - shard_uid, - ) { + let flat_head = match read_store.flat_store().get_flat_storage_status(shard_uid) { Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head, status => { panic!("cannot create flat storage for shard {shard_id} with status {status:?}") diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 6c61cbdc109..2d9bc200101 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -1311,9 +1311,7 @@ fn get_state_stats_group_by<'a>( // the account id. let type_iters = COLUMNS_WITH_ACCOUNT_ID_IN_KEY .iter() - .map(|(type_byte, _)| { - chunk_view.iter_flat_state_entries(Some(&[*type_byte]), Some(&[*type_byte + 1])) - }) + .map(|(type_byte, _)| chunk_view.iter_range(Some(&[*type_byte]), Some(&[*type_byte + 1]))) .into_iter(); // Filter out any errors. diff --git a/tools/state-viewer/src/scan_db.rs b/tools/state-viewer/src/scan_db.rs index 5a6276fd71c..db8395e4de5 100644 --- a/tools/state-viewer/src/scan_db.rs +++ b/tools/state-viewer/src/scan_db.rs @@ -18,6 +18,7 @@ use near_primitives::types::{EpochId, StateRoot}; use near_primitives::utils::{get_block_shard_id_rev, get_outcome_id_block_hash_rev}; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::BlockHeight; +use near_store::adapter::flat_store::decode_flat_state_db_key; use near_store::flat::delta::KeyForFlatStateDelta; use near_store::flat::{FlatStateChanges, FlatStateDeltaMetadata}; use near_store::{DBCol, RawTrieNodeWithSize, Store, TrieChanges}; @@ -133,8 +134,7 @@ fn format_key_and_value<'a>( Box::new(BlockHeight::try_from_slice(value).unwrap()), ), DBCol::FlatState => { - let (shard_uid, key) = - near_store::flat::store_helper::decode_flat_state_db_key(key).unwrap(); + let (shard_uid, key) = decode_flat_state_db_key(key).unwrap(); (Box::new((shard_uid, key)), Box::new(FlatStateValue::try_from_slice(value).unwrap())) } DBCol::FlatStateChanges => (