Skip to content

Commit

Permalink
[Memtrie] (5/5) Integrate memtries into the protocol. (#10087)
Browse files Browse the repository at this point in the history
This is a large PR, but it does a few independent things - these
independent things could in theory be split, but their tests are
somewhat interdependent so splitting is tricky.

* Incorporates MemTries into Trie, so that lookups, writes, and
recording are done via memtries if it is available. This logic is done
by adding a `lookup_from_memory` function, which is invoked by
`get_optimized_ref` as a first choice.
* The testing of this logic is in `trie_recording.rs`; previously this
test performs T (access via on-disk trie), F (access via flat storage),
RT (access via recorded state proof, with trie-like gas accounting), and
RF (access via recorded state proof, with flat storage-like gas
accounting), and asserts that T = RT, and F = RF. Now, we additionally
perform in-memory versions of these setups (use M for that), thereby
asserting that T = MT = RT, and F = MF = RF.
* Memtrie updates are produced via `Trie::update`, which uses the
memtrie to produce the update if available.
* Incorporates a map<shard uid, MemTries> into ShardTries, so that a
collection of memtries can be maintained.
* Memtrie updates are applied via
`WrappedTrieChanges::apply_memtrie_changes`; the block height is
additionally included into `WrappedTrieChanges` to facilitate memtrie
GC.
* Initial loading is done by calling runtime's
load_mem_tries_on_startup. It is called from the chain after ensuring
that flat storage is available (which is critical when populating
genesis for the first time).
* Three configuration parameters are added to configure the memtrie
loading behavior. These configs should immediately be usable.
* GC is called from the chain at the same time as when flat storage GC
happens.
* An integration test has been added in
`integration-tests/.../in_memory_tries.rs`, which brings up two nodes
with different memtrie loading configs and asserts that under a workload
of various cross-shard transactions the two nodes remain consistent,
even with frequent forks. It also tests both the initial loading
behavior as well as behavior after restarts, and also sanity checks GC
logic.
  • Loading branch information
robin-near authored Nov 14, 2023
1 parent ca852b7 commit eb82408
Show file tree
Hide file tree
Showing 19 changed files with 1,070 additions and 139 deletions.
31 changes: 31 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,16 @@ impl Chain {
};
store_update.commit()?;

// We must load in-memory tries here, and not inside runtime, because
// if we were initializing from genesis, the runtime would be
// initialized when no blocks or flat storage were initialized. We
// require flat storage in order to load in-memory tries.
// TODO(#9511): The calculation of shard UIDs is not precise in the case
// of resharding. We need to revisit this.
let tip = store.head()?;
let shard_uids = epoch_manager.get_shard_layout(&tip.epoch_id)?.get_shard_uids();
runtime_adapter.load_mem_tries_on_startup(&shard_uids)?;

info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}",
header_head.height, header_head.last_block_hash,
block_head.height, block_head.last_block_hash);
Expand Down Expand Up @@ -2401,6 +2411,7 @@ impl Chain {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?;
self.garbage_collect_memtrie_roots(&block, shard_uid);
}
}

Expand Down Expand Up @@ -2447,6 +2458,17 @@ impl Chain {
Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance })
}

fn garbage_collect_memtrie_roots(&self, block: &Block, shard_uid: ShardUId) {
let tries = self.runtime_adapter.get_tries();
let last_final_block = block.header().last_final_block();
if last_final_block != &CryptoHash::default() {
let header = self.store.get_block_header(last_final_block).unwrap();
if let Some(prev_height) = header.prev_height() {
tries.delete_memtrie_roots_up_to_height(shard_uid, prev_height);
}
}
}

/// Preprocess a block before applying chunks, verify that we have the necessary information
/// to process the block an the block is valid.
/// Note that this function does NOT introduce any changes to chain state.
Expand Down Expand Up @@ -3564,6 +3586,7 @@ impl Chain {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?;
self.garbage_collect_memtrie_roots(&block, shard_uid);
}
}

Expand Down Expand Up @@ -4084,6 +4107,7 @@ impl Chain {
)?;

