From e86870fc3d2509bf1eea9413bc1d55a4c905fd4d Mon Sep 17 00:00:00 2001 From: aldenhu Date: Mon, 2 Dec 2024 22:27:15 +0000 Subject: [PATCH] restructure shared state between StateStore and BufferedState --- storage/aptosdb/src/backup/restore_utils.rs | 5 +- .../aptosdb/src/db/include/aptosdb_reader.rs | 3 +- .../aptosdb/src/db/include/aptosdb_writer.rs | 5 +- .../aptosdb/src/state_store/buffered_state.rs | 46 ++++++---- .../aptosdb/src/state_store/current_state.rs | 28 ++++++ storage/aptosdb/src/state_store/mod.rs | 87 +++++++++++-------- .../src/state_store/persisted_state.rs | 44 ++++++++++ .../state_merkle_batch_committer.rs | 13 ++- .../state_store/state_snapshot_committer.rs | 8 +- storage/scratchpad/src/lib.rs | 2 +- .../scratchpad/src/sparse_merkle/ancestors.rs | 46 ---------- .../scratchpad/src/sparse_merkle/dropper.rs | 2 +- storage/scratchpad/src/sparse_merkle/mod.rs | 3 +- 13 files changed, 169 insertions(+), 123 deletions(-) create mode 100644 storage/aptosdb/src/state_store/current_state.rs create mode 100644 storage/aptosdb/src/state_store/persisted_state.rs delete mode 100644 storage/scratchpad/src/sparse_merkle/ancestors.rs diff --git a/storage/aptosdb/src/backup/restore_utils.rs b/storage/aptosdb/src/backup/restore_utils.rs index 851c3d1489b6b..df8a7400eff62 100644 --- a/storage/aptosdb/src/backup/restore_utils.rs +++ b/storage/aptosdb/src/backup/restore_utils.rs @@ -164,8 +164,9 @@ pub(crate) fn save_transactions( ledger_db.write_schemas(ledger_db_batch)?; - *state_store.current_state().lock() = - StateDelta::new_empty_with_version(Some(last_version)); + state_store + .current_state() + .set(StateDelta::new_empty_with_version(Some(last_version))); } Ok(()) diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index f4cd864c909cf..7a29fee5eaf8f 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -63,7 +63,7 @@ impl DbReader for AptosDB { fn get_pre_committed_version(&self) -> Result> { gauged_api("get_pre_committed_version", || { - Ok(self.state_store.current_state().lock().current_version) + Ok(self.state_store.current_state().current_version) }) } @@ -641,7 +641,6 @@ impl DbReader for AptosDB { Ok(self .state_store .current_state() - .lock() .base_version ) }) diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 19212ab188798..493b6e61c7cde 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -238,8 +238,7 @@ impl AptosDB { ); { - let current_state_guard = self.state_store.current_state(); - let current_state = current_state_guard.lock(); + let current_state = self.state_store.current_state(); ensure!( chunk.base_state_version == current_state.base_version, "base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}", @@ -555,7 +554,7 @@ impl AptosDB { version_to_commit: Version, ) -> Result> { let old_committed_ver = self.ledger_db.metadata_db().get_synced_version()?; - let pre_committed_ver = self.state_store.current_state().lock().current_version; + let pre_committed_ver = self.state_store.current_state().current_version; ensure!( old_committed_ver.is_none() || version_to_commit >= old_committed_ver.unwrap(), "Version too old to commit. Committed: {:?}; Trying to commit with LI: {}", diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index 89f1e305e6f51..dbda93e92dc63 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -5,17 +5,19 @@ use crate::{ metrics::{LATEST_CHECKPOINT_VERSION, OTHER_TIMERS_SECONDS}, - state_store::{state_snapshot_committer::StateSnapshotCommitter, CurrentState, StateDb}, + state_store::{ + persisted_state::PersistedState, state_snapshot_committer::StateSnapshotCommitter, + CurrentState, StateDb, + }, }; +use aptos_infallible::Mutex; use aptos_logger::info; use aptos_metrics_core::TimerHelper; -use aptos_scratchpad::SmtAncestors; use aptos_storage_interface::{ db_ensure as ensure, state_store::{sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta}, AptosDbError, Result, }; -use aptos_types::state_store::state_value::StateValue; use std::{ sync::{ mpsc, @@ -41,7 +43,7 @@ pub struct BufferedState { /// state after the latest checkpoint. The `current` is the latest speculative state. /// n.b. this is an `Arc` shared with the StateStore so that merely querying the latest state /// does not require locking the buffered state. - state_after_checkpoint: CurrentState, + state_after_checkpoint: Arc>, state_commit_sender: SyncSender>>, target_items: usize, join_handle: Option>, @@ -58,12 +60,16 @@ impl BufferedState { state_db: &Arc, state_after_checkpoint: StateDelta, target_items: usize, - ) -> (Self, SmtAncestors, CurrentState) { + current_state: Arc>, + persisted_state: Arc>, + ) -> Self { let (state_commit_sender, state_commit_receiver) = mpsc::sync_channel(ASYNC_COMMIT_CHANNEL_BUFFER_SIZE as usize); let arc_state_db = Arc::clone(state_db); - let smt_ancestors = SmtAncestors::new(state_after_checkpoint.base.clone()); - let smt_ancestors_clone = smt_ancestors.clone(); + persisted_state + .lock() + .set(state_after_checkpoint.base.clone()); + let persisted_state_clone = persisted_state.clone(); // Create a new thread with receiver subscribing to state commit changes let join_handle = std::thread::Builder::new() .name("state-committer".to_string()) @@ -71,12 +77,12 @@ impl BufferedState { let committer = StateSnapshotCommitter::new( arc_state_db, state_commit_receiver, - smt_ancestors_clone, + persisted_state_clone, ); committer.run(); }) .expect("Failed to spawn state committer thread."); - let current_state = CurrentState::new(state_after_checkpoint.clone()); + current_state.lock().set(state_after_checkpoint.clone()); let myself = Self { state_until_checkpoint: None, state_after_checkpoint: current_state.clone(), @@ -86,7 +92,7 @@ impl BufferedState { join_handle: Some(join_handle), }; myself.report_latest_committed_version(); - (myself, smt_ancestors, current_state) + myself } /// This method checks whether a commit is needed based on the target_items value and the number of items in state_until_checkpoint. @@ -194,7 +200,7 @@ impl BufferedState { new_state_after_checkpoint.base_version == state_after_checkpoint.base_version, "Diff between base and latest checkpoints not provided.", ); - *state_after_checkpoint = new_state_after_checkpoint.clone(); + state_after_checkpoint.set(new_state_after_checkpoint.clone()); } } @@ -205,16 +211,20 @@ impl BufferedState { self.report_latest_committed_version(); Ok(()) } + + pub(crate) fn drain(&mut self) { + self.sync_commit(); + self.state_commit_sender.send(CommitMessage::Exit).unwrap(); + if let Some(handle) = self.join_handle.take() { + handle + .join() + .expect("snapshot commit thread should join peacefully."); + } + } } impl Drop for BufferedState { fn drop(&mut self) { - self.sync_commit(); - self.state_commit_sender.send(CommitMessage::Exit).unwrap(); - self.join_handle - .take() - .expect("snapshot commit thread must exist.") - .join() - .expect("snapshot commit thread should join peacefully."); + self.drain() } } diff --git a/storage/aptosdb/src/state_store/current_state.rs b/storage/aptosdb/src/state_store/current_state.rs new file mode 100644 index 0000000000000..6b251c9dc4f2f --- /dev/null +++ b/storage/aptosdb/src/state_store/current_state.rs @@ -0,0 +1,28 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_storage_interface::state_store::state_delta::StateDelta; +use derive_more::{Deref, DerefMut}; + +#[derive(Clone, Debug, Deref, DerefMut)] +pub(crate) struct CurrentState { + #[deref] + #[deref_mut] + from_latest_checkpoint_to_current: StateDelta, +} + +impl CurrentState { + pub fn new_dummy() -> Self { + Self { + from_latest_checkpoint_to_current: StateDelta::new_empty(), + } + } + + pub fn set(&mut self, from_latest_checkpoint_to_current: StateDelta) { + self.from_latest_checkpoint_to_current = from_latest_checkpoint_to_current; + } + + pub fn get(&self) -> &StateDelta { + &self.from_latest_checkpoint_to_current + } +} diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 3f32b78f6d482..7e44359e91f4e 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -21,7 +21,9 @@ use crate::{ state_kv_db::StateKvDb, state_merkle_db::StateMerkleDb, state_restore::{StateSnapshotRestore, StateSnapshotRestoreMode, StateValueWriter}, - state_store::buffered_state::BufferedState, + state_store::{ + buffered_state::BufferedState, current_state::CurrentState, persisted_state::PersistedState, + }, utils::{ iterators::PrefixedStateValueIterator, new_sharded_kv_schema_batch, @@ -49,7 +51,7 @@ use aptos_jellyfish_merkle::iterator::JellyfishMerkleIterator; use aptos_logger::info; use aptos_metrics_core::TimerHelper; use aptos_schemadb::SchemaBatch; -use aptos_scratchpad::{SmtAncestors, SparseMerkleTree}; +use aptos_scratchpad::SparseMerkleTree; use aptos_storage_interface::{ db_ensure as ensure, db_other_bail as bail, state_store::{ @@ -78,7 +80,6 @@ use aptos_types::{ write_set::WriteSet, }; use claims::{assert_ge, assert_le}; -use derive_more::Deref; use itertools::Itertools; use rayon::prelude::*; use std::{ @@ -91,6 +92,8 @@ pub(crate) mod buffered_state; mod state_merkle_batch_committer; mod state_snapshot_committer; +mod current_state; +mod persisted_state; #[cfg(test)] mod state_store_test; @@ -103,15 +106,6 @@ const MAX_WRITE_SETS_AFTER_SNAPSHOT: LeafCount = buffered_state::TARGET_SNAPSHOT pub const MAX_COMMIT_PROGRESS_DIFFERENCE: u64 = 1_000_000; -#[derive(Clone, Debug, Deref)] -pub(crate) struct CurrentState(#[deref] Arc>); - -impl CurrentState { - pub fn new(from_latest_checkpoint_to_current: StateDelta) -> Self { - Self(Arc::new(Mutex::new(from_latest_checkpoint_to_current))) - } -} - pub(crate) struct StateDb { pub ledger_db: Arc, pub state_merkle_db: Arc, @@ -130,9 +124,9 @@ pub(crate) struct StateStore { buffered_state: Mutex, /// CurrentState is shared between this and the buffered_state. /// On read, we don't need to lock the `buffered_state` to get the latest state. - current_state: Mutex, + pub(crate) current_state: Arc>, /// Tracks a persisted smt, any state older than that is guaranteed to be found in RocksDB - smt_ancestors: Mutex>, + persisted_state: Arc>, buffered_state_target_items: usize, internal_indexer_db: Option, } @@ -232,7 +226,7 @@ impl DbReader for StateDb { impl DbReader for StateStore { fn get_buffered_state_base(&self) -> Result> { - Ok(self.smt_ancestors.lock().get_youngest()) + Ok(self.persisted_state().clone()) } /// Returns the latest state snapshot strictly before `next_version` if any. @@ -337,11 +331,15 @@ impl StateStore { state_kv_pruner, skip_usage, }); - let (buffered_state, smt_ancestors, current_state) = if empty_buffered_state_for_restore { + let current_state = Arc::new(Mutex::new(CurrentState::new_dummy())); + let persisted_state = Arc::new(Mutex::new(PersistedState::new_dummy())); + let buffered_state = if empty_buffered_state_for_restore { BufferedState::new( &state_db, StateDelta::new_empty(), buffered_state_target_items, + current_state.clone(), + persisted_state.clone(), ) } else { Self::create_buffered_state_from_latest_snapshot( @@ -349,6 +347,8 @@ impl StateStore { buffered_state_target_items, hack_for_tests, /*check_max_versions_after_snapshot=*/ true, + current_state.clone(), + persisted_state.clone(), ) .expect("buffered state creation failed.") }; @@ -357,8 +357,8 @@ impl StateStore { state_db, buffered_state: Mutex::new(buffered_state), buffered_state_target_items, - current_state: Mutex::new(current_state), - smt_ancestors: Mutex::new(smt_ancestors), + current_state, + persisted_state, internal_indexer_db, } } @@ -486,9 +486,15 @@ impl StateStore { state_kv_pruner, skip_usage: false, }); - let (_, _, current_state) = Self::create_buffered_state_from_latest_snapshot( - &state_db, 0, /*hack_for_tests=*/ false, + let current_state = Arc::new(Mutex::new(CurrentState::new_dummy())); + let persisted_state = Arc::new(Mutex::new(PersistedState::new_dummy())); + let _ = Self::create_buffered_state_from_latest_snapshot( + &state_db, + 0, + /*hack_for_tests=*/ false, /*check_max_versions_after_snapshot=*/ false, + current_state.clone(), + persisted_state, )?; let base_version = current_state.lock().base_version; Ok(base_version) @@ -499,7 +505,9 @@ impl StateStore { buffered_state_target_items: usize, hack_for_tests: bool, check_max_versions_after_snapshot: bool, - ) -> Result<(BufferedState, SmtAncestors, CurrentState)> { + current_state: Arc>, + persisted_state: Arc>, + ) -> Result { let num_transactions = state_db .ledger_db .metadata_db() @@ -525,7 +533,7 @@ impl StateStore { *SPARSE_MERKLE_PLACEHOLDER_HASH }; let usage = state_db.get_state_storage_usage(latest_snapshot_version)?; - let (mut buffered_state, smt_ancestors, current_state) = BufferedState::new( + let mut buffered_state = BufferedState::new( state_db, StateDelta::new_at_checkpoint( latest_snapshot_root_hash, @@ -533,11 +541,13 @@ impl StateStore { latest_snapshot_version, ), buffered_state_target_items, + current_state.clone(), + persisted_state, ); // In some backup-restore tests we hope to open the db without consistency check. if hack_for_tests { - return Ok((buffered_state, smt_ancestors, current_state)); + return Ok(buffered_state); } // Make sure the committed transactions is ahead of the latest snapshot. @@ -563,7 +573,7 @@ impl StateStore { ); } let snapshot = state_db.get_state_snapshot_before(num_transactions)?; - let current_state_cloned = current_state.lock().clone(); + let current_state_cloned = current_state.lock().get().clone(); let speculative_state = current_state_cloned .current .freeze(¤t_state_cloned.base); @@ -620,21 +630,20 @@ impl StateStore { "StateStore initialization finished.", ); } - Ok((buffered_state, smt_ancestors, current_state)) + Ok(buffered_state) } pub fn reset(&self) { - let (buffered_state, smt_ancestors, current_state) = - Self::create_buffered_state_from_latest_snapshot( - &self.state_db, - self.buffered_state_target_items, - false, - true, - ) - .expect("buffered state creation failed."); - *self.current_state.lock() = current_state; - *self.buffered_state.lock() = buffered_state; - *self.smt_ancestors.lock() = smt_ancestors; + self.buffered_state.lock().drain(); + *self.buffered_state.lock() = Self::create_buffered_state_from_latest_snapshot( + &self.state_db, + self.buffered_state_target_items, + false, + true, + self.current_state.clone(), + self.persisted_state.clone(), + ) + .expect("buffered state creation failed."); } pub fn buffered_state(&self) -> &Mutex { @@ -645,8 +654,12 @@ impl StateStore { self.current_state.lock() } + pub fn persisted_state(&self) -> MutexGuard { + self.persisted_state.lock() + } + pub fn current_state_cloned(&self) -> StateDelta { - self.current_state().lock().clone() + self.current_state().get().clone() } /// Returns the key, value pairs for a particular state key prefix at at desired version. This diff --git a/storage/aptosdb/src/state_store/persisted_state.rs b/storage/aptosdb/src/state_store/persisted_state.rs new file mode 100644 index 0000000000000..a2655a1a796fb --- /dev/null +++ b/storage/aptosdb/src/state_store/persisted_state.rs @@ -0,0 +1,44 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::metrics::OTHER_TIMERS_SECONDS; +use aptos_metrics_core::TimerHelper; +use aptos_scratchpad::{SparseMerkleTree, SUBTREE_DROPPER}; +use aptos_types::state_store::state_value::StateValue; +use std::ops::Deref; + +pub struct PersistedState { + smt: SparseMerkleTree, +} + +impl PersistedState { + const MAX_PENDING_DROPS: usize = 8; + + pub fn new_dummy() -> Self { + Self { + smt: SparseMerkleTree::new_empty(), + } + } + + pub fn get(&self) -> &SparseMerkleTree { + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["get_persisted_state"]); + + // The back pressure is on the getting side (which is the execution side) so that it's less + // likely for a lot of blocks locking the same old base SMT. + SUBTREE_DROPPER.wait_for_backlog_drop(Self::MAX_PENDING_DROPS); + + &self.smt + } + + pub fn set(&mut self, smt: SparseMerkleTree) { + self.smt = smt + } +} + +impl Deref for PersistedState { + type Target = SparseMerkleTree; + + fn deref(&self) -> &Self::Target { + self.get() + } +} diff --git a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs index 85e5c4dcbfcf3..63c4c39d9656a 100644 --- a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs +++ b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs @@ -7,17 +7,16 @@ use crate::{ metrics::{LATEST_SNAPSHOT_VERSION, OTHER_TIMERS_SECONDS}, pruner::PrunerManager, schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema, - state_store::{buffered_state::CommitMessage, StateDb}, + state_store::{buffered_state::CommitMessage, persisted_state::PersistedState, StateDb}, }; use anyhow::{anyhow, ensure, Result}; use aptos_crypto::HashValue; +use aptos_infallible::Mutex; use aptos_jellyfish_merkle::node_type::NodeKey; use aptos_logger::{info, trace}; use aptos_metrics_core::TimerHelper; use aptos_schemadb::SchemaBatch; -use aptos_scratchpad::SmtAncestors; use aptos_storage_interface::state_store::state_delta::StateDelta; -use aptos_types::state_store::state_value::StateValue; use std::sync::{mpsc::Receiver, Arc}; pub struct StateMerkleBatch { @@ -30,19 +29,19 @@ pub struct StateMerkleBatch { pub(crate) struct StateMerkleBatchCommitter { state_db: Arc, state_merkle_batch_receiver: Receiver>, - smt_ancestors: SmtAncestors, + persisted_state: Arc>, } impl StateMerkleBatchCommitter { pub fn new( state_db: Arc, state_merkle_batch_receiver: Receiver>, - smt_ancestors: SmtAncestors, + persisted_state: Arc>, ) -> Self { Self { state_db, state_merkle_batch_receiver, - smt_ancestors, + persisted_state, } } @@ -100,7 +99,7 @@ impl StateMerkleBatchCommitter { .current .log_generation("buffered_state_in_mem_base"); - self.smt_ancestors.add(state_delta.current.clone()); + self.persisted_state.lock().set(state_delta.current.clone()); }, CommitMessage::Sync(finish_sender) => finish_sender.send(()).unwrap(), CommitMessage::Exit => { diff --git a/storage/aptosdb/src/state_store/state_snapshot_committer.rs b/storage/aptosdb/src/state_store/state_snapshot_committer.rs index b45c49fa784fd..f7c14dcc9738c 100644 --- a/storage/aptosdb/src/state_store/state_snapshot_committer.rs +++ b/storage/aptosdb/src/state_store/state_snapshot_committer.rs @@ -7,20 +7,20 @@ use crate::{ metrics::OTHER_TIMERS_SECONDS, state_store::{ buffered_state::CommitMessage, + persisted_state::PersistedState, state_merkle_batch_committer::{StateMerkleBatch, StateMerkleBatchCommitter}, StateDb, }, versioned_node_cache::VersionedNodeCache, }; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; +use aptos_infallible::Mutex; use aptos_logger::trace; -use aptos_scratchpad::SmtAncestors; use aptos_storage_interface::{ jmt_update_refs, jmt_updates, state_store::{state_delta::StateDelta, NUM_STATE_SHARDS}, Result, }; -use aptos_types::state_store::state_value::StateValue; use rayon::prelude::*; use static_assertions::const_assert; use std::{ @@ -45,7 +45,7 @@ impl StateSnapshotCommitter { pub fn new( state_db: Arc, state_snapshot_commit_receiver: Receiver>>, - smt_ancestors: SmtAncestors, + persisted_state: Arc>, ) -> Self { // Note: This is to ensure we cache nodes in memory from previous batches before they get committed to DB. const_assert!( @@ -61,7 +61,7 @@ impl StateSnapshotCommitter { let committer = StateMerkleBatchCommitter::new( arc_state_db, state_merkle_batch_commit_receiver, - smt_ancestors, + persisted_state, ); committer.run(); }) diff --git a/storage/scratchpad/src/lib.rs b/storage/scratchpad/src/lib.rs index 4f2fd28dd2725..3fd9508e01f86 100644 --- a/storage/scratchpad/src/lib.rs +++ b/storage/scratchpad/src/lib.rs @@ -9,6 +9,6 @@ mod sparse_merkle; #[cfg(any(test, feature = "bench", feature = "fuzzing"))] pub use crate::sparse_merkle::test_utils; pub use crate::sparse_merkle::{ - ancestors::SmtAncestors, utils::get_state_shard_id, FrozenSparseMerkleTree, ProofRead, + dropper::SUBTREE_DROPPER, utils::get_state_shard_id, FrozenSparseMerkleTree, ProofRead, SparseMerkleTree, StateStoreStatus, }; diff --git a/storage/scratchpad/src/sparse_merkle/ancestors.rs b/storage/scratchpad/src/sparse_merkle/ancestors.rs deleted file mode 100644 index 54c6eacb421b5..0000000000000 --- a/storage/scratchpad/src/sparse_merkle/ancestors.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -#![forbid(unsafe_code)] - -use crate::{ - sparse_merkle::{dropper::SUBTREE_DROPPER, metrics::TIMER}, - SparseMerkleTree, -}; -use aptos_crypto::hash::CryptoHash; -use aptos_infallible::Mutex; -use aptos_metrics_core::TimerHelper; -use std::sync::Arc; - -/// A container to track the ancestor of the SMTs that represent a committed state (older state is -/// guaranteed to be found in persisted storage. -/// When being queried, back pressure (a slow down) is provided in order to make sure not too many -/// SMTs are being kept in memory. -#[derive(Clone, Debug)] -pub struct SmtAncestors { - youngest: Arc>>, -} - -impl SmtAncestors { - const MAX_PENDING_DROPS: usize = 8; - - pub fn new(ancestor: SparseMerkleTree) -> Self { - Self { - youngest: Arc::new(Mutex::new(ancestor)), - } - } - - pub fn get_youngest(&self) -> SparseMerkleTree { - let _timer = TIMER.timer_with(&["get_youngest_ancestor"]); - - // The back pressure is on the getting side (which is the execution side) so that it's less - // likely for a lot of blocks locking the same old base SMT. - SUBTREE_DROPPER.wait_for_backlog_drop(Self::MAX_PENDING_DROPS); - - self.youngest.lock().clone() - } - - pub fn add(&self, youngest: SparseMerkleTree) { - *self.youngest.lock() = youngest; - } -} diff --git a/storage/scratchpad/src/sparse_merkle/dropper.rs b/storage/scratchpad/src/sparse_merkle/dropper.rs index 5abdbe57be1e9..6ad426f59feb8 100644 --- a/storage/scratchpad/src/sparse_merkle/dropper.rs +++ b/storage/scratchpad/src/sparse_merkle/dropper.rs @@ -6,5 +6,5 @@ use aptos_drop_helper::async_concurrent_dropper::AsyncConcurrentDropper; use once_cell::sync::Lazy; -pub(crate) static SUBTREE_DROPPER: Lazy = +pub static SUBTREE_DROPPER: Lazy = Lazy::new(|| AsyncConcurrentDropper::new("smt_subtree", 32, 8)); diff --git a/storage/scratchpad/src/sparse_merkle/mod.rs b/storage/scratchpad/src/sparse_merkle/mod.rs index 6b9f35e145439..e600b2dda2d78 100644 --- a/storage/scratchpad/src/sparse_merkle/mod.rs +++ b/storage/scratchpad/src/sparse_merkle/mod.rs @@ -70,8 +70,7 @@ // See https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=795cd4f459f1d4a0005a99650726834b #![allow(clippy::while_let_loop)] -pub mod ancestors; -mod dropper; +pub mod dropper; mod metrics; mod node; #[cfg(test)]