diff --git a/Cargo.lock b/Cargo.lock index 2fc6a654b64..3bb8b46a928 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5113,6 +5113,7 @@ dependencies = [ "lru 0.12.3", "near-chain", "near-chain-configs", + "near-chain-primitives", "near-chunks", "near-crypto", "near-fmt", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 4add60f4015..9016cfdd1fa 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -569,7 +569,7 @@ impl Chain { // of blocks_in_processing, which is set to 5 now. let (sc, rc) = unbounded(); let resharding_manager = ReshardingManager::new( - chain_store.store().clone(), + chain_store.store(), epoch_manager.clone(), runtime_adapter.clone(), chain_config.resharding_config, @@ -664,7 +664,7 @@ impl Chain { congestion_info: Option, ) -> Result { let shard_index = shard_layout.get_shard_index(shard_id)?; - let state_root = *get_genesis_state_roots(self.chain_store.store())? + let state_root = *get_genesis_state_roots(&self.chain_store.store())? .ok_or_else(|| Error::Other("genesis state roots do not exist in the db".to_owned()))? .get(shard_index) .ok_or_else(|| { diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index ea574264f55..7fa0f5ce6a5 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -413,7 +413,6 @@ impl<'a> ChainStoreUpdate<'a> { let mut store_update = self.store().store_update(); let key: &[u8] = header_hash.as_bytes(); store_update.delete(DBCol::BlockHeader, key); - self.chain_store().headers.pop(key); self.merge(store_update); } let key = index_to_bytes(height); @@ -816,10 +815,8 @@ impl<'a> ChainStoreUpdate<'a> { let key = &index_to_bytes(height)[..]; if epoch_to_hashes.is_empty() { store_update.delete(DBCol::BlockPerHeight, key); - self.chain_store().block_hash_per_height.pop(key); } else { store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?; - self.chain_store().block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes)); } if self.is_height_processed(height)? { self.gc_col(DBCol::ProcessedBlockHeights, key); @@ -845,7 +842,6 @@ impl<'a> ChainStoreUpdate<'a> { let mut store_update = self.store().store_update(); let key = get_block_shard_id(block_hash, shard_id); store_update.delete(DBCol::OutgoingReceipts, &key); - self.chain_store().outgoing_receipts.pop(&key); self.merge(store_update); } @@ -882,7 +878,6 @@ impl<'a> ChainStoreUpdate<'a> { } DBCol::IncomingReceipts => { store_update.delete(col, key); - self.chain_store().incoming_receipts.pop(key); } DBCol::StateHeaders => { store_update.delete(col, key); @@ -893,20 +888,16 @@ impl<'a> ChainStoreUpdate<'a> { // When that happens we should make sure that block headers is // copied to the cold storage. store_update.delete(col, key); - self.chain_store().headers.pop(key); unreachable!(); } DBCol::Block => { store_update.delete(col, key); - self.chain_store().blocks.pop(key); } DBCol::BlockExtra => { store_update.delete(col, key); - self.chain_store().block_extras.pop(key); } DBCol::NextBlockHashes => { store_update.delete(col, key); - self.chain_store().next_block_hashes.pop(key); } DBCol::ChallengedBlocks => { store_update.delete(col, key); @@ -919,31 +910,24 @@ impl<'a> ChainStoreUpdate<'a> { } DBCol::BlockRefCount => { store_update.delete(col, key); - self.chain_store().block_refcounts.pop(key); } DBCol::Transactions => { store_update.decrement_refcount(col, key); - self.chain_store().transactions.pop(key); } DBCol::Receipts => { store_update.decrement_refcount(col, key); - self.chain_store().receipts.pop(key); } DBCol::Chunks => { store_update.delete(col, key); - self.chain_store().chunks.pop(key); } DBCol::ChunkExtra => { store_update.delete(col, key); - self.chain_store().chunk_extras.pop(key); } DBCol::PartialChunks => { store_update.delete(col, key); - self.chain_store().partial_chunks.pop(key); } DBCol::InvalidChunks => { store_update.delete(col, key); - self.chain_store().invalid_chunks.pop(key); } DBCol::ChunkHashesByHeight => { store_update.delete(col, key); @@ -974,7 +958,6 @@ impl<'a> ChainStoreUpdate<'a> { } DBCol::ProcessedBlockHeights => { store_update.delete(col, key); - self.chain_store().processed_block_heights.pop(key); } DBCol::HeaderHashesByHeight => { store_update.delete(col, key); diff --git a/chain/chain/src/state_sync.rs b/chain/chain/src/state_sync.rs index f6ebcff7a76..c1ff1b1bf06 100644 --- a/chain/chain/src/state_sync.rs +++ b/chain/chain/src/state_sync.rs @@ -37,7 +37,7 @@ fn save_epoch_new_chunks( header: &BlockHeader, ) -> Result<(), Error> { let Some(mut num_new_chunks) = - get_state_sync_new_chunks(chain_store.store(), header.prev_hash())? + get_state_sync_new_chunks(&chain_store.store(), header.prev_hash())? else { // This might happen in the case of epoch sync where we save individual headers without having all // headers that belong to the epoch. @@ -112,7 +112,7 @@ fn remove_old_blocks( Err(Error::DBNotFoundErr(_)) => return Ok(()), Err(e) => return Err(e), }; - for block_hash in iter_state_sync_new_chunks_keys(chain_store.store()) { + for block_hash in iter_state_sync_new_chunks_keys(&chain_store.store()) { let block_hash = block_hash?; let old_header = chain_store.get_block_header(&block_hash)?; if old_header.height() < last_final_header.height() { @@ -151,7 +151,7 @@ pub(crate) fn update_sync_hashes( // columnn for this block. This means we will no longer remember sync hashes for these old epochs, which // should be fine as we only care to state sync to (and provide state parts for) the latest state on_new_epoch(store_update, header)?; - return remove_old_epochs(chain_store.store(), store_update, header, &prev_header); + return remove_old_epochs(&chain_store.store(), store_update, header, &prev_header); } save_epoch_new_chunks(chain_store, store_update, header)?; diff --git a/chain/chain/src/store/latest_witnesses.rs b/chain/chain/src/store/latest_witnesses.rs index 2b61626b749..a779a5bcde4 100644 --- a/chain/chain/src/store/latest_witnesses.rs +++ b/chain/chain/src/store/latest_witnesses.rs @@ -147,8 +147,8 @@ impl ChainStore { // Go over witnesses with increasing indexes and remove them until the limits are satisfied. while !info.is_within_limits() && info.lowest_index < info.next_witness_index { - let key_to_delete = self - .store() + let store = self.store(); + let key_to_delete = store .get(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes())? .ok_or_else(|| { std::io::Error::new( diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index 77479990705..20c471cb902 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -1,10 +1,10 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::io; +use std::ops::Deref; use borsh::{BorshDeserialize, BorshSerialize}; use chrono::Utc; -use near_cache::CellLruCache; use near_chain_primitives::error::Error; use near_epoch_manager::EpochManagerAdapter; @@ -19,9 +19,7 @@ use near_primitives::sharding::{ ChunkHash, EncodedShardChunk, PartialEncodedChunk, ReceiptProof, ShardChunk, ShardChunkHeader, StateSyncInfo, }; -use near_primitives::state_sync::{ - ReceiptProofResponse, ShardStateSyncResponseHeader, StateHeaderKey, StateSyncDumpProgress, -}; +use near_primitives::state_sync::{ReceiptProofResponse, StateSyncDumpProgress}; use near_primitives::stateless_validation::contract_distribution::ContractUpdates; use near_primitives::stateless_validation::stored_chunk_state_transition_data::{ StoredChunkStateTransitionData, StoredChunkStateTransitionDataV1, @@ -42,6 +40,7 @@ use near_primitives::utils::{ }; use near_primitives::version::ProtocolVersion; use near_primitives::views::LightClientBlockView; +use near_store::adapter::chain_store::ChainStoreAdapter; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::{ DBCol, KeyForStateChanges, PartialStorage, Store, StoreUpdate, WrappedTrieChanges, @@ -59,10 +58,6 @@ mod merkle_proof; pub use latest_witnesses::LatestWitnessesInfo; pub use merkle_proof::MerkleProofAccess; -// TODO: Get rid of caches in chain store -const CACHE_SIZE: usize = 1; -const CHUNK_CACHE_SIZE: usize = 1; - /// Filter receipts mode for incoming receipts collection. pub enum ReceiptFilter { /// Leave receipts unchanged. Needed for receipt proof generation, because @@ -80,7 +75,7 @@ pub trait ChainStoreAccess { /// Returns underlying chain store fn chain_store(&self) -> &ChainStore; /// Returns underlying store. - fn store(&self) -> &Store; + fn store(&self) -> Store; /// The chain head. fn head(&self) -> Result; /// The chain Blocks Tail height. @@ -404,64 +399,20 @@ pub fn filter_incoming_receipts_for_shard( /// All chain-related database operations. pub struct ChainStore { - store: Store, - /// Genesis block height. - genesis_height: BlockHeight, - /// Latest known. + store: ChainStoreAdapter, latest_known: once_cell::unsync::OnceCell, - /// Current head of the chain - head: Option, - /// Tail height of the chain, - tail: Option, - /// Cache with headers. - pub(crate) headers: CellLruCache, BlockHeader>, - /// Cache with blocks. - pub(crate) blocks: CellLruCache, Block>, - /// Cache with chunks - pub(crate) chunks: CellLruCache, Arc>, - /// Cache with partial chunks - pub(crate) partial_chunks: CellLruCache, Arc>, - /// Cache with block extra. - pub(crate) block_extras: CellLruCache, Arc>, - /// Cache with chunk extra. - pub(crate) chunk_extras: CellLruCache, Arc>, - /// Cache with height to hash on the main chain. - height: CellLruCache, CryptoHash>, - /// Cache with height to block hash on any chain. - pub(crate) block_hash_per_height: - CellLruCache, Arc>>>, - /// Next block hashes for each block on the canonical chain - pub(crate) next_block_hashes: CellLruCache, CryptoHash>, - /// Light client blocks corresponding to the last finalized block of each epoch - epoch_light_client_blocks: CellLruCache, Arc>, - /// Cache with outgoing receipts. - pub(crate) outgoing_receipts: CellLruCache, Arc>>, - /// Cache with incoming receipts. - pub(crate) incoming_receipts: CellLruCache, Arc>>, - /// Invalid chunks. - pub(crate) invalid_chunks: CellLruCache, Arc>, - /// Transactions - pub(crate) transactions: CellLruCache, Arc>, - /// Receipts - pub(crate) receipts: CellLruCache, Arc>, - /// Cache with Block Refcounts - pub(crate) block_refcounts: CellLruCache, u64>, - /// Cache of block hash -> block merkle tree at the current block - block_merkle_tree: CellLruCache, Arc>, - /// Cache of block ordinal to block hash. - block_ordinal_to_hash: CellLruCache, CryptoHash>, - /// Processed block heights. - pub(crate) processed_block_heights: CellLruCache, ()>, - /// save_trie_changes should be set to true iff - /// - archive is false - non-archival nodes need trie changes to perform garbage collection - /// - archive is true, cold_store is configured and migration to split_storage is finished - node - /// working in split storage mode needs trie changes in order to do garbage collection on hot. - save_trie_changes: bool, - /// The maximum number of blocks for which a transaction is valid since its creation. pub(super) transaction_validity_period: BlockHeightDelta, } +impl Deref for ChainStore { + type Target = ChainStoreAdapter; + + fn deref(&self) -> &Self::Target { + &self.store + } +} + fn option_to_not_found(res: io::Result>, field_name: F) -> Result where F: std::string::ToString, @@ -481,31 +432,8 @@ impl ChainStore { transaction_validity_period: BlockHeightDelta, ) -> ChainStore { ChainStore { - store, - genesis_height, + store: store.chain_store(genesis_height, save_trie_changes), latest_known: once_cell::unsync::OnceCell::new(), - head: None, - tail: None, - blocks: CellLruCache::new(CACHE_SIZE), - headers: CellLruCache::new(CACHE_SIZE), - chunks: CellLruCache::new(CHUNK_CACHE_SIZE), - partial_chunks: CellLruCache::new(CHUNK_CACHE_SIZE), - block_extras: CellLruCache::new(CACHE_SIZE), - chunk_extras: CellLruCache::new(CACHE_SIZE), - height: CellLruCache::new(CACHE_SIZE), - block_hash_per_height: CellLruCache::new(CACHE_SIZE), - block_refcounts: CellLruCache::new(CACHE_SIZE), - next_block_hashes: CellLruCache::new(CACHE_SIZE), - epoch_light_client_blocks: CellLruCache::new(CACHE_SIZE), - outgoing_receipts: CellLruCache::new(CACHE_SIZE), - incoming_receipts: CellLruCache::new(CACHE_SIZE), - invalid_chunks: CellLruCache::new(CACHE_SIZE), - transactions: CellLruCache::new(CHUNK_CACHE_SIZE), - receipts: CellLruCache::new(CHUNK_CACHE_SIZE), - block_merkle_tree: CellLruCache::new(CACHE_SIZE), - block_ordinal_to_hash: CellLruCache::new(CACHE_SIZE), - processed_block_heights: CellLruCache::new(CACHE_SIZE), - save_trie_changes, transaction_validity_period, } } @@ -516,6 +444,7 @@ impl ChainStore { pub fn iterate_state_sync_infos(&self) -> Result, Error> { self.store + .store() .iter(DBCol::StateDlInfos) .map(|item| match item { Ok((k, v)) => Ok(( @@ -767,6 +696,7 @@ impl ChainStore { id: &CryptoHash, ) -> Result, Error> { self.store + .store() .iter_prefix_ser::( DBCol::TransactionResultForBlock, id.as_ref(), @@ -786,29 +716,6 @@ impl ChainStore { .collect() } - pub fn get_outcome_by_id_and_block_hash( - &self, - id: &CryptoHash, - block_hash: &CryptoHash, - ) -> Result, Error> { - Ok(self.store.get_ser( - DBCol::TransactionResultForBlock, - &get_outcome_id_block_hash(id, block_hash), - )?) - } - - /// Returns a vector of Outcome ids for given block and shard id - pub fn get_outcomes_by_block_hash_and_shard_id( - &self, - block_hash: &CryptoHash, - shard_id: ShardId, - ) -> Result, Error> { - Ok(self - .store - .get_ser(DBCol::OutcomeIds, &get_block_shard_id(block_hash, shard_id))? - .unwrap_or_default()) - } - /// Get all execution outcomes generated when the chunk are applied pub fn get_block_execution_outcomes( &self, @@ -841,60 +748,13 @@ impl ChainStore { Ok(res) } - /// Returns a hashmap of epoch id -> set of all blocks got for current (height, epoch_id) - pub fn get_all_block_hashes_by_height( - &self, - height: BlockHeight, - ) -> Result>>, Error> { - Ok(self - .read_with_cache( - DBCol::BlockPerHeight, - &self.block_hash_per_height, - &index_to_bytes(height), - )? - .unwrap_or_default()) - } - - /// Returns a HashSet of Chunk Hashes for current Height - pub fn get_all_chunk_hashes_by_height( - &self, - height: BlockHeight, - ) -> Result, Error> { - Ok(self - .store - .get_ser(DBCol::ChunkHashesByHeight, &index_to_bytes(height))? - .unwrap_or_default()) - } - - /// Returns a HashSet of Header Hashes for current Height - pub fn get_all_header_hashes_by_height( - &self, - height: BlockHeight, - ) -> Result, Error> { - Ok(self - .store - .get_ser(DBCol::HeaderHashesByHeight, &index_to_bytes(height))? - .unwrap_or_default()) - } - - pub fn get_state_header( - &self, - shard_id: ShardId, - block_hash: CryptoHash, - ) -> Result { - let key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; - match self.store.get_ser(DBCol::StateHeaders, &key) { - Ok(Some(header)) => Ok(header), - _ => Err(Error::Other("Cannot get shard_state_header".into())), - } - } - /// Returns latest known height and time it was seen. + /// TODO(store): What is this doing here? Cleanup pub fn get_latest_known(&self) -> Result { self.latest_known .get_or_try_init(|| { option_to_not_found( - self.store.get_ser(DBCol::BlockMisc, LATEST_KNOWN_KEY), + self.store.store().get_ser(DBCol::BlockMisc, LATEST_KNOWN_KEY), "LATEST_KNOWN_KEY", ) }) @@ -902,8 +762,9 @@ impl ChainStore { } /// Save the latest known. + /// TODO(store): What is this doing here? Cleanup pub fn save_latest_known(&mut self, latest_known: LatestKnown) -> Result<(), Error> { - let mut store_update = self.store.store_update(); + let mut store_update = self.store.store().store_update(); store_update.set_ser(DBCol::BlockMisc, LATEST_KNOWN_KEY, &latest_known)?; self.latest_known = once_cell::unsync::OnceCell::from(latest_known); store_update.commit().map_err(|err| err.into()) @@ -932,7 +793,8 @@ impl ChainStore { let storage_key = KeyForStateChanges::for_block(block_hash); - let mut block_changes = storage_key.find_iter(&self.store); + let store = self.store.store(); + let mut block_changes = storage_key.find_iter(&store); Ok(StateChangesKinds::from_changes(&mut block_changes)?) } @@ -943,7 +805,8 @@ impl ChainStore { ) -> Result { let storage_key = KeyForStateChanges::for_block(block_hash); - let mut block_changes = storage_key.find_iter(&self.store); + let store = self.store.store(); + let mut block_changes = storage_key.find_iter(&store); Ok(StateChanges::from_changes(&mut block_changes)?) } @@ -980,13 +843,14 @@ impl ChainStore { // left working with a key that was used in the trie. // 2.2. Parse the trie key with a relevant KeyFor* implementation to ensure consistency + let store = self.store.store(); Ok(match state_changes_request { StateChangesRequest::AccountChanges { account_ids } => { let mut changes = StateChanges::new(); for account_id in account_ids { let data_key = TrieKey::Account { account_id: account_id.clone() }; let storage_key = KeyForStateChanges::from_trie_key(block_hash, &data_key); - let changes_per_key = storage_key.find_exact_iter(&self.store); + let changes_per_key = storage_key.find_exact_iter(&store); changes.extend(StateChanges::from_account_changes(changes_per_key)?); } changes @@ -999,7 +863,7 @@ impl ChainStore { public_key: key.public_key.clone(), }; let storage_key = KeyForStateChanges::from_trie_key(block_hash, &data_key); - let changes_per_key = storage_key.find_exact_iter(&self.store); + let changes_per_key = storage_key.find_exact_iter(&store); changes.extend(StateChanges::from_access_key_changes(changes_per_key)?); } changes @@ -1009,7 +873,7 @@ impl ChainStore { for account_id in account_ids { let data_key = trie_key_parsers::get_raw_prefix_for_access_keys(account_id); let storage_key = KeyForStateChanges::from_raw_key(block_hash, &data_key); - let changes_per_key_prefix = storage_key.find_iter(&self.store); + let changes_per_key_prefix = storage_key.find_iter(&store); changes.extend(StateChanges::from_access_key_changes(changes_per_key_prefix)?); } changes @@ -1019,7 +883,7 @@ impl ChainStore { for account_id in account_ids { let data_key = TrieKey::ContractCode { account_id: account_id.clone() }; let storage_key = KeyForStateChanges::from_trie_key(block_hash, &data_key); - let changes_per_key = storage_key.find_exact_iter(&self.store); + let changes_per_key = storage_key.find_exact_iter(&store); changes.extend(StateChanges::from_contract_code_changes(changes_per_key)?); } changes @@ -1032,7 +896,7 @@ impl ChainStore { key_prefix.as_ref(), ); let storage_key = KeyForStateChanges::from_raw_key(block_hash, &data_key); - let changes_per_key_prefix = storage_key.find_iter(&self.store); + let changes_per_key_prefix = storage_key.find_iter(&store); changes.extend(StateChanges::from_data_changes(changes_per_key_prefix)?); } changes @@ -1041,23 +905,7 @@ impl ChainStore { } pub fn get_store_statistics(&self) -> Option { - self.store.get_store_statistics() - } - - fn read_with_cache<'a, T: BorshDeserialize + Clone + 'a>( - &self, - col: DBCol, - cache: &'a CellLruCache, T>, - key: &[u8], - ) -> io::Result> { - if let Some(value) = cache.get(key) { - return Ok(Some(value)); - } - if let Some(result) = self.store.get_ser::(col, key)? { - cache.put(key.to_vec(), result.clone()); - return Ok(Some(result)); - } - Ok(None) + self.store.store().get_store_statistics() } /// Constructs key 'STATE_SYNC_DUMP:', @@ -1070,12 +918,27 @@ impl ChainStore { key } - /// For each value stored, this returs an (EpochId, bool), where the bool tells whether it's finished + /// Retrieves STATE_SYNC_DUMP for the given shard. + /// TODO(store): What is this doing here? Cleanup + pub fn get_state_sync_dump_progress( + &self, + shard_id: ShardId, + ) -> Result { + option_to_not_found( + self.store + .store() + .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), + format!("STATE_SYNC_DUMP:{}", shard_id), + ) + } + + /// For each value stored, this returns an (EpochId, bool), where the bool tells whether it's finished /// because those are the only fields we really care about. pub fn iter_state_sync_dump_progress<'a>( &'a self, ) -> impl Iterator> + 'a { self.store + .store_ref() .iter_prefix_ser::(DBCol::BlockMisc, STATE_SYNC_DUMP_KEY) .map(|item| { item.and_then(|(key, progress)| { @@ -1102,12 +965,13 @@ impl ChainStore { } /// Updates STATE_SYNC_DUMP for the given shard. + /// TODO(store): What is this doing here? Cleanup pub fn set_state_sync_dump_progress( &self, shard_id: ShardId, value: Option, ) -> Result<(), Error> { - let mut store_update = self.store.store_update(); + let mut store_update = self.store.store().store_update(); let key = ChainStore::state_sync_dump_progress_key(shard_id); match value { None => store_update.delete(DBCol::BlockMisc, &key), @@ -1122,118 +986,80 @@ impl ChainStoreAccess for ChainStore { &self } - fn store(&self) -> &Store { - &self.store + fn store(&self) -> Store { + self.store.store() } /// The chain head. fn head(&self) -> Result { - if let Some(ref tip) = self.head { - Ok(tip.clone()) - } else { - option_to_not_found(self.store.get_ser(DBCol::BlockMisc, HEAD_KEY), "HEAD") - } + ChainStoreAdapter::head(self) } /// The chain Blocks Tail height, used by GC. fn tail(&self) -> Result { - if let Some(tail) = self.tail.as_ref() { - Ok(*tail) - } else { - self.store - .get_ser(DBCol::BlockMisc, TAIL_KEY) - .map(|option| option.unwrap_or(self.genesis_height)) - .map_err(|e| e.into()) - } + ChainStoreAdapter::tail(self) } /// The chain Chunks Tail height, used by GC. fn chunk_tail(&self) -> Result { - self.store - .get_ser(DBCol::BlockMisc, CHUNK_TAIL_KEY) - .map(|option| option.unwrap_or(self.genesis_height)) - .map_err(|e| e.into()) + ChainStoreAdapter::chunk_tail(self) } fn fork_tail(&self) -> Result { - self.store - .get_ser(DBCol::BlockMisc, FORK_TAIL_KEY) - .map(|option| option.unwrap_or(self.genesis_height)) - .map_err(|e| e.into()) + ChainStoreAdapter::fork_tail(self) } /// Header of the block at the head of the block chain (not the same thing as header_head). fn head_header(&self) -> Result { - self.get_block_header(&self.head()?.last_block_hash) + ChainStoreAdapter::head_header(self) } /// Largest height for which we created a doomslug endorsement fn largest_target_height(&self) -> Result { - match self.store.get_ser(DBCol::BlockMisc, LARGEST_TARGET_HEIGHT_KEY) { - Ok(Some(o)) => Ok(o), - Ok(None) => Ok(0), - Err(e) => Err(e.into()), - } + ChainStoreAdapter::largest_target_height(self) } /// Head of the header chain (not the same thing as head_header). fn header_head(&self) -> Result { - option_to_not_found(self.store.get_ser(DBCol::BlockMisc, HEADER_HEAD_KEY), "HEADER_HEAD") + ChainStoreAdapter::header_head(self) } /// Final head of the chain. fn final_head(&self) -> Result { - option_to_not_found(self.store.get_ser(DBCol::BlockMisc, FINAL_HEAD_KEY), "FINAL HEAD") + ChainStoreAdapter::final_head(self) } /// Get full block. fn get_block(&self, h: &CryptoHash) -> Result { - option_to_not_found( - self.read_with_cache(DBCol::Block, &self.blocks, h.as_ref()), - format_args!("BLOCK: {}", h), - ) + ChainStoreAdapter::get_block(self, h) } /// Get full chunk. fn get_chunk(&self, chunk_hash: &ChunkHash) -> Result, Error> { - match self.read_with_cache(DBCol::Chunks, &self.chunks, chunk_hash.as_ref()) { - Ok(Some(shard_chunk)) => Ok(shard_chunk), - _ => Err(Error::ChunkMissing(chunk_hash.clone())), - } + ChainStoreAdapter::get_chunk(self, chunk_hash) } /// Get partial chunk. fn get_partial_chunk(&self, chunk_hash: &ChunkHash) -> Result, Error> { - match self.read_with_cache(DBCol::PartialChunks, &self.partial_chunks, chunk_hash.as_ref()) - { - Ok(Some(shard_chunk)) => Ok(shard_chunk), - _ => Err(Error::ChunkMissing(chunk_hash.clone())), - } + ChainStoreAdapter::get_partial_chunk(self, chunk_hash) } /// Does this full block exist? fn block_exists(&self, h: &CryptoHash) -> Result { - self.store.exists(DBCol::Block, h.as_ref()).map_err(|e| e.into()) + ChainStoreAdapter::block_exists(self, h) } fn chunk_exists(&self, h: &ChunkHash) -> Result { - if self.chunks.get(h.as_ref()).is_some() { - Ok(true) - } else { - self.store.exists(DBCol::Chunks, h.as_ref()).map_err(|e| e.into()) - } + ChainStoreAdapter::chunk_exists(self, h) } /// Get previous header. fn get_previous_header(&self, header: &BlockHeader) -> Result { - self.get_block_header(header.prev_hash()) + ChainStoreAdapter::get_previous_header(self, header) } /// Information from applying block. fn get_block_extra(&self, block_hash: &CryptoHash) -> Result, Error> { - option_to_not_found( - self.read_with_cache(DBCol::BlockExtra, &self.block_extras, block_hash.as_ref()), - format_args!("BLOCK EXTRA: {}", block_hash), - ) + ChainStoreAdapter::get_block_extra(self, block_hash) } /// Information from applying chunk. @@ -1242,67 +1068,32 @@ impl ChainStoreAccess for ChainStore { block_hash: &CryptoHash, shard_uid: &ShardUId, ) -> Result, Error> { - option_to_not_found( - self.read_with_cache( - DBCol::ChunkExtra, - &self.chunk_extras, - &get_block_shard_uid(block_hash, shard_uid), - ), - format_args!("CHUNK EXTRA: {}:{:?}", block_hash, shard_uid), - ) + ChainStoreAdapter::get_chunk_extra(self, block_hash, shard_uid) } /// Get block header. fn get_block_header(&self, h: &CryptoHash) -> Result { - option_to_not_found( - self.read_with_cache(DBCol::BlockHeader, &self.headers, h.as_ref()), - format_args!("BLOCK HEADER: {}", h), - ) + ChainStoreAdapter::get_block_header(self, h) } /// Returns hash of the block on the main chain for given height. fn get_block_hash_by_height(&self, height: BlockHeight) -> Result { - option_to_not_found( - self.store.get_ser(DBCol::BlockHeight, &index_to_bytes(height)), - format_args!("BLOCK HEIGHT: {}", height), - ) - // TODO: cache needs to be deleted when things get updated. - // option_to_not_found( - // self.read_with_cache( - // DBCol::BlockHeight, - // &mut self.height, - // &index_to_bytes(height), - // ), - // format_args!("BLOCK HEIGHT: {}", height), - // ) + ChainStoreAdapter::get_block_hash_by_height(self, height) } fn get_next_block_hash(&self, hash: &CryptoHash) -> Result { - option_to_not_found( - self.read_with_cache(DBCol::NextBlockHashes, &self.next_block_hashes, hash.as_ref()), - format_args!("NEXT BLOCK HASH: {}", hash), - ) + ChainStoreAdapter::get_next_block_hash(self, hash) } fn get_epoch_light_client_block( &self, hash: &CryptoHash, ) -> Result, Error> { - option_to_not_found( - self.read_with_cache( - DBCol::EpochLightClientBlocks, - &self.epoch_light_client_blocks, - hash.as_ref(), - ), - format_args!("EPOCH LIGHT CLIENT BLOCK: {}", hash), - ) + ChainStoreAdapter::get_epoch_light_client_block(self, hash) } fn get_block_refcount(&self, block_hash: &CryptoHash) -> Result { - option_to_not_found( - self.read_with_cache(DBCol::BlockRefCount, &self.block_refcounts, block_hash.as_ref()), - format_args!("BLOCK REFCOUNT: {}", block_hash), - ) + ChainStoreAdapter::get_block_refcount(self, block_hash) } /// Get outgoing receipts *generated* from shard `shard_id` in block `prev_hash` @@ -1312,14 +1103,7 @@ impl ChainStoreAccess for ChainStore { prev_block_hash: &CryptoHash, shard_id: ShardId, ) -> Result>, Error> { - option_to_not_found( - self.read_with_cache( - DBCol::OutgoingReceipts, - &self.outgoing_receipts, - &get_block_shard_id(prev_block_hash, shard_id), - ), - format_args!("OUTGOING RECEIPT: {} {}", prev_block_hash, shard_id), - ) + ChainStoreAdapter::get_outgoing_receipts(self, prev_block_hash, shard_id) } fn get_incoming_receipts( @@ -1327,86 +1111,56 @@ impl ChainStoreAccess for ChainStore { block_hash: &CryptoHash, shard_id: ShardId, ) -> Result>, Error> { - option_to_not_found( - self.read_with_cache( - DBCol::IncomingReceipts, - &self.incoming_receipts, - &get_block_shard_id(block_hash, shard_id), - ), - format_args!("INCOMING RECEIPT: {} {}", block_hash, shard_id), - ) + ChainStoreAdapter::get_incoming_receipts(self, block_hash, shard_id) } fn get_blocks_to_catchup(&self, hash: &CryptoHash) -> Result, Error> { - Ok(self.store.get_ser(DBCol::BlocksToCatchup, hash.as_ref())?.unwrap_or_default()) + ChainStoreAdapter::get_blocks_to_catchup(self, hash) } fn is_block_challenged(&self, hash: &CryptoHash) -> Result { - Ok(self.store.get_ser(DBCol::ChallengedBlocks, hash.as_ref())?.unwrap_or_default()) + ChainStoreAdapter::is_block_challenged(self, hash) } fn is_invalid_chunk( &self, chunk_hash: &ChunkHash, ) -> Result>, Error> { - self.read_with_cache(DBCol::InvalidChunks, &self.invalid_chunks, chunk_hash.as_ref()) - .map_err(|err| err.into()) + ChainStoreAdapter::is_invalid_chunk(self, chunk_hash) } fn get_transaction( &self, tx_hash: &CryptoHash, ) -> Result>, Error> { - self.read_with_cache(DBCol::Transactions, &self.transactions, tx_hash.as_ref()) - .map_err(|e| e.into()) + ChainStoreAdapter::get_transaction(self, tx_hash) } fn get_receipt(&self, receipt_id: &CryptoHash) -> Result>, Error> { - self.read_with_cache(DBCol::Receipts, &self.receipts, receipt_id.as_ref()) - .map_err(|e| e.into()) + ChainStoreAdapter::get_receipt(self, receipt_id) } fn get_genesis_height(&self) -> BlockHeight { - self.genesis_height + ChainStoreAdapter::get_genesis_height(self) } fn get_block_merkle_tree( &self, block_hash: &CryptoHash, ) -> Result, Error> { - option_to_not_found( - self.read_with_cache( - DBCol::BlockMerkleTree, - &self.block_merkle_tree, - block_hash.as_ref(), - ), - format_args!("BLOCK MERKLE TREE: {}", block_hash), - ) + ChainStoreAdapter::get_block_merkle_tree(self, block_hash) } fn get_block_hash_from_ordinal(&self, block_ordinal: NumBlocks) -> Result { - option_to_not_found( - self.read_with_cache( - DBCol::BlockOrdinal, - &self.block_ordinal_to_hash, - &index_to_bytes(block_ordinal), - ), - format_args!("BLOCK ORDINAL: {}", block_ordinal), - ) + ChainStoreAdapter::get_block_hash_from_ordinal(self, block_ordinal) } fn is_height_processed(&self, height: BlockHeight) -> Result { - self.read_with_cache( - DBCol::ProcessedBlockHeights, - &self.processed_block_heights, - &index_to_bytes(height), - ) - .map(|r| r.is_some()) - .map_err(|e| e.into()) + ChainStoreAdapter::is_height_processed(self, height) } fn get_current_epoch_sync_hash(&self, epoch_id: &EpochId) -> Result, Error> { - Ok(self.store.get_ser(DBCol::StateSyncHashes, epoch_id.as_ref())?) + ChainStoreAdapter::get_current_epoch_sync_hash(self, epoch_id) } } @@ -1492,8 +1246,8 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { &self.chain_store } - fn store(&self) -> &Store { - &self.chain_store.store + fn store(&self) -> Store { + self.chain_store.store.store() } /// The chain head. @@ -1765,7 +1519,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { } fn get_genesis_height(&self) -> BlockHeight { - self.chain_store.genesis_height + self.chain_store.get_genesis_height() } fn get_block_merkle_tree( @@ -1885,7 +1639,7 @@ impl<'a> ChainStoreUpdate<'a> { /// Update header head and height to hash index for this branch. pub fn save_header_head_if_not_challenged(&mut self, t: &Tip) -> Result<(), Error> { - if t.height > self.chain_store.genesis_height { + if t.height > self.chain_store.get_genesis_height() { self.update_height_if_not_challenged(t.height, t.prev_block_hash)?; } self.try_save_latest_known(t.height)?; @@ -2308,8 +2062,8 @@ impl<'a> ChainStoreUpdate<'a> { // other information not directly related to this block chain_store_update.chain_store_cache_update.height_to_hashes.insert( - source_store.genesis_height, - Some(source_store.get_block_hash_by_height(source_store.genesis_height)?), + source_store.get_genesis_height(), + Some(source_store.get_block_hash_by_height(source_store.get_genesis_height())?), ); Ok(chain_store_update) } @@ -2557,7 +2311,7 @@ impl<'a> ChainStoreUpdate<'a> { wrapped_trie_changes.deletions_into(&mut deletions_store_update); wrapped_trie_changes.state_changes_into(&mut store_update.trie_store_update()); - if self.chain_store.save_trie_changes { + if self.chain_store.save_trie_changes() { wrapped_trie_changes.trie_changes_into(&mut store_update.trie_store_update()); } } @@ -2668,102 +2422,6 @@ impl<'a> ChainStoreUpdate<'a> { pub fn commit(mut self) -> Result<(), Error> { let store_update = self.finalize()?; store_update.commit()?; - let ChainStoreCacheUpdate { - blocks, - headers, - block_extras, - chunk_extras, - chunks, - partial_chunks, - block_hash_per_height, - height_to_hashes, - next_block_hashes, - epoch_light_client_blocks, - outgoing_receipts, - incoming_receipts, - invalid_chunks, - transactions, - receipts, - block_refcounts, - block_merkle_tree, - block_ordinal_to_hash, - processed_block_heights, - - outcomes: _, - outcome_ids: _, - } = self.chain_store_cache_update; - for (hash, block) in blocks { - self.chain_store.blocks.put(hash.into(), block); - } - for (hash, header) in headers { - self.chain_store.headers.put(hash.into(), header); - } - for (hash, block_extra) in block_extras { - self.chain_store.block_extras.put(hash.into(), block_extra); - } - for ((block_hash, shard_uid), chunk_extra) in chunk_extras { - let key = get_block_shard_uid(&block_hash, &shard_uid); - self.chain_store.chunk_extras.put(key, chunk_extra); - } - for (hash, chunk) in chunks { - self.chain_store.chunks.put(hash.into(), chunk); - } - for (hash, partial_chunk) in partial_chunks { - self.chain_store.partial_chunks.put(hash.into(), partial_chunk); - } - for (height, epoch_id_to_hash) in block_hash_per_height { - self.chain_store - .block_hash_per_height - .put(index_to_bytes(height).to_vec(), Arc::new(epoch_id_to_hash)); - } - for (height, block_hash) in height_to_hashes { - let bytes = index_to_bytes(height); - if let Some(hash) = block_hash { - self.chain_store.height.put(bytes.to_vec(), hash); - } else { - self.chain_store.height.pop(&bytes.to_vec()); - } - } - for (block_hash, next_hash) in next_block_hashes { - self.chain_store.next_block_hashes.put(block_hash.into(), next_hash); - } - for (epoch_hash, light_client_block) in epoch_light_client_blocks { - self.chain_store.epoch_light_client_blocks.put(epoch_hash.into(), light_client_block); - } - for ((block_hash, shard_id), shard_outgoing_receipts) in outgoing_receipts { - let key = get_block_shard_id(&block_hash, shard_id); - self.chain_store.outgoing_receipts.put(key, shard_outgoing_receipts); - } - for ((block_hash, shard_id), shard_incoming_receipts) in incoming_receipts { - let key = get_block_shard_id(&block_hash, shard_id); - self.chain_store.incoming_receipts.put(key, shard_incoming_receipts); - } - for (hash, invalid_chunk) in invalid_chunks { - self.chain_store.invalid_chunks.put(hash.into(), invalid_chunk); - } - for (hash, transaction) in transactions { - self.chain_store.transactions.put(hash.into(), transaction); - } - for (receipt_id, receipt) in receipts { - self.chain_store.receipts.put(receipt_id.into(), receipt); - } - for (block_hash, refcount) in block_refcounts { - self.chain_store.block_refcounts.put(block_hash.into(), refcount); - } - for (block_hash, merkle_tree) in block_merkle_tree { - self.chain_store.block_merkle_tree.put(block_hash.into(), merkle_tree); - } - for (block_ordinal, block_hash) in block_ordinal_to_hash { - self.chain_store - .block_ordinal_to_hash - .put(index_to_bytes(block_ordinal).to_vec(), block_hash); - } - for block_height in processed_block_heights { - self.chain_store.processed_block_heights.put(index_to_bytes(block_height).to_vec(), ()); - } - self.chain_store.head = self.head; - self.chain_store.tail = self.tail; - Ok(()) } } @@ -2775,11 +2433,8 @@ mod tests { use crate::test_utils::get_chain; use near_primitives::errors::InvalidTxError; - use near_primitives::hash::hash; use near_primitives::test_utils::create_test_signer; use near_primitives::test_utils::TestBlockBuilder; - use near_primitives::types::EpochId; - use near_primitives::utils::index_to_bytes; #[test] fn test_tx_validity_long_fork() { @@ -2915,42 +2570,4 @@ mod tests { Err(InvalidTxError::Expired) ); } - - #[test] - fn test_cache_invalidation() { - let mut chain = get_chain(Clock::real()); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let block1 = TestBlockBuilder::new(Clock::real(), &genesis, signer.clone()).build(); - let mut block2 = block1.clone(); - block2.mut_header().set_epoch_id(EpochId(hash(&[1, 2, 3]))); - block2.mut_header().resign(&*signer); - - let mut store_update = chain.mut_chain_store().store_update(); - store_update.chain_store_cache_update.height_to_hashes.insert(1, Some(hash(&[1]))); - store_update - .chain_store_cache_update - .blocks - .insert(*block1.header().hash(), block1.clone()); - store_update.commit().unwrap(); - - let block_hash = chain.mut_chain_store().height.get(&index_to_bytes(1).to_vec()); - let epoch_id_to_hash = - chain.mut_chain_store().block_hash_per_height.get(&index_to_bytes(1).to_vec()); - - let mut store_update = chain.mut_chain_store().store_update(); - store_update.chain_store_cache_update.height_to_hashes.insert(1, Some(hash(&[2]))); - store_update - .chain_store_cache_update - .blocks - .insert(*block2.header().hash(), block2.clone()); - store_update.commit().unwrap(); - - let block_hash1 = chain.mut_chain_store().height.get(&index_to_bytes(1).to_vec()); - let epoch_id_to_hash1 = - chain.mut_chain_store().block_hash_per_height.get(&index_to_bytes(1).to_vec()); - - assert_ne!(block_hash, block_hash1); - assert_ne!(epoch_id_to_hash, epoch_id_to_hash1); - } } diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index d82e9ba549d..cb2fb9daf4c 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -190,7 +190,7 @@ pub fn display_chain(me: &Option, chain: &mut Chain, tail: bool) { head.last_block_hash ); let mut headers = vec![]; - for (key, _) in chain_store.store().clone().iter(DBCol::BlockHeader).map(Result::unwrap) { + for (key, _) in chain_store.store().iter(DBCol::BlockHeader).map(Result::unwrap) { let header = chain_store .get_block_header(&CryptoHash::try_from(key.as_ref()).unwrap()) .unwrap() diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index e93b70e79cc..1963affd0e1 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -291,7 +291,7 @@ impl Client { chain.genesis().clone(), async_computation_spawner.clone(), config.epoch_sync.clone(), - chain.chain_store.store(), + &chain.chain_store.store(), ); let header_sync = HeaderSync::new( clock.clone(), @@ -339,10 +339,8 @@ impl Client { config.max_block_wait_delay, doomslug_threshold_mode, ); - let chunk_endorsement_tracker = ChunkEndorsementTracker::new( - epoch_manager.clone(), - chain.chain_store().store().clone(), - ); + let chunk_endorsement_tracker = + ChunkEndorsementTracker::new(epoch_manager.clone(), chain.chain_store().store()); let chunk_validator = ChunkValidator::new( epoch_manager.clone(), network_adapter.clone().into_sender(), diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index b90cf04e5e3..75c1904dd97 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -492,7 +492,7 @@ impl Handler for ClientActorInner { self.client.epoch_manager.clone(), self.client.shard_tracker.clone(), self.client.runtime_adapter.clone(), - self.client.chain.chain_store().store().clone(), + self.client.chain.chain_store().store(), self.adv.is_archival(), ); store_validator.set_timeout(timeout); diff --git a/chain/client/src/sync/epoch.rs b/chain/client/src/sync/epoch.rs index f66670f7b1b..cb6e5a41f32 100644 --- a/chain/client/src/sync/epoch.rs +++ b/chain/client/src/sync/epoch.rs @@ -990,7 +990,7 @@ impl Handler for ClientActorInner { // Temporary killswitch for the rare case there were issues with this network request. return; } - let store = self.client.chain.chain_store.store().clone(); + let store = self.client.chain.chain_store.store(); let network_adapter = self.client.network_adapter.clone(); let requester_peer_id = msg.from_peer; let cache = self.client.epoch_sync.last_epoch_sync_response_cache.clone(); diff --git a/core/store/Cargo.toml b/core/store/Cargo.toml index 971ff0fe733..369360eeab2 100644 --- a/core/store/Cargo.toml +++ b/core/store/Cargo.toml @@ -43,6 +43,7 @@ tracing.workspace = true near-time.workspace = true near-chain-configs = { workspace = true, features = ["metrics"] } +near-chain-primitives.workspace = true near-crypto.workspace = true near-fmt.workspace = true near-o11y.workspace = true diff --git a/core/store/src/adapter/chain_store.rs b/core/store/src/adapter/chain_store.rs new file mode 100644 index 00000000000..a5c25f40bc4 --- /dev/null +++ b/core/store/src/adapter/chain_store.rs @@ -0,0 +1,409 @@ +use std::collections::{HashMap, HashSet}; +use std::io; +use std::sync::Arc; + +use near_chain_primitives::Error; +use near_primitives::block::{Block, BlockHeader, Tip}; +use near_primitives::hash::CryptoHash; +use near_primitives::merkle::PartialMerkleTree; +use near_primitives::receipt::Receipt; +use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; +use near_primitives::sharding::{ + ChunkHash, EncodedShardChunk, PartialEncodedChunk, ReceiptProof, ShardChunk, +}; +use near_primitives::state_sync::{ShardStateSyncResponseHeader, StateHeaderKey}; +use near_primitives::transaction::{ExecutionOutcomeWithProof, SignedTransaction}; +use near_primitives::types::chunk_extra::ChunkExtra; +use near_primitives::types::{BlockExtra, BlockHeight, EpochId, NumBlocks, ShardId}; +use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes}; +use near_primitives::views::LightClientBlockView; + +use crate::{ + DBCol, Store, CHUNK_TAIL_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, + LARGEST_TARGET_HEIGHT_KEY, TAIL_KEY, +}; + +use super::StoreAdapter; + +/// TODO: having genesis_height and save_trie_changes don't make sense here +#[derive(Clone)] +pub struct ChainStoreAdapter { + store: Store, + /// Genesis block height. + genesis_height: BlockHeight, + /// save_trie_changes should be set to true iff + /// - archive is false - non-archival nodes need trie changes to perform garbage collection + /// - archive is true, cold_store is configured and migration to split_storage is finished - node + /// working in split storage mode needs trie changes in order to do garbage collection on hot. + save_trie_changes: bool, +} + +impl StoreAdapter for ChainStoreAdapter { + fn store_ref(&self) -> &Store { + &self.store + } +} + +impl ChainStoreAdapter { + pub fn new(store: Store, genesis_height: BlockHeight, save_trie_changes: bool) -> Self { + Self { store, genesis_height, save_trie_changes } + } + + /// The chain head. + pub fn head(&self) -> Result { + option_to_not_found(self.store.get_ser(DBCol::BlockMisc, HEAD_KEY), "HEAD") + } + + /// The chain Blocks Tail height. + pub fn tail(&self) -> Result { + self.store + .get_ser(DBCol::BlockMisc, TAIL_KEY) + .map(|option| option.unwrap_or(self.genesis_height)) + .map_err(|e| e.into()) + } + + /// The chain Chunks Tail height. + pub fn chunk_tail(&self) -> Result { + self.store + .get_ser(DBCol::BlockMisc, CHUNK_TAIL_KEY) + .map(|option| option.unwrap_or(self.genesis_height)) + .map_err(|e| e.into()) + } + + /// Tail height of the fork cleaning process. + pub fn fork_tail(&self) -> Result { + self.store + .get_ser(DBCol::BlockMisc, FORK_TAIL_KEY) + .map(|option| option.unwrap_or(self.genesis_height)) + .map_err(|e| e.into()) + } + + /// Head of the header chain (not the same thing as head_header). + pub fn header_head(&self) -> Result { + option_to_not_found(self.store.get_ser(DBCol::BlockMisc, HEADER_HEAD_KEY), "HEADER_HEAD") + } + + /// Header of the block at the head of the block chain (not the same thing as header_head). + pub fn head_header(&self) -> Result { + let last_block_hash = self.head()?.last_block_hash; + option_to_not_found( + self.store.get_ser(DBCol::BlockHeader, last_block_hash.as_ref()), + format_args!("BLOCK HEADER: {}", last_block_hash), + ) + } + + /// The chain final head. It is guaranteed to be monotonically increasing. + pub fn final_head(&self) -> Result { + option_to_not_found(self.store.get_ser(DBCol::BlockMisc, FINAL_HEAD_KEY), "FINAL HEAD") + } + + /// Largest approval target height sent by us + pub fn largest_target_height(&self) -> Result { + match self.store.get_ser(DBCol::BlockMisc, LARGEST_TARGET_HEIGHT_KEY) { + Ok(Some(o)) => Ok(o), + Ok(None) => Ok(0), + Err(e) => Err(e.into()), + } + } + + /// Get full block. + pub fn get_block(&self, block_hash: &CryptoHash) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::Block, block_hash.as_ref()), + format_args!("BLOCK: {}", block_hash), + ) + } + + /// Returns a number of references for Block with `block_hash` + pub fn get_block_refcount(&self, block_hash: &CryptoHash) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::BlockRefCount, block_hash.as_ref()), + format_args!("BLOCK REFCOUNT: {}", block_hash), + ) + } + + /// Does this full block exist? + pub fn block_exists(&self, h: &CryptoHash) -> Result { + self.store.exists(DBCol::Block, h.as_ref()).map_err(|e| e.into()) + } + + /// Get block header. + pub fn get_block_header(&self, h: &CryptoHash) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::BlockHeader, h.as_ref()), + format_args!("BLOCK HEADER: {}", h), + ) + } + + /// Get block height. + pub fn get_block_height(&self, hash: &CryptoHash) -> Result { + if hash == &CryptoHash::default() { + Ok(self.genesis_height) + } else { + Ok(self.get_block_header(hash)?.height()) + } + } + + /// Get previous header. + pub fn get_previous_header(&self, header: &BlockHeader) -> Result { + self.get_block_header(header.prev_hash()) + } + + /// Returns hash of the block on the main chain for given height. + pub fn get_block_hash_by_height(&self, height: BlockHeight) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::BlockHeight, &index_to_bytes(height)), + format_args!("BLOCK HEIGHT: {}", height), + ) + } + + /// Returns a hashmap of epoch id -> set of all blocks got for current (height, epoch_id) + pub fn get_all_block_hashes_by_height( + &self, + height: BlockHeight, + ) -> Result>>, Error> { + Ok(self.store.get_ser(DBCol::BlockPerHeight, &index_to_bytes(height))?.unwrap_or_default()) + } + + /// Returns a HashSet of Header Hashes for current Height + pub fn get_all_header_hashes_by_height( + &self, + height: BlockHeight, + ) -> Result, Error> { + Ok(self + .store + .get_ser(DBCol::HeaderHashesByHeight, &index_to_bytes(height))? + .unwrap_or_default()) + } + + /// Returns a HashSet of Chunk Hashes for current Height + pub fn get_all_chunk_hashes_by_height( + &self, + height: BlockHeight, + ) -> Result, Error> { + Ok(self + .store + .get_ser(DBCol::ChunkHashesByHeight, &index_to_bytes(height))? + .unwrap_or_default()) + } + + /// Returns block header from the current chain for given height if present. + pub fn get_block_header_by_height(&self, height: BlockHeight) -> Result { + let hash = self.get_block_hash_by_height(height)?; + self.get_block_header(&hash) + } + + pub fn get_next_block_hash(&self, hash: &CryptoHash) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::NextBlockHashes, hash.as_ref()), + format_args!("NEXT BLOCK HASH: {}", hash), + ) + } + + /// Information from applying block. + pub fn get_block_extra(&self, block_hash: &CryptoHash) -> Result, Error> { + option_to_not_found( + self.store.get_ser(DBCol::BlockExtra, block_hash.as_ref()), + format_args!("BLOCK EXTRA: {}", block_hash), + ) + } + + /// Get full chunk. + pub fn get_chunk(&self, chunk_hash: &ChunkHash) -> Result, Error> { + match self.store.get_ser(DBCol::Chunks, chunk_hash.as_ref()) { + Ok(Some(shard_chunk)) => Ok(shard_chunk), + _ => Err(Error::ChunkMissing(chunk_hash.clone())), + } + } + + /// Get partial chunk. + pub fn get_partial_chunk( + &self, + chunk_hash: &ChunkHash, + ) -> Result, Error> { + match self.store.get_ser(DBCol::PartialChunks, chunk_hash.as_ref()) { + Ok(Some(shard_chunk)) => Ok(shard_chunk), + _ => Err(Error::ChunkMissing(chunk_hash.clone())), + } + } + + /// Does this chunk exist? + pub fn chunk_exists(&self, h: &ChunkHash) -> Result { + self.store.exists(DBCol::Chunks, h.as_ref()).map_err(|e| e.into()) + } + + /// Returns encoded chunk if it's invalid otherwise None. + pub fn is_invalid_chunk( + &self, + chunk_hash: &ChunkHash, + ) -> Result>, Error> { + self.store.get_ser(DBCol::InvalidChunks, chunk_hash.as_ref()).map_err(|err| err.into()) + } + + /// Information from applying chunk. + pub fn get_chunk_extra( + &self, + block_hash: &CryptoHash, + shard_uid: &ShardUId, + ) -> Result, Error> { + option_to_not_found( + self.store.get_ser(DBCol::ChunkExtra, &get_block_shard_uid(block_hash, shard_uid)), + format_args!("CHUNK EXTRA: {}:{:?}", block_hash, shard_uid), + ) + } + + pub fn get_outgoing_receipts( + &self, + prev_block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result>, Error> { + option_to_not_found( + self.store + .get_ser(DBCol::OutgoingReceipts, &get_block_shard_id(prev_block_hash, shard_id)), + format_args!("OUTGOING RECEIPT: {} {}", prev_block_hash, shard_id), + ) + } + + pub fn get_incoming_receipts( + &self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result>, Error> { + option_to_not_found( + self.store.get_ser(DBCol::IncomingReceipts, &get_block_shard_id(block_hash, shard_id)), + format_args!("INCOMING RECEIPT: {} {}", block_hash, shard_id), + ) + } + + /// Returns whether the block with the given hash was challenged + pub fn is_block_challenged(&self, hash: &CryptoHash) -> Result { + Ok(self.store.get_ser(DBCol::ChallengedBlocks, hash.as_ref())?.unwrap_or_default()) + } + + pub fn get_blocks_to_catchup(&self, prev_hash: &CryptoHash) -> Result, Error> { + Ok(self.store.get_ser(DBCol::BlocksToCatchup, prev_hash.as_ref())?.unwrap_or_default()) + } + + pub fn get_transaction( + &self, + tx_hash: &CryptoHash, + ) -> Result>, Error> { + self.store.get_ser(DBCol::Transactions, tx_hash.as_ref()).map_err(|e| e.into()) + } + + /// Fetch a receipt by id, if it is stored in the store. + /// + /// Note that not _all_ receipts are persisted. Some receipts are ephemeral, + /// get processed immediately after creation and don't even get to the + /// database. + pub fn get_receipt(&self, receipt_id: &CryptoHash) -> Result>, Error> { + self.store.get_ser(DBCol::Receipts, receipt_id.as_ref()).map_err(|e| e.into()) + } + + pub fn get_block_merkle_tree( + &self, + block_hash: &CryptoHash, + ) -> Result, Error> { + option_to_not_found( + self.store.get_ser(DBCol::BlockMerkleTree, block_hash.as_ref()), + format_args!("BLOCK MERKLE TREE: {}", block_hash), + ) + } + + pub fn get_block_hash_from_ordinal( + &self, + block_ordinal: NumBlocks, + ) -> Result { + option_to_not_found( + self.store.get_ser(DBCol::BlockOrdinal, &index_to_bytes(block_ordinal)), + format_args!("BLOCK ORDINAL: {}", block_ordinal), + ) + } + + pub fn get_block_merkle_tree_from_ordinal( + &self, + block_ordinal: NumBlocks, + ) -> Result, Error> { + let block_hash = self.get_block_hash_from_ordinal(block_ordinal)?; + self.get_block_merkle_tree(&block_hash) + } + + pub fn get_epoch_light_client_block( + &self, + hash: &CryptoHash, + ) -> Result, Error> { + option_to_not_found( + self.store.get_ser(DBCol::EpochLightClientBlocks, hash.as_ref()), + format_args!("EPOCH LIGHT CLIENT BLOCK: {}", hash), + ) + } + + pub fn is_height_processed(&self, height: BlockHeight) -> Result { + self.store + .get(DBCol::ProcessedBlockHeights, &index_to_bytes(height)) + .map(|r| r.is_some()) + .map_err(|e| e.into()) + } + + pub fn get_outcome_by_id_and_block_hash( + &self, + id: &CryptoHash, + block_hash: &CryptoHash, + ) -> Result, Error> { + Ok(self.store.get_ser( + DBCol::TransactionResultForBlock, + &get_outcome_id_block_hash(id, block_hash), + )?) + } + + /// Returns a vector of Outcome ids for given block and shard id + pub fn get_outcomes_by_block_hash_and_shard_id( + &self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result, Error> { + Ok(self + .store + .get_ser(DBCol::OutcomeIds, &get_block_shard_id(block_hash, shard_id))? + .unwrap_or_default()) + } + + pub fn get_state_header( + &self, + shard_id: ShardId, + block_hash: CryptoHash, + ) -> Result { + let key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; + match self.store.get_ser(DBCol::StateHeaders, &key) { + Ok(Some(header)) => Ok(header), + _ => Err(Error::Other("Cannot get shard_state_header".into())), + } + } + + pub fn get_current_epoch_sync_hash( + &self, + epoch_id: &EpochId, + ) -> Result, Error> { + Ok(self.store.get_ser(DBCol::StateSyncHashes, epoch_id.as_ref())?) + } + + /// Get height of genesis + pub fn get_genesis_height(&self) -> BlockHeight { + self.genesis_height + } + + pub fn save_trie_changes(&self) -> bool { + self.save_trie_changes + } +} + +fn option_to_not_found(res: io::Result>, field_name: F) -> Result +where + F: std::string::ToString, +{ + match res { + Ok(Some(o)) => Ok(o), + Ok(None) => Err(Error::DBNotFoundErr(field_name.to_string())), + Err(e) => Err(e.into()), + } +} diff --git a/core/store/src/adapter/mod.rs b/core/store/src/adapter/mod.rs index fe975dc11c9..214fd92e594 100644 --- a/core/store/src/adapter/mod.rs +++ b/core/store/src/adapter/mod.rs @@ -1,9 +1,12 @@ +pub mod chain_store; pub mod chunk_store; pub mod flat_store; pub mod trie_store; use std::ops::{Deref, DerefMut}; +use near_primitives::types::BlockHeight; + use crate::{Store, StoreUpdate}; /// Internal enum that can store either an owned StoreUpdate to a reference to StoreUpdate. @@ -91,16 +94,24 @@ pub trait StoreAdapter { self.store_ref().clone() } - fn trie_store(&self) -> trie_store::TrieStoreAdapter { - trie_store::TrieStoreAdapter::new(self.store()) + fn chain_store( + &self, + genesis_height: BlockHeight, + save_trie_changes: bool, + ) -> chain_store::ChainStoreAdapter { + chain_store::ChainStoreAdapter::new(self.store(), genesis_height, save_trie_changes) + } + + fn chunk_store(&self) -> chunk_store::ChunkStoreAdapter { + chunk_store::ChunkStoreAdapter::new(self.store()) } fn flat_store(&self) -> flat_store::FlatStoreAdapter { flat_store::FlatStoreAdapter::new(self.store()) } - fn chunk_store(&self) -> chunk_store::ChunkStoreAdapter { - chunk_store::ChunkStoreAdapter::new(self.store()) + fn trie_store(&self) -> trie_store::TrieStoreAdapter { + trie_store::TrieStoreAdapter::new(self.store()) } } @@ -112,11 +123,11 @@ pub trait StoreAdapter { pub trait StoreUpdateAdapter: Sized { fn store_update(&mut self) -> &mut StoreUpdate; - fn trie_store_update(&mut self) -> trie_store::TrieStoreUpdateAdapter { - trie_store::TrieStoreUpdateAdapter::new(self.store_update()) - } - fn flat_store_update(&mut self) -> flat_store::FlatStoreUpdateAdapter { flat_store::FlatStoreUpdateAdapter::new(self.store_update()) } + + fn trie_store_update(&mut self) -> trie_store::TrieStoreUpdateAdapter { + trie_store::TrieStoreUpdateAdapter::new(self.store_update()) + } } diff --git a/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs b/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs index 7a9f84c3e75..151ea0bb334 100644 --- a/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs +++ b/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs @@ -68,7 +68,7 @@ fn check_genesis_congestion_info_in_store(client: &mut Client) { let gc_config = client.config.gc.clone(); client.chain.clear_data(&gc_config).unwrap(); - let infos = near_store::get_genesis_congestion_infos(client.chain.chain_store().store()) + let infos = near_store::get_genesis_congestion_infos(&client.chain.chain_store().store()) .unwrap() .unwrap(); assert_eq!(infos.len(), NUM_SHARDS); diff --git a/integration-tests/src/test_loop/utils/trie_sanity.rs b/integration-tests/src/test_loop/utils/trie_sanity.rs index b30352a219a..c6d3ea311a1 100644 --- a/integration-tests/src/test_loop/utils/trie_sanity.rs +++ b/integration-tests/src/test_loop/utils/trie_sanity.rs @@ -363,7 +363,7 @@ pub fn check_state_shard_uid_mapping_after_resharding( let mut tracked_mapped_children = vec![]; let store = client.chain.chain_store.store(); for child_shard_uid in &children_shard_uids { - let mapped_shard_uid = get_shard_uid_mapping(store, *child_shard_uid); + let mapped_shard_uid = get_shard_uid_mapping(&store, *child_shard_uid); if &mapped_shard_uid == child_shard_uid { continue; } diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 1047b74a659..52ecf0672a0 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -2335,7 +2335,7 @@ fn test_validate_chunk_extra() { env.pause_block_processing(&mut capture, block2.hash()); let mut chain_store = ChainStore::new( - env.clients[0].chain.chain_store().store().clone(), + env.clients[0].chain.chain_store().store(), genesis_height, true, genesis.config.transaction_validity_period, @@ -3667,7 +3667,7 @@ fn test_long_chain_with_restart_from_snapshot() { } let mut env2 = TestEnv::builder(&genesis.config) - .stores(vec![env1.clients[0].chain.chain_store().store().clone()]) + .stores(vec![env1.clients[0].chain.chain_store().store()]) .nightshade_runtimes(&genesis) .archive(false) .build(); diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index e3db99a5a9c..179288eab58 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -342,8 +342,8 @@ fn run_state_sync_with_dumped_parts( { let store0 = env.clients[0].chain.chain_store().store(); let store1 = env.clients[1].chain.chain_store().store(); - let (num_inlined_before, num_ref_before) = count_flat_state_value_kinds(store0); - let (num_inlined_after, num_ref_after) = count_flat_state_value_kinds(store1); + let (num_inlined_before, num_ref_before) = count_flat_state_value_kinds(&store0); + let (num_inlined_after, num_ref_after) = count_flat_state_value_kinds(&store1); // Nothing new created, number of flat state values should be identical. assert_eq!(num_inlined_before, num_inlined_after); assert_eq!(num_ref_before, num_ref_after); @@ -361,8 +361,8 @@ fn run_state_sync_with_dumped_parts( { let store0 = env.clients[0].chain.chain_store().store(); let store1 = env.clients[1].chain.chain_store().store(); - let (num_inlined_before, _num_ref_before) = count_flat_state_value_kinds(store0); - let (num_inlined_after, _num_ref_after) = count_flat_state_value_kinds(store1); + let (num_inlined_before, _num_ref_before) = count_flat_state_value_kinds(&store0); + let (num_inlined_after, _num_ref_after) = count_flat_state_value_kinds(&store1); // Created a new entry, but inlined values should stay inlinedNothing new created, number of flat state values should be identical. assert!(num_inlined_before >= num_inlined_after); assert!(num_inlined_after > 0); diff --git a/integration-tests/src/tests/client/state_snapshot.rs b/integration-tests/src/tests/client/state_snapshot.rs index adf7a96852d..1d3387ed00e 100644 --- a/integration-tests/src/tests/client/state_snapshot.rs +++ b/integration-tests/src/tests/client/state_snapshot.rs @@ -207,7 +207,7 @@ fn slow_test_make_state_snapshot() { let mut blocks = vec![]; let store = env.clients[0].chain.chain_store().store(); - let state_snapshot_test_env = set_up_test_env_for_state_snapshots(store); + let state_snapshot_test_env = set_up_test_env_for_state_snapshots(&store); for i in 1..=5 { let new_account_id = format!("test_account_{i}"); diff --git a/tools/replay-archive/src/cli.rs b/tools/replay-archive/src/cli.rs index 4e738077da2..d5005771c83 100644 --- a/tools/replay-archive/src/cli.rs +++ b/tools/replay-archive/src/cli.rs @@ -512,7 +512,7 @@ impl ReplayController { /// from the information in the genesis block without applying any transactions or receipts. fn save_genesis_chunk_extras(&mut self, genesis_block: &Block) -> Result<()> { let chain_genesis = ChainGenesis::new(&self.near_config.genesis.config); - let state_roots = get_genesis_state_roots(self.chain_store.store())? + let state_roots = get_genesis_state_roots(&self.chain_store.store())? .ok_or_else(|| anyhow!("genesis state roots do not exist in the db".to_owned()))?; let mut store_update = self.chain_store.store_update(); Chain::save_genesis_chunk_extras( diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index ae9ef8860e8..d5d169c870a 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -826,7 +826,7 @@ pub(crate) fn view_genesis( if view_config || compare { tracing::info!(target: "state_viewer", "Computing genesis from config..."); let state_roots = - near_store::get_genesis_state_roots(chain_store.store()).unwrap().unwrap(); + near_store::get_genesis_state_roots(&chain_store.store()).unwrap().unwrap(); let (genesis_block, genesis_chunks) = Chain::make_genesis_block( epoch_manager.as_ref(), runtime_adapter.as_ref(), diff --git a/tools/state-viewer/src/state_dump.rs b/tools/state-viewer/src/state_dump.rs index 6c2c621fe95..ac18b13f36a 100644 --- a/tools/state-viewer/src/state_dump.rs +++ b/tools/state-viewer/src/state_dump.rs @@ -383,7 +383,7 @@ mod test { ) .unwrap(); - let store = env.clients[0].chain.chain_store().store().clone(); + let store = env.clients[0].chain.chain_store().store(); (store, genesis, env, near_config) }