Skip to content

Commit

Permalink
[store] Add functionality to delete trie from store (#10136)
Browse files Browse the repository at this point in the history
This PR has two main changes

1. It adds a function to remove data associated with a trie from store
`remove_flat_storage_for_shard`. We clear the cache and view cache and
remove all data associated with the shard_uid from RocksDB
2. Change the flat storage `clear_state` and
`remove_flat_storage_for_shard` to accept a `store_update` instead of
directly committing the changes to store. This is helpful to club a set
of RocksDB changes from other store_update.
  • Loading branch information
Shreyan Gupta authored Nov 9, 2023
1 parent 8a317a6 commit b63dd6c
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 54 deletions.
7 changes: 5 additions & 2 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ impl SyncJobsActor {
msg: &ApplyStatePartsRequest,
) -> Result<bool, near_chain_primitives::error::Error> {
let _span = tracing::debug_span!(target: "client", "clear_flat_state").entered();
Ok(msg
let mut store_update = msg.runtime_adapter.store().store_update();
let success = msg
.runtime_adapter
.get_flat_storage_manager()
.remove_flat_storage_for_shard(msg.shard_uid)?)
.remove_flat_storage_for_shard(msg.shard_uid, &mut store_update)?;
store_update.commit()?;
Ok(success)
}
}

Expand Down
9 changes: 6 additions & 3 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,14 @@ impl FlatStorageManager {
/// Removes FlatStorage object from FlatStorageManager.
/// If FlatStorageManager did have that object, then removes all information about Flat State and returns Ok(true).
/// Otherwise does nothing and returns Ok(false).
pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<bool, StorageError> {
pub fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) -> Result<bool, StorageError> {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);

if let Some(flat_store) = flat_storages.remove(&shard_uid) {
flat_store.clear_state()?;
flat_store.clear_state(store_update)?;
Ok(true)
} else {
Ok(false)
Expand Down
18 changes: 5 additions & 13 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,13 @@ impl FlatStorage {
}

/// Clears all State key-value pairs from flat storage.
pub fn clear_state(&self) -> Result<(), StorageError> {
pub fn clear_state(&self, store_update: &mut StoreUpdate) -> Result<(), StorageError> {
let guard = self.0.write().expect(super::POISONED_LOCK_ERR);

let mut store_update = guard.store.store_update();
store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid);
store_helper::remove_all_deltas(&mut store_update, guard.shard_uid);

store_helper::set_flat_storage_status(
&mut store_update,
guard.shard_uid,
FlatStorageStatus::Empty,
);
store_update.commit().map_err(|_| StorageError::StorageInternalError)?;
let shard_uid = guard.shard_uid;
store_helper::remove_all_flat_state_values(store_update, shard_uid);
store_helper::remove_all_deltas(store_update, shard_uid);
store_helper::set_flat_storage_status(store_update, shard_uid, FlatStorageStatus::Empty);
guard.update_delta_metrics();

Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: S
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub fn remove_all_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::State);
}

pub fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
let mut buffer = vec![];
buffer.extend_from_slice(&shard_uid.to_bytes());
Expand Down
96 changes: 69 additions & 27 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::state_snapshot::{StateSnapshot, StateSnapshotConfig};
use super::TrieRefcountSubtraction;
use crate::flat::store_helper::remove_all_state_values;
use crate::flat::{FlatStorageManager, FlatStorageStatus};
use crate::trie::config::TrieConfig;
use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle;
Expand Down Expand Up @@ -333,6 +334,16 @@ impl ShardTries {
let (_store, manager) = self.get_state_snapshot(&sync_prev_prev_hash)?;
Ok(manager.get_flat_storage_status(shard_uid))
}

/// Removes all trie state values from store for a given shard_uid
/// Useful when we are trying to delete state of parent shard after resharding
/// Note that flat storage needs to be handled separately
pub fn delete_trie_for_shard(&self, shard_uid: ShardUId, store_update: &mut StoreUpdate) {
// Clear both caches and remove state values from store
self.0.caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid);
self.0.view_caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid);
remove_all_state_values(store_update, shard_uid);
}
}

