Skip to content

Commit

Permalink
Revise #12613 by using PartialEncodedChunk instead of IncomingReceipts (
Browse files Browse the repository at this point in the history
#12616)

IncomingReceipts double counts when we do catchup state sync because we
persist the state sync boundary's IncomingReceipts again. So.. switch to
using PartialEncodedChunk to make the DBCol::Receipts column. It's
equivalent, really.
  • Loading branch information
robin-near authored Dec 13, 2024
1 parent d81a85c commit 859da4b
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 85 deletions.
47 changes: 22 additions & 25 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};

use crate::types::RuntimeAdapter;
use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate};
use near_primitives::sharding::ReceiptProof;

#[derive(Clone)]
pub enum GCMode {
Expand Down Expand Up @@ -435,6 +434,15 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes());
}

let partial_chunk = self.get_partial_chunk(&chunk_hash);
if let Ok(partial_chunk) = partial_chunk {
for receipts in partial_chunk.prev_outgoing_receipts() {
for receipt in &receipts.0 {
self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes());
}
}
}

// 2. Delete chunk_hash-indexed data
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::Chunks, chunk_hash);
Expand Down Expand Up @@ -597,7 +605,7 @@ impl<'a> ChainStoreUpdate<'a> {
for shard_id in shard_layout.shard_ids() {
let block_shard_id = get_block_shard_id(&block_hash, shard_id);
self.gc_outgoing_receipts(&block_hash, shard_id);
self.gc_incoming_receipts(&block_hash, shard_id);
self.gc_col(DBCol::IncomingReceipts, &block_shard_id);
self.gc_col(DBCol::StateTransitionData, &block_shard_id);

// For incoming State Parts it's done in chain.clear_downloaded_parts()
Expand Down Expand Up @@ -698,7 +706,7 @@ impl<'a> ChainStoreUpdate<'a> {

// delete Receipts
self.gc_outgoing_receipts(&block_hash, shard_id);
self.gc_incoming_receipts(&block_hash, shard_id);
self.gc_col(DBCol::IncomingReceipts, &block_shard_id);

self.gc_col(DBCol::StateTransitionData, &block_shard_id);

Expand Down Expand Up @@ -766,6 +774,15 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes());
}

let partial_chunk = self.get_partial_chunk(&chunk_hash);
if let Ok(partial_chunk) = partial_chunk {
for receipts in partial_chunk.prev_outgoing_receipts() {
for receipt in &receipts.0 {
self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes());
}
}
}

// 2. Delete chunk_hash-indexed data
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::Chunks, chunk_hash);
Expand Down Expand Up @@ -832,27 +849,6 @@ impl<'a> ChainStoreUpdate<'a> {
self.merge(store_update);
}

fn gc_incoming_receipts(&mut self, block_hash: &CryptoHash, shard_id: ShardId) {
let mut store_update = self.store().store_update();
let key = get_block_shard_id(block_hash, shard_id);
// IncomingReceipts and Receipts are equivalent but keyed differently. So as we clean up
// IncomingReceipts, also clean up Receipts.
if let Ok(incoming_receipts) =
self.store().get_ser::<Vec<ReceiptProof>>(DBCol::IncomingReceipts, &key)
{
if let Some(incoming_receipts) = incoming_receipts {
for receipts in incoming_receipts {
for receipt in receipts.0 {
self.gc_col(DBCol::Receipts, receipt.receipt_id().as_bytes());
}
}
}
}
store_update.delete(DBCol::IncomingReceipts, &key);
self.chain_store().incoming_receipts.pop(&key);
self.merge(store_update);
}

