From 14224812a911c051f762a8ce07483c7afcf17a02 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 13 Mar 2024 20:57:40 +0200 Subject: [PATCH] chore(db): Remove storage logs dedup migration artifacts (#1391) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Removes artifacts associated with the storage logs dedup migration. - Does some other minor chores regarding DAL (more error propagation, guaranteeing lossless conversions, etc.). ## Why ❔ The storage logs dedup migration was completed on all environments. Artifacts supporting it are now obsolete and can be removed from the codebase. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. --- Cargo.lock | 12 -- Cargo.toml | 1 - .../storage_logs_dedup_migration/Cargo.toml | 21 -- .../src/consistency.rs | 66 ------- .../storage_logs_dedup_migration/src/main.rs | 172 ----------------- ...dcde721ca1c652ae2f8db41fb75cecdecb674.json | 22 --- ...7b84c5fd52f0352f0f0e311d054cb9e45b07e.json | 22 --- ...fe400fcba3bfc5cf1b7c2b801508f6673d94e.json | 23 +++ ...78f42a877039094c3d6354928a03dad29451a.json | 15 -- ...98129b44534062f524823666ed432d2fcd345.json | 12 -- ...f1719fd1284af1dbb60ea128550224b52da93.json | 24 +++ ...8aa0f8525832cb4075e831c0d4b23c5675b99.json | 24 --- ...a58f82acd817932415f04bcbd05442ad80c2b.json | 23 --- .../src/basic_witness_input_producer_dal.rs | 8 +- core/lib/dal/src/blocks_dal.rs | 96 +++++----- core/lib/dal/src/blocks_web3_dal.rs | 24 +-- core/lib/dal/src/eth_sender_dal.rs | 10 +- core/lib/dal/src/events_dal.rs | 8 +- core/lib/dal/src/events_web3_dal.rs | 7 +- core/lib/dal/src/factory_deps_dal.rs | 6 +- core/lib/dal/src/fri_gpu_prover_queue_dal.rs | 18 +- core/lib/dal/src/fri_proof_compressor_dal.rs | 12 +- core/lib/dal/src/fri_prover_dal.rs | 31 +-- .../fri_scheduler_dependency_tracker_dal.rs | 6 +- core/lib/dal/src/fri_witness_generator_dal.rs | 60 +++--- core/lib/dal/src/lib.rs | 3 + .../lib/dal/src/models/storage_transaction.rs | 4 +- core/lib/dal/src/models/tests.rs | 10 +- core/lib/dal/src/proof_generation_dal.rs | 6 +- core/lib/dal/src/protocol_versions_dal.rs | 2 +- .../lib/dal/src/protocol_versions_web3_dal.rs | 2 +- core/lib/dal/src/snapshot_recovery_dal.rs | 8 +- core/lib/dal/src/snapshots_creator_dal.rs | 39 +--- core/lib/dal/src/storage_logs_dal.rs | 181 +++++------------- core/lib/dal/src/storage_logs_dedup_dal.rs | 52 ++--- core/lib/dal/src/storage_web3_dal.rs | 135 +++++-------- core/lib/dal/src/sync_dal.rs | 2 +- core/lib/dal/src/transactions_dal.rs | 26 +-- core/lib/dal/src/transactions_web3_dal.rs | 4 +- core/lib/snapshots_applier/src/lib.rs | 4 +- core/lib/state/src/postgres/mod.rs | 59 ++++-- core/lib/state/src/postgres/tests.rs | 28 +-- core/lib/state/src/rocksdb/recovery.rs | 2 +- core/lib/state/src/rocksdb/tests.rs | 1 + .../src/metadata_calculator/recovery/mod.rs | 2 +- .../src/metadata_calculator/tests.rs | 3 +- .../src/metadata_calculator/updater.rs | 3 +- .../batch_executor/tests/tester.rs | 1 + .../src/state_keeper/io/seal_logic.rs | 3 +- 49 files changed, 431 insertions(+), 872 deletions(-) delete mode 100644 core/bin/storage_logs_dedup_migration/Cargo.toml delete mode 100644 core/bin/storage_logs_dedup_migration/src/consistency.rs delete mode 100644 core/bin/storage_logs_dedup_migration/src/main.rs delete mode 100644 core/lib/dal/.sqlx/query-0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674.json delete mode 100644 core/lib/dal/.sqlx/query-1ad3bbd791f3ff0d31683bf59187b84c5fd52f0352f0f0e311d054cb9e45b07e.json create mode 100644 core/lib/dal/.sqlx/query-2b1136c7781bcdbd9d5d1e6f900fe400fcba3bfc5cf1b7c2b801508f6673d94e.json delete mode 100644 core/lib/dal/.sqlx/query-2d87b294817859e42258136b1cb78f42a877039094c3d6354928a03dad29451a.json delete mode 100644 core/lib/dal/.sqlx/query-72ff9df79e78129cb96d14ece0198129b44534062f524823666ed432d2fcd345.json create mode 100644 core/lib/dal/.sqlx/query-be2c8e525d6867c0d2bd254c73ef1719fd1284af1dbb60ea128550224b52da93.json delete mode 100644 core/lib/dal/.sqlx/query-c192377c08abab9306c5b0844368aa0f8525832cb4075e831c0d4b23c5675b99.json delete mode 100644 core/lib/dal/.sqlx/query-fde16cd2d3de03f4b61625fa453a58f82acd817932415f04bcbd05442ad80c2b.json diff --git a/Cargo.lock b/Cargo.lock index 26b509410e19..048d0cd7f9cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6365,18 +6365,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "storage_logs_dedup_migration" -version = "0.1.0" -dependencies = [ - "clap 4.4.6", - "tokio", - "zksync_config", - "zksync_dal", - "zksync_env_config", - "zksync_types", -] - [[package]] name = "stringprep" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index a7dab6c4b367..fba46b99db54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ members = [ "core/bin/external_node", "core/bin/merkle_tree_consistency_checker", "core/bin/snapshots_creator", - "core/bin/storage_logs_dedup_migration", "core/bin/system-constants-generator", "core/bin/verified_sources_fetcher", "core/bin/zksync_server", diff --git a/core/bin/storage_logs_dedup_migration/Cargo.toml b/core/bin/storage_logs_dedup_migration/Cargo.toml deleted file mode 100644 index fb2c5c1d68e0..000000000000 --- a/core/bin/storage_logs_dedup_migration/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "storage_logs_dedup_migration" -version = "0.1.0" -edition = "2021" -license = "MIT OR Apache-2.0" - -[[bin]] -name = "storage_logs_dedup_migration" -path = "src/main.rs" - -[[bin]] -name = "storage_logs_dedup_migration_consistency_checker" -path = "src/consistency.rs" - -[dependencies] -tokio = { version = "1" } -zksync_dal = { path = "../../lib/dal" } -zksync_types = { path = "../../lib/types" } -zksync_config = { path = "../../lib/config" } -zksync_env_config = { path = "../../lib/env_config" } -clap = { version = "4.2.4", features = ["derive"] } diff --git a/core/bin/storage_logs_dedup_migration/src/consistency.rs b/core/bin/storage_logs_dedup_migration/src/consistency.rs deleted file mode 100644 index dc0b3da389c2..000000000000 --- a/core/bin/storage_logs_dedup_migration/src/consistency.rs +++ /dev/null @@ -1,66 +0,0 @@ -use clap::Parser; -use zksync_config::PostgresConfig; -use zksync_dal::ConnectionPool; -use zksync_env_config::FromEnv; -use zksync_types::MiniblockNumber; - -const MIGRATED_TABLE: &str = "storage_logs"; -const NOT_MIGRATED_TABLE: &str = "storage_logs_backup"; - -#[derive(Debug, Parser)] -#[command( - author = "Matter Labs", - about = "Consistency checker for the migration" -)] -struct Cli { - /// Miniblock number to start check from. - #[arg(long)] - from_miniblock: u32, - /// Miniblock number to check up to. - #[arg(long)] - to_miniblock: u32, -} - -#[tokio::main] -async fn main() { - let config = PostgresConfig::from_env().unwrap(); - let opt = Cli::parse(); - let pool = ConnectionPool::singleton(config.replica_url().unwrap()) - .build() - .await - .unwrap(); - let mut connection = pool.access_storage().await.unwrap(); - - println!( - "Consistency check started for miniblock range {}..={}", - opt.from_miniblock, opt.to_miniblock - ); - - for miniblock_number in opt.from_miniblock..=opt.to_miniblock { - let miniblock_number = MiniblockNumber(miniblock_number); - // Load all storage logs of miniblock. - let storage_logs = connection - .storage_logs_dal() - .get_miniblock_storage_logs_from_table(miniblock_number, NOT_MIGRATED_TABLE) - .await; - - for (hashed_key, _, _) in storage_logs { - let value_before_migration = connection - .storage_logs_dal() - .get_storage_value_from_table(hashed_key, miniblock_number, NOT_MIGRATED_TABLE) - .await; - let value_after_migration = connection - .storage_logs_dal() - .get_storage_value_from_table(hashed_key, miniblock_number, MIGRATED_TABLE) - .await; - assert_eq!( - value_before_migration, value_after_migration, - "Found divergency for hashed_key = {hashed_key:?}, miniblock {miniblock_number}" - ); - } - - println!("Processed miniblock {miniblock_number}"); - } - - println!("Finished"); -} diff --git a/core/bin/storage_logs_dedup_migration/src/main.rs b/core/bin/storage_logs_dedup_migration/src/main.rs deleted file mode 100644 index fcaebafc0fe1..000000000000 --- a/core/bin/storage_logs_dedup_migration/src/main.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::collections::hash_map::{Entry, HashMap}; - -use clap::Parser; -use zksync_config::PostgresConfig; -use zksync_dal::ConnectionPool; -use zksync_env_config::FromEnv; -use zksync_types::{MiniblockNumber, H256}; - -/// When the threshold is reached then the migration is blocked on vacuuming. -const UNVACUUMED_ROWS_THRESHOLD: usize = 2_000_000; - -#[derive(Debug, Parser)] -#[command( - author = "Matter Labs", - about = "Migration that deduplicates rows in storage_logs DB table" -)] -struct Cli { - /// Miniblock number to start migration from. - #[arg(long)] - start_from_miniblock: u32, -} - -/// Blockchain state cache -struct StateCache { - /// (hashed_key => value) mapping. - pub storage: HashMap, - /// Miniblock number cache is valid for. - pub miniblock: Option, - /// Flag indicating if state is initially empty. - pub is_state_initially_empty: bool, -} - -impl StateCache { - /// Loads value from state if present. - pub fn get_value(&mut self, hashed_key: H256) -> Option { - if let Entry::Vacant(e) = self.storage.entry(hashed_key) { - if self.is_state_initially_empty { - e.insert(H256::zero()); - } - } - - self.storage.get(&hashed_key).copied() - } -} - -#[tokio::main] -async fn main() { - let config = PostgresConfig::from_env().unwrap(); - let opt = Cli::parse(); - let pool = ConnectionPool::singleton(config.master_url().unwrap()) - .build() - .await - .unwrap(); - let mut connection = pool.access_storage().await.unwrap(); - - let sealed_miniblock = connection - .blocks_dal() - .get_sealed_miniblock_number() - .await - .unwrap() - .expect("Cannot start migration for Postgres recovered from snapshot"); - println!( - "Migration started for miniblock range {}..={}", - opt.start_from_miniblock, sealed_miniblock - ); - - let (previous_miniblock, is_state_initially_empty) = if opt.start_from_miniblock == 0 { - (None, true) - } else { - (Some((opt.start_from_miniblock - 1).into()), false) - }; - - let mut state_cache = StateCache { - storage: HashMap::new(), - miniblock: previous_miniblock, - is_state_initially_empty, - }; - - let mut number_of_unvacuum_rows = 0; - - for miniblock_number in opt.start_from_miniblock..=sealed_miniblock.0 { - let miniblock_number = MiniblockNumber(miniblock_number); - - // Load all storage logs of miniblock. - let storage_logs = connection - .storage_logs_dal() - .get_miniblock_storage_logs(miniblock_number) - .await; - let initial_storage_logs_count = storage_logs.len(); - - // Load previous values from memory. - let prev_values: HashMap<_, _> = storage_logs - .iter() - .map(|(hashed_key, _, _)| (*hashed_key, state_cache.get_value(*hashed_key))) - .collect(); - - // Load missing previous values from database. - let missing_keys: Vec<_> = prev_values - .iter() - .filter_map(|(key, value)| (value.is_none()).then_some(*key)) - .collect(); - - let in_memory_prev_values_iter = prev_values.into_iter().filter_map(|(k, v)| Some((k, v?))); - let prev_values: HashMap<_, _> = if miniblock_number.0 == 0 || missing_keys.is_empty() { - assert!(missing_keys.is_empty()); - in_memory_prev_values_iter.collect() - } else { - let values_for_missing_keys: HashMap<_, _> = connection - .storage_logs_dal() - .get_storage_values(&missing_keys, miniblock_number - 1) - .await - .expect("failed getting storage values for missing keys"); - - in_memory_prev_values_iter - .chain( - values_for_missing_keys - .into_iter() - .map(|(k, v)| (k, v.unwrap_or_else(H256::zero))), - ) - .collect() - }; - - // Effective state for keys that were touched in the current miniblock. - let current_values: HashMap<_, _> = storage_logs - .into_iter() - .map(|(hashed_key, value, operation_number)| (hashed_key, (value, operation_number))) - .collect(); - - // Collect effective storage logs of the miniblock and their operation numbers. - let (effective_logs, op_numbers_to_retain): (Vec<_>, Vec<_>) = current_values - .into_iter() - .filter_map(|(hashed_key, (value, operation_number))| { - let prev_value = prev_values[&hashed_key]; - (value != prev_value).then_some(((hashed_key, value), operation_number as i32)) - }) - .unzip(); - - // Remove others, i.e. non-effective logs from DB. - connection - .storage_logs_dal() - .retain_storage_logs(miniblock_number, &op_numbers_to_retain) - .await; - number_of_unvacuum_rows += initial_storage_logs_count - op_numbers_to_retain.len(); - - // Update state cache. - for (key, value) in effective_logs { - state_cache.storage.insert(key, value); - } - state_cache.miniblock = Some(miniblock_number); - - if miniblock_number.0 < 100 || miniblock_number.0 % 100 == 0 { - println!("Deduplicated logs for miniblock {miniblock_number}, number of unvacuumed rows {number_of_unvacuum_rows}"); - } - - if number_of_unvacuum_rows > UNVACUUMED_ROWS_THRESHOLD { - let started_at = std::time::Instant::now(); - println!("Starting vacuuming"); - connection.storage_logs_dal().vacuum_storage_logs().await; - number_of_unvacuum_rows = 0; - println!("Vacuum finished in {:?}", started_at.elapsed()); - } - } - - if number_of_unvacuum_rows > 0 { - let started_at = std::time::Instant::now(); - println!("Starting vacuuming"); - connection.storage_logs_dal().vacuum_storage_logs().await; - println!("Vacuum finished in {:?}", started_at.elapsed()); - } - - println!("Finished"); -} diff --git a/core/lib/dal/.sqlx/query-0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674.json b/core/lib/dal/.sqlx/query-0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674.json deleted file mode 100644 index f0e439d0e0b8..000000000000 --- a/core/lib/dal/.sqlx/query-0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT COUNT(*) FROM storage_logs WHERE miniblock_number = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674" -} diff --git a/core/lib/dal/.sqlx/query-1ad3bbd791f3ff0d31683bf59187b84c5fd52f0352f0f0e311d054cb9e45b07e.json b/core/lib/dal/.sqlx/query-1ad3bbd791f3ff0d31683bf59187b84c5fd52f0352f0f0e311d054cb9e45b07e.json deleted file mode 100644 index 460f81615bf4..000000000000 --- a/core/lib/dal/.sqlx/query-1ad3bbd791f3ff0d31683bf59187b84c5fd52f0352f0f0e311d054cb9e45b07e.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT\n ON (hashed_key) hashed_key\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n miniblock_number > $1\n ) inn\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "hashed_key", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "1ad3bbd791f3ff0d31683bf59187b84c5fd52f0352f0f0e311d054cb9e45b07e" -} diff --git a/core/lib/dal/.sqlx/query-2b1136c7781bcdbd9d5d1e6f900fe400fcba3bfc5cf1b7c2b801508f6673d94e.json b/core/lib/dal/.sqlx/query-2b1136c7781bcdbd9d5d1e6f900fe400fcba3bfc5cf1b7c2b801508f6673d94e.json new file mode 100644 index 000000000000..58b1236e6f6d --- /dev/null +++ b/core/lib/dal/.sqlx/query-2b1136c7781bcdbd9d5d1e6f900fe400fcba3bfc5cf1b7c2b801508f6673d94e.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bytecode\n FROM\n factory_deps\n WHERE\n bytecode_hash = $1\n AND miniblock_number <= $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bytecode", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2b1136c7781bcdbd9d5d1e6f900fe400fcba3bfc5cf1b7c2b801508f6673d94e" +} diff --git a/core/lib/dal/.sqlx/query-2d87b294817859e42258136b1cb78f42a877039094c3d6354928a03dad29451a.json b/core/lib/dal/.sqlx/query-2d87b294817859e42258136b1cb78f42a877039094c3d6354928a03dad29451a.json deleted file mode 100644 index dbd2e21c1a3e..000000000000 --- a/core/lib/dal/.sqlx/query-2d87b294817859e42258136b1cb78f42a877039094c3d6354928a03dad29451a.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM storage_logs\n WHERE\n miniblock_number = $1\n AND operation_number != ALL ($2)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4Array" - ] - }, - "nullable": [] - }, - "hash": "2d87b294817859e42258136b1cb78f42a877039094c3d6354928a03dad29451a" -} diff --git a/core/lib/dal/.sqlx/query-72ff9df79e78129cb96d14ece0198129b44534062f524823666ed432d2fcd345.json b/core/lib/dal/.sqlx/query-72ff9df79e78129cb96d14ece0198129b44534062f524823666ed432d2fcd345.json deleted file mode 100644 index 75f288ee14f6..000000000000 --- a/core/lib/dal/.sqlx/query-72ff9df79e78129cb96d14ece0198129b44534062f524823666ed432d2fcd345.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n VACUUM storage_logs\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "72ff9df79e78129cb96d14ece0198129b44534062f524823666ed432d2fcd345" -} diff --git a/core/lib/dal/.sqlx/query-be2c8e525d6867c0d2bd254c73ef1719fd1284af1dbb60ea128550224b52da93.json b/core/lib/dal/.sqlx/query-be2c8e525d6867c0d2bd254c73ef1719fd1284af1dbb60ea128550224b52da93.json new file mode 100644 index 000000000000..21964d27d60a --- /dev/null +++ b/core/lib/dal/.sqlx/query-be2c8e525d6867c0d2bd254c73ef1719fd1284af1dbb60ea128550224b52da93.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bytecode\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n AND storage_logs.miniblock_number <= $2\n ORDER BY\n storage_logs.miniblock_number DESC,\n storage_logs.operation_number DESC\n LIMIT\n 1\n ) t\n JOIN factory_deps ON value = factory_deps.bytecode_hash\n WHERE\n value != $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bytecode", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int8", + "Bytea" + ] + }, + "nullable": [ + false + ] + }, + "hash": "be2c8e525d6867c0d2bd254c73ef1719fd1284af1dbb60ea128550224b52da93" +} diff --git a/core/lib/dal/.sqlx/query-c192377c08abab9306c5b0844368aa0f8525832cb4075e831c0d4b23c5675b99.json b/core/lib/dal/.sqlx/query-c192377c08abab9306c5b0844368aa0f8525832cb4075e831c0d4b23c5675b99.json deleted file mode 100644 index 58c336bb8328..000000000000 --- a/core/lib/dal/.sqlx/query-c192377c08abab9306c5b0844368aa0f8525832cb4075e831c0d4b23c5675b99.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n bytecode\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n AND storage_logs.miniblock_number <= $2\n ORDER BY\n storage_logs.miniblock_number DESC,\n storage_logs.operation_number DESC\n LIMIT\n 1\n ) t\n JOIN factory_deps ON value = factory_deps.bytecode_hash\n WHERE\n value != $3\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "bytecode", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea", - "Int8", - "Bytea" - ] - }, - "nullable": [ - false - ] - }, - "hash": "c192377c08abab9306c5b0844368aa0f8525832cb4075e831c0d4b23c5675b99" -} diff --git a/core/lib/dal/.sqlx/query-fde16cd2d3de03f4b61625fa453a58f82acd817932415f04bcbd05442ad80c2b.json b/core/lib/dal/.sqlx/query-fde16cd2d3de03f4b61625fa453a58f82acd817932415f04bcbd05442ad80c2b.json deleted file mode 100644 index f8ad468d70db..000000000000 --- a/core/lib/dal/.sqlx/query-fde16cd2d3de03f4b61625fa453a58f82acd817932415f04bcbd05442ad80c2b.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n bytecode\n FROM\n factory_deps\n WHERE\n bytecode_hash = $1\n AND miniblock_number <= $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "bytecode", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "fde16cd2d3de03f4b61625fa453a58f82acd817932415f04bcbd05442ad80c2b" -} diff --git a/core/lib/dal/src/basic_witness_input_producer_dal.rs b/core/lib/dal/src/basic_witness_input_producer_dal.rs index ec4c35633d2f..36536ee7cab5 100644 --- a/core/lib/dal/src/basic_witness_input_producer_dal.rs +++ b/core/lib/dal/src/basic_witness_input_producer_dal.rs @@ -58,7 +58,7 @@ impl BasicWitnessInputProducerDal<'_, '_> { ($1, $2, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0), BasicWitnessInputProducerJobStatus::Queued as BasicWitnessInputProducerJobStatus, ) .instrument("create_basic_witness_input_producer_job") @@ -134,7 +134,7 @@ impl BasicWitnessInputProducerDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0), ) .fetch_optional(self.storage.conn()) .await? @@ -161,7 +161,7 @@ impl BasicWitnessInputProducerDal<'_, '_> { l1_batch_number = $2 "#, BasicWitnessInputProducerJobStatus::Successful as BasicWitnessInputProducerJobStatus, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0), duration_to_naive_time(started_at.elapsed()), object_path, ) @@ -194,7 +194,7 @@ impl BasicWitnessInputProducerDal<'_, '_> { basic_witness_input_producer_jobs.attempts "#, BasicWitnessInputProducerJobStatus::Failed as BasicWitnessInputProducerJobStatus, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0), duration_to_naive_time(started_at.elapsed()), error, BasicWitnessInputProducerJobStatus::Successful as BasicWitnessInputProducerJobStatus, diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 68b3409e456d..8b2463acc1fb 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -249,7 +249,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .instrument("get_storage_l1_batch") .with_arg("number", &number) @@ -285,7 +285,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .instrument("get_l1_batch_header") .with_arg("number", &number) @@ -308,7 +308,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .instrument("get_initial_bootloader_heap") .report_latency() @@ -337,7 +337,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .instrument("get_storage_refunds") .report_latency() @@ -369,7 +369,7 @@ impl BlocksDal<'_, '_> { WHERE l1_batch_number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .instrument("get_events_queue") .report_latency() @@ -409,8 +409,8 @@ impl BlocksDal<'_, '_> { number BETWEEN $2 AND $3 "#, eth_tx_id as i32, - number_range.start().0 as i64, - number_range.end().0 as i64 + i64::from(number_range.start().0), + i64::from(number_range.end().0) ) .execute(self.storage.conn()) .await?; @@ -426,8 +426,8 @@ impl BlocksDal<'_, '_> { number BETWEEN $2 AND $3 "#, eth_tx_id as i32, - number_range.start().0 as i64, - number_range.end().0 as i64 + i64::from(number_range.start().0), + i64::from(number_range.end().0) ) .execute(self.storage.conn()) .await?; @@ -443,8 +443,8 @@ impl BlocksDal<'_, '_> { number BETWEEN $2 AND $3 "#, eth_tx_id as i32, - number_range.start().0 as i64, - number_range.end().0 as i64 + i64::from(number_range.start().0), + i64::from(number_range.end().0) ) .execute(self.storage.conn()) .await?; @@ -485,7 +485,7 @@ impl BlocksDal<'_, '_> { // Serialization should always succeed. let used_contract_hashes = serde_json::to_value(&header.used_contract_hashes) .expect("failed to serialize used_contract_hashes to JSON value"); - let storage_refunds: Vec<_> = storage_refunds.iter().map(|n| *n as i64).collect(); + let storage_refunds: Vec<_> = storage_refunds.iter().copied().map(i64::from).collect(); let mut transaction = self.storage.start_transaction().await?; sqlx::query!( @@ -541,17 +541,17 @@ impl BlocksDal<'_, '_> { NOW() ) "#, - header.number.0 as i64, - header.l1_tx_count as i32, - header.l2_tx_count as i32, + i64::from(header.number.0), + i32::from(header.l1_tx_count), + i32::from(header.l2_tx_count), header.timestamp as i64, &l2_to_l1_logs, &header.l2_to_l1_messages, header.bloom.as_bytes(), &priority_onchain_data, - predicted_block_gas.commit as i64, - predicted_block_gas.prove as i64, - predicted_block_gas.execute as i64, + i64::from(predicted_block_gas.commit), + i64::from(predicted_block_gas.prove), + i64::from(predicted_block_gas.execute), initial_bootloader_contents, used_contract_hashes, header.base_system_contracts_hashes.bootloader.as_bytes(), @@ -574,7 +574,7 @@ impl BlocksDal<'_, '_> { VALUES ($1, '{}', $2) "#, - header.number.0 as i64, + i64::from(header.number.0), &events_queue ) .execute(transaction.conn()) @@ -616,11 +616,11 @@ impl BlocksDal<'_, '_> { VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW(), NOW()) "#, - miniblock_header.number.0 as i64, + i64::from(miniblock_header.number.0), miniblock_header.timestamp as i64, miniblock_header.hash.as_bytes(), - miniblock_header.l1_tx_count as i32, - miniblock_header.l2_tx_count as i32, + i32::from(miniblock_header.l1_tx_count), + i32::from(miniblock_header.l2_tx_count), miniblock_header.fee_account_address.as_bytes(), base_fee_per_gas, miniblock_header.batch_fee_input.l1_gas_price() as i64, @@ -635,7 +635,7 @@ impl BlocksDal<'_, '_> { .default_aa .as_bytes(), miniblock_header.protocol_version.map(|v| v as i32), - miniblock_header.virtual_blocks as i64, + i64::from(miniblock_header.virtual_blocks), miniblock_header.batch_fee_input.fair_pubdata_price() as i64, ) .execute(self.storage.conn()) @@ -716,7 +716,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - miniblock_number.0 as i64, + i64::from(miniblock_number.0), ) .fetch_optional(self.storage.conn()) .await?; @@ -771,7 +771,7 @@ impl BlocksDal<'_, '_> { "#, tree_data.hash.as_bytes(), tree_data.rollup_last_leaf_index as i64, - number.0 as i64, + i64::from(number.0), ) .instrument("save_batch_tree_data") .with_arg("number", &number) @@ -793,7 +793,7 @@ impl BlocksDal<'_, '_> { number = $1 AND hash = $2 "#, - number.0 as i64, + i64::from(number.0), tree_data.hash.as_bytes(), ) .instrument("get_matching_batch_hash") @@ -854,7 +854,7 @@ impl BlocksDal<'_, '_> { commitment_artifacts.compressed_state_diffs, commitment_artifacts.compressed_initial_writes, commitment_artifacts.compressed_repeated_writes, - number.0 as i64, + i64::from(number.0), ) .instrument("save_l1_batch_commitment_artifacts") .with_arg("number", &number) @@ -877,7 +877,7 @@ impl BlocksDal<'_, '_> { number = $1 AND commitment = $2 "#, - number.0 as i64, + i64::from(number.0), commitment_artifacts.commitment_hash.commitment.as_bytes(), ) .instrument("get_matching_batch_commitment") @@ -904,7 +904,7 @@ impl BlocksDal<'_, '_> { ($1, $2, $3) ON CONFLICT (l1_batch_number) DO NOTHING "#, - number.0 as i64, + i64::from(number.0), commitment_artifacts.aux_commitments.map(|a| a.events_queue_commitment.0.to_vec()), commitment_artifacts.aux_commitments .map(|a| a.bootloader_initial_content_commitment.0.to_vec()), @@ -1039,7 +1039,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_optional(self.storage.conn()) .await?; @@ -1190,7 +1190,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .execute(self.storage.conn()) .await?; @@ -1651,7 +1651,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -1673,7 +1673,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -1716,7 +1716,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - number.0 as i64 + i64::from(number.0) ) .fetch_optional(self.storage.conn()) .await?; @@ -1769,7 +1769,7 @@ impl BlocksDal<'_, '_> { WHERE miniblocks.l1_batch_number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await? @@ -1790,7 +1790,7 @@ impl BlocksDal<'_, '_> { &mut self, last_batch_to_keep: Option, ) -> sqlx::Result<()> { - let block_number = last_batch_to_keep.map_or(-1, |number| number.0 as i64); + let block_number = last_batch_to_keep.map_or(-1, |number| i64::from(number.0)); sqlx::query!( r#" DELETE FROM initial_writes @@ -1815,7 +1815,7 @@ impl BlocksDal<'_, '_> { &mut self, last_batch_to_keep: Option, ) -> sqlx::Result<()> { - let block_number = last_batch_to_keep.map_or(-1, |number| number.0 as i64); + let block_number = last_batch_to_keep.map_or(-1, |number| i64::from(number.0)); sqlx::query!( r#" DELETE FROM l1_batches @@ -1842,7 +1842,7 @@ impl BlocksDal<'_, '_> { &mut self, last_miniblock_to_keep: Option, ) -> sqlx::Result<()> { - let block_number = last_miniblock_to_keep.map_or(-1, |number| number.0 as i64); + let block_number = last_miniblock_to_keep.map_or(-1, |number| i64::from(number.0)); sqlx::query!( r#" DELETE FROM miniblocks @@ -1868,8 +1868,8 @@ impl BlocksDal<'_, '_> { sum: BigDecimal, } - let start = number_range.start().0 as i64; - let end = number_range.end().0 as i64; + let start = i64::from(number_range.start().0); + let end = i64::from(number_range.end().0); let query = match_query_as!( SumRow, [ @@ -1904,7 +1904,7 @@ impl BlocksDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_one(self.storage.conn()) .await?; @@ -2007,7 +2007,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -2033,7 +2033,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - miniblock_number.0 as i64 + i64::from(miniblock_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -2059,7 +2059,7 @@ impl BlocksDal<'_, '_> { WHERE number = $1 "#, - miniblock_number.0 as i64, + i64::from(miniblock_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -2256,8 +2256,8 @@ impl BlocksDal<'_, '_> { AND miniblocks.number BETWEEN $1 AND $2 AND miniblocks.fee_account_address = '\x0000000000000000000000000000000000000000'::bytea "#, - numbers.start().0 as i64, - numbers.end().0 as i64 + i64::from(numbers.start().0), + i64::from(numbers.end().0) ) .execute(self.storage.conn()) .await?; @@ -2280,7 +2280,7 @@ impl BlocksDal<'_, '_> { number = $2 "#, fee_account_address.as_bytes(), - l1_batch.0 as i64 + i64::from(l1_batch.0) ) .execute(self.storage.conn()) .await?; @@ -2305,7 +2305,7 @@ impl BlocksDal<'_, '_> { number = $2 "#, hash.as_bytes(), - batch_num.0 as i64 + i64::from(batch_num.0) ) .execute(self.storage.conn()) .await?; diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index dc3a1cc40d99..a8cc7a3f8795 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -52,7 +52,7 @@ impl BlocksWeb3Dal<'_, '_> { ORDER BY transactions.index_in_block ASC "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_all(self.storage.conn()) .await?; @@ -100,7 +100,7 @@ impl BlocksWeb3Dal<'_, '_> { SELECT l1_tx_count + l2_tx_count AS tx_count FROM miniblocks WHERE number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -129,7 +129,7 @@ impl BlocksWeb3Dal<'_, '_> { LIMIT $2 "#, - from_block.0 as i64, + i64::from(from_block.0), limit as i32 ) .fetch_all(self.storage.conn()) @@ -158,7 +158,7 @@ impl BlocksWeb3Dal<'_, '_> { ORDER BY number ASC "#, - from_block.0 as i64, + i64::from(from_block.0), ) .fetch_all(self.storage.conn()) .await?; @@ -329,7 +329,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -350,7 +350,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -376,7 +376,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE number = $1 "#, - miniblock_number.0 as i64 + i64::from(miniblock_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -399,7 +399,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_one(self.storage.conn()) .await?; @@ -460,7 +460,7 @@ impl BlocksWeb3Dal<'_, '_> { ORDER BY transactions.index_in_block "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_all(self.storage.conn()) .await? @@ -489,7 +489,7 @@ impl BlocksWeb3Dal<'_, '_> { LIMIT $2 "#, - newest_block.0 as i64, + i64::from(newest_block.0), block_count as i64 ) .fetch_all(self.storage.conn()) @@ -553,7 +553,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE miniblocks.number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .instrument("get_block_details") .with_arg("block_number", &block_number) @@ -628,7 +628,7 @@ impl BlocksWeb3Dal<'_, '_> { WHERE l1_batches.number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .instrument("get_l1_batch_details") .with_arg("l1_batch_number", &l1_batch_number) diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index f349aebad5f6..1dfbae8f0a74 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -210,8 +210,8 @@ impl EthSenderDal<'_, '_> { nonce as i64, tx_type.to_string(), address, - predicted_gas_cost as i64, - from_address.map(|a| a.0.to_vec()), + i64::from(predicted_gas_cost), + from_address.as_ref().map(Address::as_bytes), blob_sidecar.map(|sidecar| bincode::serialize(&sidecar) .expect("can always bincode serialize EthTxBlobSidecar; qed")), ) @@ -296,7 +296,7 @@ impl EthSenderDal<'_, '_> { WHERE id = $1 "#, - eth_txs_history_id as i64 + eth_txs_history_id as i32 ) .execute(self.storage.conn()) .await?; @@ -363,7 +363,7 @@ impl EthSenderDal<'_, '_> { eth_tx_id = $1 AND confirmed_at IS NOT NULL "#, - eth_tx_id as i64 + eth_tx_id as i32 ) .fetch_optional(self.storage.conn()) .await?; @@ -654,7 +654,7 @@ impl EthSenderDal<'_, '_> { ) ) "#, - last_batch_to_keep.0 as i64 + i64::from(last_batch_to_keep.0) ) .execute(self.storage.conn()) .await?; diff --git a/core/lib/dal/src/events_dal.rs b/core/lib/dal/src/events_dal.rs index fa44b187cace..1231b34e949b 100644 --- a/core/lib/dal/src/events_dal.rs +++ b/core/lib/dal/src/events_dal.rs @@ -104,7 +104,7 @@ impl EventsDal<'_, '_> { WHERE miniblock_number > $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -181,7 +181,7 @@ impl EventsDal<'_, '_> { WHERE miniblock_number > $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -263,8 +263,8 @@ impl EventsDal<'_, '_> { miniblock_number, event_index_in_block "#, - from_miniblock.0 as i64, - to_miniblock.0 as i64, + i64::from(from_miniblock.0), + i64::from(to_miniblock.0), L1_MESSENGER_ADDRESS.as_bytes(), L1_MESSENGER_BYTECODE_PUBLICATION_EVENT_SIGNATURE.as_bytes() ) diff --git a/core/lib/dal/src/events_web3_dal.rs b/core/lib/dal/src/events_web3_dal.rs index c203232e4861..8b5d5053aea1 100644 --- a/core/lib/dal/src/events_web3_dal.rs +++ b/core/lib/dal/src/events_web3_dal.rs @@ -127,9 +127,8 @@ impl EventsWeb3Dal<'_, '_> { fn build_get_logs_where_clause(&self, filter: &GetLogsFilter) -> (String, u8) { let mut arg_index = 1; - let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0 as i64); - - where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0 as i64); + let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0); + where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0); // Add filters for address (like `address = ANY($1)` or `address = $1`) if let Some(filter_sql) = @@ -245,7 +244,7 @@ impl EventsWeb3Dal<'_, '_> { miniblock_number ASC, event_index_in_block ASC "#, - from_block.0 as i64 + i64::from(from_block.0) ) .fetch_all(self.storage.conn()) .await?; diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index bb5f2937c18d..b8dc1bbbfe75 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -43,7 +43,7 @@ impl FactoryDepsDal<'_, '_> { "#, &bytecode_hashes as &[&[u8]], &bytecodes as &[&[u8]], - block_number.0 as i64, + i64::from(block_number.0) ) .execute(self.storage.conn()) .await?; @@ -147,7 +147,7 @@ impl FactoryDepsDal<'_, '_> { WHERE miniblock_number > $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_all(self.storage.conn()) .await? @@ -167,7 +167,7 @@ impl FactoryDepsDal<'_, '_> { WHERE miniblock_number > $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await?; diff --git a/core/lib/dal/src/fri_gpu_prover_queue_dal.rs b/core/lib/dal/src/fri_gpu_prover_queue_dal.rs index 56baa32ba9c8..8ea262f74030 100644 --- a/core/lib/dal/src/fri_gpu_prover_queue_dal.rs +++ b/core/lib/dal/src/fri_gpu_prover_queue_dal.rs @@ -53,7 +53,7 @@ impl FriGpuProverQueueDal<'_, '_> { gpu_prover_queue_fri.* "#, &processing_timeout, - specialized_prover_group_id as i16, + i16::from(specialized_prover_group_id), zone ) .fetch_optional(self.storage.conn()) @@ -95,9 +95,9 @@ impl FriGpuProverQueueDal<'_, '_> { zone = $4, updated_at = NOW() "#, - format!("{}", address.host), - address.port as i32, - specialized_prover_group_id as i16, + address.host.to_string(), + i32::from(address.port), + i16::from(specialized_prover_group_id), zone ) .execute(self.storage.conn()) @@ -122,9 +122,9 @@ impl FriGpuProverQueueDal<'_, '_> { AND instance_port = $3 AND zone = $4 "#, - format!("{:?}", status).to_lowercase(), - format!("{}", address.host), - address.port as i32, + format!("{status:?}").to_lowercase(), + address.host.to_string(), + i32::from(address.port), zone ) .execute(self.storage.conn()) @@ -149,8 +149,8 @@ impl FriGpuProverQueueDal<'_, '_> { AND instance_status = 'full' AND zone = $3 "#, - format!("{}", address.host), - address.port as i32, + address.host.to_string(), + i32::from(address.port), zone ) .execute(self.storage.conn()) diff --git a/core/lib/dal/src/fri_proof_compressor_dal.rs b/core/lib/dal/src/fri_proof_compressor_dal.rs index 959e4304b761..6aecc65d9a4c 100644 --- a/core/lib/dal/src/fri_proof_compressor_dal.rs +++ b/core/lib/dal/src/fri_proof_compressor_dal.rs @@ -45,7 +45,7 @@ impl FriProofCompressorDal<'_, '_> { ($1, $2, $3, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - block_number.0 as i64, + i64::from(block_number.0), fri_proof_blob_url, ProofCompressionJobStatus::Queued.to_string(), ) @@ -63,7 +63,7 @@ impl FriProofCompressorDal<'_, '_> { ($1, $2, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - block_number.0 as i64, + i64::from(block_number.0), ProofCompressionJobStatus::Skipped.to_string(), ) .fetch_optional(self.storage.conn()) @@ -125,7 +125,7 @@ impl FriProofCompressorDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -154,7 +154,7 @@ impl FriProofCompressorDal<'_, '_> { ProofCompressionJobStatus::Successful.to_string(), duration_to_naive_time(time_taken), l1_proof_blob_url, - block_number.0 as i64, + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -178,7 +178,7 @@ impl FriProofCompressorDal<'_, '_> { "#, ProofCompressionJobStatus::Failed.to_string(), error, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -232,7 +232,7 @@ impl FriProofCompressorDal<'_, '_> { l1_batch_number = $2 "#, ProofCompressionJobStatus::SentToServer.to_string(), - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await diff --git a/core/lib/dal/src/fri_prover_dal.rs b/core/lib/dal/src/fri_prover_dal.rs index f1e3091e9095..9c5fe8393991 100644 --- a/core/lib/dal/src/fri_prover_dal.rs +++ b/core/lib/dal/src/fri_prover_dal.rs @@ -324,7 +324,8 @@ impl FriProverDal<'_, '_> { id: row.id as u32, block_number: L1BatchNumber(row.l1_batch_number as u32), circuit_id: row.circuit_id as u8, - aggregation_round: AggregationRound::try_from(row.aggregation_round as i32).unwrap(), + aggregation_round: AggregationRound::try_from(i32::from(row.aggregation_round)) + .unwrap(), sequence_number: row.sequence_number as usize, depth: row.depth as u16, is_node_final_proof: row.is_node_final_proof, @@ -339,12 +340,12 @@ impl FriProverDal<'_, '_> { ) -> Option { let circuit_ids: Vec<_> = circuits_to_pick .iter() - .map(|tuple| tuple.circuit_id as i16) + .map(|tuple| i16::from(tuple.circuit_id)) .collect(); let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); let aggregation_rounds: Vec<_> = circuits_to_pick .iter() - .map(|tuple| tuple.aggregation_round as i16) + .map(|tuple| i16::from(tuple.aggregation_round)) .collect(); sqlx::query!( r#" @@ -412,7 +413,8 @@ impl FriProverDal<'_, '_> { id: row.id as u32, block_number: L1BatchNumber(row.l1_batch_number as u32), circuit_id: row.circuit_id as u8, - aggregation_round: AggregationRound::try_from(row.aggregation_round as i32).unwrap(), + aggregation_round: AggregationRound::try_from(i32::from(row.aggregation_round)) + .unwrap(), sequence_number: row.sequence_number as usize, depth: row.depth as u16, is_node_final_proof: row.is_node_final_proof, @@ -432,7 +434,7 @@ impl FriProverDal<'_, '_> { id = $2 "#, error, - id as i64, + i64::from(id) ) .execute(self.storage.conn()) .await @@ -450,7 +452,7 @@ impl FriProverDal<'_, '_> { WHERE id = $1 "#, - id as i64, + i64::from(id) ) .fetch_optional(self.storage.conn()) .await? @@ -486,7 +488,7 @@ impl FriProverDal<'_, '_> { "#, duration_to_naive_time(time_taken), blob_url, - id as i64, + i64::from(id) ) .instrument("save_fri_proof") .report_latency() @@ -498,7 +500,8 @@ impl FriProverDal<'_, '_> { id: row.id as u32, block_number: L1BatchNumber(row.l1_batch_number as u32), circuit_id: row.circuit_id as u8, - aggregation_round: AggregationRound::try_from(row.aggregation_round as i32).unwrap(), + aggregation_round: AggregationRound::try_from(i32::from(row.aggregation_round)) + .unwrap(), sequence_number: row.sequence_number as usize, depth: row.depth as u16, is_node_final_proof: row.is_node_final_proof, @@ -600,12 +603,12 @@ impl FriProverDal<'_, '_> { SET updated_at = NOW() "#, - l1_batch_number.0 as i64, - circuit_id as i16, + i64::from(l1_batch_number.0), + i16::from(circuit_id), circuit_blob_url, aggregation_round as i64, sequence_number as i64, - depth as i32, + i32::from(depth), is_node_final_proof, protocol_version_id as i32, ) @@ -739,7 +742,7 @@ impl FriProverDal<'_, '_> { id = $2 "#, status, - id as i64, + i64::from(id) ) .execute(self.storage.conn()) .await @@ -756,7 +759,7 @@ impl FriProverDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .execute(self.storage.conn()) .await @@ -778,7 +781,7 @@ impl FriProverDal<'_, '_> { AND status = 'successful' AND aggregation_round = $2 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0), AggregationRound::Scheduler as i16, ) .fetch_optional(self.storage.conn()) diff --git a/core/lib/dal/src/fri_scheduler_dependency_tracker_dal.rs b/core/lib/dal/src/fri_scheduler_dependency_tracker_dal.rs index 3312e4265179..b6f2058f6cf9 100644 --- a/core/lib/dal/src/fri_scheduler_dependency_tracker_dal.rs +++ b/core/lib/dal/src/fri_scheduler_dependency_tracker_dal.rs @@ -96,8 +96,8 @@ impl FriSchedulerDependencyTrackerDal<'_, '_> { ) }; sqlx::query(&query) - .bind(final_prover_job_id as i64) - .bind(l1_batch_number.0 as i64) + .bind(i64::from(final_prover_job_id)) + .bind(i64::from(l1_batch_number.0)) .execute(self.storage.conn()) .await .unwrap(); @@ -116,7 +116,7 @@ impl FriSchedulerDependencyTrackerDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await diff --git a/core/lib/dal/src/fri_witness_generator_dal.rs b/core/lib/dal/src/fri_witness_generator_dal.rs index 5fc34b98063b..b3b22e871886 100644 --- a/core/lib/dal/src/fri_witness_generator_dal.rs +++ b/core/lib/dal/src/fri_witness_generator_dal.rs @@ -60,7 +60,7 @@ impl FriWitnessGeneratorDal<'_, '_> { ($1, $2, $3, $4, 'queued', NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - block_number.0 as i64, + i64::from(block_number.0), object_key, protocol_version_id as i32, blobs_raw, @@ -108,7 +108,7 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING witness_inputs_fri.* "#, - last_l1_batch_to_process as i64, + i64::from(last_l1_batch_to_process), &protocol_versions[..], picked_by, ) @@ -138,7 +138,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -161,8 +161,8 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE l1_batch_number = $2 "#, - format!("{}", status), - block_number.0 as i64 + status.to_string(), + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -185,7 +185,7 @@ impl FriWitnessGeneratorDal<'_, '_> { l1_batch_number = $2 "#, duration_to_naive_time(time_taken), - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -204,7 +204,7 @@ impl FriWitnessGeneratorDal<'_, '_> { l1_batch_number = $2 "#, error, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -223,7 +223,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id = $2 "#, error, - id as i64 + i64::from(id) ) .execute(self.storage.conn()) .await @@ -242,7 +242,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id = $2 "#, duration_to_naive_time(time_taken), - id as i64 + i64::from(id) ) .execute(self.storage.conn()) .await @@ -330,8 +330,8 @@ impl FriWitnessGeneratorDal<'_, '_> { SET updated_at = NOW() "#, - block_number.0 as i64, - *circuit_id as i16, + i64::from(block_number.0), + i16::from(*circuit_id), closed_form_inputs_url, *number_of_basic_circuits as i32, protocol_version_id as i32, @@ -369,7 +369,7 @@ impl FriWitnessGeneratorDal<'_, '_> { SET updated_at = NOW() "#, - block_number.0 as i64, + i64::from(block_number.0), scheduler_partial_input_blob_url, protocol_version_id as i32, ) @@ -388,7 +388,7 @@ impl FriWitnessGeneratorDal<'_, '_> { SET updated_at = NOW() "#, - block_number.0 as i64, + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -470,7 +470,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE id = $1 "#, - id as i64, + i64::from(id) ) .fetch_optional(self.storage.conn()) .await @@ -502,10 +502,10 @@ impl FriWitnessGeneratorDal<'_, '_> { ORDER BY sequence_number ASC; "#, - block_number.0 as i64, - circuit_id as i16, + i64::from(block_number.0), + i16::from(circuit_id), round as i16, - depth as i32, + i32::from(depth) ) .fetch_all(self.storage.conn()) .await @@ -575,9 +575,9 @@ impl FriWitnessGeneratorDal<'_, '_> { AND depth = $4 "#, url, - block_number.0 as i64, - circuit_id as i16, - depth as i32, + i64::from(block_number.0), + i16::from(circuit_id), + i32::from(depth), number_of_dependent_jobs as i32, ) .execute(self.storage.conn()) @@ -662,7 +662,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE id = $1 "#, - id as i64, + i64::from(id) ) .fetch_optional(self.storage.conn()) .await @@ -684,7 +684,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id = $2 "#, error, - id as i64 + i64::from(id) ) .execute(self.storage.conn()) .await @@ -703,7 +703,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id = $2 "#, duration_to_naive_time(time_taken), - id as i64 + i64::from(id) ) .execute(self.storage.conn()) .await @@ -740,9 +740,9 @@ impl FriWitnessGeneratorDal<'_, '_> { SET updated_at = NOW() "#, - block_number.0 as i64, - circuit_id as i16, - depth as i32, + i64::from(block_number.0), + i16::from(circuit_id), + i32::from(depth), aggregations_url, number_of_dependent_jobs, protocol_version_id as i32, @@ -1042,7 +1042,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_optional(self.storage.conn()) .await? @@ -1067,7 +1067,7 @@ impl FriWitnessGeneratorDal<'_, '_> { l1_batch_number = $2 "#, duration_to_naive_time(time_taken), - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -1086,7 +1086,7 @@ impl FriWitnessGeneratorDal<'_, '_> { l1_batch_number = $2 "#, error, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -1144,7 +1144,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_one(self.storage.conn()) .await diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index 912cb75961fc..345691c80170 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -1,5 +1,8 @@ //! Data access layer (DAL) for zkSync Era. +// Linter settings. +#![warn(clippy::cast_lossless)] + pub use sqlx::{types::BigDecimal, Error as SqlxError}; pub use crate::connection::{ConnectionPool, StorageProcessor}; diff --git a/core/lib/dal/src/models/storage_transaction.rs b/core/lib/dal/src/models/storage_transaction.rs index e40b7a67ba66..9f4921e78c67 100644 --- a/core/lib/dal/src/models/storage_transaction.rs +++ b/core/lib/dal/src/models/storage_transaction.rs @@ -296,13 +296,13 @@ impl From for Transaction { .unwrap_or_else(|_| panic!("invalid json in database for tx {:?}", hash)); let received_timestamp_ms = tx.received_at.timestamp_millis() as u64; match tx.tx_format { - Some(t) if t == PRIORITY_OPERATION_L2_TX_TYPE as i32 => Transaction { + Some(t) if t == i32::from(PRIORITY_OPERATION_L2_TX_TYPE) => Transaction { common_data: ExecuteTransactionCommon::L1(tx.into()), execute, received_timestamp_ms, raw_bytes: None, }, - Some(t) if t == PROTOCOL_UPGRADE_TX_TYPE as i32 => Transaction { + Some(t) if t == i32::from(PROTOCOL_UPGRADE_TX_TYPE) => Transaction { common_data: ExecuteTransactionCommon::ProtocolUpgrade(tx.into()), execute, received_timestamp_ms, diff --git a/core/lib/dal/src/models/tests.rs b/core/lib/dal/src/models/tests.rs index 18b84cf956fc..43ee8eabb73e 100644 --- a/core/lib/dal/src/models/tests.rs +++ b/core/lib/dal/src/models/tests.rs @@ -30,7 +30,7 @@ fn protocol_upgrade_storage_tx() -> StorageTransaction { hash: H256::random().as_bytes().to_vec(), data: serde_json::to_value(default_execute().clone()).expect("invalid value"), received_at: Utc::now().naive_utc(), - tx_format: Some(PROTOCOL_UPGRADE_TX_TYPE as i32), + tx_format: Some(PROTOCOL_UPGRADE_TX_TYPE.into()), gas_limit: Some(BigDecimal::from(999)), l1_tx_mint: Some(BigDecimal::from(666)), l1_tx_refund_recipient: Some(Address::random().as_bytes().to_vec()), @@ -59,7 +59,7 @@ fn l1_storage_tx() -> StorageTransaction { l1_block_number: Some(1), data: serde_json::to_value(default_execute().clone()).expect("invalid value"), received_at: Utc::now().naive_utc(), - tx_format: Some(PRIORITY_OPERATION_L2_TX_TYPE as i32), + tx_format: Some(PRIORITY_OPERATION_L2_TX_TYPE.into()), ..StorageTransaction::default() } } @@ -278,7 +278,7 @@ fn storage_tx_to_l2_tx(i_tx_format: i32, o_tx_format: i32) { #[test] fn storage_tx_to_l2_tx_eip712() { storage_tx_to_l2_tx( - EIP_712_TX_TYPE as i32, + EIP_712_TX_TYPE.into(), TransactionType::EIP712Transaction as i32, ); } @@ -286,7 +286,7 @@ fn storage_tx_to_l2_tx_eip712() { #[test] fn storage_tx_to_l2_tx_eip2930() { storage_tx_to_l2_tx( - EIP_2930_TX_TYPE as i32, + EIP_2930_TX_TYPE.into(), TransactionType::EIP2930Transaction as i32, ); } @@ -294,7 +294,7 @@ fn storage_tx_to_l2_tx_eip2930() { #[test] fn storage_tx_to_l2_tx_eip1559() { storage_tx_to_l2_tx( - EIP_1559_TX_TYPE as i32, + EIP_1559_TX_TYPE.into(), TransactionType::EIP1559Transaction as i32, ); } diff --git a/core/lib/dal/src/proof_generation_dal.rs b/core/lib/dal/src/proof_generation_dal.rs index cdcfd70880b1..8714f43ebf19 100644 --- a/core/lib/dal/src/proof_generation_dal.rs +++ b/core/lib/dal/src/proof_generation_dal.rs @@ -83,7 +83,7 @@ impl ProofGenerationDal<'_, '_> { l1_batch_number = $2 "#, proof_blob_url, - block_number.0 as i64, + i64::from(block_number.0) ) .execute(self.storage.conn()) .await? @@ -106,7 +106,7 @@ impl ProofGenerationDal<'_, '_> { ($1, 'ready_to_be_proven', $2, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - block_number.0 as i64, + i64::from(block_number.0), proof_gen_data_blob_url, ) .execute(self.storage.conn()) @@ -128,7 +128,7 @@ impl ProofGenerationDal<'_, '_> { l1_batch_number = $2 "#, ProofGenerationJobStatus::Skipped.to_string(), - block_number.0 as i64, + i64::from(block_number.0) ) .execute(self.storage.conn()) .await? diff --git a/core/lib/dal/src/protocol_versions_dal.rs b/core/lib/dal/src/protocol_versions_dal.rs index 40da0d554da7..6ddb533cb9e6 100644 --- a/core/lib/dal/src/protocol_versions_dal.rs +++ b/core/lib/dal/src/protocol_versions_dal.rs @@ -193,7 +193,7 @@ impl ProtocolVersionsDal<'_, '_> { WHERE id = $1 "#, - version_id as i32 + i32::from(version_id) ) .fetch_optional(self.storage.conn()) .await diff --git a/core/lib/dal/src/protocol_versions_web3_dal.rs b/core/lib/dal/src/protocol_versions_web3_dal.rs index 7c4b4d256a2a..893a7e041df1 100644 --- a/core/lib/dal/src/protocol_versions_web3_dal.rs +++ b/core/lib/dal/src/protocol_versions_web3_dal.rs @@ -19,7 +19,7 @@ impl ProtocolVersionsWeb3Dal<'_, '_> { WHERE id = $1 "#, - version_id as i32 + i32::from(version_id) ) .fetch_optional(self.storage.conn()) .await diff --git a/core/lib/dal/src/snapshot_recovery_dal.rs b/core/lib/dal/src/snapshot_recovery_dal.rs index 5d5e727b38b7..c0fe61802de4 100644 --- a/core/lib/dal/src/snapshot_recovery_dal.rs +++ b/core/lib/dal/src/snapshot_recovery_dal.rs @@ -32,12 +32,12 @@ impl SnapshotRecoveryDal<'_, '_> { VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW(), NOW()) "#, - status.l1_batch_number.0 as i64, + i64::from(status.l1_batch_number.0), status.l1_batch_timestamp as i64, - status.l1_batch_root_hash.0.as_slice(), - status.miniblock_number.0 as i64, + status.l1_batch_root_hash.as_bytes(), + i64::from(status.miniblock_number.0), status.miniblock_timestamp as i64, - status.miniblock_hash.0.as_slice(), + status.miniblock_hash.as_bytes(), status.protocol_version as i32, &status.storage_logs_chunks_processed, ) diff --git a/core/lib/dal/src/snapshots_creator_dal.rs b/core/lib/dal/src/snapshots_creator_dal.rs index 7940ecb7cdf2..30797ef1cdd4 100644 --- a/core/lib/dal/src/snapshots_creator_dal.rs +++ b/core/lib/dal/src/snapshots_creator_dal.rs @@ -40,33 +40,6 @@ impl SnapshotsCreatorDal<'_, '_> { Ok(count as u64) } - /// Returns the total number of rows in the `storage_logs` table before and at the specified miniblock. - /// - /// **Warning.** This method is slow (requires a full table scan). - pub async fn get_storage_logs_row_count( - &mut self, - at_miniblock: MiniblockNumber, - ) -> sqlx::Result { - let row = sqlx::query!( - r#" - SELECT - COUNT(*) AS COUNT - FROM - storage_logs - WHERE - miniblock_number <= $1 - "#, - at_miniblock.0 as i64 - ) - .instrument("get_storage_logs_row_count") - .with_arg("miniblock_number", &at_miniblock) - .report_latency() - .expect_slow_query() - .fetch_one(self.storage) - .await?; - Ok(row.count.unwrap_or(0) as u64) - } - pub async fn get_storage_logs_chunk( &mut self, miniblock_number: MiniblockNumber, @@ -108,8 +81,8 @@ impl SnapshotsCreatorDal<'_, '_> { WHERE initial_writes.l1_batch_number <= $2 "#, - miniblock_number.0 as i64, - l1_batch_number.0 as i64, + i64::from(miniblock_number.0), + i64::from(l1_batch_number.0), hashed_keys_range.start().as_bytes(), hashed_keys_range.end().as_bytes() ) @@ -150,7 +123,7 @@ impl SnapshotsCreatorDal<'_, '_> { WHERE miniblock_number <= $1 "#, - miniblock_number.0 as i64, + i64::from(miniblock_number.0), ) .instrument("get_all_factory_deps") .report_latency() @@ -199,7 +172,7 @@ mod tests { .unwrap(); let log_row_count = conn - .snapshots_creator_dal() + .storage_logs_dal() .get_storage_logs_row_count(MiniblockNumber(1)) .await .unwrap(); @@ -231,13 +204,13 @@ mod tests { .unwrap(); let log_row_count = conn - .snapshots_creator_dal() + .storage_logs_dal() .get_storage_logs_row_count(MiniblockNumber(1)) .await .unwrap(); assert_eq!(log_row_count, logs.len() as u64); let log_row_count = conn - .snapshots_creator_dal() + .storage_logs_dal() .get_storage_logs_row_count(MiniblockNumber(2)) .await .unwrap(); diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 51d6e168b3a0..e9c11c0fec16 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, ops, time::Instant}; -use sqlx::{types::chrono::Utc, Row}; +use sqlx::types::chrono::Utc; use zksync_types::{ get_code_key, snapshots::SnapshotStorageLog, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H160, H256, @@ -122,7 +122,7 @@ impl StorageLogsDal<'_, '_> { WHERE miniblock_number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .fetch_one(self.storage.conn()) .await? @@ -142,7 +142,7 @@ impl StorageLogsDal<'_, '_> { ) -> sqlx::Result<()> { let stage_start = Instant::now(); let modified_keys = self - .modified_keys_since_miniblock(last_miniblock_to_keep) + .modified_keys_in_miniblocks(last_miniblock_to_keep.next()..=MiniblockNumber(u32::MAX)) .await?; tracing::info!( "Loaded {} keys changed after miniblock #{last_miniblock_to_keep} in {:?}", @@ -221,32 +221,30 @@ impl StorageLogsDal<'_, '_> { Ok(()) } - /// Returns all storage keys that were modified after the specified miniblock. - async fn modified_keys_since_miniblock( + /// Returns distinct hashed storage keys that were modified in the specified miniblock range. + pub async fn modified_keys_in_miniblocks( &mut self, - miniblock_number: MiniblockNumber, + miniblock_numbers: ops::RangeInclusive, ) -> sqlx::Result> { - Ok(sqlx::query!( + let rows = sqlx::query!( r#" SELECT DISTINCT - ON (hashed_key) hashed_key + hashed_key FROM - ( - SELECT - * - FROM - storage_logs - WHERE - miniblock_number > $1 - ) inn + storage_logs + WHERE + miniblock_number BETWEEN $1 AND $2 "#, - miniblock_number.0 as i64 + i64::from(miniblock_numbers.start().0), + i64::from(miniblock_numbers.end().0) ) .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|row| H256::from_slice(&row.hashed_key)) - .collect()) + .await?; + + Ok(rows + .into_iter() + .map(|row| H256::from_slice(&row.hashed_key)) + .collect()) } /// Removes all storage logs with a miniblock number strictly greater than the specified `block_number`. @@ -260,7 +258,7 @@ impl StorageLogsDal<'_, '_> { WHERE miniblock_number > $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await?; @@ -333,7 +331,7 @@ impl StorageLogsDal<'_, '_> { operation_number DESC "#, &bytecode_hashed_keys as &[_], - max_miniblock_number as i64 + i64::from(max_miniblock_number) ) .fetch_all(self.storage.conn()) .await?; @@ -383,7 +381,7 @@ impl StorageLogsDal<'_, '_> { miniblock_number, operation_number "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await?; @@ -414,7 +412,9 @@ impl StorageLogsDal<'_, '_> { }; let stage_start = Instant::now(); - let mut modified_keys = self.modified_keys_since_miniblock(last_miniblock).await?; + let mut modified_keys = self + .modified_keys_in_miniblocks(last_miniblock.next()..=MiniblockNumber(u32::MAX)) + .await?; let modified_keys_count = modified_keys.len(); tracing::info!( "Fetched {modified_keys_count} keys changed after miniblock #{last_miniblock} in {:?}", @@ -580,7 +580,7 @@ impl StorageLogsDal<'_, '_> { UNNEST($1::bytea[]) AS u (hashed_key) "#, &hashed_keys as &[&[u8]], - miniblock_number.0 as i64 + i64::from(miniblock_number.0) ) .fetch_all(self.storage.conn()) .await?; @@ -628,27 +628,31 @@ impl StorageLogsDal<'_, '_> { .collect() } - pub async fn get_miniblock_storage_logs( - &mut self, - miniblock_number: MiniblockNumber, - ) -> Vec<(H256, H256, u32)> { - self.get_miniblock_storage_logs_from_table(miniblock_number, "storage_logs") - .await - } - - /// Counts the total number of storage logs in the specified miniblock, - // TODO(PLA-596): add storage log count to snapshot metadata instead? - pub async fn count_miniblock_storage_logs( + /// Returns the total number of rows in the `storage_logs` table before and at the specified miniblock. + /// + /// **Warning.** This method is slow (requires a full table scan). + pub async fn get_storage_logs_row_count( &mut self, - miniblock_number: MiniblockNumber, + at_miniblock: MiniblockNumber, ) -> sqlx::Result { - let count = sqlx::query_scalar!( - "SELECT COUNT(*) FROM storage_logs WHERE miniblock_number = $1", - miniblock_number.0 as i32 + let row = sqlx::query!( + r#" + SELECT + COUNT(*) AS COUNT + FROM + storage_logs + WHERE + miniblock_number <= $1 + "#, + i64::from(at_miniblock.0) ) - .fetch_one(self.storage.conn()) + .instrument("get_storage_logs_row_count") + .with_arg("miniblock_number", &at_miniblock) + .report_latency() + .expect_slow_query() + .fetch_one(self.storage) .await?; - Ok(count.unwrap_or(0) as u64) + Ok(row.count.unwrap_or(0) as u64) } /// Gets a starting tree entry for each of the supplied `key_ranges` for the specified @@ -692,7 +696,7 @@ impl StorageLogsDal<'_, '_> { sl LEFT OUTER JOIN initial_writes ON initial_writes.hashed_key = sl.kv[1] "#, - miniblock_number.0 as i64, + i64::from(miniblock_number.0), &start_keys as &[&[u8]], &end_keys as &[&[u8]], ) @@ -732,7 +736,7 @@ impl StorageLogsDal<'_, '_> { ORDER BY storage_logs.hashed_key "#, - miniblock_number.0 as i64, + i64::from(miniblock_number.0), key_range.start().as_bytes(), key_range.end().as_bytes() ) @@ -746,90 +750,6 @@ impl StorageLogsDal<'_, '_> { }); Ok(rows.collect()) } - - pub async fn retain_storage_logs( - &mut self, - miniblock_number: MiniblockNumber, - operation_numbers: &[i32], - ) { - sqlx::query!( - r#" - DELETE FROM storage_logs - WHERE - miniblock_number = $1 - AND operation_number != ALL ($2) - "#, - miniblock_number.0 as i64, - &operation_numbers - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } - - /// Loads (hashed_key, value, operation_number) tuples for given miniblock_number. - /// Uses provided DB table. - /// Shouldn't be used in production. - pub async fn get_miniblock_storage_logs_from_table( - &mut self, - miniblock_number: MiniblockNumber, - table_name: &str, - ) -> Vec<(H256, H256, u32)> { - sqlx::query(&format!( - "SELECT hashed_key, value, operation_number FROM {table_name} \ - WHERE miniblock_number = $1 \ - ORDER BY operation_number" - )) - .bind(miniblock_number.0 as i64) - .fetch_all(self.storage.conn()) - .await - .unwrap() - .into_iter() - .map(|row| { - let hashed_key = H256::from_slice(row.get("hashed_key")); - let value = H256::from_slice(row.get("value")); - let operation_number: u32 = row.get::("operation_number") as u32; - (hashed_key, value, operation_number) - }) - .collect() - } - - /// Loads value for given hashed_key at given miniblock_number. - /// Uses provided DB table. - /// Shouldn't be used in production. - pub async fn get_storage_value_from_table( - &mut self, - hashed_key: H256, - miniblock_number: MiniblockNumber, - table_name: &str, - ) -> H256 { - let query_str = format!( - "SELECT value FROM {table_name} \ - WHERE hashed_key = $1 AND miniblock_number <= $2 \ - ORDER BY miniblock_number DESC, operation_number DESC LIMIT 1", - ); - sqlx::query(&query_str) - .bind(hashed_key.as_bytes()) - .bind(miniblock_number.0 as i64) - .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(|row| H256::from_slice(row.get("value"))) - .unwrap_or_else(H256::zero) - } - - /// Vacuums `storage_logs` table. - /// Shouldn't be used in production. - pub async fn vacuum_storage_logs(&mut self) { - sqlx::query!( - r#" - VACUUM storage_logs - "# - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } } #[cfg(test)] @@ -1071,7 +991,8 @@ mod tests { let non_initial = conn .storage_logs_dedup_dal() .filter_written_slots(&all_keys) - .await; + .await + .unwrap(); // Pretend that dedup logic eliminates all writes with zero values. let initial_keys: Vec<_> = logs .iter() diff --git a/core/lib/dal/src/storage_logs_dedup_dal.rs b/core/lib/dal/src/storage_logs_dedup_dal.rs index 3409588aabca..0994193c8975 100644 --- a/core/lib/dal/src/storage_logs_dedup_dal.rs +++ b/core/lib/dal/src/storage_logs_dedup_dal.rs @@ -90,7 +90,7 @@ impl StorageLogsDedupDal<'_, '_> { .map(|key| StorageKey::raw_hashed_key(key.address(), key.key()).to_vec()) .collect(); - let last_index = self.max_enumeration_index().await.unwrap_or(0); + let last_index = self.max_enumeration_index().await?.unwrap_or(0); let indices: Vec<_> = ((last_index + 1)..=(last_index + hashed_keys.len() as u64)) .map(|x| x as i64) .collect(); @@ -110,7 +110,7 @@ impl StorageLogsDedupDal<'_, '_> { "#, &hashed_keys, &indices, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .execute(self.storage.conn()) .await?; @@ -132,7 +132,7 @@ impl StorageLogsDedupDal<'_, '_> { WHERE l1_batch_number = $1 "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await @@ -147,8 +147,8 @@ impl StorageLogsDedupDal<'_, '_> { .collect() } - pub async fn max_enumeration_index(&mut self) -> Option { - sqlx::query!( + async fn max_enumeration_index(&mut self) -> sqlx::Result> { + Ok(sqlx::query!( r#" SELECT MAX(INDEX) AS "max?" @@ -157,17 +157,16 @@ impl StorageLogsDedupDal<'_, '_> { "#, ) .fetch_one(self.storage.conn()) - .await - .unwrap() + .await? .max - .map(|max| max as u64) + .map(|max| max as u64)) } pub async fn initial_writes_for_batch( &mut self, l1_batch_number: L1BatchNumber, - ) -> Vec<(H256, u64)> { - sqlx::query!( + ) -> sqlx::Result> { + Ok(sqlx::query!( r#" SELECT hashed_key, @@ -179,18 +178,20 @@ impl StorageLogsDedupDal<'_, '_> { ORDER BY INDEX "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) - .await - .unwrap() + .await? .into_iter() .map(|row| (H256::from_slice(&row.hashed_key), row.index as u64)) - .collect() + .collect()) } - pub async fn get_enumeration_index_for_key(&mut self, key: StorageKey) -> Option { - sqlx::query!( + pub async fn get_enumeration_index_for_key( + &mut self, + hashed_key: H256, + ) -> sqlx::Result> { + Ok(sqlx::query!( r#" SELECT INDEX @@ -199,18 +200,20 @@ impl StorageLogsDedupDal<'_, '_> { WHERE hashed_key = $1 "#, - key.hashed_key().0.to_vec() + hashed_key.as_bytes() ) .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(|row| row.index as u64) + .await? + .map(|row| row.index as u64)) } /// Returns `hashed_keys` that are both present in the input and in `initial_writes` table. - pub async fn filter_written_slots(&mut self, hashed_keys: &[H256]) -> HashSet { + pub async fn filter_written_slots( + &mut self, + hashed_keys: &[H256], + ) -> sqlx::Result> { let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect(); - sqlx::query!( + Ok(sqlx::query!( r#" SELECT hashed_key @@ -222,11 +225,10 @@ impl StorageLogsDedupDal<'_, '_> { &hashed_keys as &[&[u8]], ) .fetch_all(self.storage.conn()) - .await - .unwrap() + .await? .into_iter() .map(|row| H256::from_slice(&row.hashed_key)) - .collect() + .collect()) } /// Retrieves all initial write entries for testing purposes. diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 573fbe9f53a8..211abfe2a6c2 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops}; +use std::collections::HashMap; use zksync_types::{ get_code_key, get_nonce_key, @@ -96,9 +96,6 @@ impl StorageWeb3Dal<'_, '_> { key: &StorageKey, block_number: MiniblockNumber, ) -> sqlx::Result { - // We need to proper distinguish if the value is zero or None - // for the VM to correctly determine initial writes. - // So, we accept that the value is None if it's zero and it wasn't initially written at the moment. let hashed_key = key.hashed_key(); sqlx::query!( @@ -117,7 +114,7 @@ impl StorageWeb3Dal<'_, '_> { 1 "#, hashed_key.as_bytes(), - block_number.0 as i64 + i64::from(block_number.0) ) .instrument("get_historical_value_unchecked") .report_latency() @@ -165,7 +162,7 @@ impl StorageWeb3Dal<'_, '_> { 0 ) AS "pending_batch!" "#, - miniblock_number.0 as i64 + i64::from(miniblock_number.0) ) .fetch_one(self.storage.conn()) .await?; @@ -202,71 +199,44 @@ impl StorageWeb3Dal<'_, '_> { Ok(l1_batch_number) } - /// Returns distinct hashed storage keys that were modified in the specified miniblock range. - pub async fn modified_keys_in_miniblocks( - &mut self, - miniblock_numbers: ops::RangeInclusive, - ) -> Vec { - sqlx::query!( - r#" - SELECT DISTINCT - hashed_key - FROM - storage_logs - WHERE - miniblock_number BETWEEN $1 AND $2 - "#, - miniblock_numbers.start().0 as i64, - miniblock_numbers.end().0 as i64 - ) - .fetch_all(self.storage.conn()) - .await - .unwrap() - .into_iter() - .map(|row| H256::from_slice(&row.hashed_key)) - .collect() - } - /// This method doesn't check if block with number equals to `block_number` /// is present in the database. For such blocks `None` will be returned. pub async fn get_contract_code_unchecked( &mut self, address: Address, block_number: MiniblockNumber, - ) -> Result>, SqlxError> { + ) -> sqlx::Result>> { let hashed_key = get_code_key(&address).hashed_key(); - { - sqlx::query!( - r#" - SELECT - bytecode - FROM - ( - SELECT - * - FROM - storage_logs - WHERE - storage_logs.hashed_key = $1 - AND storage_logs.miniblock_number <= $2 - ORDER BY - storage_logs.miniblock_number DESC, - storage_logs.operation_number DESC - LIMIT - 1 - ) t - JOIN factory_deps ON value = factory_deps.bytecode_hash - WHERE - value != $3 - "#, - hashed_key.as_bytes(), - block_number.0 as i64, - FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH.as_bytes(), - ) - .fetch_optional(self.storage.conn()) - .await - .map(|option_row| option_row.map(|row| row.bytecode)) - } + let row = sqlx::query!( + r#" + SELECT + bytecode + FROM + ( + SELECT + * + FROM + storage_logs + WHERE + storage_logs.hashed_key = $1 + AND storage_logs.miniblock_number <= $2 + ORDER BY + storage_logs.miniblock_number DESC, + storage_logs.operation_number DESC + LIMIT + 1 + ) t + JOIN factory_deps ON value = factory_deps.bytecode_hash + WHERE + value != $3 + "#, + hashed_key.as_bytes(), + i64::from(block_number.0), + FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH.as_bytes(), + ) + .fetch_optional(self.storage.conn()) + .await?; + Ok(row.map(|row| row.bytecode)) } /// This method doesn't check if block with number equals to `block_number` @@ -275,25 +245,24 @@ impl StorageWeb3Dal<'_, '_> { &mut self, hash: H256, block_number: MiniblockNumber, - ) -> Result>, SqlxError> { - { - sqlx::query!( - r#" - SELECT - bytecode - FROM - factory_deps - WHERE - bytecode_hash = $1 - AND miniblock_number <= $2 - "#, - hash.as_bytes(), - block_number.0 as i64 - ) - .fetch_optional(self.storage.conn()) - .await - .map(|option_row| option_row.map(|row| row.bytecode)) - } + ) -> sqlx::Result>> { + let row = sqlx::query!( + r#" + SELECT + bytecode + FROM + factory_deps + WHERE + bytecode_hash = $1 + AND miniblock_number <= $2 + "#, + hash.as_bytes(), + i64::from(block_number.0) + ) + .fetch_optional(self.storage.conn()) + .await?; + + Ok(row.map(|row| row.bytecode)) } } diff --git a/core/lib/dal/src/sync_dal.rs b/core/lib/dal/src/sync_dal.rs index 28d1d71f92c9..ebb4e9a15dc9 100644 --- a/core/lib/dal/src/sync_dal.rs +++ b/core/lib/dal/src/sync_dal.rs @@ -61,7 +61,7 @@ impl SyncDal<'_, '_> { WHERE miniblocks.number = $1 "#, - block_number.0 as i64 + i64::from(block_number.0) ) .instrument("sync_dal_sync_block.block") .with_arg("block_number", &block_number) diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index a37eb72da89c..eada9dfa44ab 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -289,7 +289,7 @@ impl TransactionsDal<'_, '_> { u256_to_big_decimal(tx.common_data.fee.gas_per_pubdata_limit); let tx_format = tx.common_data.transaction_type as i32; let signature = tx.common_data.signature; - let nonce = tx.common_data.nonce.0 as i64; + let nonce = i64::from(tx.common_data.nonce.0); let input_data = tx.common_data.input.expect("Data is mandatory").data; let value = u256_to_big_decimal(tx.execute.value); let paymaster = tx.common_data.paymaster_params.paymaster.0.as_ref(); @@ -479,7 +479,7 @@ impl TransactionsDal<'_, '_> { "#, &l1_batch_tx_indexes, &hashes as &[&[u8]], - block_number.0 as i64 + i64::from(block_number.0) ) .execute(self.storage.conn()) .await @@ -565,7 +565,7 @@ impl TransactionsDal<'_, '_> { l1_indices_in_block.push(index_in_block as i32); l1_errors.push(error.unwrap_or_default()); l1_execution_infos.push(serde_json::to_value(execution_info).unwrap()); - l1_refunded_gas.push(*refunded_gas as i64); + l1_refunded_gas.push(i64::from(*refunded_gas)); l1_effective_gas_prices .push(u256_to_big_decimal(common_data.max_fee_per_gas)); } @@ -602,7 +602,7 @@ impl TransactionsDal<'_, '_> { )); l2_gas_per_pubdata_limit .push(u256_to_big_decimal(common_data.fee.gas_per_pubdata_limit)); - l2_refunded_gas.push(*refunded_gas as i64); + l2_refunded_gas.push(i64::from(*refunded_gas)); } ExecuteTransactionCommon::ProtocolUpgrade(common_data) => { upgrade_hashes.push(hash.0.to_vec()); @@ -610,7 +610,7 @@ impl TransactionsDal<'_, '_> { upgrade_errors.push(error.unwrap_or_default()); upgrade_execution_infos .push(serde_json::to_value(execution_info).unwrap()); - upgrade_refunded_gas.push(*refunded_gas as i64); + upgrade_refunded_gas.push(i64::from(*refunded_gas)); upgrade_effective_gas_prices .push(u256_to_big_decimal(common_data.max_fee_per_gas)); } @@ -855,7 +855,7 @@ impl TransactionsDal<'_, '_> { RETURNING hash "#, - miniblock_number.0 as i64 + i64::from(miniblock_number.0) ) .fetch_all(self.storage.conn()) .await @@ -985,7 +985,7 @@ impl TransactionsDal<'_, '_> { limit as i32, BigDecimal::from(fee_per_gas), BigDecimal::from(gas_per_pubdata), - PROTOCOL_UPGRADE_TX_TYPE as i32, + i32::from(PROTOCOL_UPGRADE_TX_TYPE) ) .fetch_all(self.storage.conn()) .await?; @@ -1122,7 +1122,7 @@ impl TransactionsDal<'_, '_> { miniblock_number, index_in_block "#, - l1_batch_number.0 as i64, + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await?; @@ -1164,8 +1164,8 @@ impl TransactionsDal<'_, '_> { ORDER BY number "#, - from_miniblock.0 as i64, - to_miniblock.0 as i64, + i64::from(from_miniblock.0), + i64::from(to_miniblock.0) ) .fetch_all(self.storage.conn()) .await?; @@ -1187,8 +1187,8 @@ impl TransactionsDal<'_, '_> { ORDER BY number "#, - from_miniblock.0 as i64 - 1, - to_miniblock.0 as i64 - 1, + i64::from(from_miniblock.0) - 1, + i64::from(to_miniblock.0) - 1, ) .fetch_all(self.storage.conn()) .await?; @@ -1263,7 +1263,7 @@ impl TransactionsDal<'_, '_> { miniblock_number, index_in_block "#, - l1_batch_number.0 as i64 + i64::from(l1_batch_number.0) ) .fetch_all(self.storage.conn()) .await diff --git a/core/lib/dal/src/transactions_web3_dal.rs b/core/lib/dal/src/transactions_web3_dal.rs index f4d22ebaf8ad..034983b2b889 100644 --- a/core/lib/dal/src/transactions_web3_dal.rs +++ b/core/lib/dal/src/transactions_web3_dal.rs @@ -183,7 +183,7 @@ impl TransactionsWeb3Dal<'_, '_> { ), TransactionSelector::Position(block_number, idx) => ( "transactions.miniblock_number = $1 AND transactions.index_in_block = $2"; - block_number.0 as i64, + i64::from(block_number.0), idx as i32 ), } @@ -377,7 +377,7 @@ impl TransactionsWeb3Dal<'_, '_> { ORDER BY index_in_block "#, - miniblock.0 as i64 + i64::from(miniblock.0) ) .fetch_all(self.storage.conn()) .await?; diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index b0abbfc504c0..1dd7793d456c 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -285,7 +285,7 @@ impl<'a> SnapshotsApplier<'a> { SnapshotsApplier::create_fresh_recovery_status(main_node_client).await?; let storage_logs_count = storage - .snapshots_creator_dal() + .storage_logs_dal() .get_storage_logs_row_count(recovery_status.miniblock_number) .await .map_err(|err| { @@ -612,7 +612,7 @@ impl<'a> SnapshotsApplier<'a> { .await?; // This DB query is slow, but this is fine for verification purposes. let total_log_count = storage - .snapshots_creator_dal() + .storage_logs_dal() .get_storage_logs_row_count(self.applied_snapshot_status.miniblock_number) .await .map_err(|err| SnapshotsApplierError::db(err, "cannot get storage_logs row count"))?; diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index 91e59b68fea5..100882263c8a 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -140,14 +140,13 @@ impl ValuesCache { } } - #[allow(clippy::cast_precision_loss)] // acceptable for metrics fn update( &self, from_miniblock: MiniblockNumber, to_miniblock: MiniblockNumber, rt_handle: &Handle, connection: &mut StorageProcessor<'_>, - ) { + ) -> anyhow::Result<()> { const MAX_MINIBLOCKS_LAG: u32 = 5; tracing::debug!( @@ -161,8 +160,16 @@ impl ValuesCache { "Storage values cache is too far behind (current miniblock is {from_miniblock}; \ requested update to {to_miniblock}); resetting the cache" ); - let mut lock = self.0.write().expect("values cache is poisoned"); - assert_eq!(lock.valid_for, from_miniblock); + let mut lock = self + .0 + .write() + .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; + anyhow::ensure!( + lock.valid_for == from_miniblock, + "sanity check failed: values cache was expected to be valid for miniblock #{from_miniblock}, but it's actually \ + valid for miniblock #{}", + lock.valid_for + ); lock.valid_for = to_miniblock; lock.values.clear(); @@ -170,11 +177,15 @@ impl ValuesCache { } else { let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::LoadKeys].start(); let miniblocks = (from_miniblock + 1)..=to_miniblock; - let modified_keys = rt_handle.block_on( - connection - .storage_web3_dal() - .modified_keys_in_miniblocks(miniblocks.clone()), - ); + let modified_keys = rt_handle + .block_on( + connection + .storage_logs_dal() + .modified_keys_in_miniblocks(miniblocks.clone()), + ) + .with_context(|| { + format!("failed loading modified keys for miniblocks {miniblocks:?}") + })?; let elapsed = update_latency.observe(); CACHE_METRICS @@ -188,11 +199,19 @@ impl ValuesCache { let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::RemoveStaleKeys].start(); - let mut lock = self.0.write().expect("values cache is poisoned"); + let mut lock = self + .0 + .write() + .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; // The code below holding onto the write `lock` is the only code that can theoretically poison the `RwLock` // (other than emptying the cache above). Thus, it's kept as simple and tight as possible. // E.g., we load data from Postgres beforehand. - assert_eq!(lock.valid_for, from_miniblock); + anyhow::ensure!( + lock.valid_for == from_miniblock, + "sanity check failed: values cache was expected to be valid for miniblock #{from_miniblock}, but it's actually \ + valid for miniblock #{}", + lock.valid_for + ); lock.valid_for = to_miniblock; for modified_key in &modified_keys { lock.values.remove(modified_key); @@ -201,9 +220,11 @@ impl ValuesCache { drop(lock); update_latency.observe(); } + CACHE_METRICS .values_valid_for_miniblock .set(u64::from(to_miniblock.0)); + Ok(()) } } @@ -298,9 +319,13 @@ impl PostgresStorageCaches { continue; } let mut connection = rt_handle - .block_on(connection_pool.access_storage_tagged("values_cache_updater")) - .unwrap(); - values_cache.update(current_miniblock, to_miniblock, &rt_handle, &mut connection); + .block_on(connection_pool.access_storage_tagged("values_cache_updater"))?; + values_cache.update( + current_miniblock, + to_miniblock, + &rt_handle, + &mut connection, + )?; current_miniblock = to_miniblock; } Ok(()) @@ -519,11 +544,9 @@ impl ReadStorage for PostgresStorage<'_> { fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { let mut dal = self.connection.storage_logs_dedup_dal(); - let value = self .rt_handle - .block_on(dal.get_enumeration_index_for_key(*key)); - - value + .block_on(dal.get_enumeration_index_for_key(key.hashed_key())); + value.expect("failed getting enumeration index for key") } } diff --git a/core/lib/state/src/postgres/tests.rs b/core/lib/state/src/postgres/tests.rs index 5ad4364b4769..f007f9c0bc29 100644 --- a/core/lib/state/src/postgres/tests.rs +++ b/core/lib/state/src/postgres/tests.rs @@ -451,12 +451,14 @@ fn test_values_cache(pool: &ConnectionPool, rt_handle: Handle) { (non_existing_key, Some(H256::zero())), ]); - values_cache.update( - MiniblockNumber(0), - MiniblockNumber(1), - &storage.rt_handle, - &mut storage.connection, - ); + values_cache + .update( + MiniblockNumber(0), + MiniblockNumber(1), + &storage.rt_handle, + &mut storage.connection, + ) + .unwrap(); assert_eq!(values_cache.0.read().unwrap().valid_for, MiniblockNumber(1)); assert_eq!(storage.read_value(&existing_key), H256::repeat_byte(1)); @@ -534,12 +536,14 @@ fn mini_fuzz_values_cache_inner(rng: &mut impl Rng, pool: &ConnectionPool, mut r let cache_valid_for = values_cache.valid_for(); assert!(cache_valid_for < MiniblockNumber(latest_block_number)); - values_cache.update( - cache_valid_for, - MiniblockNumber(latest_block_number), - &rt_handle, - &mut connection, - ); + values_cache + .update( + cache_valid_for, + MiniblockNumber(latest_block_number), + &rt_handle, + &mut connection, + ) + .unwrap(); cache_updated = true; } diff --git a/core/lib/state/src/rocksdb/recovery.rs b/core/lib/state/src/rocksdb/recovery.rs index dae4ae144be0..1584a23822cf 100644 --- a/core/lib/state/src/rocksdb/recovery.rs +++ b/core/lib/state/src/rocksdb/recovery.rs @@ -176,7 +176,7 @@ impl RocksdbStorage { let snapshot_miniblock = snapshot_recovery.miniblock_number; let log_count = storage .storage_logs_dal() - .count_miniblock_storage_logs(snapshot_miniblock) + .get_storage_logs_row_count(snapshot_miniblock) .await .with_context(|| { format!("Failed getting number of logs for miniblock #{snapshot_miniblock}") diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index c95325cdea69..a7bd03a76f86 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -253,6 +253,7 @@ async fn rocksdb_enum_index_migration() { .storage_logs_dedup_dal() .initial_writes_for_batch(L1BatchNumber(1)) .await + .unwrap() .into_iter() .collect(); diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index f6b6f74fb2b2..4ca9215f3e6a 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -127,7 +127,7 @@ impl SnapshotParameters { let mut storage = pool.access_storage().await?; let log_count = storage .storage_logs_dal() - .count_miniblock_storage_logs(miniblock) + .get_storage_logs_row_count(miniblock) .await .with_context(|| format!("Failed getting number of logs for miniblock #{miniblock}"))?; diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index ece4c4e66fce..df6b849a72c3 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -540,7 +540,8 @@ async fn insert_initial_writes_for_batch( let pre_written_slots = connection .storage_logs_dedup_dal() .filter_written_slots(&hashed_keys) - .await; + .await + .unwrap(); let keys_to_insert: Vec<_> = written_non_zero_slots .into_iter() diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 081fa2458899..a7b37533c66f 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -319,7 +319,8 @@ impl TreeUpdater { let pg_initial_writes = connection .storage_logs_dedup_dal() .initial_writes_for_batch(l1_batch_number) - .await; + .await + .expect("cannot get initial writes for L1 batch"); let pg_initial_writes: Vec<_> = pg_initial_writes .into_iter() diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs index 224f8cd691b5..4769dc8037c1 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs @@ -206,6 +206,7 @@ impl Tester { .storage_logs_dedup_dal() .filter_written_slots(&[storage_log.key.hashed_key()]) .await + .unwrap() .is_empty() { storage diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs index df97610dd381..e4cf2abfbe90 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs @@ -198,7 +198,8 @@ impl UpdatesManager { let non_initial_writes = transaction .storage_logs_dedup_dal() .filter_written_slots(&deduplicated_writes_hashed_keys) - .await; + .await + .expect("cannot filter out previously written VM storage slots"); progress.observe(deduplicated_writes.len()); let progress = L1_BATCH_METRICS.start(L1BatchSealStage::InsertInitialWrites);