pub struct WrappedTrieChanges {
Expand Down Expand Up @@ -580,6 +591,30 @@ mod test {
use super::*;
use std::{assert_eq, str::FromStr};

fn create_trie() -> ShardTries {
let store = create_test_store();
let trie_cache_config = TrieCacheConfig {
default_max_bytes: DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT,
per_shard_max_bytes: Default::default(),
shard_cache_deletions_queue_capacity: 0,
};
let trie_config = TrieConfig {
shard_cache_config: trie_cache_config.clone(),
view_shard_cache_config: trie_cache_config,
enable_receipt_prefetching: false,
sweat_prefetch_receivers: Vec::new(),
sweat_prefetch_senders: Vec::new(),
};
let shard_uids = Vec::from([ShardUId::single_shard()]);
ShardTries::new(
store.clone(),
trie_config,
&shard_uids,
FlatStorageManager::new(store),
StateSnapshotConfig::default(),
)
}

#[test]
fn test_delayed_receipt_row_key() {
let trie_key1 = TrieKey::DelayedReceipt { index: 1 };
Expand Down Expand Up @@ -647,31 +682,9 @@ mod test {
//TODO(jbajic) Simplify logic for creating configuration
#[test]
fn test_insert_delete_trie_cache() {
let store = create_test_store();
let trie_cache_config = TrieCacheConfig {
default_max_bytes: DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT,
per_shard_max_bytes: Default::default(),
shard_cache_deletions_queue_capacity: 0,
};
let trie_config = TrieConfig {
shard_cache_config: trie_cache_config.clone(),
view_shard_cache_config: trie_cache_config,
enable_receipt_prefetching: false,
sweat_prefetch_receivers: Vec::new(),
sweat_prefetch_senders: Vec::new(),
};
let shard_uids = Vec::from([ShardUId { shard_id: 0, version: 0 }]);
let shard_uid = *shard_uids.first().unwrap();

let trie = ShardTries::new(
store.clone(),
trie_config,
&shard_uids,
FlatStorageManager::new(store),
StateSnapshotConfig::default(),
);

let trie_caches = &trie.0.caches;
let shard_uid = ShardUId::single_shard();
let tries = create_trie();
let trie_caches = &tries.0.caches;
// Assert only one cache for one shard exists
assert_eq!(trie_caches.read().unwrap().len(), 1);
// Assert the shard uid is correct
Expand All @@ -684,14 +697,14 @@ mod test {
assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());

let insert_ops = Vec::from([(&key, Some(val.as_slice()))]);
trie.update_cache(insert_ops, shard_uid);
tries.update_cache(insert_ops, shard_uid);
assert_eq!(
trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(),
val
);

let deletions_ops = Vec::from([(&key, None)]);
trie.update_cache(deletions_ops, shard_uid);
tries.update_cache(deletions_ops, shard_uid);
assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
}

Expand Down Expand Up @@ -740,4 +753,33 @@ mod test {
trie.update_cache(insert_ops, shard_uid);
assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
}

#[test]
fn test_delete_trie_for_shard() {
let shard_uid = ShardUId::single_shard();
let tries = create_trie();

let key = CryptoHash::hash_borsh("alice").as_bytes().to_vec();
let val: Vec<u8> = Vec::from([0, 1, 2, 3, 4]);

// insert some data
let trie = tries.get_trie_for_shard(shard_uid, CryptoHash::default());
let trie_changes = trie.update(vec![(key, Some(val))]).unwrap();
let mut store_update = tries.store_update();
tries.apply_insertions(&trie_changes, shard_uid, &mut store_update);
store_update.commit().unwrap();

// delete trie for shard_uid
let mut store_update = tries.store_update();
tries.delete_trie_for_shard(shard_uid, &mut store_update);
store_update.commit().unwrap();

// verify if data and caches are deleted
assert!(tries.0.caches.read().unwrap().get(&shard_uid).is_none());
assert!(tries.0.view_caches.read().unwrap().get(&shard_uid).is_none());
let store = tries.get_store();
let key_prefix = shard_uid.to_bytes();
let mut iter = store.iter_prefix(DBCol::State, &key_prefix);
assert!(iter.next().is_none());
}
}
18 changes: 15 additions & 3 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ fn test_flat_storage_creation_sanity() {
);
}

get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
let mut store_update = store.store_update();
get_flat_storage_manager(&env)
.remove_flat_storage_for_shard(shard_uid, &mut store_update)
.unwrap();
store_update.commit().unwrap();
}

// Create new chain and runtime using the same store. It should produce next blocks normally, but now it should
Expand Down Expand Up @@ -274,7 +278,11 @@ fn test_flat_storage_creation_two_shards() {
);
}