fn gc_outcomes(&mut self, block: &Block) -> Result<(), Error> {
let block_hash = block.hash();
let store_update = self.store().store_update();
Expand Down Expand Up @@ -885,7 +881,8 @@ impl<'a> ChainStoreUpdate<'a> {
panic!("Outgoing receipts must be garbage collected by calling gc_outgoing_receipts");
}
DBCol::IncomingReceipts => {
panic!("Incoming receipts must be garbage collected by calling gc_incoming_receipts");
store_update.delete(col, key);
self.chain_store().incoming_receipts.pop(key);
}
DBCol::StateHeaders => {
store_update.delete(col, key);
Expand Down
24 changes: 14 additions & 10 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,20 @@ impl<'a> ChainStoreUpdate<'a> {
chunk_hash.as_ref(),
partial_chunk,
)?;

// We'd like the Receipts column to be exactly the same collection of receipts as
// the partial encoded chunks. This way, if we only track a subset of shards, we
// can still have all the incoming receipts for the shards we do track.
for receipts in partial_chunk.prev_outgoing_receipts() {
for receipt in &receipts.0 {
let bytes = borsh::to_vec(&receipt).expect("Borsh cannot fail");
store_update.increment_refcount(
DBCol::Receipts,
receipt.get_hash().as_ref(),
&bytes,
);
}
}
}
}

Expand Down Expand Up @@ -2425,16 +2439,6 @@ impl<'a> ChainStoreUpdate<'a> {
&get_block_shard_id(block_hash, *shard_id),
receipt,
)?;
for receipts in receipt.iter() {
for receipt in &receipts.0 {
let bytes = borsh::to_vec(&receipt).expect("Borsh cannot fail");
store_update.increment_refcount(
DBCol::Receipts,
receipt.get_hash().as_ref(),
&bytes,
);
}
}
}
}

Expand Down
20 changes: 12 additions & 8 deletions chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use near_primitives::epoch_manager::AGGREGATOR_KEY;
use near_primitives::epoch_sync::EpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::get_block_shard_uid_rev;
use near_primitives::sharding::{ChunkHash, ShardChunk, StateSyncInfo};
use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, ShardChunk, StateSyncInfo};
use near_primitives::state_sync::{ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey};
use near_primitives::transaction::ExecutionOutcomeWithProof;
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -160,13 +160,6 @@ impl StoreValidator {
self.check(&validate::block_header_exists, &block_hash, &block, col);
// Chunks for current Block exist
self.check(&validate::block_chunks_exist, &block_hash, &block, col);
// IncomingReceipts for the Block are an exact mapping with the receipts in the Receipts column
self.check(
&validate::receipts_contain_block_incoming_receipts,
&block_hash,
&block,
col,
);
// Chunks for current Block have Height Created not higher than Block Height
self.check(&validate::block_chunks_height_validity, &block_hash, &block, col);
// BlockInfo for current Block exists
Expand Down Expand Up @@ -253,6 +246,17 @@ impl StoreValidator {
// Block which can be indexed by Outcome block_hash exists
self.check(&validate::outcome_id_block_exists, &block_hash, &outcome_ids, col);
}
DBCol::PartialChunks => {
let chunk_hash = ChunkHash::try_from_slice(key_ref)?;
let shard_chunk = PartialEncodedChunk::try_from_slice(value_ref)?;
// Receipts column contain exactly the receipts from PartialEncodedChunk.
self.check(
&validate::partial_chunk_receipts_exist_in_receipts,
&chunk_hash,
&shard_chunk,
col,
);
}
DBCol::TransactionResultForBlock => {
let (outcome_id, block_hash) = get_outcome_id_block_hash_rev(key_ref)?;
let outcome = <ExecutionOutcomeWithProof>::try_from_slice(value_ref)?;
Expand Down
59 changes: 20 additions & 39 deletions chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use near_primitives::epoch_info::EpochInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
use near_primitives::shard_layout::{get_block_shard_uid, ShardUId};
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, StateSyncInfo};
use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, ShardChunk, StateSyncInfo};
use near_primitives::state_sync::{ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey};
use near_primitives::transaction::{ExecutionOutcomeWithProof, SignedTransaction};
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -295,6 +295,25 @@ pub(crate) fn chunk_indexed_by_height_created(
Ok(())
}

