diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 97ce975969ad..50de6c48f8db 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -160,10 +160,10 @@ async fn run_tree( .await .context("failed creating DB pool for Merkle tree recovery")?; - let metadata_calculator = - MetadataCalculator::new(metadata_calculator_config, None, tree_pool, recovery_pool) - .await - .context("failed initializing metadata calculator")?; + let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool) + .await + .context("failed initializing metadata calculator")? + .with_recovery_pool(recovery_pool); let tree_reader = Arc::new(metadata_calculator.tree_reader()); app_health.insert_component(metadata_calculator.tree_health_check()); diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 7651cc59c28d..0e51a0d956a7 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -107,7 +107,7 @@ impl Cli { } if self.prune { - let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone(), 0); + let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone()); pruner.set_poll_interval(Duration::from_secs(10)); let pruner_thread = thread::spawn(|| pruner.run()); pruner_handles = Some((pruner_handle, pruner_thread)); @@ -160,6 +160,16 @@ impl Cli { let output = tree.extend(kvs.collect()); output.root_hash }; + + if let Some((pruner_handle, _)) = &pruner_handles { + if pruner_handle.set_target_retained_version(version).is_err() { + tracing::error!("Pruner unexpectedly stopped"); + let (_, pruner_thread) = pruner_handles.unwrap(); + pruner_thread.join().expect("Pruner panicked"); + return; // unreachable + } + } + let elapsed = start.elapsed(); tracing::info!("Processed block #{version} in {elapsed:?}, root hash = {root_hash:?}"); } @@ -172,8 +182,8 @@ impl Cli { tracing::info!("Verified tree consistency in {elapsed:?}"); if let Some((pruner_handle, pruner_thread)) = pruner_handles { - pruner_handle.abort(); - pruner_thread.join().unwrap(); + drop(pruner_handle); + pruner_thread.join().expect("Pruner panicked"); } } diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index e8a634057785..09587a6ce2c4 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -6,12 +6,13 @@ use zksync_prover_interface::inputs::{PrepareBasicCircuitsJob, StorageLogMetadat use zksync_types::{L1BatchNumber, StorageKey}; use crate::{ + consistency::ConsistencyError, storage::{PatchSet, Patched, RocksDBWrapper}, types::{ Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH, }, - BlockOutput, HashTree, MerkleTree, NoVersionError, + BlockOutput, HashTree, MerkleTree, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError, }; /// Metadata for the current tree state. @@ -42,6 +43,7 @@ pub struct ZkSyncTree { tree: MerkleTree>, thread_pool: Option, mode: TreeMode, + pruning_enabled: bool, } impl ZkSyncTree { @@ -93,9 +95,26 @@ impl ZkSyncTree { tree: MerkleTree::new(Patched::new(db)), thread_pool: None, mode, + pruning_enabled: false, } } + /// Returns tree pruner and a handle to stop it. + /// + /// # Panics + /// + /// Panics if this method was already called for the tree instance; it's logically unsound to run + /// multiple pruners for the same tree concurrently. + pub fn pruner(&mut self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + assert!( + !self.pruning_enabled, + "pruner was already obtained for the tree" + ); + self.pruning_enabled = true; + let db = self.tree.db.inner().clone(); + MerkleTreePruner::new(db) + } + /// Returns a readonly handle to the tree. The handle **does not** see uncommitted changes to the tree, /// only ones flushed to RocksDB. pub fn reader(&self) -> ZkSyncTreeReader { @@ -360,6 +379,14 @@ impl ZkSyncTreeReader { L1BatchNumber(number) } + /// Returns the minimum L1 batch number retained by the tree. + #[allow(clippy::missing_panics_doc)] + pub fn min_l1_batch_number(&self) -> Option { + self.0.first_retained_version().map(|version| { + L1BatchNumber(u32::try_from(version).expect("integer overflow for L1 batch number")) + }) + } + /// Returns the number of leaves in the tree. pub fn leaf_count(&self) -> u64 { self.0.latest_root().leaf_count() @@ -379,4 +406,17 @@ impl ZkSyncTreeReader { let version = u64::from(l1_batch_number.0); self.0.entries_with_proofs(version, keys) } + + /// Verifies consistency of the tree at the specified L1 batch number. + /// + /// # Errors + /// + /// Returns the first encountered verification error, should one occur. + pub fn verify_consistency( + &self, + l1_batch_number: L1BatchNumber, + ) -> Result<(), ConsistencyError> { + let version = l1_batch_number.0.into(); + self.0.verify_consistency(version, true) + } } diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index 738c79c3718a..caa965751578 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -241,6 +241,19 @@ impl MerkleTree { } } +impl MerkleTree { + /// Returns the first retained version of the tree. + pub fn first_retained_version(&self) -> Option { + match self.db.min_stale_key_version() { + // Min stale key version is next after the first retained version since at least + // the root is updated on each version. + Some(version) => version.checked_sub(1), + // No stale keys means all past versions of the tree have been pruned + None => self.latest_version(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/lib/merkle_tree/src/metrics.rs b/core/lib/merkle_tree/src/metrics.rs index ef1e94f9b050..8c8fdc4aeaa4 100644 --- a/core/lib/merkle_tree/src/metrics.rs +++ b/core/lib/merkle_tree/src/metrics.rs @@ -334,7 +334,7 @@ pub struct PruningStats { } impl PruningStats { - pub fn report(self) { + pub fn report(&self) { PRUNING_METRICS .target_retained_version .set(self.target_retained_version); diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 5b1911ca6005..9e99098cbfd4 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -1,26 +1,54 @@ //! Tree pruning logic. -use std::{fmt, sync::mpsc, time::Duration}; +use std::{ + fmt, + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc, Arc, Weak, + }, + time::Duration, +}; use crate::{ metrics::{PruningStats, PRUNING_TIMINGS}, storage::{PruneDatabase, PrunePatchSet}, }; +/// Error returned by [`MerkleTreePrunerHandle::set_target_retained_version()`]. +#[derive(Debug)] +pub struct PrunerStoppedError(()); + +impl fmt::Display for PrunerStoppedError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("Merkle tree pruner stopped") + } +} + /// Handle for a [`MerkleTreePruner`] allowing to abort its operation. /// /// The pruner is aborted once the handle is dropped. #[must_use = "Pruner is aborted once handle is dropped"] #[derive(Debug)] pub struct MerkleTreePrunerHandle { - aborted_sender: mpsc::Sender<()>, + _aborted_sender: mpsc::Sender<()>, + target_retained_version: Weak, } impl MerkleTreePrunerHandle { - /// Aborts the pruner that this handle is attached to. If the pruner has already terminated - /// (e.g., due to a panic), this is a no-op. - pub fn abort(self) { - self.aborted_sender.send(()).ok(); + /// Sets the version of the tree the pruner should attempt to prune to. Calls should provide + /// monotonically increasing versions; call with a lesser version will have no effect. + /// + /// Returns the previously set target retained version. + /// + /// # Errors + /// + /// If the pruner has stopped (e.g., due to a panic), this method will return an error. + pub fn set_target_retained_version(&self, new_version: u64) -> Result { + if let Some(version) = self.target_retained_version.upgrade() { + Ok(version.fetch_max(new_version, Ordering::Relaxed)) + } else { + Err(PrunerStoppedError(())) + } } } @@ -35,44 +63,45 @@ impl MerkleTreePrunerHandle { /// stale keys are recorded in a separate column family. A pruner takes stale keys that were produced /// by a certain range of tree versions, and removes the corresponding nodes from the tree /// (in RocksDB, this uses simple pointwise `delete_cf()` operations). The range of versions -/// depends on pruning policies; for now, it's "remove versions older than `latest_version - N`", -/// where `N` is a configurable number set when the pruner [is created](Self::new()). +/// depends on pruning policies; for now, it's passed via the pruner handle. pub struct MerkleTreePruner { db: DB, - past_versions_to_keep: u64, target_pruned_key_count: usize, poll_interval: Duration, aborted_receiver: mpsc::Receiver<()>, + target_retained_version: Arc, } impl fmt::Debug for MerkleTreePruner { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter .debug_struct("MerkleTreePruner") - .field("past_versions_to_keep", &self.past_versions_to_keep) .field("target_pruned_key_count", &self.target_pruned_key_count) .field("poll_interval", &self.poll_interval) + .field("target_retained_version", &self.target_retained_version) .finish_non_exhaustive() } } impl MerkleTreePruner { - /// Creates a pruner with the specified database and the number of past tree versions to keep. - /// E.g., 0 means keeping only the latest version. + /// Creates a pruner with the specified database. /// /// # Return value /// - /// Returns the created pruner and a handle to it. *The pruner will be aborted when its handle - /// is dropped.* - pub fn new(db: DB, past_versions_to_keep: u64) -> (Self, MerkleTreePrunerHandle) { + /// Returns the created pruner and a handle to it. *The pruner will be aborted when its handle is dropped.* + pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); - let handle = MerkleTreePrunerHandle { aborted_sender }; + let target_retained_version = Arc::new(AtomicU64::new(0)); + let handle = MerkleTreePrunerHandle { + _aborted_sender: aborted_sender, + target_retained_version: Arc::downgrade(&target_retained_version), + }; let this = Self { db, - past_versions_to_keep, target_pruned_key_count: 500_000, poll_interval: Duration::from_secs(60), aborted_receiver, + target_retained_version, }; (this, handle) } @@ -94,18 +123,33 @@ impl MerkleTreePruner { self.poll_interval = poll_interval; } - fn target_retained_version(&self) -> Option { + /// Returns max version number that can be safely pruned, so that there is at least one version present after pruning. + #[doc(hidden)] // Used in integration tests; logically private + pub fn last_prunable_version(&self) -> Option { let manifest = self.db.manifest()?; - let latest_version = manifest.version_count.checked_sub(1)?; - latest_version.checked_sub(self.past_versions_to_keep) + manifest.version_count.checked_sub(1) } #[doc(hidden)] // Used in integration tests; logically private #[allow(clippy::range_plus_one)] // exclusive range is required by `PrunePatchSet` constructor - pub fn run_once(&mut self) -> Option { - let target_retained_version = self.target_retained_version()?; + pub fn prune_up_to(&mut self, target_retained_version: u64) -> Option { let min_stale_key_version = self.db.min_stale_key_version()?; + + // We must retain at least one tree version. + let last_prunable_version = self.last_prunable_version(); + if last_prunable_version.is_none() { + tracing::info!("Nothing to prune; skipping"); + return None; + } + let target_retained_version = last_prunable_version?.min(target_retained_version); let stale_key_new_versions = min_stale_key_version..=target_retained_version; + if stale_key_new_versions.is_empty() { + tracing::info!( + "No Merkle tree versions can be pruned; min stale key version is {min_stale_key_version}, \ + target retained version is {target_retained_version}" + ); + return None; + } tracing::info!("Collecting stale keys with new versions in {stale_key_new_versions:?}"); let load_stale_keys_latency = PRUNING_TIMINGS.load_stale_keys.start(); @@ -142,33 +186,38 @@ impl MerkleTreePruner { Some(stats) } - /// Runs this pruner indefinitely until it is aborted by dropping its handle. + fn wait_for_abort(&mut self, timeout: Duration) -> bool { + match self.aborted_receiver.recv_timeout(timeout) { + Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => true, + Err(mpsc::RecvTimeoutError::Timeout) => { + // The pruner handle is alive and wasn't used to abort the pruner. + false + } + } + } + + /// Runs this pruner indefinitely until it is aborted. pub fn run(mut self) { tracing::info!("Started Merkle tree pruner {self:?}"); - loop { - let timeout = if let Some(stats) = self.run_once() { - let has_more_work = stats.has_more_work(); + + let mut wait_interval = Duration::ZERO; + while !self.wait_for_abort(wait_interval) { + let retained_version = self.target_retained_version.load(Ordering::Relaxed); + if let Some(stats) = self.prune_up_to(retained_version) { + tracing::debug!( + "Performed pruning for target retained version {retained_version}: {stats:?}" + ); stats.report(); - if has_more_work { - Duration::ZERO - } else { - self.poll_interval + if stats.has_more_work() { + continue; } } else { - tracing::debug!("No pruning required per specified policies; waiting"); - self.poll_interval - }; - - match self.aborted_receiver.recv_timeout(timeout) { - Ok(()) => break, // Abort was requested - Err(mpsc::RecvTimeoutError::Disconnected) => { - tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); - break; - } - Err(mpsc::RecvTimeoutError::Timeout) => { - // The pruner handle is alive and wasn't used to abort the pruner. - } + tracing::debug!( + "Pruning was not performed; waiting {:?}", + self.poll_interval + ); } + wait_interval = self.poll_interval; } } } @@ -203,9 +252,12 @@ mod tests { #[test] fn pruner_basics() { let mut db = create_db(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, 0); + assert_eq!(MerkleTree::new(&mut db).first_retained_version(), Some(0)); - let stats = pruner.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert!(stats.pruned_key_count > 0); assert_eq!(stats.deleted_stale_key_versions, 1..5); assert_eq!(stats.target_retained_version, 4); @@ -216,16 +268,20 @@ mod tests { assert!(db.root_mut(version).is_none()); } assert!(db.root_mut(4).is_some()); + + assert_eq!(MerkleTree::new(&mut db).first_retained_version(), Some(4)); } #[test] fn pruner_with_intermediate_commits() { let mut db = create_db(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, 0); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); pruner.set_target_pruned_key_count(1); for i in 1..5 { - let stats = pruner.run_once().unwrap(); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert!(stats.pruned_key_count > 0); assert_eq!(stats.deleted_stale_key_versions, i..(i + 1)); assert_eq!(stats.target_retained_version, 4); @@ -235,11 +291,11 @@ mod tests { #[test] fn pruner_is_aborted_immediately_when_requested() { - let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default(), 0); + let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default()); pruner.set_poll_interval(Duration::from_secs(30)); let join_handle = thread::spawn(|| pruner.run()); - pruner_handle.abort(); + drop(pruner_handle); let start = Instant::now(); join_handle.join().unwrap(); assert!(start.elapsed() < Duration::from_secs(10)); @@ -260,8 +316,10 @@ mod tests { } let latest_version = tree.latest_version().unwrap(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, past_versions_to_keep); - let stats = pruner.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap() - past_versions_to_keep) + .unwrap(); assert!(stats.pruned_key_count > 0); let first_retained_version = latest_version.saturating_sub(past_versions_to_keep); assert_eq!(stats.target_retained_version, first_retained_version); @@ -272,6 +330,7 @@ mod tests { assert_no_stale_keys(&db, first_retained_version); let mut tree = MerkleTree::new(&mut db); + assert_eq!(tree.first_retained_version(), Some(first_retained_version)); for version in first_retained_version..=latest_version { tree.verify_consistency(version, true).unwrap(); } @@ -282,8 +341,10 @@ mod tests { } let latest_version = tree.latest_version().unwrap(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, past_versions_to_keep); - let stats = pruner.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap() - past_versions_to_keep) + .unwrap(); assert!(stats.pruned_key_count > 0); let first_retained_version = latest_version.saturating_sub(past_versions_to_keep); assert_eq!(stats.target_retained_version, first_retained_version); @@ -344,7 +405,10 @@ mod tests { let new_keys_in_db: HashSet<_> = db.nodes_mut().map(|(key, _)| *key).collect(); assert!(new_keys_in_db.is_superset(&keys_in_db)); - let stats = MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert_eq!(stats.pruned_key_count, keys_in_db.len() + batch_count); // ^ roots are not counted in `keys_in_db` @@ -378,12 +442,18 @@ mod tests { for chunk in new_kvs.chunks(20) { MerkleTree::new(&mut db).extend(chunk.to_vec()); if prune_iteratively { - MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); } } if !prune_iteratively { - MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); } let new_leaf_keys_in_db = leaf_keys(&mut db); assert!(new_leaf_keys_in_db.is_disjoint(&leaf_keys_in_db)); diff --git a/core/lib/merkle_tree/tests/integration/merkle_tree.rs b/core/lib/merkle_tree/tests/integration/merkle_tree.rs index 68cceeedef91..823f5be21303 100644 --- a/core/lib/merkle_tree/tests/integration/merkle_tree.rs +++ b/core/lib/merkle_tree/tests/integration/merkle_tree.rs @@ -622,8 +622,8 @@ mod rocksdb { fn snapshot_for_pruned_tree(chunk_size: usize) { let Harness { mut db, dir: _dir } = Harness::new(); test_intermediate_commits(&mut db, chunk_size); - let (mut pruner, _) = MerkleTreePruner::new(&mut db, 0); - pruner.run_once(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + pruner.prune_up_to(pruner.last_prunable_version().unwrap()); let raw_db = db.into_inner(); let snapshot_name = format!("db-snapshot-{chunk_size}-chunked-commits-pruned"); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index f1a078ac8922..b16c088ef428 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1016,9 +1016,10 @@ async fn run_tree( .build() .await .context("failed to build connection pool for Merkle tree recovery")?; - let metadata_calculator = MetadataCalculator::new(config, object_store, pool, recovery_pool) + let metadata_calculator = MetadataCalculator::new(config, object_store, pool) .await - .context("failed initializing metadata_calculator")?; + .context("failed initializing metadata_calculator")? + .with_recovery_pool(recovery_pool); if let Some(api_config) = api_config { let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); @@ -1035,7 +1036,6 @@ async fn run_tree( let tree_health_check = metadata_calculator.tree_health_check(); app_health.insert_component(tree_health_check); - let tree_task = tokio::spawn(metadata_calculator.run(stop_receiver)); task_futures.push(tree_task); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 49f51a95116a..2ca68039b14d 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -26,6 +26,7 @@ use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageKey, H256}; use super::{ metrics::{LoadChangesStage, TreeUpdateStage, METRICS}, + pruning::PruningHandles, MetadataCalculatorConfig, }; @@ -35,6 +36,7 @@ pub struct MerkleTreeInfo { pub mode: MerkleTreeMode, pub root_hash: H256, pub next_l1_batch_number: L1BatchNumber, + pub min_l1_batch_number: Option, pub leaf_count: u64, } @@ -156,6 +158,10 @@ impl AsyncTree { self.mode } + pub fn pruner(&mut self) -> PruningHandles { + self.as_mut().pruner() + } + pub fn reader(&self) -> AsyncTreeReader { AsyncTreeReader { inner: self.inner.as_ref().expect(Self::INCONSISTENT_MSG).reader(), @@ -235,12 +241,21 @@ impl AsyncTreeReader { mode: self.mode, root_hash: self.inner.root_hash(), next_l1_batch_number: self.inner.next_l1_batch_number(), + min_l1_batch_number: self.inner.min_l1_batch_number(), leaf_count: self.inner.leaf_count(), }) .await .unwrap() } + #[cfg(test)] + pub async fn verify_consistency(self, l1_batch_number: L1BatchNumber) -> anyhow::Result<()> { + tokio::task::spawn_blocking(move || self.inner.verify_consistency(l1_batch_number)) + .await + .context("tree consistency verification panicked")? + .map_err(Into::into) + } + pub async fn entries_with_proofs( self, l1_batch_number: L1BatchNumber, diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 54d4163ae61b..14c130da4b54 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::Context as _; -use tokio::sync::watch; +use tokio::sync::{oneshot, watch}; use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, @@ -17,16 +17,18 @@ use zksync_dal::{ConnectionPool, Core}; use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::ObjectStore; -pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; +pub use self::{helpers::LazyAsyncTreeReader, pruning::MerkleTreePruningTask}; use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, metrics::{ConfigLabels, METRICS}, + pruning::PruningHandles, updater::TreeUpdater, }; mod helpers; mod metrics; +mod pruning; mod recovery; #[cfg(test)] pub(crate) mod tests; @@ -86,6 +88,7 @@ impl MetadataCalculatorConfig { pub struct MetadataCalculator { config: MetadataCalculatorConfig, tree_reader: watch::Sender>, + pruning_handles_sender: oneshot::Sender, object_store: Option>, pool: ConnectionPool, recovery_pool: ConnectionPool, @@ -99,14 +102,11 @@ impl MetadataCalculator { /// /// # Arguments /// - /// - `pool` can have a single connection. - /// - `recovery_pool` will only be used in case of snapshot recovery. It should have multiple connections (e.g., 10) - /// to speed up recovery. + /// - `pool` can have a single connection (but then you should set a separate recovery pool). pub async fn new( config: MetadataCalculatorConfig, object_store: Option>, pool: ConnectionPool, - recovery_pool: ConnectionPool, ) -> anyhow::Result { if let Err(err) = METRICS.info.set(ConfigLabels::new(&config)) { tracing::warn!( @@ -129,9 +129,10 @@ impl MetadataCalculator { let (_, health_updater) = ReactiveHealthCheck::new("tree"); Ok(Self { tree_reader: watch::channel(None).0, + pruning_handles_sender: oneshot::channel().0, object_store, + recovery_pool: pool.clone(), pool, - recovery_pool, delayer: Delayer::new(config.delay_interval), health_updater, max_l1_batches_per_iter: config.max_l1_batches_per_iter, @@ -139,6 +140,13 @@ impl MetadataCalculator { }) } + /// Sets a separate pool that will be used in case of snapshot recovery. It should have multiple connections + /// (e.g., 10) to speed up recovery. + pub fn with_recovery_pool(mut self, recovery_pool: ConnectionPool) -> Self { + self.recovery_pool = recovery_pool; + self + } + /// Returns a health check for this calculator. pub fn tree_health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() @@ -149,6 +157,15 @@ impl MetadataCalculator { LazyAsyncTreeReader(self.tree_reader.subscribe()) } + /// Returns a task that can be used to prune the Merkle tree according to the pruning logs in Postgres. + /// This method should be called once; only the latest returned task will do any job, all previous ones + /// will terminate immediately. + pub fn pruning_task(&mut self, poll_interval: Duration) -> MerkleTreePruningTask { + let (pruning_handles_sender, pruning_handles) = oneshot::channel(); + self.pruning_handles_sender = pruning_handles_sender; + MerkleTreePruningTask::new(pruning_handles, self.pool.clone(), poll_interval) + } + async fn create_tree(&self) -> anyhow::Result { self.health_updater .update(MerkleTreeHealth::Initialization.into()); @@ -179,7 +196,7 @@ impl MetadataCalculator { &self.health_updater, ) .await?; - let Some(tree) = tree else { + let Some(mut tree) = tree else { return Ok(()); // recovery was aborted because a stop signal was received }; let tree_reader = tree.reader(); @@ -187,6 +204,10 @@ impl MetadataCalculator { "Merkle tree is initialized and ready to process L1 batches: {:?}", tree_reader.clone().info().await ); + + if !self.pruning_handles_sender.is_closed() { + self.pruning_handles_sender.send(tree.pruner()).ok(); + } self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs new file mode 100644 index 000000000000..9346a5be2c86 --- /dev/null +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -0,0 +1,248 @@ +//! Merkle tree pruning logic. + +use std::time::Duration; + +use anyhow::Context as _; +use tokio::sync::{oneshot, watch}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_merkle_tree::{MerkleTreePruner, MerkleTreePrunerHandle, RocksDBWrapper}; + +pub(super) type PruningHandles = (MerkleTreePruner, MerkleTreePrunerHandle); + +/// Task performing Merkle tree pruning according to the pruning entries in Postgres. +#[derive(Debug)] +#[must_use = "Task should `run()` in a managed Tokio task"] +pub struct MerkleTreePruningTask { + handles: oneshot::Receiver, + pool: ConnectionPool, + poll_interval: Duration, +} + +impl MerkleTreePruningTask { + pub(super) fn new( + handles: oneshot::Receiver, + pool: ConnectionPool, + poll_interval: Duration, + ) -> Self { + Self { + handles, + pool, + poll_interval, + } + } + + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let (mut pruner, pruner_handle); + tokio::select! { + res = self.handles => { + match res { + Ok(handles) => (pruner, pruner_handle) = handles, + Err(_) => { + tracing::info!("Merkle tree dropped; shutting down tree pruning"); + return Ok(()); + } + } + } + _ = stop_receiver.changed() => { + tracing::info!("Stop signal received before Merkle tree is initialized; shutting down tree pruning"); + return Ok(()); + } + } + tracing::info!("Obtained pruning handles; starting Merkle tree pruning"); + + // Pruner is not allocated a managed task because it is blocking; its cancellation awareness inherently + // depends on the pruner handle (i.e., this task). + pruner.set_poll_interval(self.poll_interval); + let pruner_task_handle = tokio::task::spawn_blocking(|| pruner.run()); + + while !*stop_receiver.borrow_and_update() { + let mut storage = self.pool.connection_tagged("metadata_calculator").await?; + let pruning_info = storage.pruning_dal().get_pruning_info().await?; + drop(storage); + + if let Some(l1_batch_number) = pruning_info.last_hard_pruned_l1_batch { + let target_retained_version = u64::from(l1_batch_number.0) + 1; + let Ok(prev_target_version) = + pruner_handle.set_target_retained_version(target_retained_version) + else { + tracing::error!("Merkle tree pruning thread unexpectedly stopped"); + return pruner_task_handle + .await + .context("Merkle tree pruning thread panicked"); + }; + if prev_target_version != target_retained_version { + tracing::info!("Set target retained tree version from {prev_target_version} to {target_retained_version}"); + } + } + + if tokio::time::timeout(self.poll_interval, stop_receiver.changed()) + .await + .is_ok() + { + break; + } + } + + tracing::info!("Stop signal received, Merkle tree pruning is shutting down"); + drop(pruner_handle); + pruner_task_handle + .await + .context("Merkle tree pruning thread panicked") + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + use zksync_types::{L1BatchNumber, L2BlockNumber}; + + use super::*; + use crate::{ + genesis::{insert_genesis_batch, GenesisParams}, + metadata_calculator::{ + tests::{extend_db_state_from_l1_batch, gen_storage_logs, mock_config, reset_db_state}, + MetadataCalculator, + }, + utils::testonly::prepare_recovery_snapshot, + }; + + const POLL_INTERVAL: Duration = Duration::from_millis(50); + + #[tokio::test] + async fn basic_tree_pruning_workflow() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + reset_db_state(&pool, 5).await; + + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let reader = calculator.tree_reader(); + let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + + // Wait until the calculator is initialized. + let reader = reader.wait().await; + while reader.clone().info().await.next_l1_batch_number < L1BatchNumber(6) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + // Add a pruning log to force pruning. + storage + .pruning_dal() + .hard_prune_batches_range(L1BatchNumber(3), L2BlockNumber(3)) + .await + .unwrap(); + + while reader.clone().info().await.min_l1_batch_number.unwrap() <= L1BatchNumber(3) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + reader.verify_consistency(L1BatchNumber(5)).await.unwrap(); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + pruning_task_handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn pruning_after_snapshot_recovery() { + let pool = ConnectionPool::::test_pool().await; + let snapshot_logs = gen_storage_logs(100..300, 1).pop().unwrap(); + let mut storage = pool.connection().await.unwrap(); + let snapshot_recovery = prepare_recovery_snapshot( + &mut storage, + L1BatchNumber(23), + L2BlockNumber(23), + &snapshot_logs, + ) + .await; + + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let reader = calculator.tree_reader(); + let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + + // Wait until the calculator is initialized. + let reader = reader.wait().await; + let tree_info = reader.clone().info().await; + assert_eq!( + tree_info.next_l1_batch_number, + snapshot_recovery.l1_batch_number + 1 + ); + assert_eq!( + tree_info.min_l1_batch_number, + Some(snapshot_recovery.l1_batch_number) + ); + + // Add some new L1 batches and wait for them to be processed. + let mut new_logs = gen_storage_logs(500..600, 5); + { + let mut storage = storage.start_transaction().await.unwrap(); + // Logs must be sorted by `log.key` to match their enum index assignment + for batch_logs in &mut new_logs { + batch_logs.sort_unstable_by_key(|log| log.key); + } + + extend_db_state_from_l1_batch( + &mut storage, + snapshot_recovery.l1_batch_number + 1, + new_logs, + ) + .await; + storage.commit().await.unwrap(); + } + + let original_tree_info = loop { + let tree_info = reader.clone().info().await; + if tree_info.next_l1_batch_number == snapshot_recovery.l1_batch_number + 6 { + break tree_info; + } + tokio::time::sleep(POLL_INTERVAL).await; + }; + + // Prune first 3 created batches in Postgres. + storage + .pruning_dal() + .hard_prune_batches_range( + snapshot_recovery.l1_batch_number + 3, + snapshot_recovery.l2_block_number + 3, + ) + .await + .unwrap(); + + // Check that the batches are pruned in the tree. + let pruned_tree_info = loop { + let tree_info = reader.clone().info().await; + if tree_info.min_l1_batch_number.unwrap() > snapshot_recovery.l1_batch_number + 3 { + break tree_info; + } + tokio::time::sleep(POLL_INTERVAL).await; + }; + + assert_eq!(pruned_tree_info.root_hash, original_tree_info.root_hash); + assert_eq!(pruned_tree_info.leaf_count, original_tree_info.leaf_count); + reader + .verify_consistency(pruned_tree_info.next_l1_batch_number - 1) + .await + .unwrap(); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + pruning_task_handle.await.unwrap().unwrap(); + } +} diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 5e34153db26c..ac7225409e39 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -252,10 +252,9 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { &merkle_tree_config, &OperationsManagerConfig { delay_interval: 50 }, ); - let mut calculator = - MetadataCalculator::new(calculator_config, None, pool.clone(), pool.clone()) - .await - .unwrap(); + let mut calculator = MetadataCalculator::new(calculator_config, None, pool.clone()) + .await + .unwrap(); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index fad46873433d..fd942bc5a351 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -424,7 +424,7 @@ async fn setup_calculator_with_options( let calculator_config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); - MetadataCalculator::new(calculator_config, object_store, pool.clone(), pool) + MetadataCalculator::new(calculator_config, object_store, pool) .await .unwrap() } diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index b6f65542202d..7277dfa666a5 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -1,3 +1,4 @@ +use anyhow::Context as _; use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}; use zksync_storage::RocksDB; @@ -56,9 +57,9 @@ impl WiringLayer for MetadataCalculatorLayer { self.0, object_store.map(|store_resource| store_resource.0), main_pool, - recovery_pool, ) - .await?; + .await? + .with_recovery_pool(recovery_pool); let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; app_health.insert_component(metadata_calculator.tree_health_check()); @@ -83,8 +84,7 @@ impl Task for MetadataCalculatorTask { // Wait for all the instances of RocksDB to be destroyed. tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) .await - .unwrap(); - + .context("failed terminating RocksDB instances")?; result } }