Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store] Flat store adapter #12123

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ use near_primitives::views::{
FinalExecutionOutcomeView, FinalExecutionOutcomeWithReceiptView, FinalExecutionStatus,
LightClientBlockView, SignedTransactionView,
};
use near_store::adapter::StoreUpdateAdapter;
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use near_store::trie::mem::resharding::RetainMode;
use near_store::DBCol;
use near_store::{get_genesis_state_roots, PartialStorage};
Expand Down Expand Up @@ -482,7 +483,7 @@ impl Chain {
// must be set here.
let flat_storage_manager = runtime_adapter.get_flat_storage_manager();
let genesis_epoch_id = genesis.header().epoch_id();
let mut tmp_store_update = store_update.store().store_update();
let mut tmp_store_update = store_update.store().store_update().flat_store_update();
for shard_uid in epoch_manager.get_shard_layout(genesis_epoch_id)?.shard_uids() {
flat_storage_manager.set_flat_storage_for_genesis(
&mut tmp_store_update,
Expand All @@ -491,7 +492,7 @@ impl Chain {
genesis.header().height(),
)
}
store_update.merge(tmp_store_update);
store_update.merge(tmp_store_update.store_update());

info!(target: "chain", "Init: saved genesis: #{} {} / {:?}", block_head.height, block_head.last_block_hash, state_roots);

Expand Down Expand Up @@ -3021,9 +3022,8 @@ impl Chain {

tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage");

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
let mut store_update = self.runtime_adapter.store().store_update().flat_store_update();
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: near_store::flat::BlockInfo {
Expand Down
9 changes: 5 additions & 4 deletions chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use near_primitives::state_sync::{ReceiptProofResponse, ShardStateSyncResponseHe
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{BlockExtra, BlockHeight, BlockHeightDelta, ShardId};
use near_primitives::views::LightClientBlockView;
use near_store::adapter::StoreUpdateAdapter;
use std::sync::Arc;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -134,7 +135,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.store_update());

self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_outgoing_receipt(
Expand Down Expand Up @@ -174,7 +175,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.store_update());

self.chain_store_update.save_chunk_extra(block_hash, &shard_uid, new_extra);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
Expand Down Expand Up @@ -544,7 +545,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.store_update());

self.chain_store_update.save_trie_changes(apply_result.trie_changes);

Expand Down Expand Up @@ -643,7 +644,7 @@ impl<'a> ChainUpdate<'a> {
shard_uid,
apply_result.trie_changes.state_changes(),
)?;
self.chain_store_update.merge(store_update);
self.chain_store_update.merge(store_update.store_update());
self.chain_store_update.save_trie_changes(apply_result.trie_changes);

// The chunk is missing but some fields may need to be updated
Expand Down
83 changes: 29 additions & 54 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use near_primitives::state_part::PartId;
use near_primitives::types::{BlockHeight, StateRoot};
use near_store::adapter::flat_store::FlatStoreAdapter;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::{
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
FlatStorageCreationStatus, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus,
NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -88,14 +89,14 @@ impl FlatStorageShardCreator {

/// Fetch state part, write all state items to flat storage and send the number of items to the given channel.
fn fetch_state_part(
store: Store,
store: FlatStoreAdapter,
shard_uid: ShardUId,
state_root: StateRoot,
part_id: PartId,
progress: Arc<AtomicU64>,
result_sender: Sender<u64>,
) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let trie_storage = TrieDBStorage::new(store.store(), shard_uid);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap();
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
Expand All @@ -110,12 +111,7 @@ impl FlatStorageShardCreator {
{
if let Some(key) = key {
let value = trie.retrieve_value(&hash).unwrap();
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
key,
Some(FlatStateValue::value_ref(&value)),
);
store_update.set(shard_uid, key, Some(FlatStateValue::value_ref(&value)));
num_items += 1;
}
}
Expand Down Expand Up @@ -149,16 +145,16 @@ impl FlatStorageShardCreator {
chain_store: &ChainStore,
thread_pool: &rayon::ThreadPool,
) -> Result<bool, Error> {
let store = chain_store.store().flat_store();
let shard_id = self.shard_uid.shard_id();
let current_status =
store_helper::get_flat_storage_status(chain_store.store(), self.shard_uid)
.expect("failed to read flat storage status");
let current_status = store
.get_flat_storage_status(self.shard_uid)
.expect("failed to read flat storage status");
self.metrics.set_status(&current_status);
match &current_status {
FlatStorageStatus::Empty => {
let mut store_update = chain_store.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
let mut store_update = store.store_update();
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas),
);
Expand All @@ -179,11 +175,7 @@ impl FlatStorageShardCreator {
for hash in hashes {
debug!(target: "store", %shard_id, %height, %hash, "Checking delta existence");
assert_matches!(
store_helper::get_delta_changes(
chain_store.store(),
self.shard_uid,
*hash
),
store.get_delta(self.shard_uid, *hash),
Ok(Some(_))
);
}
Expand All @@ -192,10 +184,9 @@ impl FlatStorageShardCreator {

// We continue saving deltas, and also start fetching state.
let block_hash = final_head.last_block_hash;
let store = self.runtime.store().clone();
let epoch_id = self.epoch_manager.get_epoch_id(&block_hash)?;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let trie_storage = TrieDBStorage::new(store, shard_uid);
let trie_storage = TrieDBStorage::new(store.store(), shard_uid);
let state_root =
*chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root();
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
Expand All @@ -210,10 +201,9 @@ impl FlatStorageShardCreator {
};
info!(target: "store", %shard_id, %final_height, ?status, "Switching status to fetching state");

let mut store_update = chain_store.store().store_update();
let mut store_update = store.store_update();
self.metrics.set_flat_head_height(final_head.height);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
status,
Expand All @@ -225,7 +215,6 @@ impl FlatStorageShardCreator {
FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
fetching_state_status,
)) => {
let store = self.runtime.store().clone();
let block_hash = fetching_state_status.block_hash;
let start_part_id = fetching_state_status.part_id;
let num_parts_in_step = fetching_state_status.num_parts_in_step;
Expand Down Expand Up @@ -280,7 +269,7 @@ impl FlatStorageShardCreator {
// Mark that we don't wait for new state parts.
self.remaining_state_parts = None;

let mut store_update = chain_store.store().store_update();
let mut store_update = store.store_update();
if next_start_part_id < num_parts {
// If there are still remaining state parts, switch status to the new range of state parts.
// We will spawn new rayon tasks on the next status update.
Expand All @@ -291,8 +280,7 @@ impl FlatStorageShardCreator {
num_parts,
};
debug!(target: "chain", %shard_id, %block_hash, ?new_status);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(
FlatStorageCreationStatus::FetchingState(new_status),
Expand All @@ -302,13 +290,8 @@ impl FlatStorageShardCreator {
// If all parts were fetched, we can start catchup.
info!(target: "chain", %shard_id, %block_hash, "Finished fetching state");
self.metrics.set_remaining_state_parts(0);
store_helper::remove_delta(
&mut store_update,
self.shard_uid,
block_hash,
);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.remove_delta(self.shard_uid, block_hash);
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(
block_hash,
Expand All @@ -320,11 +303,10 @@ impl FlatStorageShardCreator {
}
}
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(old_flat_head)) => {
let store = self.runtime.store();
let mut flat_head = *old_flat_head;
let chain_final_head = chain_store.final_head()?;
let mut merged_changes = FlatStateChanges::default();
let mut store_update = self.runtime.store().store_update();
let mut store_update = store.store_update();

// Merge up to 50 deltas of the next blocks until we reach chain final head.
// TODO: consider merging 10 deltas at once to limit memory usage
Expand All @@ -338,11 +320,9 @@ impl FlatStorageShardCreator {
break;
}
flat_head = chain_store.get_next_block_hash(&flat_head).unwrap();
let changes = store_helper::get_delta_changes(store, self.shard_uid, flat_head)
.unwrap()
.unwrap();
let changes = store.get_delta(self.shard_uid, flat_head).unwrap().unwrap();
merged_changes.merge(changes);
store_helper::remove_delta(&mut store_update, self.shard_uid, flat_head);
store_update.remove_delta(self.shard_uid, flat_head);
}

if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash)
Expand All @@ -356,8 +336,7 @@ impl FlatStorageShardCreator {
debug!(target: "chain", %shard_id, %old_flat_head, %old_height, %flat_head, %height, "Catching up flat head");
self.metrics.set_flat_head_height(height);
merged_changes.apply_to_flat_state(&mut store_update, shard_uid);
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Creation(FlatStorageCreationStatus::CatchingUp(
flat_head,
Expand All @@ -370,26 +349,22 @@ impl FlatStorageShardCreator {
// GC deltas from forks which could have appeared on chain during catchup.
// Assuming that flat storage creation finishes in < 2 days, all deltas metadata cannot occupy
// more than 2 * (Blocks per day = 48 * 60 * 60) * (BlockInfo size = 72) ~= 12.4 MB.
let mut store_update = self.runtime.store().store_update();
let deltas_metadata = store_helper::get_all_deltas_metadata(&store, shard_uid)
let mut store_update = store.store_update();
let deltas_metadata = store.get_all_deltas_metadata(shard_uid)
.unwrap_or_else(|_| {
panic!("Cannot read flat state deltas metadata for shard {shard_id} from storage")
});
let mut gc_count = 0;
for delta_metadata in deltas_metadata {
if delta_metadata.block.height <= chain_final_head.height {
store_helper::remove_delta(
&mut store_update,
self.shard_uid,
delta_metadata.block.hash,
);
store_update
.remove_delta(self.shard_uid, delta_metadata.block.hash);
gc_count += 1;
}
}

// If we reached chain final head, we can finish catchup and finally create flat storage.
store_helper::set_flat_storage_status(
&mut store_update,
store_update.set_flat_storage_status(
self.shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: BlockInfo {
Expand Down
8 changes: 4 additions & 4 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use near_primitives::shard_layout::get_block_shard_uid;
use near_primitives::state_sync::{StateHeaderKey, StatePartKey};
use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId};
use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes};
use near_store::flat::store_helper;
use near_store::adapter::StoreUpdateAdapter;
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};

use crate::types::RuntimeAdapter;
Expand Down Expand Up @@ -710,9 +710,9 @@ impl<'a> ChainStoreUpdate<'a> {
}

// delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata
let mut store_update = self.store().store_update();
store_helper::remove_delta(&mut store_update, shard_uid, block_hash);
self.merge(store_update);
let mut store_update = self.store().store_update().flat_store_update();
store_update.remove_delta(shard_uid, block_hash);
self.merge(store_update.store_update());
}

// 2. Delete block_hash-indexed data
Expand Down
4 changes: 3 additions & 1 deletion chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use near_primitives::views::{
AccessKeyInfoView, CallResult, ContractCodeView, QueryRequest, QueryResponse,
QueryResponseKind, ViewStateResult,
};
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::config::StateSnapshotType;
use near_store::flat::FlatStorageManager;
use near_store::metadata::DbKind;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl NightshadeRuntime {

let runtime = Runtime::new();
let trie_viewer = TrieViewer::new(trie_viewer_state_size_limit, max_gas_burnt_view);
let flat_storage_manager = FlatStorageManager::new(store.clone());
let flat_storage_manager = FlatStorageManager::new(store.flat_store());
let shard_uids: Vec<_> = genesis_config.shard_layout.shard_uids().collect();
let tries = ShardTries::new(
store.clone(),
Expand Down Expand Up @@ -1244,6 +1245,7 @@ impl RuntimeAdapter for NightshadeRuntime {
debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len());
// TODO: `apply_to_flat_state` inserts values with random writes, which can be time consuming.
// Optimize taking into account that flat state values always correspond to a consecutive range of keys.
let mut store_update = store_update.flat_store_update();
flat_state_delta.apply_to_flat_state(&mut store_update, shard_uid);
self.precompile_contracts(epoch_id, contract_codes)?;
Ok(store_update.commit()?)
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl TestEnv {
for shard_uid in
epoch_manager.get_shard_layout(&EpochId::default()).unwrap().shard_uids()
{
let mut store_update = store.store_update();
let mut store_update = store.store_update().flat_store_update();
flat_storage_manager.set_flat_storage_for_genesis(
&mut store_update,
shard_uid,
Expand Down Expand Up @@ -301,7 +301,7 @@ impl TestEnv {
},
};
let new_store_update = flat_storage.add_delta(delta).unwrap();
store_update.merge(new_store_update);
store_update.merge(new_store_update.store_update());
}
store_update.commit().unwrap();

Expand Down
Loading
Loading