Skip to content

Commit

Permalink
refactor(state-keeper): Make batch executor and storage factory param…
Browse files Browse the repository at this point in the history
…etric (matter-labs#2599)

## What ❔

Makes `BatchExecutor` and `ReadStorageFactory` parametric by the storage
type. Encapsulates this storage type using a private helper trait
(essentially an async closure) used by the state keeper.

## Why ❔

Allows to avoid crutches with the mock storage factory. Potentially
extends customizability of batch execution.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Aug 13, 2024
1 parent 3d02946 commit d7b5691
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 68 deletions.
11 changes: 6 additions & 5 deletions core/lib/state/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ use zksync_vm_interface::storage::ReadStorage;

use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily};

/// Factory that can produce [`OwnedStorage`] instances on demand.
/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param
/// (mostly for testing purposes); the default is [`OwnedStorage`].
#[async_trait]
pub trait ReadStorageFactory: Debug + Send + Sync + 'static {
/// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB
/// instance. The specific criteria on which one are left up to the implementation.
pub trait ReadStorageFactory<S = OwnedStorage>: Debug + Send + Sync + 'static {
/// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance.
/// The specific criteria on which one are left up to the implementation.
///
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>>;
) -> anyhow::Result<Option<S>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
Expand Down
7 changes: 5 additions & 2 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use zksync_node_test_utils::{create_l1_batch_metadata, l1_batch_metadata_to_comm
use zksync_state_keeper::{
io::{IoCursor, L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor},
testonly::{
fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory,
MockBatchExecutor,
},
AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence,
TreeWritesPersistence, ZkSyncStateKeeper,
};
Expand Down Expand Up @@ -631,7 +634,7 @@ impl StateKeeperRunner {
.with_handler(Box::new(tree_writes_persistence))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(self.pool.0.clone()),
Arc::new(MockReadStorageFactory),
)
.run()
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl WiringLayer for StateKeeperLayer {

let state_keeper = StateKeeperTask {
io,
batch_executor_base,
batch_executor: batch_executor_base,
output_handler,
sealer,
storage_factory: Arc::new(storage_factory),
Expand All @@ -125,7 +125,7 @@ impl WiringLayer for StateKeeperLayer {
#[derive(Debug)]
pub struct StateKeeperTask {
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn BatchExecutor>,
batch_executor: Box<dyn BatchExecutor>,
output_handler: OutputHandler,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
Expand All @@ -141,7 +141,7 @@ impl Task for StateKeeperTask {
let state_keeper = ZkSyncStateKeeper::new(
stop_receiver.0,
self.io,
self.batch_executor_base,
self.batch_executor,
self.output_handler,
self.sealer,
self.storage_factory,
Expand Down
4 changes: 2 additions & 2 deletions core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_node_test_utils::{
use zksync_state_keeper::{
io::{L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::test_batch_executor::TestBatchExecutorBuilder,
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper,
};
use zksync_types::{
Expand Down Expand Up @@ -132,7 +132,7 @@ impl StateKeeperHandles {
Box::new(batch_executor_base),
output_handler,
Arc::new(NoopSealer),
Arc::new(pool),
Arc::new(MockReadStorageFactory),
);

Self {
Expand Down
2 changes: 1 addition & 1 deletion core/node/state_keeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ zksync_base_token_adjuster.workspace = true

anyhow.workspace = true
async-trait.workspace = true
tempfile.workspace = true # used in `testonly` module
tokio = { workspace = true, features = ["time"] }
thiserror.workspace = true
tracing.workspace = true
Expand All @@ -44,6 +43,7 @@ hex.workspace = true

[dev-dependencies]
assert_matches.workspace = true
tempfile.workspace = true
test-casing.workspace = true
futures.workspace = true

Expand Down
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl MainBatchExecutor {
}
}

impl BatchExecutor for MainBatchExecutor {
impl BatchExecutor<OwnedStorage> for MainBatchExecutor {
fn init_batch(
&mut self,
storage: OwnedStorage,
Expand Down
6 changes: 4 additions & 2 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ impl TxExecutionResult {
/// An abstraction that allows us to create different kinds of batch executors.
/// The only requirement is to return a [`BatchExecutorHandle`], which does its work
/// by communicating with the externally initialized thread.
pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
///
/// This type is generic over the storage type accepted to create the VM instance, mostly for testing purposes.
pub trait BatchExecutor<S = OwnedStorage>: 'static + Send + Sync + fmt::Debug {
fn init_batch(
&mut self,
storage: OwnedStorage,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle;
Expand Down
88 changes: 61 additions & 27 deletions core/node/state_keeper/src/keeper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
convert::Infallible,
fmt,
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Context as _;
use async_trait::async_trait;
use tokio::sync::watch;
use tracing::{info_span, Instrument};
use zksync_multivm::interface::{Halt, L1BatchEnv, SystemEnv};
Expand Down Expand Up @@ -48,6 +50,45 @@ impl Error {
}
}

/// Functionality [`BatchExecutor`] + [`ReadStorageFactory`] with an erased storage type. This allows to keep
/// [`ZkSyncStateKeeper`] not parameterized by the storage type, simplifying its dependency injection and usage in tests.
#[async_trait]
trait ErasedBatchExecutor: fmt::Debug + Send {
async fn init_batch(
&mut self,
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Result<BatchExecutorHandle, Error>;
}

/// The only [`ErasedBatchExecutor`] implementation.
#[derive(Debug)]
struct ErasedBatchExecutorImpl<S> {
batch_executor: Box<dyn BatchExecutor<S>>,
storage_factory: Arc<dyn ReadStorageFactory<S>>,
}

#[async_trait]
impl<S: 'static + fmt::Debug> ErasedBatchExecutor for ErasedBatchExecutorImpl<S> {
async fn init_batch(
&mut self,
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Result<BatchExecutorHandle, Error> {
let storage = self
.storage_factory
.access_storage(stop_receiver, l1_batch_env.number - 1)
.await
.context("failed creating VM storage")?
.ok_or(Error::Canceled)?;
Ok(self
.batch_executor
.init_batch(storage, l1_batch_env, system_env))
}
}

/// State keeper represents a logic layer of L1 batch / L2 block processing flow.
/// It's responsible for taking all the data from the `StateKeeperIO`, feeding it into `BatchExecutor` objects
/// and calling `SealManager` to decide whether an L2 block or L1 batch should be sealed.
Expand All @@ -62,27 +103,28 @@ pub struct ZkSyncStateKeeper {
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
output_handler: OutputHandler,
batch_executor_base: Box<dyn BatchExecutor>,
batch_executor: Box<dyn ErasedBatchExecutor>,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
}

impl ZkSyncStateKeeper {
pub fn new(
pub fn new<S: 'static + fmt::Debug>(
stop_receiver: watch::Receiver<bool>,
sequencer: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn BatchExecutor>,
batch_executor: Box<dyn BatchExecutor<S>>,
output_handler: OutputHandler,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
storage_factory: Arc<dyn ReadStorageFactory<S>>,
) -> Self {
Self {
stop_receiver,
io: sequencer,
batch_executor_base,
batch_executor: Box::new(ErasedBatchExecutorImpl {
batch_executor,
storage_factory,
}),
output_handler,
sealer,
storage_factory,
}
}

Expand Down Expand Up @@ -146,7 +188,12 @@ impl ZkSyncStateKeeper {
.await?;

let mut batch_executor = self
.create_batch_executor(l1_batch_env.clone(), system_env.clone())
.batch_executor
.init_batch(
l1_batch_env.clone(),
system_env.clone(),
&self.stop_receiver,
)
.await?;
self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks)
.await?;
Expand Down Expand Up @@ -195,7 +242,12 @@ impl ZkSyncStateKeeper {
(system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?;
updates_manager = UpdatesManager::new(&l1_batch_env, &system_env);
batch_executor = self
.create_batch_executor(l1_batch_env.clone(), system_env.clone())
.batch_executor
.init_batch(
l1_batch_env.clone(),
system_env.clone(),
&self.stop_receiver,
)
.await?;

let version_changed = system_env.version != sealed_batch_protocol_version;
Expand All @@ -208,24 +260,6 @@ impl ZkSyncStateKeeper {
Err(Error::Canceled)
}

async fn create_batch_executor(
&mut self,
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
) -> Result<BatchExecutorHandle, Error> {
let Some(storage) = self
.storage_factory
.access_storage(&self.stop_receiver, l1_batch_env.number - 1)
.await
.context("failed creating VM storage")?
else {
return Err(Error::Canceled);
};
Ok(self
.batch_executor_base
.init_batch(storage, l1_batch_env, system_env))
}

/// This function is meant to be called only once during the state-keeper initialization.
/// It will check if we should load a protocol upgrade or a `setChainId` transaction,
/// perform some checks and return it.
Expand Down
5 changes: 2 additions & 3 deletions core/node/state_keeper/src/testonly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use zksync_multivm::interface::{
storage::StorageViewCache, CurrentExecutionState, ExecutionResult, FinishedL1Batch, L1BatchEnv,
Refunds, SystemEnv, VmExecutionLogs, VmExecutionResultAndLogs, VmExecutionStatistics,
};
use zksync_state::OwnedStorage;
use zksync_test_account::Account;
use zksync_types::{
fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute,
Expand Down Expand Up @@ -78,10 +77,10 @@ pub(crate) fn storage_view_cache() -> StorageViewCache {
#[derive(Debug)]
pub struct MockBatchExecutor;

impl BatchExecutor for MockBatchExecutor {
impl BatchExecutor<()> for MockBatchExecutor {
fn init_batch(
&mut self,
_storage: OwnedStorage,
_storage: (),
_l1batch_params: L1BatchEnv,
_system_env: SystemEnv,
) -> BatchExecutorHandle {
Expand Down
29 changes: 8 additions & 21 deletions core/node/state_keeper/src/testonly/test_batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use zksync_multivm::{
vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT,
};
use zksync_node_test_utils::create_l2_transaction;
use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage};
use zksync_state::ReadStorageFactory;
use zksync_types::{
fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber,
L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256,
Expand Down Expand Up @@ -207,7 +207,7 @@ impl TestScenario {
Box::new(batch_executor_base),
output_handler,
Arc::new(sealer),
Arc::<MockReadStorageFactory>::default(),
Arc::new(MockReadStorageFactory),
);
let sk_thread = tokio::spawn(state_keeper.run());

Expand Down Expand Up @@ -410,10 +410,10 @@ impl TestBatchExecutorBuilder {
}
}

impl BatchExecutor for TestBatchExecutorBuilder {
impl BatchExecutor<()> for TestBatchExecutorBuilder {
fn init_batch(
&mut self,
_storage: OwnedStorage,
_storage: (),
_l1_batch_params: L1BatchEnv,
_system_env: SystemEnv,
) -> BatchExecutorHandle {
Expand Down Expand Up @@ -806,28 +806,15 @@ impl StateKeeperIO for TestIO {
/// Storage factory that produces empty VM storage for any batch. Should only be used with a mock batch executor
/// that doesn't read from the storage. Prefer using `ConnectionPool` as a factory if it's available.
#[derive(Debug)]
pub struct MockReadStorageFactory(tempfile::TempDir);

impl Default for MockReadStorageFactory {
fn default() -> Self {
Self(
tempfile::TempDir::new()
.expect("failed creating temporary directory for `MockReadStorageFactory`"),
)
}
}
pub struct MockReadStorageFactory;

#[async_trait]
impl ReadStorageFactory for MockReadStorageFactory {
impl ReadStorageFactory<()> for MockReadStorageFactory {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
_l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let storage = RocksdbStorage::builder(self.0.path())
.await
.expect("Cannot create mock RocksDB storage")
.build_unchecked();
Ok(Some(PgOrRocksdbStorage::Rocksdb(storage).into()))
) -> anyhow::Result<Option<()>> {
Ok(Some(()))
}
}
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ async fn load_upgrade_tx() {
Box::new(batch_executor_base),
output_handler,
Arc::new(sealer),
Arc::<MockReadStorageFactory>::default(),
Arc::new(MockReadStorageFactory),
);

// Since the version hasn't changed, and we are not using shared bridge, we should not load any
Expand Down

0 comments on commit d7b5691

Please sign in to comment.