From 86e189ca2f4be2a3c431694a8938af073b81b298 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 8 Feb 2024 15:18:48 +0200 Subject: [PATCH] feat(api): Start API server after first L1 batch (#1026) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Delays the API server start until there's at least one L1 batch present in the node DB. - Refactors the VM sandbox so that it's more modular / maintainable. - Tests VM-related API server methods. ## Why ❔ Fixes API server operation when working on the DB after snapshot recovery. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. --- core/lib/state/src/postgres/mod.rs | 46 +- .../src/api_server/execution_sandbox/apply.rs | 517 ++++++++++-------- .../api_server/execution_sandbox/execute.rs | 2 +- .../src/api_server/execution_sandbox/mod.rs | 39 +- .../api_server/execution_sandbox/testonly.rs | 102 ++-- .../src/api_server/execution_sandbox/tests.rs | 44 +- .../api_server/execution_sandbox/validate.rs | 2 +- .../src/api_server/tx_sender/mod.rs | 17 +- .../zksync_core/src/api_server/web3/mod.rs | 47 +- .../src/api_server/web3/namespaces/eth.rs | 12 +- .../zksync_core/src/api_server/web3/state.rs | 37 +- .../src/api_server/web3/tests/debug.rs | 2 +- .../src/api_server/web3/tests/filters.rs | 8 +- .../src/api_server/web3/tests/mod.rs | 70 +-- .../src/api_server/web3/tests/vm.rs | 348 ++++++++++-- .../src/api_server/web3/tests/ws.rs | 16 +- core/lib/zksync_core/src/lib.rs | 8 +- .../src/state_keeper/mempool_actor.rs | 12 +- core/lib/zksync_core/src/sync_layer/tests.rs | 67 ++- core/lib/zksync_core/src/utils/mod.rs | 41 +- 20 files changed, 954 insertions(+), 483 deletions(-) diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index 7208877abb3f..0bdc01851779 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, RwLock}, }; +use anyhow::Context as _; use tokio::{runtime::Handle, sync::mpsc}; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::{L1BatchNumber, MiniblockNumber, StorageKey, StorageValue, H256}; @@ -342,23 +343,46 @@ pub struct PostgresStorage<'a> { impl<'a> PostgresStorage<'a> { /// Creates a new storage using the specified connection. + /// /// # Panics + /// /// Panics on Postgres errors. pub fn new( rt_handle: Handle, - mut connection: StorageProcessor<'a>, + connection: StorageProcessor<'a>, block_number: MiniblockNumber, consider_new_l1_batch: bool, - ) -> PostgresStorage<'a> { - let resolved = rt_handle - .block_on( - connection - .storage_web3_dal() - .resolve_l1_batch_number_of_miniblock(block_number), - ) - .expect("Failed resolving L1 batch number for miniblock"); + ) -> Self { + rt_handle + .clone() + .block_on(Self::new_async( + rt_handle, + connection, + block_number, + consider_new_l1_batch, + )) + .unwrap() + } - Self { + /// Asynchronous version of [`Self::new()`] that also propagates errors instead of panicking. + /// + /// # Errors + /// + /// Propagates Postgres errors. + pub async fn new_async( + rt_handle: Handle, + mut connection: StorageProcessor<'a>, + block_number: MiniblockNumber, + consider_new_l1_batch: bool, + ) -> anyhow::Result> { + let resolved = connection + .storage_web3_dal() + .resolve_l1_batch_number_of_miniblock(block_number) + .await + .with_context(|| { + format!("failed resolving L1 batch number for miniblock #{block_number}") + })?; + Ok(Self { rt_handle, connection, miniblock_number: block_number, @@ -366,7 +390,7 @@ impl<'a> PostgresStorage<'a> { pending_l1_batch_number: resolved.pending_l1_batch, consider_new_l1_batch, caches: None, - } + }) } /// Sets the caches to use with the storage. diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs index f987af3aa4bd..9eaa698aef95 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs @@ -15,8 +15,9 @@ use multivm::{ vm_latest::{constants::BLOCK_GAS_LIMIT, HistoryDisabled}, VmInstance, }; +use tokio::runtime::Handle; use zksync_dal::{ConnectionPool, StorageProcessor}; -use zksync_state::{PostgresStorage, ReadStorage, StorageView, WriteStorage}; +use zksync_state::{PostgresStorage, ReadStorage, StoragePtr, StorageView, WriteStorage}; use zksync_system_constants::{ SYSTEM_CONTEXT_ADDRESS, SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, ZKPORTER_IS_AVAILABLE, @@ -36,7 +37,249 @@ use super::{ vm_metrics::{self, SandboxStage, SANDBOX_METRICS}, BlockArgs, TxExecutionArgs, TxSharedArgs, VmPermit, }; -use crate::utils::projected_first_l1_batch; + +type BoxedVm<'a> = Box>, HistoryDisabled>>; + +#[derive(Debug)] +struct Sandbox<'a> { + system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + execution_args: &'a TxExecutionArgs, + l2_block_info_to_reset: Option, + storage_view: StorageView>, +} + +impl<'a> Sandbox<'a> { + async fn new( + mut connection: StorageProcessor<'a>, + shared_args: TxSharedArgs, + execution_args: &'a TxExecutionArgs, + block_args: BlockArgs, + ) -> anyhow::Result> { + let resolve_started_at = Instant::now(); + let resolved_block_info = block_args + .resolve_block_info(&mut connection) + .await + .with_context(|| format!("cannot resolve block numbers for {block_args:?}"))?; + let resolve_time = resolve_started_at.elapsed(); + // We don't want to emit too many logs. + if resolve_time > Duration::from_millis(10) { + tracing::debug!("Resolved block numbers (took {resolve_time:?})"); + } + + if block_args.resolves_to_latest_sealed_miniblock() { + shared_args + .caches + .schedule_values_update(resolved_block_info.state_l2_block_number); + } + + let (next_l2_block_info, l2_block_info_to_reset) = Self::load_l2_block_info( + &mut connection, + block_args.is_pending_miniblock(), + &resolved_block_info, + ) + .await?; + + let storage = PostgresStorage::new_async( + Handle::current(), + connection, + resolved_block_info.state_l2_block_number, + false, + ) + .await + .context("cannot create `PostgresStorage`")? + .with_caches(shared_args.caches.clone()); + + let storage_view = StorageView::new(storage); + let (system_env, l1_batch_env) = Self::prepare_env( + shared_args, + execution_args, + &resolved_block_info, + next_l2_block_info, + ); + + Ok(Self { + system_env, + l1_batch_env, + storage_view, + execution_args, + l2_block_info_to_reset, + }) + } + + async fn load_l2_block_info( + connection: &mut StorageProcessor<'_>, + is_pending_block: bool, + resolved_block_info: &ResolvedBlockInfo, + ) -> anyhow::Result<(L2BlockEnv, Option)> { + let mut l2_block_info_to_reset = None; + let current_l2_block_info = StoredL2BlockInfo::new( + connection, + resolved_block_info.state_l2_block_number, + Some(resolved_block_info.state_l2_block_hash), + ) + .await + .context("failed reading L2 block info")?; + + let next_l2_block_info = if is_pending_block { + L2BlockEnv { + number: current_l2_block_info.l2_block_number + 1, + timestamp: resolved_block_info.l1_batch_timestamp, + prev_block_hash: current_l2_block_info.l2_block_hash, + // For simplicity we assume each miniblock create one virtual block. + // This may be wrong only during transition period. + max_virtual_blocks_to_create: 1, + } + } else if current_l2_block_info.l2_block_number == 0 { + // Special case: + // - For environments, where genesis block was created before virtual block upgrade it doesn't matter what we put here. + // - Otherwise, we need to put actual values here. We cannot create next L2 block with block_number=0 and `max_virtual_blocks_to_create=0` + // because of SystemContext requirements. But, due to intrinsics of SystemContext, block.number still will be resolved to 0. + L2BlockEnv { + number: 1, + timestamp: 0, + prev_block_hash: MiniblockHasher::legacy_hash(MiniblockNumber(0)), + max_virtual_blocks_to_create: 1, + } + } else { + // We need to reset L2 block info in storage to process transaction in the current block context. + // Actual resetting will be done after `storage_view` is created. + let prev_l2_block_info = StoredL2BlockInfo::new( + connection, + resolved_block_info.state_l2_block_number - 1, + None, + ) + .await + .context("failed reading previous L2 block info")?; + + l2_block_info_to_reset = Some(prev_l2_block_info); + L2BlockEnv { + number: current_l2_block_info.l2_block_number, + timestamp: current_l2_block_info.l2_block_timestamp, + prev_block_hash: prev_l2_block_info.l2_block_hash, + max_virtual_blocks_to_create: 1, + } + }; + + Ok((next_l2_block_info, l2_block_info_to_reset)) + } + + /// This method is blocking. + fn setup_storage_view(&mut self, tx: &Transaction) { + let storage_view_setup_started_at = Instant::now(); + if let Some(nonce) = self.execution_args.enforced_nonce { + let nonce_key = get_nonce_key(&tx.initiator_account()); + let full_nonce = self.storage_view.read_value(&nonce_key); + let (_, deployment_nonce) = decompose_full_nonce(h256_to_u256(full_nonce)); + let enforced_full_nonce = nonces_to_full_nonce(U256::from(nonce.0), deployment_nonce); + self.storage_view + .set_value(nonce_key, u256_to_h256(enforced_full_nonce)); + } + + let payer = tx.payer(); + let balance_key = storage_key_for_eth_balance(&payer); + let mut current_balance = h256_to_u256(self.storage_view.read_value(&balance_key)); + current_balance += self.execution_args.added_balance; + self.storage_view + .set_value(balance_key, u256_to_h256(current_balance)); + + // Reset L2 block info if necessary. + if let Some(l2_block_info_to_reset) = self.l2_block_info_to_reset { + let l2_block_info_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, + ); + let l2_block_info = pack_block_info( + l2_block_info_to_reset.l2_block_number as u64, + l2_block_info_to_reset.l2_block_timestamp, + ); + self.storage_view + .set_value(l2_block_info_key, u256_to_h256(l2_block_info)); + + let l2_block_txs_rolling_hash_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, + ); + self.storage_view.set_value( + l2_block_txs_rolling_hash_key, + l2_block_info_to_reset.txs_rolling_hash, + ); + } + + let storage_view_setup_time = storage_view_setup_started_at.elapsed(); + // We don't want to emit too many logs. + if storage_view_setup_time > Duration::from_millis(10) { + tracing::debug!("Prepared the storage view (took {storage_view_setup_time:?})",); + } + } + + fn prepare_env( + shared_args: TxSharedArgs, + execution_args: &TxExecutionArgs, + resolved_block_info: &ResolvedBlockInfo, + next_l2_block_info: L2BlockEnv, + ) -> (SystemEnv, L1BatchEnv) { + let TxSharedArgs { + operator_account, + fee_input, + base_system_contracts, + validation_computational_gas_limit, + chain_id, + .. + } = shared_args; + + // In case we are executing in a past block, we'll use the historical fee data. + let fee_input = resolved_block_info + .historical_fee_input + .unwrap_or(fee_input); + let system_env = SystemEnv { + zk_porter_available: ZKPORTER_IS_AVAILABLE, + version: resolved_block_info.protocol_version, + base_system_smart_contracts: base_system_contracts + .get_by_protocol_version(resolved_block_info.protocol_version), + gas_limit: BLOCK_GAS_LIMIT, + execution_mode: execution_args.execution_mode, + default_validation_computational_gas_limit: validation_computational_gas_limit, + chain_id, + }; + let l1_batch_env = L1BatchEnv { + previous_batch_hash: None, + number: resolved_block_info.vm_l1_batch_number, + timestamp: resolved_block_info.l1_batch_timestamp, + fee_input, + fee_account: *operator_account.address(), + enforced_base_fee: execution_args.enforced_base_fee, + first_l2_block: next_l2_block_info, + }; + (system_env, l1_batch_env) + } + + /// This method is blocking. + fn into_vm( + mut self, + tx: &Transaction, + adjust_pubdata_price: bool, + ) -> (BoxedVm<'a>, StoragePtr>>) { + self.setup_storage_view(tx); + let protocol_version = self.system_env.version; + if adjust_pubdata_price { + self.l1_batch_env.fee_input = adjust_pubdata_price_for_tx( + self.l1_batch_env.fee_input, + tx.gas_per_pubdata_byte_limit(), + protocol_version.into(), + ); + }; + + let storage_view = self.storage_view.to_rc_ptr(); + let vm = Box::new(VmInstance::new_with_specific_version( + self.l1_batch_env, + self.system_env, + storage_view.clone(), + protocol_version.into_api_vm_version(), + )); + (vm, storage_view) + } +} #[allow(clippy::too_many_arguments)] pub(super) fn apply_vm_in_sandbox( @@ -59,185 +302,22 @@ pub(super) fn apply_vm_in_sandbox( let span = tracing::debug_span!("initialization").entered(); let rt_handle = vm_permit.rt_handle(); - let mut connection = rt_handle + let connection = rt_handle .block_on(connection_pool.access_storage_tagged("api")) .context("failed acquiring DB connection")?; let connection_acquire_time = stage_started_at.elapsed(); // We don't want to emit too many logs. if connection_acquire_time > Duration::from_millis(10) { - tracing::debug!( - "Obtained connection (took {:?})", - stage_started_at.elapsed() - ); - } - - let resolve_started_at = Instant::now(); - let ResolvedBlockInfo { - state_l2_block_number, - vm_l1_batch_number, - l1_batch_timestamp, - protocol_version, - historical_fee_input, - } = rt_handle - .block_on(block_args.resolve_block_info(&mut connection)) - .with_context(|| format!("failed resolving block numbers for {block_args:?}"))?; - let resolve_time = resolve_started_at.elapsed(); - // We don't want to emit too many logs. - if resolve_time > Duration::from_millis(10) { - tracing::debug!( - "Resolved block numbers (took {:?})", - resolve_started_at.elapsed() - ); - } - - if block_args.resolves_to_latest_sealed_miniblock() { - shared_args - .caches - .schedule_values_update(state_l2_block_number); - } - - let mut l2_block_info_to_reset = None; - let current_l2_block_info = rt_handle - .block_on(StoredL2BlockInfo::new( - &mut connection, - state_l2_block_number, - )) - .context("failed reading L2 block info")?; - let next_l2_block_info = if block_args.is_pending_miniblock() { - L2BlockEnv { - number: current_l2_block_info.l2_block_number + 1, - timestamp: l1_batch_timestamp, - prev_block_hash: current_l2_block_info.l2_block_hash, - // For simplicity we assume each miniblock create one virtual block. - // This may be wrong only during transition period. - max_virtual_blocks_to_create: 1, - } - } else if current_l2_block_info.l2_block_number == 0 { - // Special case: - // - For environments, where genesis block was created before virtual block upgrade it doesn't matter what we put here. - // - Otherwise, we need to put actual values here. We cannot create next L2 block with block_number=0 and `max_virtual_blocks_to_create=0` - // because of SystemContext requirements. But, due to intrinsics of SystemContext, block.number still will be resolved to 0. - L2BlockEnv { - number: 1, - timestamp: 0, - prev_block_hash: MiniblockHasher::legacy_hash(MiniblockNumber(0)), - max_virtual_blocks_to_create: 1, - } - } else { - // We need to reset L2 block info in storage to process transaction in the current block context. - // Actual resetting will be done after `storage_view` is created. - let prev_l2_block_info = rt_handle - .block_on(StoredL2BlockInfo::new( - &mut connection, - state_l2_block_number - 1, - )) - .context("failed reading previous L2 block info")?; - - l2_block_info_to_reset = Some(prev_l2_block_info); - L2BlockEnv { - number: current_l2_block_info.l2_block_number, - timestamp: current_l2_block_info.l2_block_timestamp, - prev_block_hash: prev_l2_block_info.l2_block_hash, - max_virtual_blocks_to_create: 1, - } - }; - - let storage = PostgresStorage::new(rt_handle.clone(), connection, state_l2_block_number, false) - .with_caches(shared_args.caches); - let mut storage_view = StorageView::new(storage); - - let storage_view_setup_started_at = Instant::now(); - if let Some(nonce) = execution_args.enforced_nonce { - let nonce_key = get_nonce_key(&tx.initiator_account()); - let full_nonce = storage_view.read_value(&nonce_key); - let (_, deployment_nonce) = decompose_full_nonce(h256_to_u256(full_nonce)); - let enforced_full_nonce = nonces_to_full_nonce(U256::from(nonce.0), deployment_nonce); - storage_view.set_value(nonce_key, u256_to_h256(enforced_full_nonce)); - } - - let payer = tx.payer(); - let balance_key = storage_key_for_eth_balance(&payer); - let mut current_balance = h256_to_u256(storage_view.read_value(&balance_key)); - current_balance += execution_args.added_balance; - storage_view.set_value(balance_key, u256_to_h256(current_balance)); - - // Reset L2 block info. - if let Some(l2_block_info_to_reset) = l2_block_info_to_reset { - let l2_block_info_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, - ); - let l2_block_info = pack_block_info( - l2_block_info_to_reset.l2_block_number as u64, - l2_block_info_to_reset.l2_block_timestamp, - ); - storage_view.set_value(l2_block_info_key, u256_to_h256(l2_block_info)); - - let l2_block_txs_rolling_hash_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, - ); - storage_view.set_value( - l2_block_txs_rolling_hash_key, - l2_block_info_to_reset.txs_rolling_hash, - ); - } - - let storage_view_setup_time = storage_view_setup_started_at.elapsed(); - // We don't want to emit too many logs. - if storage_view_setup_time > Duration::from_millis(10) { - tracing::debug!("Prepared the storage view (took {storage_view_setup_time:?})",); + tracing::debug!("Obtained connection (took {connection_acquire_time:?})"); } - let TxSharedArgs { - operator_account, - fee_input, - base_system_contracts, - validation_computational_gas_limit, - chain_id, - .. - } = shared_args; - - // In case we are executing in a past block, we'll - // use the historical fee data. - let fee_input = historical_fee_input.unwrap_or(fee_input); - let fee_input = if adjust_pubdata_price { - adjust_pubdata_price_for_tx( - fee_input, - tx.gas_per_pubdata_byte_limit(), - protocol_version.into(), - ) - } else { - fee_input - }; - - let system_env = SystemEnv { - zk_porter_available: ZKPORTER_IS_AVAILABLE, - version: protocol_version, - base_system_smart_contracts: base_system_contracts - .get_by_protocol_version(protocol_version), - gas_limit: BLOCK_GAS_LIMIT, - execution_mode: execution_args.execution_mode, - default_validation_computational_gas_limit: validation_computational_gas_limit, - chain_id, - }; - let l1_batch_env = L1BatchEnv { - previous_batch_hash: None, - number: vm_l1_batch_number, - timestamp: l1_batch_timestamp, - fee_input, - fee_account: *operator_account.address(), - enforced_base_fee: execution_args.enforced_base_fee, - first_l2_block: next_l2_block_info, - }; - - let storage_view = storage_view.to_rc_ptr(); - let mut vm = Box::new(VmInstance::new_with_specific_version( - l1_batch_env, - system_env, - storage_view.clone(), - protocol_version.into_api_vm_version(), - )); + let sandbox = rt_handle.block_on(Sandbox::new( + connection, + shared_args, + execution_args, + block_args, + ))?; + let (mut vm, storage_view) = sandbox.into_vm(&tx, adjust_pubdata_price); SANDBOX_METRICS.sandbox[&SandboxStage::Initialization].observe(stage_started_at.elapsed()); span.exit(); @@ -258,8 +338,6 @@ pub(super) fn apply_vm_in_sandbox( vm_execution_took, storage_view.as_ref().borrow_mut().metrics(), ); - drop(vm_permit); // Ensure that the permit lives until this point - Ok(result) } @@ -272,9 +350,11 @@ struct StoredL2BlockInfo { } impl StoredL2BlockInfo { + /// If `miniblock_hash` is `None`, it needs to be fetched from the storage. async fn new( connection: &mut StorageProcessor<'_>, miniblock_number: MiniblockNumber, + miniblock_hash: Option, ) -> anyhow::Result { let l2_block_info_key = StorageKey::new( AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), @@ -297,12 +377,16 @@ impl StoredL2BlockInfo { .await .context("failed reading transaction rolling hash from VM state")?; - let l2_block_hash = connection - .blocks_web3_dal() - .get_miniblock_hash(miniblock_number) - .await - .with_context(|| format!("failed getting hash for miniblock #{miniblock_number}"))? - .with_context(|| format!("miniblock #{miniblock_number} disappeared from Postgres"))?; + let l2_block_hash = if let Some(hash) = miniblock_hash { + hash + } else { + connection + .blocks_web3_dal() + .get_miniblock_hash(miniblock_number) + .await + .with_context(|| format!("failed getting hash for miniblock #{miniblock_number}"))? + .with_context(|| format!("miniblock #{miniblock_number} not present in storage"))? + }; Ok(Self { l2_block_number: l2_block_number as u32, @@ -314,23 +398,24 @@ impl StoredL2BlockInfo { } #[derive(Debug)] -pub(crate) struct ResolvedBlockInfo { - pub state_l2_block_number: MiniblockNumber, - pub vm_l1_batch_number: L1BatchNumber, - pub l1_batch_timestamp: u64, - pub protocol_version: ProtocolVersionId, - pub historical_fee_input: Option, +struct ResolvedBlockInfo { + state_l2_block_number: MiniblockNumber, + state_l2_block_hash: H256, + vm_l1_batch_number: L1BatchNumber, + l1_batch_timestamp: u64, + protocol_version: ProtocolVersionId, + historical_fee_input: Option, } impl BlockArgs { - pub(crate) fn is_pending_miniblock(&self) -> bool { + fn is_pending_miniblock(&self) -> bool { matches!( self.block_id, api::BlockId::Number(api::BlockNumber::Pending) ) } - pub(crate) fn is_estimate_like(&self) -> bool { + fn is_estimate_like(&self) -> bool { matches!( self.block_id, api::BlockId::Number(api::BlockNumber::Pending) @@ -339,50 +424,57 @@ impl BlockArgs { ) } - pub(crate) async fn resolve_block_info( + async fn resolve_block_info( &self, connection: &mut StorageProcessor<'_>, ) -> anyhow::Result { let (state_l2_block_number, vm_l1_batch_number, l1_batch_timestamp); - if self.is_pending_miniblock() { - let sealed_l1_batch_number = - connection.blocks_dal().get_sealed_l1_batch_number().await?; + let miniblock_header = if self.is_pending_miniblock() { + vm_l1_batch_number = connection + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("failed getting sealed L1 batch number")? + .context("no L1 batches in storage")?; let sealed_miniblock_header = connection .blocks_dal() .get_last_sealed_miniblock_header() - .await? + .await + .context("failed getting sealed miniblock header")? .context("no miniblocks in storage")?; - vm_l1_batch_number = match sealed_l1_batch_number { - Some(number) => number + 1, - None => projected_first_l1_batch(connection).await?, - }; state_l2_block_number = sealed_miniblock_header.number; // Timestamp of the next L1 batch must be greater than the timestamp of the last miniblock. l1_batch_timestamp = seconds_since_epoch().max(sealed_miniblock_header.timestamp + 1); + sealed_miniblock_header } else { vm_l1_batch_number = connection .storage_web3_dal() .resolve_l1_batch_number_of_miniblock(self.resolved_block_number) - .await? + .await + .context("failed resolving L1 batch for miniblock")? .expected_l1_batch(); - l1_batch_timestamp = self.l1_batch_timestamp_s.unwrap_or_else(|| { - panic!( - "L1 batch timestamp is `None`, `block_id`: {:?}, `resolved_block_number`: {}", - self.block_id, self.resolved_block_number.0 - ); - }); + l1_batch_timestamp = self + .l1_batch_timestamp_s + .context("L1 batch timestamp is `None` for non-pending block args")?; state_l2_block_number = self.resolved_block_number; + + connection + .blocks_dal() + .get_miniblock_header(self.resolved_block_number) + .await + .context("failed getting header of resolved miniblock")? + .context("resolved miniblock disappeared from storage")? }; let historical_fee_input = if !self.is_estimate_like() { let miniblock_header = connection .blocks_dal() .get_miniblock_header(self.resolved_block_number) - .await? - .context("The resolved miniblock is not in storage")?; - + .await + .context("failed getting resolved miniblock header")? + .context("resolved miniblock is not in storage")?; Some(miniblock_header.batch_fee_input) } else { None @@ -390,14 +482,13 @@ impl BlockArgs { // Blocks without version specified are considered to be of `Version9`. // TODO: remove `unwrap_or` when protocol version ID will be assigned for each block. - let protocol_version = connection - .blocks_dal() - .get_miniblock_protocol_version_id(state_l2_block_number) - .await? + let protocol_version = miniblock_header + .protocol_version .unwrap_or(ProtocolVersionId::last_potentially_undefined()); Ok(ResolvedBlockInfo { state_l2_block_number, + state_l2_block_hash: miniblock_header.hash, vm_l1_batch_number, l1_batch_timestamp, protocol_version, diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs index 20b5d7815507..f194ce3bbb1b 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs @@ -115,7 +115,7 @@ impl TransactionExecutor { ) -> anyhow::Result { #[cfg(test)] if let Self::Mock(mock_executor) = self { - return mock_executor.execute_tx(&tx); + return mock_executor.execute_tx(&tx, &block_args); } let total_factory_deps = tx diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs index 3c3a8e9a36ad..5a208f57a190 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs @@ -219,14 +219,33 @@ pub(crate) struct TxSharedArgs { pub chain_id: L2ChainId, } +impl TxSharedArgs { + #[cfg(test)] + pub fn mock(base_system_contracts: MultiVMBaseSystemContracts, pool: ConnectionPool) -> Self { + let mut caches = PostgresStorageCaches::new(1, 1); + tokio::task::spawn_blocking(caches.configure_storage_values_cache( + 1, + pool, + Handle::current(), + )); + + Self { + operator_account: AccountTreeId::default(), + fee_input: BatchFeeInput::l1_pegged(55, 555), + base_system_contracts, + caches, + validation_computational_gas_limit: u32::MAX, + chain_id: L2ChainId::default(), + } + } +} + /// Information about first L1 batch / miniblock in the node storage. #[derive(Debug, Clone, Copy)] pub(crate) struct BlockStartInfo { - /// Projected number of the first locally available miniblock. This miniblock is **not** - /// guaranteed to be present in the storage! + /// Number of the first locally available miniblock. pub first_miniblock: MiniblockNumber, - /// Projected number of the first locally available L1 batch. This L1 batch is **not** - /// guaranteed to be present in the storage! + /// Number of the first locally available L1 batch. pub first_l1_batch: L1BatchNumber, } @@ -326,20 +345,16 @@ impl BlockArgs { .with_context(|| { format!("failed resolving L1 batch number of miniblock #{resolved_block_number}") })?; - let l1_batch_timestamp_s = connection + let l1_batch_timestamp = connection .blocks_web3_dal() .get_expected_l1_batch_timestamp(&l1_batch) .await - .with_context(|| format!("failed getting timestamp for {l1_batch:?}"))?; - if l1_batch_timestamp_s.is_none() { - // Can happen after snapshot recovery if no miniblocks are persisted yet. In this case, - // we cannot proceed; the issue will be resolved shortly. - return Err(BlockArgsError::Missing); - } + .with_context(|| format!("failed getting timestamp for {l1_batch:?}"))? + .context("missing timestamp for non-pending block")?; Ok(Self { block_id, resolved_block_number, - l1_batch_timestamp_s, + l1_batch_timestamp_s: Some(l1_batch_timestamp), }) } diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs index 4eb33d45f2c5..12c6ac6aae44 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs @@ -1,37 +1,78 @@ -use std::collections::HashMap; +use std::fmt; use multivm::interface::{ExecutionResult, VmExecutionResultAndLogs}; use zksync_types::{ - fee::TransactionExecutionMetrics, l2::L2Tx, ExecuteTransactionCommon, Transaction, H256, + fee::TransactionExecutionMetrics, l2::L2Tx, ExecuteTransactionCommon, Transaction, }; use super::{ execute::{TransactionExecutionOutput, TransactionExecutor}, validate::ValidationError, + BlockArgs, }; -#[derive(Debug, Default)] +type TxResponseFn = dyn Fn(&Transaction, &BlockArgs) -> ExecutionResult + Send + Sync; + pub(crate) struct MockTransactionExecutor { - call_responses: HashMap, TransactionExecutionOutput>, - tx_responses: HashMap, + call_responses: Box, + tx_responses: Box, +} + +impl fmt::Debug for MockTransactionExecutor { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("MockTransactionExecutor") + .finish_non_exhaustive() + } +} + +impl Default for MockTransactionExecutor { + fn default() -> Self { + Self { + call_responses: Box::new(|tx, _| { + panic!( + "Unexpected call with data {}", + hex::encode(tx.execute.calldata()) + ); + }), + tx_responses: Box::new(|tx, _| { + panic!("Unexpect transaction call: {tx:?}"); + }), + } + } } impl MockTransactionExecutor { - pub fn insert_call_response(&mut self, calldata: Vec, result: ExecutionResult) { - let output = TransactionExecutionOutput { - vm: VmExecutionResultAndLogs { - result, - logs: Default::default(), - statistics: Default::default(), - refunds: Default::default(), - }, - metrics: TransactionExecutionMetrics::default(), - are_published_bytecodes_ok: true, - }; - self.call_responses.insert(calldata, output); + pub fn set_call_responses(&mut self, responses: F) + where + F: Fn(&Transaction, &BlockArgs) -> ExecutionResult + 'static + Send + Sync, + { + self.call_responses = Box::new(responses); + } + + pub fn set_tx_responses(&mut self, responses: F) + where + F: Fn(&Transaction, &BlockArgs) -> ExecutionResult + 'static + Send + Sync, + { + self.tx_responses = Box::new(responses); } - pub fn insert_tx_response(&mut self, tx_hash: H256, result: ExecutionResult) { + pub fn validate_tx(&self, tx: L2Tx, block_args: &BlockArgs) -> Result<(), ValidationError> { + let result = (self.tx_responses)(&tx.into(), block_args); + match result { + ExecutionResult::Success { .. } => Ok(()), + other => Err(ValidationError::Internal(anyhow::anyhow!( + "transaction validation failed: {other:?}" + ))), + } + } + + pub fn execute_tx( + &self, + tx: &Transaction, + block_args: &BlockArgs, + ) -> anyhow::Result { + let result = self.get_execution_result(tx, block_args); let output = TransactionExecutionOutput { vm: VmExecutionResultAndLogs { result, @@ -42,33 +83,16 @@ impl MockTransactionExecutor { metrics: TransactionExecutionMetrics::default(), are_published_bytecodes_ok: true, }; - self.tx_responses.insert(tx_hash, output); + Ok(output) } - pub fn validate_tx(&self, tx: &L2Tx) -> Result<(), ValidationError> { - self.tx_responses - .get(&tx.hash()) - .unwrap_or_else(|| panic!("Validating unexpected transaction: {tx:?}")); - Ok(()) - } - - pub fn execute_tx(&self, tx: &Transaction) -> anyhow::Result { + fn get_execution_result(&self, tx: &Transaction, block_args: &BlockArgs) -> ExecutionResult { if let ExecuteTransactionCommon::L2(data) = &tx.common_data { if data.input.is_none() { - // `Transaction` was obtained from a `CallRequest` - return Ok(self - .call_responses - .get(tx.execute.calldata()) - .unwrap_or_else(|| panic!("Executing unexpected call: {tx:?}")) - .clone()); + return (self.call_responses)(tx, block_args); } } - - Ok(self - .tx_responses - .get(&tx.hash()) - .unwrap_or_else(|| panic!("Executing unexpected transaction: {tx:?}")) - .clone()) + (self.tx_responses)(tx, block_args) } } diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs index 525b2a26b431..f66455ce0dca 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs @@ -4,8 +4,9 @@ use assert_matches::assert_matches; use super::*; use crate::{ + api_server::{execution_sandbox::apply::apply_vm_in_sandbox, tx_sender::ApiContracts}, genesis::{ensure_genesis_state, GenesisParams}, - utils::testonly::{create_miniblock, prepare_recovery_snapshot}, + utils::testonly::{create_l2_transaction, create_miniblock, prepare_recovery_snapshot}, }; #[tokio::test] @@ -154,3 +155,44 @@ async fn creating_block_args_after_snapshot_recovery() { assert_matches!(err, BlockArgsError::Missing); } } + +#[tokio::test] +async fn instantiating_vm() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let block_args = BlockArgs::pending(&mut storage).await.unwrap(); + test_instantiating_vm(pool.clone(), block_args).await; + let start_info = BlockStartInfo::new(&mut storage).await.unwrap(); + let block_args = BlockArgs::new(&mut storage, api::BlockId::Number(0.into()), start_info) + .await + .unwrap(); + test_instantiating_vm(pool.clone(), block_args).await; +} + +async fn test_instantiating_vm(pool: ConnectionPool, block_args: BlockArgs) { + let (vm_concurrency_limiter, _) = VmConcurrencyLimiter::new(1); + let vm_permit = vm_concurrency_limiter.acquire().await.unwrap(); + let transaction = create_l2_transaction(10, 100).into(); + + tokio::task::spawn_blocking(move || { + apply_vm_in_sandbox( + vm_permit, + TxSharedArgs::mock(ApiContracts::load_from_disk().estimate_gas, pool.clone()), + true, + &TxExecutionArgs::for_gas_estimate(None, &transaction, 123), + &pool, + transaction.clone(), + block_args, + |_, received_tx| { + assert_eq!(received_tx, transaction); + }, + ) + }) + .await + .expect("VM instantiation panicked") + .expect("VM instantiation errored"); +} diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs index a2439816d817..462977a8d36c 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs @@ -42,7 +42,7 @@ impl TransactionExecutor { ) -> Result<(), ValidationError> { #[cfg(test)] if let Self::Mock(mock) = self { - return mock.validate_tx(&tx); + return mock.validate_tx(tx, &block_args); } let stage_latency = SANDBOX_METRICS.sandbox[&SandboxStage::ValidateInSandbox].start(); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index f93e67aa8caf..51c817b8fbe6 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -39,6 +39,7 @@ use crate::{ fee_model::BatchFeeModelInputProvider, metrics::{TxStage, APP_METRICS}, state_keeper::seal_criteria::{ConditionalSealer, NoopSealer, SealData}, + utils::pending_protocol_version, }; mod proxy; @@ -375,8 +376,7 @@ impl TxSender { .as_ref() .unwrap() // Checked above .access_storage_tagged("api") - .await - .unwrap() + .await? .transactions_dal() .insert_transaction_l2(tx, execution_output.metrics) .await; @@ -664,11 +664,9 @@ impl TxSender { let mut connection = self.acquire_replica_connection().await?; let block_args = BlockArgs::pending(&mut connection).await?; - let protocol_version = block_args - .resolve_block_info(&mut connection) + let protocol_version = pending_protocol_version(&mut connection) .await - .with_context(|| format!("failed resolving block info for {block_args:?}"))? - .protocol_version; + .context("failed getting pending protocol version")?; drop(connection); let fee_input = { @@ -917,12 +915,9 @@ impl TxSender { pub async fn gas_price(&self) -> anyhow::Result { let mut connection = self.acquire_replica_connection().await?; - let block_args = BlockArgs::pending(&mut connection).await?; - let protocol_version = block_args - .resolve_block_info(&mut connection) + let protocol_version = pending_protocol_version(&mut connection) .await - .with_context(|| format!("failed resolving block info for {block_args:?}"))? - .protocol_version; + .context("failed obtaining pending protocol version")?; drop(connection); let (base_fee, _) = derive_base_fee_and_gas_per_pubdata( diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 0f88382a1d18..7b304f4f2dfb 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -41,6 +41,7 @@ use crate::{ web3::backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware, }, sync_layer::SyncState, + utils::wait_for_l1_batch, }; pub mod backend_jsonrpsee; @@ -98,9 +99,10 @@ impl Namespace { /// Handles to the initialized API server. #[derive(Debug)] pub struct ApiServerHandles { - pub local_addr: SocketAddr, pub tasks: Vec>>, pub health_check: ReactiveHealthCheck, + #[allow(unused)] // only used in tests + pub(crate) local_addr: future::TryMaybeDone>, } /// Optional part of the API server parameters. @@ -441,22 +443,11 @@ impl FullApiParams { health_updater, )); - let local_addr = match local_addr.await { - Ok(addr) => addr, - Err(_) => { - // If the local address was not transmitted, `server_task` must have failed. - let err = server_task - .await - .with_context(|| format!("{health_check_name} server panicked"))? - .unwrap_err(); - return Err(err); - } - }; tasks.push(server_task); Ok(ApiServerHandles { - local_addr, health_check, tasks, + local_addr: future::try_maybe_done(local_addr), }) } @@ -469,6 +460,29 @@ impl FullApiParams { health_updater: HealthUpdater, ) -> anyhow::Result<()> { let transport = self.transport; + let (transport_str, is_http, addr) = match transport { + ApiTransport::Http(addr) => ("HTTP", true, addr), + ApiTransport::WebSocket(addr) => ("WS", false, addr), + }; + let transport_label = (&transport).into(); + + tracing::info!( + "Waiting for at least one L1 batch in Postgres to start {transport_str} API server" + ); + // Starting the server before L1 batches are present in Postgres can lead to some invariants the server logic + // implicitly assumes not being upheld. The only case when we'll actually wait here is immediately after snapshot recovery. + let earliest_l1_batch_number = + wait_for_l1_batch(&self.pool, self.polling_interval, &mut stop_receiver) + .await + .context("error while waiting for L1 batch in Postgres")?; + + if let Some(number) = earliest_l1_batch_number { + tracing::info!("Successfully waited for at least one L1 batch in Postgres; the earliest one is #{number}"); + } else { + tracing::info!("Received shutdown signal before {transport_str} API server is started; shutting down"); + return Ok(()); + } + let batch_request_config = self .optional .batch_request_size_limit @@ -487,12 +501,6 @@ impl FullApiParams { .build_rpc_module(pub_sub, last_sealed_miniblock) .await?; - let (transport_str, is_http, addr) = match transport { - ApiTransport::Http(addr) => ("HTTP", true, addr), - ApiTransport::WebSocket(addr) => ("WS", false, addr), - }; - let transport_label = (&transport).into(); - // Setup CORS. let cors = is_http.then(|| { CorsLayer::new() @@ -549,6 +557,7 @@ impl FullApiParams { let local_addr = local_addr.with_context(|| { format!("Failed getting local address for {transport_str} JSON-RPC server") })?; + tracing::info!("Initialized {transport_str} API on {local_addr:?}"); local_addr_sender.send(local_addr).ok(); let close_handle = server_handle.clone(); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 0098eacdbf03..86f7e8a56764 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -109,7 +109,7 @@ impl EthNamespace { .set_nonce_for_call_request(&mut request_with_gas_per_pubdata_overridden) .await?; - if let Some(ref mut eip712_meta) = request_with_gas_per_pubdata_overridden.eip712_meta { + if let Some(eip712_meta) = &mut request_with_gas_per_pubdata_overridden.eip712_meta { if eip712_meta.gas_per_pubdata == U256::zero() { eip712_meta.gas_per_pubdata = DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE.into(); } @@ -132,7 +132,6 @@ impl EthNamespace { // When we're estimating fee, we are trying to deduce values related to fee, so we should // not consider provided ones. - let gas_price = self.state.tx_sender.gas_price().await; let gas_price = gas_price.map_err(|err| internal_error(METHOD_NAME, err))?; tx.common_data.fee.max_fee_per_gas = gas_price.into(); @@ -580,12 +579,9 @@ impl EthNamespace { .blocks_dal() .get_sealed_miniblock_number() .await - .map_err(|err| internal_error(METHOD_NAME, err))?; - let next_block_number = match last_block_number { - Some(number) => number + 1, - // If we don't have miniblocks in the storage, use the first projected miniblock number as the cursor - None => self.state.start_info.first_miniblock, - }; + .map_err(|err| internal_error(METHOD_NAME, err))? + .ok_or_else(|| internal_error(METHOD_NAME, "no miniblocks in storage"))?; + let next_block_number = last_block_number + 1; drop(storage); let idx = self diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 180e2de7211f..d930880c4be7 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -343,24 +343,27 @@ impl RpcState { ) -> Result<(), Web3Error> { const METHOD_NAME: &str = "set_nonce_for_call_request"; - if call_request.nonce.is_none() { - let from = call_request.from.unwrap_or_default(); - let block_id = api::BlockId::Number(api::BlockNumber::Latest); - let mut connection = self - .connection_pool - .access_storage_tagged("api") - .await - .unwrap(); - let block_number = self - .resolve_block(&mut connection, block_id, METHOD_NAME) - .await?; - let address_historical_nonce = connection - .storage_web3_dal() - .get_address_historical_nonce(from, block_number) - .await - .map_err(|err| internal_error(METHOD_NAME, err))?; - call_request.nonce = Some(address_historical_nonce); + if call_request.nonce.is_some() { + return Ok(()); } + let mut connection = self + .connection_pool + .access_storage_tagged("api") + .await + .map_err(|err| internal_error(METHOD_NAME, err))?; + + let latest_block_id = api::BlockId::Number(api::BlockNumber::Latest); + let latest_block_number = self + .resolve_block(&mut connection, latest_block_id, METHOD_NAME) + .await?; + + let from = call_request.from.unwrap_or_default(); + let address_historical_nonce = connection + .storage_web3_dal() + .get_address_historical_nonce(from, latest_block_number) + .await + .map_err(|err| internal_error(METHOD_NAME, err))?; + call_request.nonce = Some(address_historical_nonce); Ok(()) } } diff --git a/core/lib/zksync_core/src/api_server/web3/tests/debug.rs b/core/lib/zksync_core/src/api_server/web3/tests/debug.rs index e8821d03e696..0da58d28cf9d 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/debug.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/debug.rs @@ -152,7 +152,7 @@ impl HttpTest for TraceBlockTestWithSnapshotRecovery { assert_pruned_block_error(&error, snapshot_miniblock_number + 1); } - TraceBlockTest(snapshot_miniblock_number + 1) + TraceBlockTest(snapshot_miniblock_number + 2) .test(client, pool) .await?; Ok(()) diff --git a/core/lib/zksync_core/src/api_server/web3/tests/filters.rs b/core/lib/zksync_core/src/api_server/web3/tests/filters.rs index 2b202be8c028..1321b7241b64 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/filters.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/filters.rs @@ -31,7 +31,7 @@ impl HttpTest for BasicFilterChangesTest { let new_miniblock = store_miniblock( &mut pool.access_storage().await?, if self.snapshot_recovery { - StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1 + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 } else { MiniblockNumber(1) }, @@ -115,12 +115,12 @@ impl HttpTest for LogFilterChangesTest { let topics_filter_id = client.new_filter(topics_filter).await?; let mut storage = pool.access_storage().await?; - let first_local_miniblock = if self.snapshot_recovery { - StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0 + 1 + let next_local_miniblock = if self.snapshot_recovery { + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0 + 2 } else { 1 }; - let (_, events) = store_events(&mut storage, first_local_miniblock, 0).await?; + let (_, events) = store_events(&mut storage, next_local_miniblock, 0).await?; drop(storage); let events: Vec<_> = events.iter().collect(); diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index 4292ab8c0c21..33d578cc3d4e 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Instant}; +use std::{collections::HashMap, pin::Pin, time::Instant}; use assert_matches::assert_matches; use async_trait::async_trait; @@ -14,7 +14,6 @@ use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, Storage use zksync_health_check::CheckHealth; use zksync_types::{ api, - api::BlockId, block::MiniblockHeader, fee::TransactionExecutionMetrics, get_nonce_key, @@ -57,8 +56,8 @@ const TEST_TIMEOUT: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(50); impl ApiServerHandles { - /// Waits until the server health check reports the ready state. - pub(crate) async fn wait_until_ready(&self) { + /// Waits until the server health check reports the ready state. Must be called once per server instance. + pub(crate) async fn wait_until_ready(&mut self) -> SocketAddr { let started_at = Instant::now(); loop { assert!( @@ -71,6 +70,13 @@ impl ApiServerHandles { } tokio::time::sleep(POLL_INTERVAL).await; } + + let mut local_addr_future = Pin::new(&mut self.local_addr); + local_addr_future + .as_mut() + .await + .expect("API server panicked"); + local_addr_future.output_mut().copied().unwrap() } pub(crate) async fn shutdown(self) { @@ -233,6 +239,10 @@ impl StorageInitialization { .factory_deps_dal() .insert_factory_deps(Self::SNAPSHOT_RECOVERY_BLOCK, factory_deps) .await?; + + // Insert the next L1 batch in the storage so that the API server doesn't hang up. + store_miniblock(storage, Self::SNAPSHOT_RECOVERY_BLOCK + 1, &[]).await?; + seal_l1_batch(storage, Self::SNAPSHOT_RECOVERY_BATCH + 1).await?; } } Ok(()) @@ -250,17 +260,17 @@ async fn test_http_server(test: impl HttpTest) { drop(storage); let (stop_sender, stop_receiver) = watch::channel(false); - let server_handles = spawn_http_server( + let mut server_handles = spawn_http_server( &network_config, pool.clone(), test.transaction_executor(), stop_receiver, ) .await; - server_handles.wait_until_ready().await; + let local_addr = server_handles.wait_until_ready().await; let client = ::builder() - .build(format!("http://{}/", server_handles.local_addr)) + .build(format!("http://{local_addr}/")) .unwrap(); test.test(&client, &pool).await.unwrap(); @@ -440,27 +450,12 @@ impl HttpTest for BlockMethodsWithSnapshotRecovery { StorageInitialization::empty_recovery() } - async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { - let error = client.get_block_number().await.unwrap_err(); - if let ClientError::Call(error) = error { - assert_eq!(error.code(), ErrorCode::InvalidParams.code()); - } else { - panic!("Unexpected error: {error:?}"); - } - - let block = client - .get_block_by_number(api::BlockNumber::Latest, false) - .await?; - assert!(block.is_none()); + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { let block = client.get_block_by_number(1_000.into(), false).await?; assert!(block.is_none()); - let mut storage = pool.access_storage().await?; - store_miniblock(&mut storage, MiniblockNumber(24), &[]).await?; - drop(storage); - - let block_number = client.get_block_number().await?; let expected_block_number = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; + let block_number = client.get_block_number().await?; assert_eq!(block_number, expected_block_number.0.into()); for block_number in [api::BlockNumber::Latest, expected_block_number.0.into()] { @@ -471,7 +466,7 @@ impl HttpTest for BlockMethodsWithSnapshotRecovery { assert_eq!(block.number, expected_block_number.0.into()); } - for number in [0, 1, expected_block_number.0 - 1] { + for number in [0, 1, StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0] { let error = client .get_block_details(MiniblockNumber(number)) .await @@ -528,21 +523,9 @@ impl HttpTest for L1BatchMethodsWithSnapshotRecovery { StorageInitialization::empty_recovery() } - async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { - let error = client.get_l1_batch_number().await.unwrap_err(); - if let ClientError::Call(error) = error { - assert_eq!(error.code(), ErrorCode::InvalidParams.code()); - } else { - panic!("Unexpected error: {error:?}"); - } - - let mut storage = pool.access_storage().await?; + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { let miniblock_number = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; let l1_batch_number = StorageInitialization::SNAPSHOT_RECOVERY_BATCH + 1; - store_miniblock(&mut storage, miniblock_number, &[]).await?; - seal_l1_batch(&mut storage, l1_batch_number).await?; - drop(storage); - assert_eq!( client.get_l1_batch_number().await?, l1_batch_number.0.into() @@ -631,9 +614,7 @@ impl HttpTest for StorageAccessWithSnapshotRecovery { StorageInitialization::Recovery { logs, factory_deps } } - async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { - let mut storage = pool.access_storage().await?; - + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { let address = Address::repeat_byte(1); let first_local_miniblock = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; for number in [0, 1, first_local_miniblock.0 - 1] { @@ -649,9 +630,6 @@ impl HttpTest for StorageAccessWithSnapshotRecovery { assert_pruned_block_error(&error, first_local_miniblock); } - store_miniblock(&mut storage, first_local_miniblock, &[]).await?; - drop(storage); - for number in [api::BlockNumber::Latest, first_local_miniblock.0.into()] { let number = api::BlockIdVariant::BlockNumber(number); let code = client.get_code(address, Some(number)).await?; @@ -806,8 +784,6 @@ impl HttpTest for TransactionCountAfterSnapshotRecoveryTest { } let latest_miniblock_number = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; - store_miniblock(&mut storage, latest_miniblock_number, &[]).await?; - let latest_block_numbers = [api::BlockNumber::Latest, latest_miniblock_number.0.into()]; for number in latest_block_numbers { let number = api::BlockIdVariant::BlockNumber(number); @@ -860,7 +836,7 @@ impl HttpTest for TransactionReceiptsTest { } let receipts = client - .get_block_receipts(BlockId::Number(miniblock_number.0.into())) + .get_block_receipts(api::BlockId::Number(miniblock_number.0.into())) .await?; assert_eq!(receipts.len(), 2); for (receipt, expected_receipt) in receipts.iter().zip(&expected_receipts) { diff --git a/core/lib/zksync_core/src/api_server/web3/tests/vm.rs b/core/lib/zksync_core/src/api_server/web3/tests/vm.rs index 060c5cafeee8..f8afa306e785 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/vm.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/vm.rs @@ -1,12 +1,13 @@ //! Tests for the VM-instantiating methods (e.g., `eth_call`). -// TODO: Test other VM methods (`debug_traceCall`, `eth_estimateGas`) +use std::sync::atomic::{AtomicU32, Ordering}; -use multivm::interface::ExecutionResult; +use multivm::interface::{ExecutionResult, VmRevertReason}; use zksync_types::{ get_intrinsic_constants, transaction_request::CallRequest, L2ChainId, PackedEthSignature, U256, }; use zksync_utils::u256_to_h256; +use zksync_web3_decl::namespaces::DebugNamespaceClient; use super::*; @@ -14,48 +15,62 @@ use super::*; struct CallTest; impl CallTest { - fn call_request() -> CallRequest { + fn call_request(data: &[u8]) -> CallRequest { CallRequest { from: Some(Address::repeat_byte(1)), to: Some(Address::repeat_byte(2)), - data: Some(b"call".to_vec().into()), + data: Some(data.to_vec().into()), + value: Some(4321.into()), + gas: Some(123.into()), ..CallRequest::default() } } -} -#[async_trait] -impl HttpTest for CallTest { - fn transaction_executor(&self) -> MockTransactionExecutor { + fn create_executor(only_block: MiniblockNumber) -> MockTransactionExecutor { let mut tx_executor = MockTransactionExecutor::default(); - tx_executor.insert_call_response( - Self::call_request().data.unwrap().0, + tx_executor.set_call_responses(move |tx, block_args| { + let expected_block_number = match tx.execute.calldata() { + b"pending" => only_block + 1, + b"first" => only_block, + data => panic!("Unexpected calldata: {data:?}"), + }; + assert_eq!(block_args.resolved_block_number(), expected_block_number); + ExecutionResult::Success { output: b"output".to_vec(), - }, - ); + } + }); tx_executor } +} + +#[async_trait] +impl HttpTest for CallTest { + fn transaction_executor(&self) -> MockTransactionExecutor { + Self::create_executor(MiniblockNumber(0)) + } async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { - let call_result = client.call(Self::call_request(), None).await?; + let call_result = client.call(Self::call_request(b"pending"), None).await?; assert_eq!(call_result.0, b"output"); - let valid_block_numbers = [ - api::BlockNumber::Pending, - api::BlockNumber::Latest, - 0.into(), + let valid_block_numbers_and_calldata = [ + (api::BlockNumber::Pending, b"pending" as &[_]), + (api::BlockNumber::Latest, b"first"), + (0.into(), b"first"), ]; - for number in valid_block_numbers { + for (number, calldata) in valid_block_numbers_and_calldata { let number = api::BlockIdVariant::BlockNumber(number); - let call_result = client.call(Self::call_request(), Some(number)).await?; + let call_result = client + .call(Self::call_request(calldata), Some(number)) + .await?; assert_eq!(call_result.0, b"output"); } let invalid_block_number = api::BlockNumber::from(100); let number = api::BlockIdVariant::BlockNumber(invalid_block_number); let error = client - .call(Self::call_request(), Some(number)) + .call(Self::call_request(b"100"), Some(number)) .await .unwrap_err(); if let ClientError::Call(error) = error { @@ -83,50 +98,41 @@ impl HttpTest for CallTestAfterSnapshotRecovery { } fn transaction_executor(&self) -> MockTransactionExecutor { - CallTest.transaction_executor() + let first_local_miniblock = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; + CallTest::create_executor(first_local_miniblock) } - async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { - let call_result = client.call(CallTest::call_request(), None).await?; + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { + let call_result = client + .call(CallTest::call_request(b"pending"), None) + .await?; assert_eq!(call_result.0, b"output"); let pending_block_number = api::BlockIdVariant::BlockNumber(api::BlockNumber::Pending); let call_result = client - .call(CallTest::call_request(), Some(pending_block_number)) + .call( + CallTest::call_request(b"pending"), + Some(pending_block_number), + ) .await?; assert_eq!(call_result.0, b"output"); let first_local_miniblock = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; - let first_miniblock_numbers = [api::BlockNumber::Latest, first_local_miniblock.0.into()]; - for number in first_miniblock_numbers { - let number = api::BlockIdVariant::BlockNumber(number); - let error = client - .call(CallTest::call_request(), Some(number)) - .await - .unwrap_err(); - if let ClientError::Call(error) = error { - assert_eq!(error.code(), ErrorCode::InvalidParams.code()); - } else { - panic!("Unexpected error: {error:?}"); - } - } - let pruned_block_numbers = [0, 1, StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0]; for number in pruned_block_numbers { let number = api::BlockIdVariant::BlockNumber(number.into()); let error = client - .call(CallTest::call_request(), Some(number)) + .call(CallTest::call_request(b"pruned"), Some(number)) .await .unwrap_err(); - assert_pruned_block_error(&error, StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1); + assert_pruned_block_error(&error, first_local_miniblock); } - let mut storage = pool.access_storage().await?; - store_miniblock(&mut storage, first_local_miniblock, &[]).await?; - drop(storage); - + let first_miniblock_numbers = [api::BlockNumber::Latest, first_local_miniblock.0.into()]; for number in first_miniblock_numbers { let number = api::BlockIdVariant::BlockNumber(number); - let call_result = client.call(CallTest::call_request(), Some(number)).await?; + let call_result = client + .call(CallTest::call_request(b"first"), Some(number)) + .await?; assert_eq!(call_result.0, b"output"); } Ok(()) @@ -145,9 +151,7 @@ struct SendRawTransactionTest { impl SendRawTransactionTest { fn transaction_bytes_and_hash() -> (Vec, H256) { - let private_key = H256::repeat_byte(11); - let address = PackedEthSignature::address_from_private_key(&private_key).unwrap(); - + let (private_key, address) = Self::private_key_and_address(); let tx_request = api::TransactionRequest { chain_id: Some(L2ChainId::default().as_u64()), from: Some(address), @@ -172,9 +176,14 @@ impl SendRawTransactionTest { (data.into(), tx_hash) } - fn balance_storage_log() -> StorageLog { + fn private_key_and_address() -> (H256, Address) { let private_key = H256::repeat_byte(11); let address = PackedEthSignature::address_from_private_key(&private_key).unwrap(); + (private_key, address) + } + + fn balance_storage_log() -> StorageLog { + let (_, address) = Self::private_key_and_address(); let balance_key = storage_key_for_eth_balance(&address); StorageLog::new_write_log(balance_key, u256_to_h256(U256::one() << 64)) } @@ -196,10 +205,16 @@ impl HttpTest for SendRawTransactionTest { fn transaction_executor(&self) -> MockTransactionExecutor { let mut tx_executor = MockTransactionExecutor::default(); - tx_executor.insert_tx_response( - Self::transaction_bytes_and_hash().1, - ExecutionResult::Success { output: vec![] }, - ); + let pending_block = if self.snapshot_recovery { + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 + } else { + MiniblockNumber(1) + }; + tx_executor.set_tx_responses(move |tx, block_args| { + assert_eq!(tx.hash(), Self::transaction_bytes_and_hash().1); + assert_eq!(block_args.resolved_block_number(), pending_block); + ExecutionResult::Success { output: vec![] } + }); tx_executor } @@ -238,3 +253,230 @@ async fn send_raw_transaction_after_snapshot_recovery() { }) .await; } + +#[derive(Debug)] +struct TraceCallTest; + +impl TraceCallTest { + fn assert_debug_call(call_request: &CallRequest, call_result: &api::DebugCall) { + assert_eq!(call_result.from, Address::zero()); + assert_eq!(call_result.gas, call_request.gas.unwrap()); + assert_eq!(call_result.value, call_request.value.unwrap()); + assert_eq!(call_result.input, *call_request.data.as_ref().unwrap()); + assert_eq!(call_result.output.0, b"output"); + } +} + +#[async_trait] +impl HttpTest for TraceCallTest { + fn transaction_executor(&self) -> MockTransactionExecutor { + CallTest::create_executor(MiniblockNumber(0)) + } + + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { + let call_request = CallTest::call_request(b"pending"); + let call_result = client.trace_call(call_request.clone(), None, None).await?; + Self::assert_debug_call(&call_request, &call_result); + let pending_block_number = api::BlockId::Number(api::BlockNumber::Pending); + let call_result = client + .trace_call(call_request.clone(), Some(pending_block_number), None) + .await?; + Self::assert_debug_call(&call_request, &call_result); + + let genesis_block_numbers = [ + api::BlockNumber::Earliest, + api::BlockNumber::Latest, + 0.into(), + ]; + let call_request = CallTest::call_request(b"first"); + for number in genesis_block_numbers { + let call_result = client + .trace_call( + call_request.clone(), + Some(api::BlockId::Number(number)), + None, + ) + .await?; + Self::assert_debug_call(&call_request, &call_result); + } + + let invalid_block_number = api::BlockNumber::from(100); + let error = client + .trace_call( + CallTest::call_request(b"100"), + Some(api::BlockId::Number(invalid_block_number)), + None, + ) + .await + .unwrap_err(); + if let ClientError::Call(error) = error { + assert_eq!(error.code(), ErrorCode::InvalidParams.code()); + } else { + panic!("Unexpected error: {error:?}"); + } + + Ok(()) + } +} + +#[tokio::test] +async fn trace_call_basics() { + test_http_server(TraceCallTest).await; +} + +#[derive(Debug)] +struct TraceCallTestAfterSnapshotRecovery; + +#[async_trait] +impl HttpTest for TraceCallTestAfterSnapshotRecovery { + fn storage_initialization(&self) -> StorageInitialization { + StorageInitialization::empty_recovery() + } + + fn transaction_executor(&self) -> MockTransactionExecutor { + let number = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; + CallTest::create_executor(number) + } + + async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { + let call_request = CallTest::call_request(b"pending"); + let call_result = client.trace_call(call_request.clone(), None, None).await?; + TraceCallTest::assert_debug_call(&call_request, &call_result); + let pending_block_number = api::BlockId::Number(api::BlockNumber::Pending); + let call_result = client + .trace_call(call_request.clone(), Some(pending_block_number), None) + .await?; + TraceCallTest::assert_debug_call(&call_request, &call_result); + + let first_local_miniblock = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; + let pruned_block_numbers = [0, 1, StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0]; + for number in pruned_block_numbers { + let number = api::BlockIdVariant::BlockNumber(number.into()); + let error = client + .call(CallTest::call_request(b"pruned"), Some(number)) + .await + .unwrap_err(); + assert_pruned_block_error(&error, first_local_miniblock); + } + + let call_request = CallTest::call_request(b"first"); + let first_miniblock_numbers = [api::BlockNumber::Latest, first_local_miniblock.0.into()]; + for number in first_miniblock_numbers { + let number = api::BlockId::Number(number); + let call_result = client + .trace_call(call_request.clone(), Some(number), None) + .await?; + TraceCallTest::assert_debug_call(&call_request, &call_result); + } + Ok(()) + } +} + +#[tokio::test] +async fn trace_call_after_snapshot_recovery() { + test_http_server(TraceCallTestAfterSnapshotRecovery).await; +} + +#[derive(Debug)] +struct EstimateGasTest { + gas_limit_threshold: Arc, + snapshot_recovery: bool, +} + +impl EstimateGasTest { + fn new(snapshot_recovery: bool) -> Self { + Self { + gas_limit_threshold: Arc::default(), + snapshot_recovery, + } + } +} + +#[async_trait] +impl HttpTest for EstimateGasTest { + fn storage_initialization(&self) -> StorageInitialization { + let snapshot_recovery = self.snapshot_recovery; + SendRawTransactionTest { snapshot_recovery }.storage_initialization() + } + + fn transaction_executor(&self) -> MockTransactionExecutor { + let mut tx_executor = MockTransactionExecutor::default(); + let pending_block_number = if self.snapshot_recovery { + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 + } else { + MiniblockNumber(1) + }; + let gas_limit_threshold = self.gas_limit_threshold.clone(); + tx_executor.set_call_responses(move |tx, block_args| { + assert_eq!(tx.execute.calldata(), [] as [u8; 0]); + assert_eq!(tx.nonce(), Some(Nonce(0))); + assert_eq!(block_args.resolved_block_number(), pending_block_number); + + let gas_limit_threshold = gas_limit_threshold.load(Ordering::SeqCst); + if tx.gas_limit() >= U256::from(gas_limit_threshold) { + ExecutionResult::Success { output: vec![] } + } else { + ExecutionResult::Revert { + output: VmRevertReason::VmError, + } + } + }); + tx_executor + } + + async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { + let l2_transaction = create_l2_transaction(10, 100); + for threshold in [10_000, 50_000, 100_000, 1_000_000] { + self.gas_limit_threshold.store(threshold, Ordering::Relaxed); + let output = client + .estimate_gas(l2_transaction.clone().into(), None) + .await?; + assert!( + output >= U256::from(threshold), + "{output} for threshold {threshold}" + ); + assert!( + output < U256::from(threshold) * 2, + "{output} for threshold {threshold}" + ); + } + + // Check transaction with value. + if !self.snapshot_recovery { + // Manually set sufficient balance for the transaction account. + let storage_log = SendRawTransactionTest::balance_storage_log(); + let mut storage = pool.access_storage().await?; + storage + .storage_logs_dal() + .append_storage_logs(MiniblockNumber(0), &[(H256::zero(), vec![storage_log])]) + .await; + } + let mut call_request = CallRequest::from(l2_transaction); + call_request.from = Some(SendRawTransactionTest::private_key_and_address().1); + call_request.value = Some(1_000_000.into()); + client.estimate_gas(call_request.clone(), None).await?; + + call_request.value = Some(U256::max_value()); + let error = client.estimate_gas(call_request, None).await.unwrap_err(); + if let ClientError::Call(error) = error { + let error_msg = error.message(); + assert!( + error_msg.to_lowercase().contains("insufficient"), + "{error_msg}" + ); + } else { + panic!("Unexpected error: {error:?}"); + } + Ok(()) + } +} + +#[tokio::test] +async fn estimate_gas_basics() { + test_http_server(EstimateGasTest::new(false)).await; +} + +#[tokio::test] +async fn estimate_gas_after_snapshot_recovery() { + test_http_server(EstimateGasTest::new(true)).await; +} diff --git a/core/lib/zksync_core/src/api_server/web3/tests/ws.rs b/core/lib/zksync_core/src/api_server/web3/tests/ws.rs index 818a3d34564c..8294915f2702 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/ws.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/ws.rs @@ -172,17 +172,17 @@ async fn test_ws_server(test: impl WsTest) { drop(storage); let (stop_sender, stop_receiver) = watch::channel(false); - let (server_handles, pub_sub_events) = spawn_ws_server( + let (mut server_handles, pub_sub_events) = spawn_ws_server( &network_config, pool.clone(), stop_receiver, test.websocket_requests_per_minute_limit(), ) .await; - server_handles.wait_until_ready().await; + let local_addr = server_handles.wait_until_ready().await; let client = WsClientBuilder::default() - .build(format!("ws://{}", server_handles.local_addr)) + .build(format!("ws://{local_addr}")) .await .unwrap(); test.test(&client, &pool, pub_sub_events).await.unwrap(); @@ -267,7 +267,7 @@ impl WsTest for BasicSubscriptionsTest { let tx_result = execute_l2_transaction(create_l2_transaction(1, 2)); let new_tx_hash = tx_result.hash; let miniblock_number = if self.snapshot_recovery { - StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1 + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 } else { MiniblockNumber(1) }; @@ -389,12 +389,12 @@ impl WsTest for LogSubscriptionsTest { } = LogSubscriptions::new(client, &mut pub_sub_events).await?; let mut storage = pool.access_storage().await?; - let miniblock_number = if self.snapshot_recovery { - StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0 + 1 + let next_miniblock_number = if self.snapshot_recovery { + StorageInitialization::SNAPSHOT_RECOVERY_BLOCK.0 + 2 } else { 1 }; - let (tx_location, events) = store_events(&mut storage, miniblock_number, 0).await?; + let (tx_location, events) = store_events(&mut storage, next_miniblock_number, 0).await?; drop(storage); let events: Vec<_> = events.iter().collect(); @@ -403,7 +403,7 @@ impl WsTest for LogSubscriptionsTest { assert_eq!(log.transaction_index, Some(0.into())); assert_eq!(log.log_index, Some(i.into())); assert_eq!(log.transaction_hash, Some(tx_location.tx_hash)); - assert_eq!(log.block_number, Some(miniblock_number.into())); + assert_eq!(log.block_number, Some(next_miniblock_number.into())); } assert_logs_match(&all_logs, &events); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 4a3f1b0e593d..de90e5213e5d 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -415,8 +415,8 @@ pub async fn initialize_components( let elapsed = started_at.elapsed(); APP_METRICS.init_latency[&InitStage::HttpApi].set(elapsed); tracing::info!( - "Initialized HTTP API on {:?} in {elapsed:?}", - server_handles.local_addr + "Initialized HTTP API on port {:?} in {elapsed:?}", + api_config.web3_json_rpc.http_port ); } @@ -453,8 +453,8 @@ pub async fn initialize_components( let elapsed = started_at.elapsed(); APP_METRICS.init_latency[&InitStage::WsApi].set(elapsed); tracing::info!( - "initialized WS API on {:?} in {elapsed:?}", - server_handles.local_addr + "Initialized WS API on port {} in {elapsed:?}", + api_config.web3_json_rpc.ws_port ); } diff --git a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs index d4f3e4961945..c00da131bab5 100644 --- a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs +++ b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs @@ -13,7 +13,7 @@ use zksync_types::H256; use zksync_types::{get_nonce_key, Address, Nonce, Transaction, VmVersion}; use super::{metrics::KEEPER_METRICS, types::MempoolGuard}; -use crate::{api_server::execution_sandbox::BlockArgs, fee_model::BatchFeeModelInputProvider}; +use crate::{fee_model::BatchFeeModelInputProvider, utils::pending_protocol_version}; /// Creates a mempool filter for L2 transactions based on the current L1 gas price. /// The filter is used to filter out transactions from the mempool that do not cover expenses @@ -89,15 +89,9 @@ impl MempoolFetcher { let latency = KEEPER_METRICS.mempool_sync.start(); let mut storage = pool.access_storage_tagged("state_keeper").await?; let mempool_info = self.mempool.get_mempool_info(); - - let latest_miniblock = BlockArgs::pending(&mut storage) - .await - .context("failed obtaining latest miniblock")?; - let protocol_version = latest_miniblock - .resolve_block_info(&mut storage) + let protocol_version = pending_protocol_version(&mut storage) .await - .with_context(|| format!("failed resolving block info for {latest_miniblock:?}"))? - .protocol_version; + .context("failed getting pending protocol version")?; let l2_tx_filter = l2_tx_filter( self.batch_fee_input_provider.as_ref(), diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index fc78b7d9678b..1c57bc075ad7 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -489,24 +489,40 @@ pub(super) async fn mock_l1_batch_hash_computation(pool: ConnectionPool, number: /// Returns tx hashes of all generated transactions, grouped by the L1 batch. pub(super) async fn run_state_keeper_with_multiple_l1_batches( pool: ConnectionPool, -) -> Vec> { - ensure_genesis(&mut pool.access_storage().await.unwrap()).await; + snapshot_recovery: bool, +) -> (SnapshotRecoveryStatus, Vec>) { + let mut storage = pool.access_storage().await.unwrap(); + let snapshot = if snapshot_recovery { + prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), MiniblockNumber(42), &[]).await + } else { + ensure_genesis(&mut storage).await; + genesis_snapshot_recovery_status() + }; + drop(storage); - let l1_batch = open_l1_batch(1, 1, 1); + let l1_batch = open_l1_batch( + snapshot.l1_batch_number.0 + 1, + snapshot.miniblock_timestamp + 1, + snapshot.miniblock_number.0 + 1, + ); let first_tx = create_l2_transaction(10, 100); let first_tx_hash = first_tx.hash(); let first_tx = SyncAction::Tx(Box::new(first_tx.into())); let first_l1_batch_actions = vec![l1_batch, first_tx, SyncAction::SealMiniblock]; let fictive_miniblock = SyncAction::Miniblock { - number: MiniblockNumber(2), - timestamp: 2, + number: snapshot.miniblock_number + 2, + timestamp: snapshot.miniblock_timestamp + 2, virtual_blocks: 0, }; let seal_l1_batch = SyncAction::SealBatch { virtual_blocks: 0 }; let fictive_miniblock_actions = vec![fictive_miniblock, seal_l1_batch]; - let l1_batch = open_l1_batch(2, 3, 3); + let l1_batch = open_l1_batch( + snapshot.l1_batch_number.0 + 2, + snapshot.miniblock_timestamp + 3, + snapshot.miniblock_number.0 + 3, + ); let second_tx = create_l2_transaction(10, 100); let second_tx_hash = second_tx.hash(); let second_tx = SyncAction::Tx(Box::new(second_tx.into())); @@ -524,20 +540,23 @@ pub(super) async fn run_state_keeper_with_multiple_l1_batches( actions_sender.push_actions(fictive_miniblock_actions).await; actions_sender.push_actions(second_l1_batch_actions).await; - let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1)); + let hash_task = tokio::spawn(mock_l1_batch_hash_computation( + pool.clone(), + snapshot.l1_batch_number.0 + 1, + )); // Wait until the miniblocks are sealed. state_keeper - .wait(|state| state.get_local_block() == MiniblockNumber(3)) + .wait(|state| state.get_local_block() == snapshot.miniblock_number + 3) .await; hash_task.await.unwrap(); - vec![vec![first_tx_hash], vec![second_tx_hash]] + (snapshot, vec![vec![first_tx_hash], vec![second_tx_hash]]) } #[tokio::test] async fn external_io_with_multiple_l1_batches() { let pool = ConnectionPool::test_pool().await; - run_state_keeper_with_multiple_l1_batches(pool.clone()).await; + run_state_keeper_with_multiple_l1_batches(pool.clone(), false).await; let mut storage = pool.access_storage().await.unwrap(); let l1_batch_header = storage @@ -650,23 +669,23 @@ async fn fetcher_basics() { #[tokio::test] async fn fetcher_with_real_server(snapshot_recovery: bool) { let pool = ConnectionPool::test_pool().await; - // Fill in transactions grouped in multiple miniblocks in the storage. + // Fill in transactions grouped in multiple L1 batches in the storage. We need at least one L1 batch, + // so that the API server doesn't hang up waiting for it. let (snapshot, tx_hashes) = - run_state_keeper_with_multiple_miniblocks(pool.clone(), snapshot_recovery).await; - let mut tx_hashes = VecDeque::from(tx_hashes); + run_state_keeper_with_multiple_l1_batches(pool.clone(), snapshot_recovery).await; + let mut tx_hashes: VecDeque<_> = tx_hashes.into_iter().flatten().collect(); // Start the API server. let network_config = NetworkConfig::for_tests(); let (stop_sender, stop_receiver) = watch::channel(false); - let server_handles = spawn_http_server( + let mut server_handles = spawn_http_server( &network_config, pool.clone(), Default::default(), stop_receiver.clone(), ) .await; - server_handles.wait_until_ready().await; - let server_addr = &server_handles.local_addr; + let server_addr = &server_handles.wait_until_ready().await; // Start the fetcher connected to the API server. let sync_state = SyncState::default(); @@ -688,10 +707,12 @@ async fn fetcher_with_real_server(snapshot_recovery: bool) { // Check generated actions. let mut current_miniblock_number = snapshot.miniblock_number; + let mut current_l1_batch_number = snapshot.l1_batch_number + 1; let mut tx_count_in_miniblock = 0; let miniblock_number_to_tx_count = HashMap::from([ - (snapshot.miniblock_number + 1, 5), - (snapshot.miniblock_number + 2, 3), + (snapshot.miniblock_number + 1, 1), + (snapshot.miniblock_number + 2, 0), + (snapshot.miniblock_number + 3, 1), ]); let started_at = Instant::now(); let deadline = started_at + TEST_TIMEOUT; @@ -699,18 +720,20 @@ async fn fetcher_with_real_server(snapshot_recovery: bool) { let action = tokio::time::timeout_at(deadline.into(), actions.recv_action()) .await .unwrap(); - match action { + match dbg!(action) { SyncAction::OpenBatch { number, first_miniblock_info, .. } => { - assert_eq!(number, snapshot.l1_batch_number + 1); + assert_eq!(number, current_l1_batch_number); current_miniblock_number += 1; // First miniblock is implicitly opened tx_count_in_miniblock = 0; assert_eq!(first_miniblock_info.0, current_miniblock_number); } - SyncAction::SealBatch { .. } => unreachable!("L1 batches are not sealed in test"), + SyncAction::SealBatch { .. } => { + current_l1_batch_number += 1; + } SyncAction::Miniblock { number, .. } => { current_miniblock_number += 1; tx_count_in_miniblock = 0; @@ -725,7 +748,7 @@ async fn fetcher_with_real_server(snapshot_recovery: bool) { tx_count_in_miniblock, miniblock_number_to_tx_count[¤t_miniblock_number] ); - if current_miniblock_number == snapshot.miniblock_number + 2 { + if current_miniblock_number == snapshot.miniblock_number + 3 { break; } } diff --git a/core/lib/zksync_core/src/utils/mod.rs b/core/lib/zksync_core/src/utils/mod.rs index 9b3fa3da7991..ad2e4bd05412 100644 --- a/core/lib/zksync_core/src/utils/mod.rs +++ b/core/lib/zksync_core/src/utils/mod.rs @@ -1,12 +1,16 @@ //! Miscellaneous utils used by multiple components. -use std::{future::Future, time::Duration}; +use std::{ + future::Future, + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; use anyhow::Context as _; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, StorageProcessor}; -use zksync_types::L1BatchNumber; +use zksync_types::{L1BatchNumber, ProtocolVersionId}; #[cfg(test)] pub(crate) mod testonly; @@ -126,6 +130,39 @@ pub(crate) async fn projected_first_l1_batch( Ok(snapshot_recovery.map_or(L1BatchNumber(0), |recovery| recovery.l1_batch_number + 1)) } +/// Obtains a protocol version projected to be applied for the next miniblock. This is either the version used by the last +/// sealed miniblock, or (if there are no miniblocks), one referenced in the snapshot recovery record. +pub(crate) async fn pending_protocol_version( + storage: &mut StorageProcessor<'_>, +) -> anyhow::Result { + static WARNED_ABOUT_NO_VERSION: AtomicBool = AtomicBool::new(false); + + let last_miniblock = storage + .blocks_dal() + .get_last_sealed_miniblock_header() + .await + .context("failed getting last sealed miniblock")?; + if let Some(last_miniblock) = last_miniblock { + return Ok(last_miniblock.protocol_version.unwrap_or_else(|| { + // Protocol version should be set for the most recent miniblock even in cases it's not filled + // for old miniblocks, hence the warning. We don't want to rely on this assumption, so we treat + // the lack of it as in other similar places, replacing with the default value. + if !WARNED_ABOUT_NO_VERSION.fetch_or(true, Ordering::Relaxed) { + tracing::warn!("Protocol version not set for recent miniblock: {last_miniblock:?}"); + } + ProtocolVersionId::last_potentially_undefined() + })); + } + // No miniblocks in the storage; use snapshot recovery information. + let snapshot_recovery = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .context("failed getting snapshot recovery status")? + .context("storage contains neither miniblocks, nor snapshot recovery info")?; + Ok(snapshot_recovery.protocol_version) +} + #[cfg(test)] mod tests { use zksync_types::L2ChainId;