Skip to content

Commit

Permalink
[store] Introduce Chain store adapter (#12159)
Browse files Browse the repository at this point in the history
For now this PR only moves some of the functionality from ChainStore to
ChainStoreAdapter, the simpler functions.

We also do away with the cache in ChainStore which was originally
disabled in #12459

In future PRs, will work towards getting a better suited interface with
helper functions which are too complex to be a part of
ChainStoreAdapter.

Also big issue is for the ChainStoreUpdateAdapter seeing a consistent
view of all the updates. Will need to see how to make that happen!
  • Loading branch information
shreyan-gupta authored Jan 16, 2025
1 parent 5ae3789 commit b42bdc9
Show file tree
Hide file tree
Showing 21 changed files with 546 additions and 526 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ impl Chain {
// of blocks_in_processing, which is set to 5 now.
let (sc, rc) = unbounded();
let resharding_manager = ReshardingManager::new(
chain_store.store().clone(),
chain_store.store(),
epoch_manager.clone(),
runtime_adapter.clone(),
chain_config.resharding_config,
Expand Down Expand Up @@ -664,7 +664,7 @@ impl Chain {
congestion_info: Option<CongestionInfo>,
) -> Result<ChunkExtra, Error> {
let shard_index = shard_layout.get_shard_index(shard_id)?;
let state_root = *get_genesis_state_roots(self.chain_store.store())?
let state_root = *get_genesis_state_roots(&self.chain_store.store())?
.ok_or_else(|| Error::Other("genesis state roots do not exist in the db".to_owned()))?
.get(shard_index)
.ok_or_else(|| {
Expand Down
17 changes: 0 additions & 17 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ impl<'a> ChainStoreUpdate<'a> {
let mut store_update = self.store().store_update();
let key: &[u8] = header_hash.as_bytes();
store_update.delete(DBCol::BlockHeader, key);
self.chain_store().headers.pop(key);
self.merge(store_update);
}
let key = index_to_bytes(height);
Expand Down Expand Up @@ -816,10 +815,8 @@ impl<'a> ChainStoreUpdate<'a> {
let key = &index_to_bytes(height)[..];
if epoch_to_hashes.is_empty() {
store_update.delete(DBCol::BlockPerHeight, key);
self.chain_store().block_hash_per_height.pop(key);
} else {
store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?;
self.chain_store().block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes));
}
if self.is_height_processed(height)? {
self.gc_col(DBCol::ProcessedBlockHeights, key);
Expand All @@ -845,7 +842,6 @@ impl<'a> ChainStoreUpdate<'a> {
let mut store_update = self.store().store_update();
let key = get_block_shard_id(block_hash, shard_id);
store_update.delete(DBCol::OutgoingReceipts, &key);
self.chain_store().outgoing_receipts.pop(&key);
self.merge(store_update);
}

Expand Down Expand Up @@ -882,7 +878,6 @@ impl<'a> ChainStoreUpdate<'a> {
}
DBCol::IncomingReceipts => {
store_update.delete(col, key);
self.chain_store().incoming_receipts.pop(key);
}
DBCol::StateHeaders => {
store_update.delete(col, key);
Expand All @@ -893,20 +888,16 @@ impl<'a> ChainStoreUpdate<'a> {
// When that happens we should make sure that block headers is
// copied to the cold storage.
store_update.delete(col, key);
self.chain_store().headers.pop(key);
unreachable!();
}
DBCol::Block => {
store_update.delete(col, key);
self.chain_store().blocks.pop(key);
}
DBCol::BlockExtra => {
store_update.delete(col, key);
self.chain_store().block_extras.pop(key);
}
DBCol::NextBlockHashes => {
store_update.delete(col, key);
self.chain_store().next_block_hashes.pop(key);
}
DBCol::ChallengedBlocks => {
store_update.delete(col, key);
Expand All @@ -919,31 +910,24 @@ impl<'a> ChainStoreUpdate<'a> {
}
DBCol::BlockRefCount => {
store_update.delete(col, key);
self.chain_store().block_refcounts.pop(key);
}
DBCol::Transactions => {
store_update.decrement_refcount(col, key);
self.chain_store().transactions.pop(key);
}
DBCol::Receipts => {
store_update.decrement_refcount(col, key);
self.chain_store().receipts.pop(key);
}
DBCol::Chunks => {
store_update.delete(col, key);
self.chain_store().chunks.pop(key);
}
DBCol::ChunkExtra => {
store_update.delete(col, key);
self.chain_store().chunk_extras.pop(key);
}
DBCol::PartialChunks => {
store_update.delete(col, key);
self.chain_store().partial_chunks.pop(key);
}
DBCol::InvalidChunks => {
store_update.delete(col, key);
self.chain_store().invalid_chunks.pop(key);
}
DBCol::ChunkHashesByHeight => {
store_update.delete(col, key);
Expand Down Expand Up @@ -974,7 +958,6 @@ impl<'a> ChainStoreUpdate<'a> {
}
DBCol::ProcessedBlockHeights => {
store_update.delete(col, key);
self.chain_store().processed_block_heights.pop(key);
}
DBCol::HeaderHashesByHeight => {
store_update.delete(col, key);
Expand Down
6 changes: 3 additions & 3 deletions chain/chain/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>(
header: &BlockHeader,
) -> Result<(), Error> {
let Some(mut num_new_chunks) =
get_state_sync_new_chunks(chain_store.store(), header.prev_hash())?
get_state_sync_new_chunks(&chain_store.store(), header.prev_hash())?
else {
// This might happen in the case of epoch sync where we save individual headers without having all
// headers that belong to the epoch.
Expand Down Expand Up @@ -112,7 +112,7 @@ fn remove_old_blocks<T: ChainStoreAccess>(
Err(Error::DBNotFoundErr(_)) => return Ok(()),
Err(e) => return Err(e),
};
for block_hash in iter_state_sync_new_chunks_keys(chain_store.store()) {
for block_hash in iter_state_sync_new_chunks_keys(&chain_store.store()) {
let block_hash = block_hash?;
let old_header = chain_store.get_block_header(&block_hash)?;
if old_header.height() < last_final_header.height() {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub(crate) fn update_sync_hashes<T: ChainStoreAccess>(
// columnn for this block. This means we will no longer remember sync hashes for these old epochs, which
// should be fine as we only care to state sync to (and provide state parts for) the latest state
on_new_epoch(store_update, header)?;
return remove_old_epochs(chain_store.store(), store_update, header, &prev_header);
return remove_old_epochs(&chain_store.store(), store_update, header, &prev_header);
}

save_epoch_new_chunks(chain_store, store_update, header)?;
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl ChainStore {

// Go over witnesses with increasing indexes and remove them until the limits are satisfied.
while !info.is_within_limits() && info.lowest_index < info.next_witness_index {
let key_to_delete = self
.store()
let store = self.store();
let key_to_delete = store
.get(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes())?
.ok_or_else(|| {
std::io::Error::new(
Expand Down
Loading

0 comments on commit b42bdc9

Please sign in to comment.