From d7b5691e1e0fda879da8f4dff5b691f51c523a12 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 13 Aug 2024 20:00:18 +0300 Subject: [PATCH] refactor(state-keeper): Make batch executor and storage factory parametric (#2599) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Makes `BatchExecutor` and `ReadStorageFactory` parametric by the storage type. Encapsulates this storage type using a private helper trait (essentially an async closure) used by the state keeper. ## Why ❔ Allows to avoid crutches with the mock storage factory. Potentially extends customizability of batch execution. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/state/src/storage_factory.rs | 11 +-- core/node/consensus/src/testonly.rs | 7 +- .../layers/state_keeper/mod.rs | 6 +- core/node/node_sync/src/tests.rs | 4 +- core/node/state_keeper/Cargo.toml | 2 +- .../src/batch_executor/main_executor.rs | 2 +- .../state_keeper/src/batch_executor/mod.rs | 6 +- core/node/state_keeper/src/keeper.rs | 88 +++++++++++++------ core/node/state_keeper/src/testonly/mod.rs | 5 +- .../src/testonly/test_batch_executor.rs | 29 ++---- core/node/state_keeper/src/tests/mod.rs | 2 +- 11 files changed, 94 insertions(+), 68 deletions(-) diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index d3b978356a50..4792200a4637 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -10,11 +10,12 @@ use zksync_vm_interface::storage::ReadStorage; use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}; -/// Factory that can produce [`OwnedStorage`] instances on demand. +/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param +/// (mostly for testing purposes); the default is [`OwnedStorage`]. #[async_trait] -pub trait ReadStorageFactory: Debug + Send + Sync + 'static { - /// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB - /// instance. The specific criteria on which one are left up to the implementation. +pub trait ReadStorageFactory: Debug + Send + Sync + 'static { + /// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance. + /// The specific criteria on which one are left up to the implementation. /// /// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives /// a stop signal; this is the only case in which `Ok(None)` should be returned. @@ -22,7 +23,7 @@ pub trait ReadStorageFactory: Debug + Send + Sync + 'static { &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>; + ) -> anyhow::Result>; } /// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 2c6fdc79a521..9cf06b992e87 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -32,7 +32,10 @@ use zksync_node_test_utils::{create_l1_batch_metadata, l1_batch_metadata_to_comm use zksync_state_keeper::{ io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor}, + testonly::{ + fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory, + MockBatchExecutor, + }, AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; @@ -631,7 +634,7 @@ impl StateKeeperRunner { .with_handler(Box::new(tree_writes_persistence)) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), - Arc::new(self.pool.0.clone()), + Arc::new(MockReadStorageFactory), ) .run() .await diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index b0dfe0f1600c..a77344f3706e 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -102,7 +102,7 @@ impl WiringLayer for StateKeeperLayer { let state_keeper = StateKeeperTask { io, - batch_executor_base, + batch_executor: batch_executor_base, output_handler, sealer, storage_factory: Arc::new(storage_factory), @@ -125,7 +125,7 @@ impl WiringLayer for StateKeeperLayer { #[derive(Debug)] pub struct StateKeeperTask { io: Box, - batch_executor_base: Box, + batch_executor: Box, output_handler: OutputHandler, sealer: Arc, storage_factory: Arc, @@ -141,7 +141,7 @@ impl Task for StateKeeperTask { let state_keeper = ZkSyncStateKeeper::new( stop_receiver.0, self.io, - self.batch_executor_base, + self.batch_executor, self.output_handler, self.sealer, self.storage_factory, diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index e091472ad512..edd8306e72e0 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -13,7 +13,7 @@ use zksync_node_test_utils::{ use zksync_state_keeper::{ io::{L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::test_batch_executor::TestBatchExecutorBuilder, + testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder}, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_types::{ @@ -132,7 +132,7 @@ impl StateKeeperHandles { Box::new(batch_executor_base), output_handler, Arc::new(NoopSealer), - Arc::new(pool), + Arc::new(MockReadStorageFactory), ); Self { diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 890543bcd910..d1cd88ee277a 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -33,7 +33,6 @@ zksync_base_token_adjuster.workspace = true anyhow.workspace = true async-trait.workspace = true -tempfile.workspace = true # used in `testonly` module tokio = { workspace = true, features = ["time"] } thiserror.workspace = true tracing.workspace = true @@ -44,6 +43,7 @@ hex.workspace = true [dev-dependencies] assert_matches.workspace = true +tempfile.workspace = true test-casing.workspace = true futures.workspace = true diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index cc05da9235b5..5335b960dce5 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -58,7 +58,7 @@ impl MainBatchExecutor { } } -impl BatchExecutor for MainBatchExecutor { +impl BatchExecutor for MainBatchExecutor { fn init_batch( &mut self, storage: OwnedStorage, diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index b6f57694afa0..f5b66fc24682 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -55,10 +55,12 @@ impl TxExecutionResult { /// An abstraction that allows us to create different kinds of batch executors. /// The only requirement is to return a [`BatchExecutorHandle`], which does its work /// by communicating with the externally initialized thread. -pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { +/// +/// This type is generic over the storage type accepted to create the VM instance, mostly for testing purposes. +pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { fn init_batch( &mut self, - storage: OwnedStorage, + storage: S, l1_batch_params: L1BatchEnv, system_env: SystemEnv, ) -> BatchExecutorHandle; diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 934ed9493f86..2871d474e4f6 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -1,10 +1,12 @@ use std::{ convert::Infallible, + fmt, sync::Arc, time::{Duration, Instant}, }; use anyhow::Context as _; +use async_trait::async_trait; use tokio::sync::watch; use tracing::{info_span, Instrument}; use zksync_multivm::interface::{Halt, L1BatchEnv, SystemEnv}; @@ -48,6 +50,45 @@ impl Error { } } +/// Functionality [`BatchExecutor`] + [`ReadStorageFactory`] with an erased storage type. This allows to keep +/// [`ZkSyncStateKeeper`] not parameterized by the storage type, simplifying its dependency injection and usage in tests. +#[async_trait] +trait ErasedBatchExecutor: fmt::Debug + Send { + async fn init_batch( + &mut self, + l1_batch_env: L1BatchEnv, + system_env: SystemEnv, + stop_receiver: &watch::Receiver, + ) -> Result; +} + +/// The only [`ErasedBatchExecutor`] implementation. +#[derive(Debug)] +struct ErasedBatchExecutorImpl { + batch_executor: Box>, + storage_factory: Arc>, +} + +#[async_trait] +impl ErasedBatchExecutor for ErasedBatchExecutorImpl { + async fn init_batch( + &mut self, + l1_batch_env: L1BatchEnv, + system_env: SystemEnv, + stop_receiver: &watch::Receiver, + ) -> Result { + let storage = self + .storage_factory + .access_storage(stop_receiver, l1_batch_env.number - 1) + .await + .context("failed creating VM storage")? + .ok_or(Error::Canceled)?; + Ok(self + .batch_executor + .init_batch(storage, l1_batch_env, system_env)) + } +} + /// State keeper represents a logic layer of L1 batch / L2 block processing flow. /// It's responsible for taking all the data from the `StateKeeperIO`, feeding it into `BatchExecutor` objects /// and calling `SealManager` to decide whether an L2 block or L1 batch should be sealed. @@ -62,27 +103,28 @@ pub struct ZkSyncStateKeeper { stop_receiver: watch::Receiver, io: Box, output_handler: OutputHandler, - batch_executor_base: Box, + batch_executor: Box, sealer: Arc, - storage_factory: Arc, } impl ZkSyncStateKeeper { - pub fn new( + pub fn new( stop_receiver: watch::Receiver, sequencer: Box, - batch_executor_base: Box, + batch_executor: Box>, output_handler: OutputHandler, sealer: Arc, - storage_factory: Arc, + storage_factory: Arc>, ) -> Self { Self { stop_receiver, io: sequencer, - batch_executor_base, + batch_executor: Box::new(ErasedBatchExecutorImpl { + batch_executor, + storage_factory, + }), output_handler, sealer, - storage_factory, } } @@ -146,7 +188,12 @@ impl ZkSyncStateKeeper { .await?; let mut batch_executor = self - .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .batch_executor + .init_batch( + l1_batch_env.clone(), + system_env.clone(), + &self.stop_receiver, + ) .await?; self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks) .await?; @@ -195,7 +242,12 @@ impl ZkSyncStateKeeper { (system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); batch_executor = self - .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .batch_executor + .init_batch( + l1_batch_env.clone(), + system_env.clone(), + &self.stop_receiver, + ) .await?; let version_changed = system_env.version != sealed_batch_protocol_version; @@ -208,24 +260,6 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } - async fn create_batch_executor( - &mut self, - l1_batch_env: L1BatchEnv, - system_env: SystemEnv, - ) -> Result { - let Some(storage) = self - .storage_factory - .access_storage(&self.stop_receiver, l1_batch_env.number - 1) - .await - .context("failed creating VM storage")? - else { - return Err(Error::Canceled); - }; - Ok(self - .batch_executor_base - .init_batch(storage, l1_batch_env, system_env)) - } - /// This function is meant to be called only once during the state-keeper initialization. /// It will check if we should load a protocol upgrade or a `setChainId` transaction, /// perform some checks and return it. diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index 02b0043b97cf..d17261a3a0f7 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -9,7 +9,6 @@ use zksync_multivm::interface::{ storage::StorageViewCache, CurrentExecutionState, ExecutionResult, FinishedL1Batch, L1BatchEnv, Refunds, SystemEnv, VmExecutionLogs, VmExecutionResultAndLogs, VmExecutionStatistics, }; -use zksync_state::OwnedStorage; use zksync_test_account::Account; use zksync_types::{ fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute, @@ -78,10 +77,10 @@ pub(crate) fn storage_view_cache() -> StorageViewCache { #[derive(Debug)] pub struct MockBatchExecutor; -impl BatchExecutor for MockBatchExecutor { +impl BatchExecutor<()> for MockBatchExecutor { fn init_batch( &mut self, - _storage: OwnedStorage, + _storage: (), _l1batch_params: L1BatchEnv, _system_env: SystemEnv, ) -> BatchExecutorHandle { diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index aefc8d50bc7d..d8ee36990a1c 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -20,7 +20,7 @@ use zksync_multivm::{ vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_node_test_utils::create_l2_transaction; -use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; +use zksync_state::ReadStorageFactory; use zksync_types::{ fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -207,7 +207,7 @@ impl TestScenario { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ); let sk_thread = tokio::spawn(state_keeper.run()); @@ -410,10 +410,10 @@ impl TestBatchExecutorBuilder { } } -impl BatchExecutor for TestBatchExecutorBuilder { +impl BatchExecutor<()> for TestBatchExecutorBuilder { fn init_batch( &mut self, - _storage: OwnedStorage, + _storage: (), _l1_batch_params: L1BatchEnv, _system_env: SystemEnv, ) -> BatchExecutorHandle { @@ -806,28 +806,15 @@ impl StateKeeperIO for TestIO { /// Storage factory that produces empty VM storage for any batch. Should only be used with a mock batch executor /// that doesn't read from the storage. Prefer using `ConnectionPool` as a factory if it's available. #[derive(Debug)] -pub struct MockReadStorageFactory(tempfile::TempDir); - -impl Default for MockReadStorageFactory { - fn default() -> Self { - Self( - tempfile::TempDir::new() - .expect("failed creating temporary directory for `MockReadStorageFactory`"), - ) - } -} +pub struct MockReadStorageFactory; #[async_trait] -impl ReadStorageFactory for MockReadStorageFactory { +impl ReadStorageFactory<()> for MockReadStorageFactory { async fn access_storage( &self, _stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - let storage = RocksdbStorage::builder(self.0.path()) - .await - .expect("Cannot create mock RocksDB storage") - .build_unchecked(); - Ok(Some(PgOrRocksdbStorage::Rocksdb(storage).into())) + ) -> anyhow::Result> { + Ok(Some(())) } } diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index a5239f444832..eaab9dd193dc 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -438,7 +438,7 @@ async fn load_upgrade_tx() { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ); // Since the version hasn't changed, and we are not using shared bridge, we should not load any