Skip to content

Commit

Permalink
[store] Introduce Flat store adapter (#12123)
Browse files Browse the repository at this point in the history
This is the first concept PR of having adapters on top of store. Most of
the details for how it works can be found in
core/store/src/adapter/mod.rs

The functions in core/store/src/adapter/flat_store.rs are moved from
store_helper file.
  • Loading branch information
shreyan-gupta authored Sep 26, 2024
1 parent ec3b627 commit c9def16
Show file tree
Hide file tree
Showing 44 changed files with 800 additions and 736 deletions.
8 changes: 4 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ use near_primitives::views::{
FinalExecutionOutcomeView, FinalExecutionOutcomeWithReceiptView, FinalExecutionStatus,
LightClientBlockView, SignedTransactionView,
};
use near_store::adapter::StoreUpdateAdapter;
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use near_store::trie::mem::resharding::RetainMode;
use near_store::DBCol;
use near_store::{get_genesis_state_roots, PartialStorage};
Expand Down Expand Up @@ -485,7 +486,7 @@ impl Chain {
let mut tmp_store_update = store_update.store().store_update();
for shard_uid in epoch_manager.get_shard_layout(genesis_epoch_id)?.shard_uids() {
flat_storage_manager.set_flat_storage_for_genesis(
&mut tmp_store_update,
&mut tmp_store_update.flat_store_update(),
shard_uid,
genesis.hash(),
genesis.header().height(),
Expand Down Expand Up @@ -3022,8 +3023,7 @@ impl Chain {
tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage");

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
store_update.flat_store_update().set_flat_storage_status(
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: near_store::flat::BlockInfo {
Expand Down
8 changes: 4 additions & 4 deletions chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.into());

self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_outgoing_receipt(
Expand Down Expand Up @@ -174,7 +174,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.into());

self.chain_store_update.save_chunk_extra(block_hash, &shard_uid, new_extra);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
Expand Down Expand Up @@ -544,7 +544,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.into());

self.chain_store_update.save_trie_changes(apply_result.trie_changes);

Expand Down Expand Up @@ -643,7 +643,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.into());
self.chain_store_update.save_trie_changes(apply_result.trie_changes);

// The chunk is missing but some fields may need to be updated
Expand Down
83 changes: 29 additions & 54 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use near_primitives::state_part::PartId;
use near_primitives::types::{BlockHeight, StateRoot};
use near_store::adapter::flat_store::FlatStoreAdapter;
use near_store::adapter::StoreAdapter;
use near_store::flat::{
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -88,14 +89,14 @@ impl FlatStorageShardCreator {

/// Fetch state part, write all state items to flat storage and send the number of items to the given channel.
fn fetch_state_part(
store: Store,
store: FlatStoreAdapter,
shard_uid: ShardUId,
state_root: StateRoot,
part_id: PartId,
progress: Arc<AtomicU64>,
result_sender: Sender<u64>,
) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let trie_storage = TrieDBStorage::new(store.store(), shard_uid);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap();
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
Expand All @@ -110,12 +111,7 @@ impl FlatStorageShardCreator {
{
if let Some(key) = key {
let value = trie.retrieve_value(&hash).unwrap();
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
key,
Some(FlatStateValue::value_ref(&value)),
);
store_update.set(shard_uid, key, Some(FlatStateValue::value_ref(&value)));
num_items += 1;
}
}
Expand Down Expand Up @@ -149,16 +145,16 @@ impl FlatStorageShardCreator {
chain_store: &ChainStore,
thread_pool: &rayon::ThreadPool,
) -> Result<bool, Error> {
let store = chain_store.store().flat_store();
let shard_id = self.shard_uid.shard_id();
let current_status =
store_helper::get_flat_storage_status(chain_store.store(), self.shard_uid)
.expect("failed to read flat storage status");
let current_status = store
.get_flat_storage_status(self.shard_uid)
.expect("failed to read flat storage status");
self.metrics.set_status(&current_status);
match &current_status {
FlatStorageStatus::Empty => {
let mut store_update = chain_store.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
let mut store_update = store.store_update();
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas),
);
Expand All @@ -179,11 +175,7 @@ impl FlatStorageShardCreator {
for hash in hashes {
debug!(target: "store", %shard_id, %height, %hash, "Checking delta existence");
assert_matches!(
store_helper::get_delta_changes(
chain_store.store(),
self.shard_uid,
*hash
),
store.get_delta(self.shard_uid, *hash),
Ok(Some(_))
);
}
Expand All @@ -192,10 +184,9 @@ impl FlatStorageShardCreator {

// We continue saving deltas, and also start fetching state.
let block_hash = final_head.last_block_hash;
let store = self.runtime.store().clone();
let epoch_id = self.epoch_manager.get_epoch_id(&block_hash)?;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let trie_storage = TrieDBStorage::new(store, shard_uid);
let trie_storage = TrieDBStorage::new(store.store(), shard_uid);
let state_root =
*chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root();
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
Expand All @@ -210,10 +201,9 @@ impl FlatStorageShardCreator {
};
info!(target: "store", %shard_id, %final_height, ?status, "Switching status to fetching state");

let mut store_update = chain_store.store().store_update();
let mut store_update = store.store_update();
self.metrics.set_flat_head_height(final_head.height);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
status,
Expand All @@ -225,7 +215,6 @@ impl FlatStorageShardCreator {
FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
fetching_state_status,
)) => {
let store = self.runtime.store().clone();
let block_hash = fetching_state_status.block_hash;
let start_part_id = fetching_state_status.part_id;
let num_parts_in_step = fetching_state_status.num_parts_in_step;
Expand Down Expand Up @@ -280,7 +269,7 @@ impl FlatStorageShardCreator {
// Mark that we don't wait for new state parts.
self.remaining_state_parts = None;

let mut store_update = chain_store.store().store_update();
let mut store_update = store.store_update();
if next_start_part_id < num_parts {
// If there are still remaining state parts, switch status to the new range of state parts.
// We will spawn new rayon tasks on the next status update.
Expand All @@ -291,8 +280,7 @@ impl FlatStorageShardCreator {
num_parts,
};
debug!(target: "chain", %shard_id, %block_hash, ?new_status);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(
FlatStorageCreationStatus::FetchingState(new_status),
Expand All @@ -302,13 +290,8 @@ impl FlatStorageShardCreator {
// If all parts were fetched, we can start catchup.
info!(target: "chain", %shard_id, %block_hash, "Finished fetching state");
self.metrics.set_remaining_state_parts(0);
store_helper::remove_delta(
&mut store_update,
self.shard_uid,
block_hash,
);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.remove_delta(self.shard_uid, block_hash);
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(
block_hash,
Expand All @@ -320,11 +303,10 @@ impl FlatStorageShardCreator {
}
}
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(old_flat_head)) => {
let store = self.runtime.store();
let mut flat_head = *old_flat_head;
let chain_final_head = chain_store.final_head()?;
let mut merged_changes = FlatStateChanges::default();
let mut store_update = self.runtime.store().store_update();
let mut store_update = store.store_update();

// Merge up to 50 deltas of the next blocks until we reach chain final head.
// TODO: consider merging 10 deltas at once to limit memory usage
Expand All @@ -338,11 +320,9 @@ impl FlatStorageShardCreator {
break;
}
flat_head = chain_store.get_next_block_hash(&flat_head).unwrap();
let changes = store_helper::get_delta_changes(store, self.shard_uid, flat_head)
.unwrap()
.unwrap();
let changes = store.get_delta(self.shard_uid, flat_head).unwrap().unwrap();
merged_changes.merge(changes);
store_helper::remove_delta(&mut store_update, self.shard_uid, flat_head);
store_update.remove_delta(self.shard_uid, flat_head);
}

if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash)
Expand All @@ -356,8 +336,7 @@ impl FlatStorageShardCreator {
debug!(target: "chain", %shard_id, %old_flat_head, %old_height, %flat_head, %height, "Catching up flat head");
self.metrics.set_flat_head_height(height);
merged_changes.apply_to_flat_state(&mut store_update, shard_uid);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(
flat_head,
Expand All @@ -370,26 +349,22 @@ impl FlatStorageShardCreator {
// GC deltas from forks which could have appeared on chain during catchup.
// Assuming that flat storage creation finishes in < 2 days, all deltas metadata cannot occupy
// more than 2 * (Blocks per day = 48 * 60 * 60) * (BlockInfo size = 72) ~= 12.4 MB.
let mut store_update = self.runtime.store().store_update();
let deltas_metadata = store_helper::get_all_deltas_metadata(&store, shard_uid)
let mut store_update = store.store_update();
let deltas_metadata = store.get_all_deltas_metadata(shard_uid)
.unwrap_or_else(|_| {
panic!("Cannot read flat state deltas metadata for shard {shard_id} from storage")
});
let mut gc_count = 0;
for delta_metadata in deltas_metadata {
if delta_metadata.block.height <= chain_final_head.height {
store_helper::remove_delta(
&mut store_update,
self.shard_uid,
delta_metadata.block.hash,
);
store_update
.remove_delta(self.shard_uid, delta_metadata.block.hash);
gc_count += 1;
}
}

// If we reached chain final head, we can finish catchup and finally create flat storage.
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: BlockInfo {
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use near_primitives::shard_layout::get_block_shard_uid;
use near_primitives::state_sync::{StateHeaderKey, StatePartKey};
use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId};
use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes};
use near_store::flat::store_helper;
use near_store::adapter::StoreUpdateAdapter;
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};

use crate::types::RuntimeAdapter;
Expand Down Expand Up @@ -711,7 +711,7 @@ impl<'a> ChainStoreUpdate<'a> {

// delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata
let mut store_update = self.store().store_update();
store_helper::remove_delta(&mut store_update, shard_uid, block_hash);
store_update.flat_store_update().remove_delta(shard_uid, block_hash);
self.merge(store_update);
}

Expand Down
5 changes: 3 additions & 2 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use near_primitives::views::{
AccessKeyInfoView, CallResult, ContractCodeView, QueryRequest, QueryResponse,
QueryResponseKind, ViewStateResult,
};
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::config::StateSnapshotType;
use near_store::flat::FlatStorageManager;
use near_store::metadata::DbKind;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl NightshadeRuntime {

let runtime = Runtime::new();
let trie_viewer = TrieViewer::new(trie_viewer_state_size_limit, max_gas_burnt_view);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let flat_storage_manager = FlatStorageManager::new(store.flat_store());
let shard_uids: Vec<_> = genesis_config.shard_layout.shard_uids().collect();
let tries = ShardTries::new(
store.clone(),
Expand Down Expand Up @@ -1244,7 +1245,7 @@ impl RuntimeAdapter for NightshadeRuntime {
debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len());
// TODO: `apply_to_flat_state` inserts values with random writes, which can be time consuming.
// Optimize taking into account that flat state values always correspond to a consecutive range of keys.
flat_state_delta.apply_to_flat_state(&mut store_update, shard_uid);
flat_state_delta.apply_to_flat_state(&mut store_update.flat_store_update(), shard_uid);
self.precompile_contracts(epoch_id, contract_codes)?;
Ok(store_update.commit()?)
}
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl TestEnv {
{
let mut store_update = store.store_update();
flat_storage_manager.set_flat_storage_for_genesis(
&mut store_update,
&mut store_update.flat_store_update(),
shard_uid,
&genesis_hash,
0,
Expand Down Expand Up @@ -301,7 +301,7 @@ impl TestEnv {
},
};
let new_store_update = flat_storage.add_delta(delta).unwrap();
store_update.merge(new_store_update);
store_update.merge(new_store_update.into());
}
store_update.commit().unwrap();

Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use near_performance_metrics_macros::perf;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::StatePartKey;
use near_primitives::types::ShardId;
use near_store::adapter::StoreUpdateAdapter;
use near_store::DBCol;

// Set the mailbox capacity for the SyncJobsActor from default 16 to 100.
Expand Down Expand Up @@ -100,7 +101,7 @@ impl SyncJobsActor {
let success = msg
.runtime_adapter
.get_flat_storage_manager()
.remove_flat_storage_for_shard(msg.shard_uid, &mut store_update)?;
.remove_flat_storage_for_shard(msg.shard_uid, &mut store_update.flat_store_update())?;
store_update.commit()?;
Ok(success)
}
Expand Down
Loading

0 comments on commit c9def16

Please sign in to comment.