get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uids[0]).unwrap();
let mut store_update = store.store_update();
get_flat_storage_manager(&env)
.remove_flat_storage_for_shard(shard_uids[0], &mut store_update)
.unwrap();
store_update.commit().unwrap();
}

// Check that flat storage is not ready for shard 0 but ready for shard 1.
Expand Down Expand Up @@ -416,7 +424,11 @@ fn test_catchup_succeeds_even_if_no_new_blocks() {
env.produce_block(0, height);
}
// Remove flat storage.
get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
let mut store_update = store.store_update();
get_flat_storage_manager(&env)
.remove_flat_storage_for_shard(shard_uid, &mut store_update)
.unwrap();
store_update.commit().unwrap();
}
let mut env = setup_env(&genesis, store.clone());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2466,10 +2466,12 @@ fn test_catchup_gas_price_change() {
let store = rt.store();

let shard_id = msg.shard_uid.shard_id as ShardId;
let mut store_update = store.store_update();
assert!(rt
.get_flat_storage_manager()
.remove_flat_storage_for_shard(msg.shard_uid)
.remove_flat_storage_for_shard(msg.shard_uid, &mut store_update)
.unwrap());
store_update.commit().unwrap();
for part_id in 0..msg.num_parts {
let key = borsh::to_vec(&StatePartKey(msg.sync_hash, shard_id, part_id)).unwrap();
let part = store.get(DBCol::StateParts, &key).unwrap().unwrap();
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/tests/client/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,12 @@ fn run_state_sync_with_dumped_parts(
let runtime_client_1 = Arc::clone(&env.clients[1].runtime_adapter);
let runtime_client_0 = Arc::clone(&env.clients[0].runtime_adapter);
let client_0_store = runtime_client_0.store();
let mut store_update = runtime_client_1.store().store_update();
assert!(runtime_client_1
.get_flat_storage_manager()
.remove_flat_storage_for_shard(ShardUId::single_shard())
.remove_flat_storage_for_shard(ShardUId::single_shard(), &mut store_update)
.unwrap());
store_update.commit().unwrap();

for part_id in 0..num_parts {
let key = borsh::to_vec(&StatePartKey(sync_hash, 0, part_id)).unwrap();
Expand Down
5 changes: 3 additions & 2 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,12 @@ fn test_dump_epoch_missing_chunk_in_last_block() {
let store = rt.store();

let shard_id = msg.shard_uid.shard_id as ShardId;

let mut store_update = store.store_update();
assert!(rt
.get_flat_storage_manager()
.remove_flat_storage_for_shard(msg.shard_uid)
.remove_flat_storage_for_shard(msg.shard_uid, &mut store_update)
.unwrap());
store_update.commit().unwrap();

for part_id in 0..msg.num_parts {
let key =
Expand Down
6 changes: 4 additions & 2 deletions tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,17 @@ impl FlatStorageCommand {
near_config: &NearConfig,
opener: StoreOpener,
) -> anyhow::Result<()> {
let (_, epoch_manager, rw_hot_runtime, rw_chain_store, _) =
let (_, epoch_manager, rw_hot_runtime, rw_chain_store, store) =
Self::get_db(&opener, home_dir, &near_config, near_store::Mode::ReadWriteExisting);
let tip = rw_chain_store.final_head()?;

// TODO: there should be a method that 'loads' the current flat storage state based on Storage.
let shard_uid = epoch_manager.shard_id_to_uid(cmd.shard_id, &tip.epoch_id)?;
let flat_storage_manager = rw_hot_runtime.get_flat_storage_manager();
flat_storage_manager.create_flat_storage_for_shard(shard_uid)?;
flat_storage_manager.remove_flat_storage_for_shard(shard_uid)?;
let mut store_update = store.store_update();
flat_storage_manager.remove_flat_storage_for_shard(shard_uid, &mut store_update)?;
store_update.commit()?;
Ok(())
}

Expand Down

0 comments on commit b63dd6c

Please sign in to comment.