let block_hash = *block.hash();
let block_height = block.header().height();
let challenges_result = block.header().challenges_result().clone();
let block_timestamp = block.header().raw_timestamp();
let next_gas_price = prev_block.header().next_gas_price();
Expand Down Expand Up @@ -4129,6 +4153,7 @@ impl Chain {
epoch_manager.as_ref(),
runtime.as_ref(),
&block_hash,
block_height,
&prev_block_hash,
&apply_result,
split_state_roots,
Expand Down Expand Up @@ -4165,6 +4190,7 @@ impl Chain {
let new_extra = self.get_chunk_extra(&prev_block_hash, &shard_uid)?;

let block_hash = *block.hash();
let block_height = block.header().height();
let challenges_result = block.header().challenges_result().clone();
let block_timestamp = block.header().raw_timestamp();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?;
Expand Down Expand Up @@ -4216,6 +4242,7 @@ impl Chain {
epoch_manager.as_ref(),
runtime.as_ref(),
&block_hash,
block_height,
&prev_block_hash,
&apply_result,
split_state_roots,
Expand Down Expand Up @@ -4249,6 +4276,7 @@ impl Chain {
let state_changes =
self.store().get_state_changes_for_split_states(block.hash(), shard_id)?;
let block_hash = *block.hash();
let block_height = block.header().height();
Ok(Some(Box::new(move |parent_span| -> Result<ApplyChunkResult, Error> {
let _span = tracing::debug_span!(
target: "chain",
Expand All @@ -4259,6 +4287,7 @@ impl Chain {
.entered();
let results = runtime.apply_update_to_split_states(
&block_hash,
block_height,
split_state_roots,
&next_epoch_shard_layout,
state_changes,
Expand Down Expand Up @@ -5154,6 +5183,7 @@ impl<'a> ChainUpdate<'a> {
epoch_manager: &dyn EpochManagerAdapter,
runtime_adapter: &dyn RuntimeAdapter,
block_hash: &CryptoHash,
block_height: BlockHeight,
prev_block_hash: &CryptoHash,
apply_result: &ApplyTransactionResult,
split_state_roots: Option<HashMap<ShardUId, StateRoot>>,
Expand All @@ -5170,6 +5200,7 @@ impl<'a> ChainUpdate<'a> {
if let Some(state_roots) = split_state_roots {
let split_state_results = runtime_adapter.apply_update_to_split_states(
block_hash,
block_height,
state_roots,
&next_epoch_shard_layout,
state_changes,
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3174,6 +3174,7 @@ impl<'a> ChainStoreUpdate<'a> {
// from the store.
let mut deletions_store_update = self.store().store_update();
for mut wrapped_trie_changes in self.trie_changes.drain(..) {
wrapped_trie_changes.apply_mem_changes();
wrapped_trie_changes.insertions_into(&mut store_update);
wrapped_trie_changes.deletions_into(&mut deletions_store_update);
wrapped_trie_changes.state_changes_into(&mut store_update);
Expand Down
46 changes: 23 additions & 23 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, RwLock};

use super::ValidatorSchedule;
use crate::types::{
ApplySplitStateResult, ApplyTransactionResult, RuntimeAdapter, RuntimeStorageConfig,
};
use crate::BlockHeader;
use borsh::{BorshDeserialize, BorshSerialize};

use near_epoch_manager::types::BlockHeaderInfo;
use near_epoch_manager::{EpochManagerAdapter, RngSeed};
use near_primitives::state_part::PartId;
use near_store::test_utils::TestTriesBuilder;
use num_rational::Ratio;

use near_chain_configs::{ProtocolConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP};
use near_chain_primitives::Error;
use near_crypto::{KeyType, PublicKey, SecretKey, Signature};
use near_epoch_manager::types::BlockHeaderInfo;
use near_epoch_manager::{EpochManagerAdapter, RngSeed};
use near_pool::types::PoolIterator;
use near_primitives::account::{AccessKey, Account};
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::challenge::ChallengesResult;
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
use near_primitives::epoch_manager::ShardConfig;
use near_primitives::epoch_manager::ValidatorSelectionConfig;
use near_primitives::errors::{EpochError, InvalidTxError};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::{ActionReceipt, Receipt, ReceiptEnum};
use near_primitives::shard_layout;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::sharding::ChunkHash;
use near_primitives::state_part::PartId;
use near_primitives::transaction::{
Action, ExecutionMetadata, ExecutionOutcome, ExecutionOutcomeWithId, ExecutionStatus,
SignedTransaction, TransferAction,
Expand All @@ -41,19 +39,15 @@ use near_primitives::views::{
AccessKeyInfoView, AccessKeyList, CallResult, ContractCodeView, EpochValidatorInfo,
QueryRequest, QueryResponse, QueryResponseKind, ViewStateResult,
};
use near_store::test_utils::TestTriesBuilder;
use near_store::{
set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, Store, StoreUpdate, Trie,
TrieChanges, WrappedTrieChanges,
};

use crate::types::{
ApplySplitStateResult, ApplyTransactionResult, RuntimeAdapter, RuntimeStorageConfig,
set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, StorageError, Store, StoreUpdate,
Trie, TrieChanges, WrappedTrieChanges,
};
use crate::BlockHeader;

use near_primitives::epoch_manager::ShardConfig;

use super::ValidatorSchedule;
use num_rational::Ratio;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, RwLock};

/// Simple key value runtime for tests.
///
Expand Down Expand Up @@ -1038,7 +1032,7 @@ impl RuntimeAdapter for KeyValueRuntime {
&self,
shard_id: ShardId,
storage_config: RuntimeStorageConfig,
_height: BlockHeight,
height: BlockHeight,
_block_timestamp: u64,
_prev_block_hash: &CryptoHash,
block_hash: &CryptoHash,
Expand Down Expand Up @@ -1191,6 +1185,7 @@ impl RuntimeAdapter for KeyValueRuntime {
TrieChanges::empty(state_root),
Default::default(),
*block_hash,
height,
),
new_root: state_root,
outcomes: tx_results,
Expand Down Expand Up @@ -1382,10 +1377,15 @@ impl RuntimeAdapter for KeyValueRuntime {
fn apply_update_to_split_states(
&self,
_block_hash: &CryptoHash,
_block_height: BlockHeight,
_state_roots: HashMap<ShardUId, StateRoot>,
_next_shard_layout: &ShardLayout,
_state_changes: StateChangesForSplitStates,
) -> Result<Vec<ApplySplitStateResult>, Error> {
Ok(vec![])
}

fn load_mem_tries_on_startup(&self, _shard_uids: &[ShardUId]) -> Result<(), StorageError> {
Ok(())
}
}
1 change: 1 addition & 0 deletions chain/chain/src/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ fn do_fork(
trie_changes,
Default::default(),
*block.hash(),
block.header().height(),
);
store_update.save_trie_changes(wrapped_trie_changes);

Expand Down
7 changes: 7 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Utc;
use near_chain_configs::StateSplitConfig;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use near_store::StorageError;
use num_rational::Rational32;

use near_chain_configs::{Genesis, ProtocolConfig};
Expand Down Expand Up @@ -392,6 +393,7 @@ pub trait RuntimeAdapter: Send + Sync {
fn apply_update_to_split_states(
&self,
block_hash: &CryptoHash,
block_height: BlockHeight,
state_roots: HashMap<ShardUId, StateRoot>,
next_shard_layout: &ShardLayout,
state_changes: StateChangesForSplitStates,
Expand Down Expand Up @@ -426,6 +428,11 @@ pub trait RuntimeAdapter: Send + Sync {
) -> bool;

fn get_protocol_config(&self, epoch_id: &EpochId) -> Result<ProtocolConfig, Error>;

/// Loads in-memory tries upon startup. The given shard_uids are possible candidates to load,
/// but which exact shards to load depends on configuration. This may only be called when flat
/// storage is ready.
fn load_mem_tries_on_startup(&self, shard_uids: &[ShardUId]) -> Result<(), StorageError>;
}

/// The last known / checked height and time when we have processed it.
Expand Down
9 changes: 6 additions & 3 deletions chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use near_network::test_utils::MockPeerManagerAdapter;
use near_primitives::epoch_manager::{AllEpochConfigTestOverrides, RngSeed};
use near_primitives::types::{AccountId, NumShards};
use near_store::test_utils::create_test_store;
use near_store::{NodeStorage, ShardUId, Store, StoreConfig};
use near_store::{NodeStorage, ShardUId, Store, StoreConfig, TrieConfig};

use super::setup::{setup_client_with_runtime, setup_synchronous_shards_manager};
use super::test_env::TestEnv;
Expand Down Expand Up @@ -299,11 +299,13 @@ impl TestEnvBuilder {
pub fn internal_initialize_nightshade_runtimes(
self,
runtime_configs: Vec<RuntimeConfigStore>,
trie_configs: Vec<TrieConfig>,
nightshade_runtime_creator: impl Fn(
PathBuf,
Store,
Arc<EpochManagerHandle>,
RuntimeConfigStore,
TrieConfig,
) -> Arc<dyn RuntimeAdapter>,
) -> Self {
let builder = self.ensure_home_dirs().ensure_epoch_managers().ensure_stores();
Expand All @@ -312,15 +314,16 @@ impl TestEnvBuilder {
builder.stores.clone().unwrap(),
builder.epoch_managers.clone().unwrap(),
runtime_configs,
trie_configs,
))
.map(|(home_dir, store, epoch_manager, runtime_config)| {
.map(|(home_dir, store, epoch_manager, runtime_config, trie_config)| {
let epoch_manager = match epoch_manager {
EpochManagerKind::Mock(_) => {
panic!("NightshadeRuntime can only be instantiated with EpochManagerHandle")
}
EpochManagerKind::Handle(handle) => handle,
};
nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config)
nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config, trie_config)
})
.collect();
builder.runtimes(runtimes)
Expand Down
10 changes: 10 additions & 0 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ pub struct StoreConfig {
/// This config option is temporary and will be removed once flat storage is implemented.
pub sweat_prefetch_senders: Vec<String>,

/// List of shard UIDs for which we should load the tries in memory.
/// TODO(#9511): This does not automatically survive resharding. We may need to figure out a
/// strategy for that.
pub load_mem_tries_for_shards: Vec<ShardUId>,
/// If true, load mem tries for all shards; this has priority over `load_mem_tries_for_shards`.
pub load_mem_tries_for_all_shards: bool,

/// Path where to create RocksDB checkpoints during database migrations or
/// `false` to disable that feature.
///
Expand Down Expand Up @@ -262,6 +269,9 @@ impl Default for StoreConfig {
"sweat_the_oracle.testnet".to_owned(),
],

load_mem_tries_for_shards: vec![ShardUId { shard_id: 3, version: 1 }],
load_mem_tries_for_all_shards: false,

migration_snapshot: Default::default(),

// We checked that this number of threads doesn't impact
Expand Down
22 changes: 20 additions & 2 deletions core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ pub struct TestTriesBuilder {
shard_version: ShardVersion,
num_shards: NumShards,
enable_flat_storage: bool,
enable_in_memory_tries: bool,
}

impl TestTriesBuilder {
pub fn new() -> Self {
Self { store: None, shard_version: 0, num_shards: 1, enable_flat_storage: false }
Self {
store: None,
shard_version: 0,
num_shards: 1,
enable_flat_storage: false,
enable_in_memory_tries: false,
}
}

pub fn with_store(mut self, store: Store) -> Self {
Expand All @@ -92,6 +99,11 @@ impl TestTriesBuilder {
self
}

pub fn with_in_memory_tries(mut self) -> Self {
self.enable_in_memory_tries = true;
self
}

pub fn build(self) -> ShardTries {
let store = self.store.unwrap_or_else(create_test_store);
let shard_uids = (0..self.num_shards)
Expand All @@ -100,7 +112,10 @@ impl TestTriesBuilder {
let flat_storage_manager = FlatStorageManager::new(store.clone());
let tries = ShardTries::new(
store,
TrieConfig::default(),
TrieConfig {
load_mem_tries_for_all_shards: self.enable_in_memory_tries,
..Default::default()
},
&shard_uids,
flat_storage_manager,
StateSnapshotConfig::default(),
Expand Down Expand Up @@ -131,6 +146,9 @@ impl TestTriesBuilder {
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap();
}
}
if self.enable_in_memory_tries {
tries.load_mem_tries_for_enabled_shards(&shard_uids).unwrap();
}
tries
}
}
Expand Down
Loading

0 comments on commit eb82408

Please sign in to comment.