diff --git a/core/lib/config/src/configs/api.rs b/core/lib/config/src/configs/api.rs index d80c3b387aa..e1f79d05226 100644 --- a/core/lib/config/src/configs/api.rs +++ b/core/lib/config/src/configs/api.rs @@ -1,10 +1,8 @@ -/// External uses use anyhow::Context as _; use serde::Deserialize; -/// Built-in uses -use std::net::SocketAddr; -use std::time::Duration; -// Local uses + +use std::{net::SocketAddr, time::Duration}; + use super::envy_load; pub use crate::configs::PrometheusConfig; use zksync_basic_types::H256; @@ -20,6 +18,8 @@ pub struct ApiConfig { pub prometheus: PrometheusConfig, /// Configuration options for the Health check. pub healthcheck: HealthCheckConfig, + /// Configuration options for Merkle tree API. + pub merkle_tree: MerkleTreeApiConfig, } impl ApiConfig { @@ -30,6 +30,7 @@ impl ApiConfig { .context("ContractVerificationApiConfig")?, prometheus: PrometheusConfig::from_env().context("PrometheusConfig")?, healthcheck: HealthCheckConfig::from_env().context("HealthCheckConfig")?, + merkle_tree: MerkleTreeApiConfig::from_env().context("MerkleTreeApiConfig")?, }) } } @@ -225,6 +226,25 @@ impl ContractVerificationApiConfig { } } +/// Configuration for the Merkle tree API. +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct MerkleTreeApiConfig { + /// Port to bind the Merkle tree API server to. + #[serde(default = "MerkleTreeApiConfig::default_port")] + pub port: u16, +} + +impl MerkleTreeApiConfig { + const fn default_port() -> u16 { + 3_072 + } + + /// Loads configuration from env variables. + pub fn from_env() -> anyhow::Result { + envy_load("merkle_tree_api", "API_MERKLE_TREE_") + } +} + #[cfg(test)] mod tests { use std::net::IpAddr; @@ -280,6 +300,7 @@ mod tests { push_interval_ms: Some(100), }, healthcheck: HealthCheckConfig { port: 8081 }, + merkle_tree: MerkleTreeApiConfig { port: 8082 }, } } @@ -321,6 +342,7 @@ mod tests { API_PROMETHEUS_PUSHGATEWAY_URL="http://127.0.0.1:9091" API_PROMETHEUS_PUSH_INTERVAL_MS=100 API_HEALTHCHECK_PORT=8081 + API_MERKLE_TREE_PORT=8082 "#; lock.set_env(config); diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 6b26bbd873f..fb6d3d870e1 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -4,8 +4,8 @@ use rayon::{ThreadPool, ThreadPoolBuilder}; use crate::{ storage::{MerkleTreeColumnFamily, PatchSet, Patched, RocksDBWrapper}, - types::{Key, Root, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH}, - BlockOutput, HashTree, MerkleTree, + types::{Key, Root, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH}, + BlockOutput, HashTree, MerkleTree, NoVersionError, }; use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_storage::RocksDB; @@ -98,6 +98,13 @@ impl ZkSyncTree { } } + /// Returns a readonly handle to the tree. The handle **does not** see uncommitted changes to the tree, + /// only ones flushed to RocksDB. + pub fn reader(&self) -> ZkSyncTreeReader { + let db = self.tree.db.inner().clone(); + ZkSyncTreeReader(MerkleTree::new(db)) + } + /// Sets the chunk size for multi-get operations. The requested keys will be split /// into chunks of this size and requested in parallel using `rayon`. Setting chunk size /// to a large value (e.g., `usize::MAX`) will effectively disable parallelism. @@ -373,3 +380,50 @@ impl ZkSyncTree { self.tree.db.reset(); } } + +/// Readonly handle to a [`ZkSyncTree`]. +#[derive(Debug)] +pub struct ZkSyncTreeReader(MerkleTree<'static, RocksDBWrapper>); + +// While cloning `MerkleTree` is logically unsound, cloning a reader is reasonable since it is readonly. +impl Clone for ZkSyncTreeReader { + fn clone(&self) -> Self { + Self(MerkleTree::new(self.0.db.clone())) + } +} + +impl ZkSyncTreeReader { + /// Returns the current root hash of this tree. + pub fn root_hash(&self) -> ValueHash { + self.0.latest_root_hash() + } + + /// Returns the next L1 batch number that should be processed by the tree. + #[allow(clippy::missing_panics_doc)] + pub fn next_l1_batch_number(&self) -> L1BatchNumber { + let number = self.0.latest_version().map_or(0, |version| { + u32::try_from(version + 1).expect("integer overflow for L1 batch number") + }); + L1BatchNumber(number) + } + + /// Returns the number of leaves in the tree. + pub fn leaf_count(&self) -> u64 { + self.0.latest_root().leaf_count() + } + + /// Reads entries together with Merkle proofs with the specified keys from the tree. The entries are returned + /// in the same order as requested. + /// + /// # Errors + /// + /// Returns an error if the tree `version` is missing. + pub fn entries_with_proofs( + &self, + l1_batch_number: L1BatchNumber, + keys: &[Key], + ) -> Result, NoVersionError> { + let version = u64::from(l1_batch_number.0); + self.0.entries_with_proofs(version, keys) + } +} diff --git a/core/lib/merkle_tree/src/storage/database.rs b/core/lib/merkle_tree/src/storage/database.rs index 368b1cb8474..0c1e494dc21 100644 --- a/core/lib/merkle_tree/src/storage/database.rs +++ b/core/lib/merkle_tree/src/storage/database.rs @@ -175,6 +175,11 @@ impl Patched { .map_or_else(Vec::new, |patch| patch.roots.keys().copied().collect()) } + /// Provides readonly access to the wrapped DB. + pub(crate) fn inner(&self) -> &DB { + &self.inner + } + /// Provides access to the wrapped DB. Should not be used to mutate DB data. pub(crate) fn inner_mut(&mut self) -> &mut DB { &mut self.inner diff --git a/core/lib/zksync_core/src/api_server/mod.rs b/core/lib/zksync_core/src/api_server/mod.rs index a224c371502..b05214e6972 100644 --- a/core/lib/zksync_core/src/api_server/mod.rs +++ b/core/lib/zksync_core/src/api_server/mod.rs @@ -1,6 +1,8 @@ // Everywhere in this module the word "block" actually means "miniblock". + pub mod contract_verification; pub mod execution_sandbox; pub mod healthcheck; +pub mod tree; pub mod tx_sender; pub mod web3; diff --git a/core/lib/zksync_core/src/api_server/tree/metrics.rs b/core/lib/zksync_core/src/api_server/tree/metrics.rs new file mode 100644 index 00000000000..e6b552468d8 --- /dev/null +++ b/core/lib/zksync_core/src/api_server/tree/metrics.rs @@ -0,0 +1,24 @@ +//! Metrics for the Merkle tree API. + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit}; + +use std::time::Duration; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "method", rename_all = "snake_case")] +pub(super) enum MerkleTreeApiMethod { + Info, + GetProofs, +} + +/// Metrics for Merkle tree API. +#[derive(Debug, Metrics)] +#[metrics(prefix = "server_merkle_tree_api")] +pub(super) struct MerkleTreeApiMetrics { + /// Server latency of the Merkle tree API methods. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub latency: Family>, +} + +#[vise::register] +pub(super) static API_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/zksync_core/src/api_server/tree/mod.rs b/core/lib/zksync_core/src/api_server/tree/mod.rs new file mode 100644 index 00000000000..f59cd171192 --- /dev/null +++ b/core/lib/zksync_core/src/api_server/tree/mod.rs @@ -0,0 +1,291 @@ +//! Primitive Merkle tree API used internally to fetch proofs. + +use anyhow::Context as _; +use async_trait::async_trait; +use axum::{ + extract::State, + http::{header, StatusCode}, + response::{IntoResponse, Response}, + routing, Json, Router, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use std::{fmt, future::Future, net::SocketAddr, pin::Pin}; + +use zksync_merkle_tree::NoVersionError; +use zksync_types::{L1BatchNumber, H256, U256}; + +mod metrics; +#[cfg(test)] +mod tests; + +use self::metrics::{MerkleTreeApiMethod, API_METRICS}; +use crate::metadata_calculator::{AsyncTreeReader, MerkleTreeInfo}; + +#[derive(Debug, Serialize, Deserialize)] +struct TreeProofsRequest { + l1_batch_number: L1BatchNumber, + hashed_keys: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +struct TreeProofsResponse { + entries: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct TreeEntryWithProof { + #[serde(default, skip_serializing_if = "H256::is_zero")] + pub value: H256, + #[serde(default, skip_serializing_if = "TreeEntryWithProof::is_zero")] + pub index: u64, + pub merkle_path: Vec, +} + +impl TreeEntryWithProof { + fn is_zero(&value: &u64) -> bool { + value == 0 + } +} + +impl TreeEntryWithProof { + fn new(src: zksync_merkle_tree::TreeEntryWithProof) -> Self { + let mut merkle_path = src.merkle_path; + merkle_path.reverse(); // Use root-to-leaf enumeration direction as in Ethereum + Self { + value: src.base.value_hash, + index: src.base.leaf_index, + merkle_path, + } + } +} + +#[derive(Debug)] +enum TreeApiError { + NoTreeVersion(NoVersionError), +} + +impl IntoResponse for TreeApiError { + fn into_response(self) -> Response { + let (status, title, detail) = match self { + Self::NoTreeVersion(err) => { + (StatusCode::NOT_FOUND, "L1 batch not found", err.to_string()) + } + }; + + // Loosely conforms to HTTP Problem Details RFC: https://datatracker.ietf.org/doc/html/rfc7807 + let body = serde_json::json!({ + "type": "/errors#l1-batch-not-found", + "title": title, + "detail": detail, + }); + let headers = [(header::CONTENT_TYPE, "application/problem+json")]; + (status, headers, Json(body)).into_response() + } +} + +/// Client accessing Merkle tree API. +#[async_trait] +pub(crate) trait TreeApiClient { + /// Obtains general information about the tree. + async fn get_info(&self) -> anyhow::Result; + + /// Obtains proofs for the specified `hashed_keys` at the specified tree version (= L1 batch number). + async fn get_proofs( + &self, + l1_batch_number: L1BatchNumber, + hashed_keys: Vec, + ) -> anyhow::Result>; +} + +/// In-memory client implementation. +#[async_trait] +impl TreeApiClient for AsyncTreeReader { + async fn get_info(&self) -> anyhow::Result { + Ok(self.clone().info().await) + } + + async fn get_proofs( + &self, + l1_batch_number: L1BatchNumber, + hashed_keys: Vec, + ) -> anyhow::Result> { + self.get_proofs_inner(l1_batch_number, hashed_keys) + .await + .map_err(Into::into) + } +} + +/// [`TreeApiClient`] implementation requesting data from a Merkle tree API server. +#[derive(Debug, Clone)] +pub(crate) struct TreeApiHttpClient { + inner: reqwest::Client, + info_url: String, + proofs_url: String, +} + +impl TreeApiHttpClient { + #[cfg(test)] // temporary measure until `TreeApiClient` is required by other components + pub fn new(url_base: &str) -> Self { + Self { + inner: reqwest::Client::new(), + info_url: url_base.to_owned(), + proofs_url: format!("{url_base}/proofs"), + } + } +} + +#[async_trait] +impl TreeApiClient for TreeApiHttpClient { + async fn get_info(&self) -> anyhow::Result { + let response = self + .inner + .get(&self.info_url) + .send() + .await + .context("Failed requesting tree info")?; + let response = response + .error_for_status() + .context("Requesting tree info returned non-OK response")?; + response + .json() + .await + .context("Failed deserializing tree info") + } + + async fn get_proofs( + &self, + l1_batch_number: L1BatchNumber, + hashed_keys: Vec, + ) -> anyhow::Result> { + let response = self + .inner + .post(&self.proofs_url) + .json(&TreeProofsRequest { + l1_batch_number, + hashed_keys, + }) + .send() + .await + .with_context(|| format!("Failed requesting proofs for L1 batch #{l1_batch_number}"))?; + let response = response.error_for_status().with_context(|| { + format!("Requesting proofs for L1 batch #{l1_batch_number} returned non-OK response") + })?; + let response: TreeProofsResponse = response.json().await.with_context(|| { + format!("Failed deserializing proofs for L1 batch #{l1_batch_number}") + })?; + Ok(response.entries) + } +} + +impl AsyncTreeReader { + async fn info_handler(State(this): State) -> Json { + let latency = API_METRICS.latency[&MerkleTreeApiMethod::Info].start(); + let info = this.info().await; + latency.observe(); + Json(info) + } + + async fn get_proofs_inner( + &self, + l1_batch_number: L1BatchNumber, + hashed_keys: Vec, + ) -> Result, NoVersionError> { + let proofs = self + .clone() + .entries_with_proofs(l1_batch_number, hashed_keys) + .await?; + Ok(proofs.into_iter().map(TreeEntryWithProof::new).collect()) + } + + async fn get_proofs_handler( + State(this): State, + Json(request): Json, + ) -> Result, TreeApiError> { + let latency = API_METRICS.latency[&MerkleTreeApiMethod::GetProofs].start(); + let entries = this + .get_proofs_inner(request.l1_batch_number, request.hashed_keys) + .await + .map_err(TreeApiError::NoTreeVersion)?; + let response = TreeProofsResponse { entries }; + latency.observe(); + Ok(Json(response)) + } + + fn create_api_server( + self, + bind_address: &SocketAddr, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result { + tracing::debug!("Starting Merkle tree API server on {bind_address}"); + + let app = Router::new() + .route("/", routing::get(Self::info_handler)) + .route("/proofs", routing::post(Self::get_proofs_handler)) + .with_state(self); + + let server = axum::Server::try_bind(bind_address) + .with_context(|| format!("Failed binding Merkle tree API server to {bind_address}"))? + .serve(app.into_make_service()); + let local_addr = server.local_addr(); + let server_future = async move { + server.with_graceful_shutdown(async move { + if stop_receiver.changed().await.is_err() { + tracing::warn!( + "Stop signal sender for Merkle tree API server was dropped without sending a signal" + ); + } + tracing::info!("Stop signal received, Merkle tree API server is shutting down"); + }) + .await + .context("Merkle tree API server failed")?; + + tracing::info!("Merkle tree API server shut down"); + Ok(()) + }; + + Ok(MerkleTreeServer { + local_addr, + server_future: Box::pin(server_future), + }) + } + + /// Runs the HTTP API server. + pub async fn run_api_server( + self, + bind_address: SocketAddr, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + self.create_api_server(&bind_address, stop_receiver)? + .run() + .await + } +} + +/// `axum`-powered REST server for Merkle tree API. +#[must_use = "Server must be `run()`"] +struct MerkleTreeServer { + local_addr: SocketAddr, + server_future: Pin> + Send>>, +} + +impl fmt::Debug for MerkleTreeServer { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("MerkleTreeServer") + .field("local_addr", &self.local_addr) + .finish_non_exhaustive() + } +} + +impl MerkleTreeServer { + #[cfg(test)] + pub fn local_addr(&self) -> &SocketAddr { + &self.local_addr + } + + async fn run(self) -> anyhow::Result<()> { + self.server_future.await + } +} diff --git a/core/lib/zksync_core/src/api_server/tree/tests.rs b/core/lib/zksync_core/src/api_server/tree/tests.rs new file mode 100644 index 00000000000..ea057b8bcc4 --- /dev/null +++ b/core/lib/zksync_core/src/api_server/tree/tests.rs @@ -0,0 +1,77 @@ +//! Tests for the Merkle tree API. + +use tempfile::TempDir; + +use std::net::Ipv4Addr; + +use db_test_macro::db_test; +use zksync_dal::ConnectionPool; + +use super::*; +use crate::metadata_calculator::tests::{ + gen_storage_logs, reset_db_state, run_calculator, setup_calculator, +}; + +#[db_test] +async fn merkle_tree_api(pool: ConnectionPool, prover_pool: ConnectionPool) { + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let api_addr = (Ipv4Addr::LOCALHOST, 0).into(); + let (stop_sender, stop_receiver) = watch::channel(false); + let api_server = calculator + .tree_reader() + .create_api_server(&api_addr, stop_receiver.clone()) + .unwrap(); + let local_addr = *api_server.local_addr(); + let api_server_task = tokio::spawn(api_server.run()); + let api_client = TreeApiHttpClient::new(&format!("http://{local_addr}")); + + reset_db_state(&pool, 5).await; + // Wait until the calculator processes initial L1 batches. + run_calculator(calculator, pool, prover_pool).await; + + // Query the API. + let tree_info = api_client.get_info().await.unwrap(); + assert!(tree_info.leaf_count > 20); + assert_eq!(tree_info.next_l1_batch_number, L1BatchNumber(6)); + + let mut hashed_keys: Vec<_> = gen_storage_logs(20..30, 1)[0] + .iter() + .map(|log| log.key.hashed_key_u256()) + .collect(); + // Extend with some non-existing keys. + hashed_keys.extend((0_u8..10).map(|byte| U256::from_big_endian(&[byte; 32]))); + + let proofs = api_client + .get_proofs(L1BatchNumber(5), hashed_keys) + .await + .unwrap(); + assert_eq!(proofs.len(), 20); + for (i, proof) in proofs.into_iter().enumerate() { + let should_be_present = i < 10; + assert_eq!(proof.index == 0, !should_be_present); + assert!(!proof.merkle_path.is_empty()); + } + + let err = api_client + .get_proofs(L1BatchNumber(10), vec![]) + .await + .unwrap_err(); + let err = format!("{err:?}"); + // Check that the error message contains all necessary info to troubleshoot it. + assert!( + err.contains("Requesting proofs for L1 batch #10 returned non-OK response"), + "{}", + err + ); + assert!(err.contains("404 Not Found"), "{}", err); + assert!( + err.contains(&format!("http://{local_addr}/proofs")), + "{}", + err + ); + + // Stop the calculator and the tree API server. + stop_sender.send_replace(true); + api_server_task.await.unwrap().unwrap(); +} diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index f64eaffefc2..fd5714d0436 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::upper_case_acronyms, clippy::derive_partial_eq_without_eq)] -use std::{str::FromStr, sync::Arc, time::Instant}; +use std::{net::Ipv4Addr, str::FromStr, sync::Arc, time::Instant}; use anyhow::Context as _; use futures::channel::oneshot; @@ -12,6 +12,7 @@ use zksync_circuit_breaker::{ replication_lag::ReplicationLagChecker, CircuitBreaker, CircuitBreakerChecker, CircuitBreakerError, }; +use zksync_config::configs::api::MerkleTreeApiConfig; use zksync_config::configs::{ api::{HealthCheckConfig, Web3JsonRpcConfig}, chain::{ @@ -175,34 +176,36 @@ pub fn setup_sigint_handler() -> oneshot::Receiver<()> { #[derive(Debug, Clone, Copy, PartialEq)] pub enum Component { - // Public Web3 API running on HTTP server. + /// Public Web3 API running on HTTP server. HttpApi, - // Public Web3 API running on HTTP/WebSocket server and redirect eth_getLogs to another method. + /// Public Web3 API running on HTTP/WebSocket server and redirect eth_getLogs to another method. ApiTranslator, - // Public Web3 API (including PubSub) running on WebSocket server. + /// Public Web3 API (including PubSub) running on WebSocket server. WsApi, - // REST API for contract verification. + /// REST API for contract verification. ContractVerificationApi, - // Metadata Calculator. + /// Metadata calculator. Tree, // TODO(BFT-273): Remove `TreeLightweight` component as obsolete TreeLightweight, TreeBackup, + /// Merkle tree API. + TreeApi, EthWatcher, - // Eth tx generator + /// Eth tx generator. EthTxAggregator, - // Manager for eth tx + /// Manager for eth tx. EthTxManager, - // Data fetchers: list fetcher, volume fetcher, price fetcher. + /// Data fetchers: list fetcher, volume fetcher, price fetcher. DataFetcher, - // State keeper. + /// State keeper. StateKeeper, - // Witness Generator. The first argument is a number of jobs to process. If None, runs indefinitely. - // The second argument is the type of the witness-generation performed + /// Witness Generator. The first argument is a number of jobs to process. If None, runs indefinitely. + /// The second argument is the type of the witness-generation performed WitnessGenerator(Option, AggregationRound), - // Component for housekeeping task such as cleaning blobs from GCS, reporting metrics etc. + /// Component for housekeeping task such as cleaning blobs from GCS, reporting metrics etc. Housekeeper, - // Component for exposing API's to prover for providing proof generation data and accepting proofs. + /// Component for exposing API's to prover for providing proof generation data and accepting proofs. ProofDataHandler, } @@ -228,6 +231,7 @@ impl FromStr for Components { Ok(Components(vec![Component::TreeLightweight])) } "tree_backup" => Ok(Components(vec![Component::TreeBackup])), + "tree_api" => Ok(Components(vec![Component::TreeApi])), "data_fetcher" => Ok(Components(vec![Component::DataFetcher])), "state_keeper" => Ok(Components(vec![Component::StateKeeper])), "housekeeper" => Ok(Components(vec![Component::Housekeeper])), @@ -726,6 +730,13 @@ async fn add_trees_to_task_futures( let db_config = DBConfig::from_env().context("DBConfig::from_env()")?; let operation_config = OperationsManagerConfig::from_env().context("OperationManagerConfig::from_env()")?; + let api_config = ApiConfig::from_env() + .context("ApiConfig::from_env()")? + .merkle_tree; + let api_config = components + .contains(&Component::TreeApi) + .then_some(&api_config); + let has_tree_component = components.contains(&Component::Tree); let has_lightweight_component = components.contains(&Component::TreeLightweight); let mode = match (has_tree_component, has_lightweight_component) { @@ -739,22 +750,37 @@ async fn add_trees_to_task_futures( MerkleTreeMode::Lightweight => MetadataCalculatorModeConfig::Lightweight, MerkleTreeMode::Full => MetadataCalculatorModeConfig::Full { store_factory }, }, - (false, false) => return Ok(()), + (false, false) => { + anyhow::ensure!( + !components.contains(&Component::TreeApi), + "Merkle tree API cannot be started without a tree component" + ); + return Ok(()); + } }; - let (future, tree_health_check) = run_tree(&db_config, &operation_config, mode, stop_receiver) - .await - .context("run_tree()")?; - task_futures.push(future); - healthchecks.push(Box::new(tree_health_check)); - Ok(()) + + run_tree( + task_futures, + healthchecks, + &db_config, + api_config, + &operation_config, + mode, + stop_receiver, + ) + .await + .context("run_tree()") } async fn run_tree( - config: &DBConfig, + task_futures: &mut Vec>>, + healthchecks: &mut Vec>, + db_config: &DBConfig, + api_config: Option<&MerkleTreeApiConfig>, operation_manager: &OperationsManagerConfig, mode: MetadataCalculatorModeConfig<'_>, stop_receiver: watch::Receiver, -) -> anyhow::Result<(JoinHandle>, ReactiveHealthCheck)> { +) -> anyhow::Result<()> { let started_at = Instant::now(); let mode_str = if matches!(mode, MetadataCalculatorModeConfig::Full { .. }) { "full" @@ -763,9 +789,18 @@ async fn run_tree( }; tracing::info!("Initializing Merkle tree in {mode_str} mode"); - let config = MetadataCalculatorConfig::for_main_node(config, operation_manager, mode); + let config = MetadataCalculatorConfig::for_main_node(db_config, operation_manager, mode); let metadata_calculator = MetadataCalculator::new(&config).await; + if let Some(api_config) = api_config { + let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); + let server_task = metadata_calculator + .tree_reader() + .run_api_server(address, stop_receiver.clone()); + task_futures.push(tokio::spawn(server_task)); + } + let tree_health_check = metadata_calculator.tree_health_check(); + healthchecks.push(Box::new(tree_health_check)); let pool = ConnectionPool::singleton(DbVariant::Master) .build() .await @@ -774,12 +809,13 @@ async fn run_tree( .build() .await .context("failed to build prover_pool")?; - let future = tokio::spawn(metadata_calculator.run(pool, prover_pool, stop_receiver)); + let tree_task = tokio::spawn(metadata_calculator.run(pool, prover_pool, stop_receiver)); + task_futures.push(tree_task); let elapsed = started_at.elapsed(); APP_METRICS.init_latency[&InitStage::Tree].set(elapsed); tracing::info!("Initialized {mode_str} tree in {elapsed:?}"); - Ok((future, tree_health_check)) + Ok(()) } async fn add_witness_generator_to_task_futures( diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 3d7246d78c5..ad4dea3023b 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -1,13 +1,12 @@ //! Various helpers for the metadata calculator. -use serde::Serialize; +use serde::{Deserialize, Serialize}; #[cfg(test)] use tokio::sync::mpsc; use std::{ collections::BTreeMap, future::Future, - mem, path::{Path, PathBuf}, time::Duration, }; @@ -16,23 +15,26 @@ use zksync_config::configs::database::MerkleTreeMode; use zksync_dal::StorageProcessor; use zksync_health_check::{Health, HealthStatus}; use zksync_merkle_tree::{ - domain::{TreeMetadata, ZkSyncTree}, - MerkleTreeColumnFamily, + domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, + Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof, }; use zksync_storage::RocksDB; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256}; use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS}; -#[derive(Debug, Serialize)] -pub(super) struct TreeHealthCheckDetails { +/// General information about the Merkle tree. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct MerkleTreeInfo { pub mode: MerkleTreeMode, - pub next_l1_batch_to_seal: L1BatchNumber, + pub root_hash: H256, + pub next_l1_batch_number: L1BatchNumber, + pub leaf_count: u64, } -impl From for Health { - fn from(details: TreeHealthCheckDetails) -> Self { - Self::from(HealthStatus::Ready).with_details(details) +impl From for Health { + fn from(tree_info: MerkleTreeInfo) -> Self { + Self::from(HealthStatus::Ready).with_details(tree_info) } } @@ -44,12 +46,15 @@ impl From for Health { /// at least not explicitly), all `MetadataCalculator` data including `ZkSyncTree` is discarded. /// In the unlikely case you get a "`ZkSyncTree` is in inconsistent state" panic, /// cancellation is most probably the reason. -#[derive(Debug, Default)] -pub(super) struct AsyncTree(Option); +#[derive(Debug)] +pub(super) struct AsyncTree { + inner: Option, + mode: MerkleTreeMode, +} impl AsyncTree { const INCONSISTENT_MSG: &'static str = - "`ZkSyncTree` is in inconsistent state, which could occur after one of its blocking futures was cancelled"; + "`ZkSyncTree` is in inconsistent state, which could occur after one of its async methods was cancelled"; pub async fn new( db_path: PathBuf, @@ -74,7 +79,10 @@ impl AsyncTree { .unwrap(); tree.set_multi_get_chunk_size(multi_get_chunk_size); - Self(Some(tree)) + Self { + inner: Some(tree), + mode, + } } fn create_db(path: &Path, block_cache_capacity: usize) -> RocksDB { @@ -89,11 +97,22 @@ impl AsyncTree { } fn as_ref(&self) -> &ZkSyncTree { - self.0.as_ref().expect(Self::INCONSISTENT_MSG) + self.inner.as_ref().expect(Self::INCONSISTENT_MSG) } fn as_mut(&mut self) -> &mut ZkSyncTree { - self.0.as_mut().expect(Self::INCONSISTENT_MSG) + self.inner.as_mut().expect(Self::INCONSISTENT_MSG) + } + + pub fn mode(&self) -> MerkleTreeMode { + self.mode + } + + pub fn reader(&self) -> AsyncTreeReader { + AsyncTreeReader { + inner: self.inner.as_ref().expect(Self::INCONSISTENT_MSG).reader(), + mode: self.mode, + } } pub fn is_empty(&self) -> bool { @@ -109,26 +128,28 @@ impl AsyncTree { } pub async fn process_l1_batch(&mut self, storage_logs: Vec) -> TreeMetadata { - let mut tree = mem::take(self); + let mut tree = self.inner.take().expect(Self::INCONSISTENT_MSG); let (tree, metadata) = tokio::task::spawn_blocking(move || { - let metadata = tree.as_mut().process_l1_batch(&storage_logs); + let metadata = tree.process_l1_batch(&storage_logs); (tree, metadata) }) .await .unwrap(); - *self = tree; + self.inner = Some(tree); metadata } pub async fn save(&mut self) { - let mut tree = mem::take(self); - *self = tokio::task::spawn_blocking(|| { - tree.as_mut().save(); - tree - }) - .await - .unwrap(); + let mut tree = self.inner.take().expect(Self::INCONSISTENT_MSG); + self.inner = Some( + tokio::task::spawn_blocking(|| { + tree.save(); + tree + }) + .await + .unwrap(), + ); } pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) { @@ -136,6 +157,36 @@ impl AsyncTree { } } +/// Async version of [`ZkSyncTreeReader`]. +#[derive(Debug, Clone)] +pub(crate) struct AsyncTreeReader { + inner: ZkSyncTreeReader, + mode: MerkleTreeMode, +} + +impl AsyncTreeReader { + pub async fn info(self) -> MerkleTreeInfo { + tokio::task::spawn_blocking(move || MerkleTreeInfo { + mode: self.mode, + root_hash: self.inner.root_hash(), + next_l1_batch_number: self.inner.next_l1_batch_number(), + leaf_count: self.inner.leaf_count(), + }) + .await + .unwrap() + } + + pub async fn entries_with_proofs( + self, + l1_batch_number: L1BatchNumber, + keys: Vec, + ) -> Result, NoVersionError> { + tokio::task::spawn_blocking(move || self.inner.entries_with_proofs(l1_batch_number, &keys)) + .await + .unwrap() + } +} + /// Component implementing the delay policy in [`MetadataCalculator`] when there are no /// L1 batches to seal. #[derive(Debug, Clone)] diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index d5bba74ff24..d0359c754f2 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -22,10 +22,10 @@ use zksync_types::{ mod helpers; mod metrics; #[cfg(test)] -mod tests; +pub(crate) mod tests; mod updater; -pub(crate) use self::helpers::L1BatchWithLogs; +pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; use self::{ helpers::Delayer, metrics::{TreeUpdateStage, METRICS}, @@ -111,6 +111,7 @@ impl MetadataCalculator { MetadataCalculatorModeConfig::Lightweight => None, }; let updater = TreeUpdater::new(mode, config, object_store).await; + let (_, health_updater) = ReactiveHealthCheck::new("tree"); Self { updater, @@ -124,6 +125,11 @@ impl MetadataCalculator { self.health_updater.subscribe() } + /// Returns a reference to the tree reader. + pub(crate) fn tree_reader(&self) -> AsyncTreeReader { + self.updater.tree().reader() + } + pub async fn run( self, pool: ConnectionPool, diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index fa8df01d909..b8ebc0e988e 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -196,7 +196,7 @@ async fn running_metadata_calculator_with_additional_blocks( let calculator_handle = tokio::spawn(calculator.run(pool.clone(), prover_pool.clone(), stop_rx)); - // Wait until the calculator has processed initial blocks. + // Wait until the calculator has processed initial L1 batches. let (next_l1_batch, _) = tokio::time::timeout(RUN_TIMEOUT, delay_rx.recv()) .await .expect("metadata calculator timed out processing initial blocks") @@ -356,7 +356,7 @@ async fn postgres_backup_recovery_with_excluded_metadata( test_postgres_backup_recovery(pool, prover_pool, false, true).await; } -async fn setup_calculator( +pub(crate) async fn setup_calculator( db_path: &Path, pool: &ConnectionPool, ) -> (MetadataCalculator, Box) { @@ -426,7 +426,7 @@ fn path_to_string(path: &Path) -> String { path.to_str().unwrap().to_owned() } -async fn run_calculator( +pub(crate) async fn run_calculator( mut calculator: MetadataCalculator, pool: ConnectionPool, prover_pool: ConnectionPool, @@ -451,7 +451,7 @@ async fn run_calculator( delayer_handle.await.unwrap() } -pub(super) async fn reset_db_state(pool: &ConnectionPool, num_batches: usize) { +pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usize) { let mut storage = pool.access_storage().await.unwrap(); // Drops all L1 batches (except the L1 batch with number 0) and their storage logs. storage @@ -572,7 +572,7 @@ async fn insert_initial_writes_for_batch( .await; } -pub(super) fn gen_storage_logs( +pub(crate) fn gen_storage_logs( indices: ops::Range, num_batches: usize, ) -> Vec> { diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index c2d8b007659..0298ac7871b 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -1,4 +1,5 @@ //! Tree updater trait and its implementations. + use anyhow::Context as _; use futures::{future, FutureExt}; use tokio::sync::watch; @@ -14,14 +15,13 @@ use zksync_object_store::ObjectStore; use zksync_types::{block::L1BatchHeader, writes::InitialStorageWrite, L1BatchNumber, H256, U256}; use super::{ - helpers::{AsyncTree, Delayer, L1BatchWithLogs, TreeHealthCheckDetails}, + helpers::{AsyncTree, Delayer, L1BatchWithLogs}, metrics::{TreeUpdateStage, METRICS}, MetadataCalculator, MetadataCalculatorConfig, }; #[derive(Debug)] pub(super) struct TreeUpdater { - mode: MerkleTreeMode, tree: AsyncTree, max_l1_batches_per_iter: usize, object_store: Option>, @@ -47,14 +47,12 @@ impl TreeUpdater { ) .await; Self { - mode, tree, max_l1_batches_per_iter: config.max_l1_batches_per_iter, object_store, } } - #[cfg(test)] pub fn tree(&self) -> &AsyncTree { &self.tree } @@ -144,7 +142,11 @@ impl TreeUpdater { check_consistency_latency.observe(); let (events_queue_commitment, bootloader_initial_content_commitment) = - self.calculate_commitments(storage, &header).await; + if self.tree.mode() == MerkleTreeMode::Full { + self.calculate_commitments(storage, &header).await + } else { + (None, None) + }; let build_metadata_latency = METRICS.start_stage(TreeUpdateStage::BuildMetadata); let metadata = MetadataCalculator::build_l1_batch_metadata( @@ -224,40 +226,36 @@ impl TreeUpdater { conn: &mut StorageProcessor<'_>, header: &L1BatchHeader, ) -> (Option, Option) { - if self.mode == MerkleTreeMode::Full { - let events_queue_commitment_latency = - METRICS.start_stage(TreeUpdateStage::EventsCommitment); - let events_queue = conn - .blocks_dal() - .get_events_queue(header.number) - .await - .unwrap() - .unwrap(); - let events_queue_commitment = - events_queue_commitment(&events_queue, header.protocol_version.unwrap()); - events_queue_commitment_latency.observe(); + let events_queue_commitment_latency = + METRICS.start_stage(TreeUpdateStage::EventsCommitment); + let events_queue = conn + .blocks_dal() + .get_events_queue(header.number) + .await + .unwrap() + .unwrap(); + let events_queue_commitment = + events_queue_commitment(&events_queue, header.protocol_version.unwrap()); + events_queue_commitment_latency.observe(); - let bootloader_commitment_latency = - METRICS.start_stage(TreeUpdateStage::BootloaderCommitment); - let initial_bootloader_contents = conn - .blocks_dal() - .get_initial_bootloader_heap(header.number) - .await - .unwrap() - .unwrap(); - let bootloader_initial_content_commitment = bootloader_initial_content_commitment( - &initial_bootloader_contents, - header.protocol_version.unwrap(), - ); - bootloader_commitment_latency.observe(); + let bootloader_commitment_latency = + METRICS.start_stage(TreeUpdateStage::BootloaderCommitment); + let initial_bootloader_contents = conn + .blocks_dal() + .get_initial_bootloader_heap(header.number) + .await + .unwrap() + .unwrap(); + let bootloader_initial_content_commitment = bootloader_initial_content_commitment( + &initial_bootloader_contents, + header.protocol_version.unwrap(), + ); + bootloader_commitment_latency.observe(); - ( - events_queue_commitment, - bootloader_initial_content_commitment, - ) - } else { - (None, None) - } + ( + events_queue_commitment, + bootloader_initial_content_commitment, + ) } async fn step( @@ -334,11 +332,8 @@ impl TreeUpdater { (last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0); METRICS.backup_lag.set(backup_lag.into()); - let health = TreeHealthCheckDetails { - mode: self.mode, - next_l1_batch_to_seal, - }; - health_updater.update(health.into()); + let tree_info = tree.reader().info().await; + health_updater.update(tree_info.into()); if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 { // Check stop signal before proceeding with a potentially time-consuming operation. @@ -357,11 +352,8 @@ impl TreeUpdater { next_l1_batch_to_seal = tree.next_l1_batch_number(); tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); - let health = TreeHealthCheckDetails { - mode: self.mode, - next_l1_batch_to_seal, - }; - health_updater.update(health.into()); + let tree_info = tree.reader().info().await; + health_updater.update(tree_info.into()); } loop { @@ -388,11 +380,8 @@ impl TreeUpdater { ); delayer.wait(&self.tree).left_future() } else { - let health = TreeHealthCheckDetails { - mode: self.mode, - next_l1_batch_to_seal, - }; - health_updater.update(health.into()); + let tree_info = self.tree.reader().info().await; + health_updater.update(tree_info.into()); tracing::trace!( "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) made progress from #{snapshot}" diff --git a/etc/env/base/api.toml b/etc/env/base/api.toml index 3bfe29ab543..186e2cfb2b0 100644 --- a/etc/env/base/api.toml +++ b/etc/env/base/api.toml @@ -59,3 +59,7 @@ push_interval_ms=100 # Configuration for the healtcheck server. [api.healthcheck] port=3071 + +# Configuration for the Merkle tree API server +[api.merkle_tree] +port=3072