pub(crate) fn partial_chunk_receipts_exist_in_receipts(
sv: &mut StoreValidator,
_chunk_hash: &ChunkHash,
partial_chunk: &PartialEncodedChunk,
) -> Result<(), StoreValidatorError> {
for receipt_proof in partial_chunk.prev_outgoing_receipts() {
for receipt in &receipt_proof.0 {
unwrap_or_err_db!(
sv.store.get_ser::<Receipt>(DBCol::Receipts, receipt.receipt_id().as_bytes()),
"IncomingReceipt has {:?} but it doesn't exist in Receipts column",
receipt
);
// This is verified later when we verify the Receipts column.
*sv.inner.receipt_refcount.entry(*receipt.receipt_id()).or_insert(0) += 1;
}
}
Ok(())
}

pub(crate) fn header_hash_indexed_by_height(
sv: &mut StoreValidator,
_hash: &CryptoHash,
Expand Down Expand Up @@ -412,43 +431,6 @@ pub(crate) fn block_chunks_exist(
Ok(())
}

pub(crate) fn receipts_contain_block_incoming_receipts(
sv: &mut StoreValidator,
block_hash: &CryptoHash,
block: &Block,
) -> Result<(), StoreValidatorError> {
let Ok(shard_ids) = sv.epoch_manager.shard_ids(block.header().epoch_id()) else {
err!("Error getting shard ids for epoch {:?}", block.header().epoch_id());
};
for shard_id in shard_ids {
let Ok(incoming_receipts) = sv.store.get_ser::<Vec<ReceiptProof>>(
DBCol::IncomingReceipts,
&get_block_shard_id(block_hash, shard_id),
) else {
err!(
"DB error when getting incoming receipts for block {:?} shard {:?}",
block_hash,
shard_id
);
};
if let Some(incoming_receipts) = incoming_receipts {
for receipt_proof in incoming_receipts {
for receipt in receipt_proof.0 {
unwrap_or_err_db!(
sv.store
.get_ser::<Receipt>(DBCol::Receipts, receipt.receipt_id().as_bytes()),
"IncomingReceipt has {:?} but it doesn't exist in Receipts column",
receipt
);
// This is verified later when we verify the Receipts column.
*sv.inner.receipt_refcount.entry(*receipt.receipt_id()).or_insert(0) += 1;
}
}
}
}
Ok(())
}

pub(crate) fn block_chunks_height_validity(
_sv: &mut StoreValidator,
_block_hash: &CryptoHash,
Expand Down Expand Up @@ -862,7 +844,6 @@ pub(crate) fn receipt_refcount(
err!("Invalid receipt refcount, expected {:?}, found {:?}", expected, refcount)
} else {
sv.inner.receipt_refcount.remove(receipt_id);
tracing::error!("Receipt {:?} refcount verified", receipt_id);
return Ok(());
}
}
Expand Down
12 changes: 10 additions & 2 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use near_chain::ChainStoreAccess;
use near_chain::{
types::EpochManagerAdapter, validate::validate_chunk_proofs, BlockHeader, Chain, ChainStore,
};
Expand Down Expand Up @@ -258,10 +259,17 @@ pub fn persist_chunk(
shard_chunk: Option<ShardChunk>,
store: &mut ChainStore,
) -> Result<(), Error> {
let need_persist_partial_chunk = store.get_partial_chunk(&partial_chunk.chunk_hash()).is_err();
let need_persist_shard_chunk = shard_chunk.is_some()
&& store.get_chunk(&shard_chunk.as_ref().unwrap().chunk_hash()).is_err();
let mut update = store.store_update();
update.save_partial_chunk(partial_chunk);
if need_persist_partial_chunk {
update.save_partial_chunk(partial_chunk);
};
if let Some(shard_chunk) = shard_chunk {
update.save_chunk(shard_chunk);
if need_persist_shard_chunk {
update.save_chunk(shard_chunk);
}
}
update.commit()?;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,6 @@ impl Client {
// TODO(#10569) We would like a proper error handling here instead of `expect`.
persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store())
.expect("Could not persist chunk");

// We're marking chunk as accepted.
self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash());
// If this was the last chunk that was missing for a block, it will be processed now.
Expand Down

0 comments on commit 859da4b

Please sign in to comment.