diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index a6712c46809..4059996a814 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -16,7 +16,6 @@ use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId}; use crate::types::RuntimeAdapter; use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate}; -use near_primitives::sharding::ReceiptProof; #[derive(Clone)] pub enum GCMode { @@ -435,6 +434,15 @@ impl<'a> ChainStoreUpdate<'a> { self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); } + let partial_chunk = self.get_partial_chunk(&chunk_hash); + if let Ok(partial_chunk) = partial_chunk { + for receipts in partial_chunk.prev_outgoing_receipts() { + for receipt in &receipts.0 { + self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes()); + } + } + } + // 2. Delete chunk_hash-indexed data let chunk_hash = chunk_hash.as_bytes(); self.gc_col(DBCol::Chunks, chunk_hash); @@ -597,7 +605,7 @@ impl<'a> ChainStoreUpdate<'a> { for shard_id in shard_layout.shard_ids() { let block_shard_id = get_block_shard_id(&block_hash, shard_id); self.gc_outgoing_receipts(&block_hash, shard_id); - self.gc_incoming_receipts(&block_hash, shard_id); + self.gc_col(DBCol::IncomingReceipts, &block_shard_id); self.gc_col(DBCol::StateTransitionData, &block_shard_id); // For incoming State Parts it's done in chain.clear_downloaded_parts() @@ -698,7 +706,7 @@ impl<'a> ChainStoreUpdate<'a> { // delete Receipts self.gc_outgoing_receipts(&block_hash, shard_id); - self.gc_incoming_receipts(&block_hash, shard_id); + self.gc_col(DBCol::IncomingReceipts, &block_shard_id); self.gc_col(DBCol::StateTransitionData, &block_shard_id); @@ -766,6 +774,15 @@ impl<'a> ChainStoreUpdate<'a> { self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); } + let partial_chunk = self.get_partial_chunk(&chunk_hash); + if let Ok(partial_chunk) = partial_chunk { + for receipts in partial_chunk.prev_outgoing_receipts() { + for receipt in &receipts.0 { + self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes()); + } + } + } + // 2. Delete chunk_hash-indexed data let chunk_hash = chunk_hash.as_bytes(); self.gc_col(DBCol::Chunks, chunk_hash); @@ -832,27 +849,6 @@ impl<'a> ChainStoreUpdate<'a> { self.merge(store_update); } - fn gc_incoming_receipts(&mut self, block_hash: &CryptoHash, shard_id: ShardId) { - let mut store_update = self.store().store_update(); - let key = get_block_shard_id(block_hash, shard_id); - // IncomingReceipts and Receipts are equivalent but keyed differently. So as we clean up - // IncomingReceipts, also clean up Receipts. - if let Ok(incoming_receipts) = - self.store().get_ser::>(DBCol::IncomingReceipts, &key) - { - if let Some(incoming_receipts) = incoming_receipts { - for receipts in incoming_receipts { - for receipt in receipts.0 { - self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes()); - } - } - } - } - store_update.delete(DBCol::IncomingReceipts, &key); - self.chain_store().incoming_receipts.pop(&key); - self.merge(store_update); - } - fn gc_outcomes(&mut self, block: &Block) -> Result<(), Error> { let block_hash = block.hash(); let store_update = self.store().store_update(); @@ -885,7 +881,8 @@ impl<'a> ChainStoreUpdate<'a> { panic!("Outgoing receipts must be garbage collected by calling gc_outgoing_receipts"); } DBCol::IncomingReceipts => { - panic!("Incoming receipts must be garbage collected by calling gc_incoming_receipts"); + store_update.delete(col, key); + self.chain_store().incoming_receipts.pop(key); } DBCol::StateHeaders => { store_update.delete(col, key); diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index e7b796b38e9..0a59b46c640 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -2381,6 +2381,20 @@ impl<'a> ChainStoreUpdate<'a> { chunk_hash.as_ref(), partial_chunk, )?; + + // We'd like the Receipts column to be exactly the same collection of receipts as + // the partial encoded chunks. This way, if we only track a subset of shards, we + // can still have all the incoming receipts for the shards we do track. + for receipts in partial_chunk.prev_outgoing_receipts() { + for receipt in &receipts.0 { + let bytes = borsh::to_vec(&receipt).expect("Borsh cannot fail"); + store_update.increment_refcount( + DBCol::Receipts, + receipt.get_hash().as_ref(), + &bytes, + ); + } + } } } @@ -2425,16 +2439,6 @@ impl<'a> ChainStoreUpdate<'a> { &get_block_shard_id(block_hash, *shard_id), receipt, )?; - for receipts in receipt.iter() { - for receipt in &receipts.0 { - let bytes = borsh::to_vec(&receipt).expect("Borsh cannot fail"); - store_update.increment_refcount( - DBCol::Receipts, - receipt.get_hash().as_ref(), - &bytes, - ); - } - } } } diff --git a/chain/chain/src/store_validator.rs b/chain/chain/src/store_validator.rs index ba5d1446fd5..fb44b18721e 100644 --- a/chain/chain/src/store_validator.rs +++ b/chain/chain/src/store_validator.rs @@ -13,7 +13,7 @@ use near_primitives::epoch_manager::AGGREGATOR_KEY; use near_primitives::epoch_sync::EpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::get_block_shard_uid_rev; -use near_primitives::sharding::{ChunkHash, ShardChunk, StateSyncInfo}; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, ShardChunk, StateSyncInfo}; use near_primitives::state_sync::{ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey}; use near_primitives::transaction::ExecutionOutcomeWithProof; use near_primitives::types::chunk_extra::ChunkExtra; @@ -160,13 +160,6 @@ impl StoreValidator { self.check(&validate::block_header_exists, &block_hash, &block, col); // Chunks for current Block exist self.check(&validate::block_chunks_exist, &block_hash, &block, col); - // IncomingReceipts for the Block are an exact mapping with the receipts in the Receipts column - self.check( - &validate::receipts_contain_block_incoming_receipts, - &block_hash, - &block, - col, - ); // Chunks for current Block have Height Created not higher than Block Height self.check(&validate::block_chunks_height_validity, &block_hash, &block, col); // BlockInfo for current Block exists @@ -253,6 +246,17 @@ impl StoreValidator { // Block which can be indexed by Outcome block_hash exists self.check(&validate::outcome_id_block_exists, &block_hash, &outcome_ids, col); } + DBCol::PartialChunks => { + let chunk_hash = ChunkHash::try_from_slice(key_ref)?; + let shard_chunk = PartialEncodedChunk::try_from_slice(value_ref)?; + // Receipts column contain exactly the receipts from PartialEncodedChunk. + self.check( + &validate::partial_chunk_receipts_exist_in_receipts, + &chunk_hash, + &shard_chunk, + col, + ); + } DBCol::TransactionResultForBlock => { let (outcome_id, block_hash) = get_outcome_id_block_hash_rev(key_ref)?; let outcome = ::try_from_slice(value_ref)?; diff --git a/chain/chain/src/store_validator/validate.rs b/chain/chain/src/store_validator/validate.rs index 90f025b3be2..6f1a3f6fda7 100644 --- a/chain/chain/src/store_validator/validate.rs +++ b/chain/chain/src/store_validator/validate.rs @@ -6,7 +6,7 @@ use near_primitives::epoch_info::EpochInfo; use near_primitives::hash::CryptoHash; use near_primitives::receipt::Receipt; use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; -use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, StateSyncInfo}; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, ShardChunk, StateSyncInfo}; use near_primitives::state_sync::{ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey}; use near_primitives::transaction::{ExecutionOutcomeWithProof, SignedTransaction}; use near_primitives::types::chunk_extra::ChunkExtra; @@ -295,6 +295,25 @@ pub(crate) fn chunk_indexed_by_height_created( Ok(()) } +pub(crate) fn partial_chunk_receipts_exist_in_receipts( + sv: &mut StoreValidator, + _chunk_hash: &ChunkHash, + partial_chunk: &PartialEncodedChunk, +) -> Result<(), StoreValidatorError> { + for receipt_proof in partial_chunk.prev_outgoing_receipts() { + for receipt in &receipt_proof.0 { + unwrap_or_err_db!( + sv.store.get_ser::(DBCol::Receipts, receipt.receipt_id().as_bytes()), + "IncomingReceipt has {:?} but it doesn't exist in Receipts column", + receipt + ); + // This is verified later when we verify the Receipts column. + *sv.inner.receipt_refcount.entry(*receipt.receipt_id()).or_insert(0) += 1; + } + } + Ok(()) +} + pub(crate) fn header_hash_indexed_by_height( sv: &mut StoreValidator, _hash: &CryptoHash, @@ -412,43 +431,6 @@ pub(crate) fn block_chunks_exist( Ok(()) } -pub(crate) fn receipts_contain_block_incoming_receipts( - sv: &mut StoreValidator, - block_hash: &CryptoHash, - block: &Block, -) -> Result<(), StoreValidatorError> { - let Ok(shard_ids) = sv.epoch_manager.shard_ids(block.header().epoch_id()) else { - err!("Error getting shard ids for epoch {:?}", block.header().epoch_id()); - }; - for shard_id in shard_ids { - let Ok(incoming_receipts) = sv.store.get_ser::>( - DBCol::IncomingReceipts, - &get_block_shard_id(block_hash, shard_id), - ) else { - err!( - "DB error when getting incoming receipts for block {:?} shard {:?}", - block_hash, - shard_id - ); - }; - if let Some(incoming_receipts) = incoming_receipts { - for receipt_proof in incoming_receipts { - for receipt in receipt_proof.0 { - unwrap_or_err_db!( - sv.store - .get_ser::(DBCol::Receipts, receipt.receipt_id().as_bytes()), - "IncomingReceipt has {:?} but it doesn't exist in Receipts column", - receipt - ); - // This is verified later when we verify the Receipts column. - *sv.inner.receipt_refcount.entry(*receipt.receipt_id()).or_insert(0) += 1; - } - } - } - } - Ok(()) -} - pub(crate) fn block_chunks_height_validity( _sv: &mut StoreValidator, _block_hash: &CryptoHash, @@ -862,7 +844,6 @@ pub(crate) fn receipt_refcount( err!("Invalid receipt refcount, expected {:?}, found {:?}", expected, refcount) } else { sv.inner.receipt_refcount.remove(receipt_id); - tracing::error!("Receipt {:?} refcount verified", receipt_id); return Ok(()); } } diff --git a/chain/chunks/src/logic.rs b/chain/chunks/src/logic.rs index 06cbb30327b..935579b1b6b 100644 --- a/chain/chunks/src/logic.rs +++ b/chain/chunks/src/logic.rs @@ -1,3 +1,4 @@ +use near_chain::ChainStoreAccess; use near_chain::{ types::EpochManagerAdapter, validate::validate_chunk_proofs, BlockHeader, Chain, ChainStore, }; @@ -258,10 +259,17 @@ pub fn persist_chunk( shard_chunk: Option, store: &mut ChainStore, ) -> Result<(), Error> { + let need_persist_partial_chunk = store.get_partial_chunk(&partial_chunk.chunk_hash()).is_err(); + let need_persist_shard_chunk = shard_chunk.is_some() + && store.get_chunk(&shard_chunk.as_ref().unwrap().chunk_hash()).is_err(); let mut update = store.store_update(); - update.save_partial_chunk(partial_chunk); + if need_persist_partial_chunk { + update.save_partial_chunk(partial_chunk); + }; if let Some(shard_chunk) = shard_chunk { - update.save_chunk(shard_chunk); + if need_persist_shard_chunk { + update.save_chunk(shard_chunk); + } } update.commit()?; Ok(()) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 39857b4ef10..57bb66497f4 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1501,7 +1501,6 @@ impl Client { // TODO(#10569) We would like a proper error handling here instead of `expect`. persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); - // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); // If this was the last chunk that was missing for a block, it will be processed now.