From 0d2ba09c5d4b607bd9da31fc4bf0ea8ca2b4d7b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Grze=C5=9Bkiewicz?= Date: Thu, 25 Jan 2024 14:36:31 +0100 Subject: [PATCH] feat: Adding EN snapshots applier (#882) Parts of this component that I decided to do in separate PRs: -resilience --- Cargo.lock | 17 + Cargo.toml | 1 + core/bin/snapshots_creator/src/tests.rs | 3 +- ...ae942bab5d71daaed7017e3fa62dc5e42ab0a.json | 32 ++ ...00f7b1176d59a83520288f5428b67ebd52130.json | 14 + ...1397212b4767039a6c0e22697cf40969729af.json | 56 +++ ...d913ed9fbd69b8354b7d18b01d3fb62f6be8.json} | 14 +- ...8f8af02aa297d85a2695c5f448ed14b2d7386.json | 19 - ...940bd2aee55b66f6780ceae06c3e1ff92eb8b.json | 18 + ...92dfb0d28a797bee69872634f3105d2d51996.json | 14 + ...er_processed_chunks_format_change.down.sql | 11 + ...lier_processed_chunks_format_change.up.sql | 8 + core/lib/dal/src/blocks_dal.rs | 36 ++ core/lib/dal/src/blocks_web3_dal.rs | 5 +- core/lib/dal/src/snapshot_recovery_dal.rs | 72 ++-- core/lib/dal/src/storage_dal.rs | 7 +- core/lib/dal/src/storage_logs_dal.rs | 78 +++- core/lib/dal/src/storage_logs_dedup_dal.rs | 61 ++- core/lib/dal/src/storage_web3_dal.rs | 5 +- core/lib/snapshots_applier/Cargo.toml | 26 ++ core/lib/snapshots_applier/src/lib.rs | 376 ++++++++++++++++++ core/lib/snapshots_applier/src/tests.rs | 228 +++++++++++ core/lib/state/src/postgres/tests.rs | 15 +- core/lib/state/src/rocksdb/tests.rs | 6 +- core/lib/state/src/test_utils.rs | 9 +- core/lib/types/src/snapshots.rs | 27 +- .../src/api_server/web3/tests/mod.rs | 2 +- .../lib/zksync_core/src/block_reverter/mod.rs | 5 + core/lib/zksync_core/src/genesis.rs | 6 +- .../src/metadata_calculator/recovery/tests.rs | 3 +- .../src/metadata_calculator/tests.rs | 10 + .../src/state_keeper/io/seal_logic.rs | 3 +- .../zksync_core/src/sync_layer/external_io.rs | 3 +- core/lib/zksync_core/src/utils/testonly.rs | 10 +- 34 files changed, 1095 insertions(+), 105 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-2506e9edfd4b41ca1e187909631ae942bab5d71daaed7017e3fa62dc5e42ab0a.json create mode 100644 core/lib/dal/.sqlx/query-2c71a819c6ed22a3ab79675840e00f7b1176d59a83520288f5428b67ebd52130.json create mode 100644 core/lib/dal/.sqlx/query-555f396946bdb8b84a5d77abbfc1397212b4767039a6c0e22697cf40969729af.json rename core/lib/dal/.sqlx/{query-47c2f23d9209d155f3f32fd21ef7931a02fe5ffaf2c4dc2f1e7a48c0e932c060.json => query-73f0401ac19c4e1efd73d02b8dcdd913ed9fbd69b8354b7d18b01d3fb62f6be8.json} (68%) delete mode 100644 core/lib/dal/.sqlx/query-df3b08549a11729fb475341b8f38f8af02aa297d85a2695c5f448ed14b2d7386.json create mode 100644 core/lib/dal/.sqlx/query-eb83e9175b4f8c0351ac2d4b4d2940bd2aee55b66f6780ceae06c3e1ff92eb8b.json create mode 100644 core/lib/dal/.sqlx/query-f2f852a340c45ff69cbca42d7c592dfb0d28a797bee69872634f3105d2d51996.json create mode 100644 core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.down.sql create mode 100644 core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.up.sql create mode 100644 core/lib/snapshots_applier/Cargo.toml create mode 100644 core/lib/snapshots_applier/src/lib.rs create mode 100644 core/lib/snapshots_applier/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 626f2a7e6c5a..5d00d9a8b097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8849,6 +8849,23 @@ dependencies = [ "zksync_utils", ] +[[package]] +name = "zksync_snapshots_applier" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "thiserror", + "tokio", + "tracing", + "vise", + "zksync_dal", + "zksync_object_store", + "zksync_types", + "zksync_utils", + "zksync_web3_decl", +] + [[package]] name = "zksync_state" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index e7b94164a1d9..f7ddf706e6ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "core/lib/multivm", "core/lib/vm_utils", "core/lib/web3_decl", + "core/lib/snapshots_applier", # Test infrastructure "core/tests/test_account", diff --git a/core/bin/snapshots_creator/src/tests.rs b/core/bin/snapshots_creator/src/tests.rs index 987976fc9168..2761769a2155 100644 --- a/core/bin/snapshots_creator/src/tests.rs +++ b/core/bin/snapshots_creator/src/tests.rs @@ -205,7 +205,8 @@ async fn prepare_postgres( let factory_deps = gen_factory_deps(rng, 10); conn.storage_dal() .insert_factory_deps(MiniblockNumber(block_number), &factory_deps) - .await; + .await + .unwrap(); // Since we generate `logs` randomly, all of them are written the first time. create_l1_batch(conn, L1BatchNumber(block_number), &logs).await; diff --git a/core/lib/dal/.sqlx/query-2506e9edfd4b41ca1e187909631ae942bab5d71daaed7017e3fa62dc5e42ab0a.json b/core/lib/dal/.sqlx/query-2506e9edfd4b41ca1e187909631ae942bab5d71daaed7017e3fa62dc5e42ab0a.json new file mode 100644 index 000000000000..c4f8057011d4 --- /dev/null +++ b/core/lib/dal/.sqlx/query-2506e9edfd4b41ca1e187909631ae942bab5d71daaed7017e3fa62dc5e42ab0a.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n hashed_key,\n l1_batch_number,\n INDEX\n FROM\n initial_writes\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hashed_key", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "index", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "2506e9edfd4b41ca1e187909631ae942bab5d71daaed7017e3fa62dc5e42ab0a" +} diff --git a/core/lib/dal/.sqlx/query-2c71a819c6ed22a3ab79675840e00f7b1176d59a83520288f5428b67ebd52130.json b/core/lib/dal/.sqlx/query-2c71a819c6ed22a3ab79675840e00f7b1176d59a83520288f5428b67ebd52130.json new file mode 100644 index 000000000000..e8e6d8e760b4 --- /dev/null +++ b/core/lib/dal/.sqlx/query-2c71a819c6ed22a3ab79675840e00f7b1176d59a83520288f5428b67ebd52130.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM initial_writes\n WHERE\n l1_batch_number > $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "2c71a819c6ed22a3ab79675840e00f7b1176d59a83520288f5428b67ebd52130" +} diff --git a/core/lib/dal/.sqlx/query-555f396946bdb8b84a5d77abbfc1397212b4767039a6c0e22697cf40969729af.json b/core/lib/dal/.sqlx/query-555f396946bdb8b84a5d77abbfc1397212b4767039a6c0e22697cf40969729af.json new file mode 100644 index 000000000000..1cb61dc4460e --- /dev/null +++ b/core/lib/dal/.sqlx/query-555f396946bdb8b84a5d77abbfc1397212b4767039a6c0e22697cf40969729af.json @@ -0,0 +1,56 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n hashed_key,\n address,\n key,\n value,\n operation_number,\n tx_hash,\n miniblock_number\n FROM\n storage_logs\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hashed_key", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "address", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "key", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "value", + "type_info": "Bytea" + }, + { + "ordinal": 4, + "name": "operation_number", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 6, + "name": "miniblock_number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "555f396946bdb8b84a5d77abbfc1397212b4767039a6c0e22697cf40969729af" +} diff --git a/core/lib/dal/.sqlx/query-47c2f23d9209d155f3f32fd21ef7931a02fe5ffaf2c4dc2f1e7a48c0e932c060.json b/core/lib/dal/.sqlx/query-73f0401ac19c4e1efd73d02b8dcdd913ed9fbd69b8354b7d18b01d3fb62f6be8.json similarity index 68% rename from core/lib/dal/.sqlx/query-47c2f23d9209d155f3f32fd21ef7931a02fe5ffaf2c4dc2f1e7a48c0e932c060.json rename to core/lib/dal/.sqlx/query-73f0401ac19c4e1efd73d02b8dcdd913ed9fbd69b8354b7d18b01d3fb62f6be8.json index fe8a346d1e21..7c366776a5ab 100644 --- a/core/lib/dal/.sqlx/query-47c2f23d9209d155f3f32fd21ef7931a02fe5ffaf2c4dc2f1e7a48c0e932c060.json +++ b/core/lib/dal/.sqlx/query-73f0401ac19c4e1efd73d02b8dcdd913ed9fbd69b8354b7d18b01d3fb62f6be8.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n l1_batch_number,\n l1_batch_root_hash,\n miniblock_number,\n miniblock_root_hash,\n last_finished_chunk_id,\n total_chunk_count\n FROM\n snapshot_recovery\n ", + "query": "\n SELECT\n l1_batch_number,\n l1_batch_root_hash,\n miniblock_number,\n miniblock_root_hash,\n storage_logs_chunks_processed\n FROM\n snapshot_recovery\n ", "describe": { "columns": [ { @@ -25,13 +25,8 @@ }, { "ordinal": 4, - "name": "last_finished_chunk_id", - "type_info": "Int4" - }, - { - "ordinal": 5, - "name": "total_chunk_count", - "type_info": "Int4" + "name": "storage_logs_chunks_processed", + "type_info": "BoolArray" } ], "parameters": { @@ -42,9 +37,8 @@ false, false, false, - true, false ] }, - "hash": "47c2f23d9209d155f3f32fd21ef7931a02fe5ffaf2c4dc2f1e7a48c0e932c060" + "hash": "73f0401ac19c4e1efd73d02b8dcdd913ed9fbd69b8354b7d18b01d3fb62f6be8" } diff --git a/core/lib/dal/.sqlx/query-df3b08549a11729fb475341b8f38f8af02aa297d85a2695c5f448ed14b2d7386.json b/core/lib/dal/.sqlx/query-df3b08549a11729fb475341b8f38f8af02aa297d85a2695c5f448ed14b2d7386.json deleted file mode 100644 index a04523bc07b8..000000000000 --- a/core/lib/dal/.sqlx/query-df3b08549a11729fb475341b8f38f8af02aa297d85a2695c5f448ed14b2d7386.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n snapshot_recovery (\n l1_batch_number,\n l1_batch_root_hash,\n miniblock_number,\n miniblock_root_hash,\n last_finished_chunk_id,\n total_chunk_count,\n updated_at,\n created_at\n )\n VALUES\n ($1, $2, $3, $4, $5, $6, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO\n UPDATE\n SET\n l1_batch_number = excluded.l1_batch_number,\n l1_batch_root_hash = excluded.l1_batch_root_hash,\n miniblock_number = excluded.miniblock_number,\n miniblock_root_hash = excluded.miniblock_root_hash,\n last_finished_chunk_id = excluded.last_finished_chunk_id,\n total_chunk_count = excluded.total_chunk_count,\n updated_at = excluded.updated_at\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Bytea", - "Int8", - "Bytea", - "Int4", - "Int4" - ] - }, - "nullable": [] - }, - "hash": "df3b08549a11729fb475341b8f38f8af02aa297d85a2695c5f448ed14b2d7386" -} diff --git a/core/lib/dal/.sqlx/query-eb83e9175b4f8c0351ac2d4b4d2940bd2aee55b66f6780ceae06c3e1ff92eb8b.json b/core/lib/dal/.sqlx/query-eb83e9175b4f8c0351ac2d4b4d2940bd2aee55b66f6780ceae06c3e1ff92eb8b.json new file mode 100644 index 000000000000..250e5beb89a1 --- /dev/null +++ b/core/lib/dal/.sqlx/query-eb83e9175b4f8c0351ac2d4b4d2940bd2aee55b66f6780ceae06c3e1ff92eb8b.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n snapshot_recovery (\n l1_batch_number,\n l1_batch_root_hash,\n miniblock_number,\n miniblock_root_hash,\n storage_logs_chunks_processed,\n updated_at,\n created_at\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Bytea", + "Int8", + "Bytea", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "eb83e9175b4f8c0351ac2d4b4d2940bd2aee55b66f6780ceae06c3e1ff92eb8b" +} diff --git a/core/lib/dal/.sqlx/query-f2f852a340c45ff69cbca42d7c592dfb0d28a797bee69872634f3105d2d51996.json b/core/lib/dal/.sqlx/query-f2f852a340c45ff69cbca42d7c592dfb0d28a797bee69872634f3105d2d51996.json new file mode 100644 index 000000000000..11c409c21802 --- /dev/null +++ b/core/lib/dal/.sqlx/query-f2f852a340c45ff69cbca42d7c592dfb0d28a797bee69872634f3105d2d51996.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE snapshot_recovery\n SET\n storage_logs_chunks_processed[$1] = TRUE,\n updated_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "f2f852a340c45ff69cbca42d7c592dfb0d28a797bee69872634f3105d2d51996" +} diff --git a/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.down.sql b/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.down.sql new file mode 100644 index 000000000000..598af2037d6c --- /dev/null +++ b/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.down.sql @@ -0,0 +1,11 @@ +ALTER TABLE snapshot_recovery ADD COLUMN last_finished_chunk_id NOT NULL; +ALTER TABLE snapshot_recovery ADD COLUMN total_chunk_count NOT NULL; + +ALTER TABLE snapshot_recovery DROP COLUMN storage_logs_chunks_processed; + +ALTER TABLE factory_deps ADD CONSTRAINT factory_deps_miniblock_number_fkey + FOREIGN KEY (miniblock_number) REFERENCES miniblocks (number); +ALTER TABLE initial_writes ADD CONSTRAINT initial_writes_l1_batch_number_fkey + FOREIGN KEY (l1_batch_number) REFERENCES l1_batches (number); +ALTER TABLE storage_logs ADD CONSTRAINT storage_logs_miniblock_number_fkey + FOREIGN KEY (miniblock_number) REFERENCES miniblocks (number); diff --git a/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.up.sql b/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.up.sql new file mode 100644 index 000000000000..3f484c38638f --- /dev/null +++ b/core/lib/dal/migrations/20240112194527_snapshots_applier_processed_chunks_format_change.up.sql @@ -0,0 +1,8 @@ +ALTER TABLE snapshot_recovery DROP COLUMN last_finished_chunk_id; +ALTER TABLE snapshot_recovery DROP COLUMN total_chunk_count; + +ALTER TABLE snapshot_recovery ADD COLUMN storage_logs_chunks_processed BOOL[] NOT NULL; + +ALTER TABLE factory_deps DROP CONSTRAINT factory_deps_miniblock_number_fkey; +ALTER TABLE initial_writes DROP CONSTRAINT initial_writes_l1_batch_number_fkey; +ALTER TABLE storage_logs DROP CONSTRAINT storage_logs_miniblock_number_fkey; diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 82813bb8b939..097fdd344ffb 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -1812,6 +1812,31 @@ impl BlocksDal<'_, '_> { .collect()) } + pub async fn delete_initial_writes( + &mut self, + last_batch_to_keep: L1BatchNumber, + ) -> sqlx::Result<()> { + self.delete_initial_writes_inner(Some(last_batch_to_keep)) + .await + } + + pub async fn delete_initial_writes_inner( + &mut self, + last_batch_to_keep: Option, + ) -> sqlx::Result<()> { + let block_number = last_batch_to_keep.map_or(-1, |number| number.0 as i64); + sqlx::query!( + r#" + DELETE FROM initial_writes + WHERE + l1_batch_number > $1 + "#, + block_number + ) + .execute(self.storage.conn()) + .await?; + Ok(()) + } /// Deletes all L1 batches from the storage so that the specified batch number is the last one left. pub async fn delete_l1_batches( &mut self, @@ -2181,6 +2206,9 @@ impl BlocksDal<'_, '_> { self.delete_l1_batches_inner(None) .await .context("delete_l1_batches_inner()")?; + self.delete_initial_writes_inner(None) + .await + .context("delete_initial_writes_inner()")?; Ok(()) } } @@ -2204,6 +2232,10 @@ mod tests { .delete_l1_batches(L1BatchNumber(0)) .await .unwrap(); + conn.blocks_dal() + .delete_initial_writes(L1BatchNumber(0)) + .await + .unwrap(); conn.protocol_versions_dal() .save_protocol_version_with_tx(ProtocolVersion::default()) .await; @@ -2265,6 +2297,10 @@ mod tests { .delete_l1_batches(L1BatchNumber(0)) .await .unwrap(); + conn.blocks_dal() + .delete_initial_writes(L1BatchNumber(0)) + .await + .unwrap(); conn.protocol_versions_dal() .save_protocol_version_with_tx(ProtocolVersion::default()) .await; diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index 17a38d12588f..0a44ebcf9be1 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -767,11 +767,10 @@ mod tests { l1_batch_root_hash: H256::zero(), miniblock_number: MiniblockNumber(42), miniblock_root_hash: H256::zero(), - last_finished_chunk_id: None, - total_chunk_count: 100, + storage_logs_chunks_processed: vec![true; 100], }; conn.snapshot_recovery_dal() - .set_applied_snapshot_status(&snapshot_recovery) + .insert_initial_recovery_status(&snapshot_recovery) .await .unwrap(); diff --git a/core/lib/dal/src/snapshot_recovery_dal.rs b/core/lib/dal/src/snapshot_recovery_dal.rs index abf6ceb44069..af6f6a25439c 100644 --- a/core/lib/dal/src/snapshot_recovery_dal.rs +++ b/core/lib/dal/src/snapshot_recovery_dal.rs @@ -8,7 +8,7 @@ pub struct SnapshotRecoveryDal<'a, 'c> { } impl SnapshotRecoveryDal<'_, '_> { - pub async fn set_applied_snapshot_status( + pub async fn insert_initial_recovery_status( &mut self, status: &SnapshotRecoveryStatus, ) -> sqlx::Result<()> { @@ -20,36 +20,43 @@ impl SnapshotRecoveryDal<'_, '_> { l1_batch_root_hash, miniblock_number, miniblock_root_hash, - last_finished_chunk_id, - total_chunk_count, + storage_logs_chunks_processed, updated_at, created_at ) VALUES - ($1, $2, $3, $4, $5, $6, NOW(), NOW()) - ON CONFLICT (l1_batch_number) DO - UPDATE - SET - l1_batch_number = excluded.l1_batch_number, - l1_batch_root_hash = excluded.l1_batch_root_hash, - miniblock_number = excluded.miniblock_number, - miniblock_root_hash = excluded.miniblock_root_hash, - last_finished_chunk_id = excluded.last_finished_chunk_id, - total_chunk_count = excluded.total_chunk_count, - updated_at = excluded.updated_at + ($1, $2, $3, $4, $5, NOW(), NOW()) "#, status.l1_batch_number.0 as i64, status.l1_batch_root_hash.0.as_slice(), status.miniblock_number.0 as i64, status.miniblock_root_hash.0.as_slice(), - status.last_finished_chunk_id.map(|v| v as i32), - status.total_chunk_count as i64, + &status.storage_logs_chunks_processed, ) .execute(self.storage.conn()) .await?; Ok(()) } + pub async fn mark_storage_logs_chunk_as_processed( + &mut self, + chunk_id: u64, + ) -> sqlx::Result<()> { + sqlx::query!( + r#" + UPDATE snapshot_recovery + SET + storage_logs_chunks_processed[$1] = TRUE, + updated_at = NOW() + "#, + chunk_id as i32 + 1 + ) + .execute(self.storage.conn()) + .await?; + + Ok(()) + } + pub async fn get_applied_snapshot_status( &mut self, ) -> sqlx::Result> { @@ -60,8 +67,7 @@ impl SnapshotRecoveryDal<'_, '_> { l1_batch_root_hash, miniblock_number, miniblock_root_hash, - last_finished_chunk_id, - total_chunk_count + storage_logs_chunks_processed FROM snapshot_recovery "#, @@ -74,8 +80,7 @@ impl SnapshotRecoveryDal<'_, '_> { l1_batch_root_hash: H256::from_slice(&r.l1_batch_root_hash), miniblock_number: MiniblockNumber(r.miniblock_number as u32), miniblock_root_hash: H256::from_slice(&r.miniblock_root_hash), - last_finished_chunk_id: r.last_finished_chunk_id.map(|v| v as u64), - total_chunk_count: r.total_chunk_count as u64, + storage_logs_chunks_processed: r.storage_logs_chunks_processed.into_iter().collect(), })) } } @@ -96,40 +101,37 @@ mod tests { .await .unwrap(); assert_eq!(None, empty_status); - let status = SnapshotRecoveryStatus { + let mut status = SnapshotRecoveryStatus { l1_batch_number: L1BatchNumber(123), l1_batch_root_hash: H256::random(), miniblock_number: MiniblockNumber(234), miniblock_root_hash: H256::random(), - last_finished_chunk_id: None, - total_chunk_count: 345, + storage_logs_chunks_processed: vec![false, false, true, false], }; applied_status_dal - .set_applied_snapshot_status(&status) + .insert_initial_recovery_status(&status) .await .unwrap(); let status_from_db = applied_status_dal .get_applied_snapshot_status() .await .unwrap(); - assert_eq!(Some(status), status_from_db); + assert_eq!(status, status_from_db.unwrap()); - let updated_status = SnapshotRecoveryStatus { - l1_batch_number: L1BatchNumber(123), - l1_batch_root_hash: H256::random(), - miniblock_number: MiniblockNumber(234), - miniblock_root_hash: H256::random(), - last_finished_chunk_id: Some(2345), - total_chunk_count: 345, - }; + status.storage_logs_chunks_processed = vec![false, true, true, true]; applied_status_dal - .set_applied_snapshot_status(&updated_status) + .mark_storage_logs_chunk_as_processed(1) .await .unwrap(); + applied_status_dal + .mark_storage_logs_chunk_as_processed(3) + .await + .unwrap(); + let updated_status_from_db = applied_status_dal .get_applied_snapshot_status() .await .unwrap(); - assert_eq!(Some(updated_status), updated_status_from_db); + assert_eq!(status, updated_status_from_db.unwrap()); } } diff --git a/core/lib/dal/src/storage_dal.rs b/core/lib/dal/src/storage_dal.rs index 5ad1fd8eeb15..1155cae4a3be 100644 --- a/core/lib/dal/src/storage_dal.rs +++ b/core/lib/dal/src/storage_dal.rs @@ -19,7 +19,7 @@ impl StorageDal<'_, '_> { &mut self, block_number: MiniblockNumber, factory_deps: &HashMap>, - ) { + ) -> sqlx::Result<()> { let (bytecode_hashes, bytecodes): (Vec<_>, Vec<_>) = factory_deps .iter() .map(|dep| (dep.0.as_bytes(), dep.1.as_slice())) @@ -45,8 +45,9 @@ impl StorageDal<'_, '_> { block_number.0 as i64, ) .execute(self.storage.conn()) - .await - .unwrap(); + .await?; + + Ok(()) } /// Returns bytecode for a factory dependency with the specified bytecode `hash`. diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 27707c50d3a4..b80d3063609b 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -2,8 +2,10 @@ use std::{collections::HashMap, ops, time::Instant}; use sqlx::{types::chrono::Utc, Row}; use zksync_types::{ - get_code_key, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, - FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, + get_code_key, + snapshots::{SnapshotStorageLog, StorageLogDbRow}, + AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, + FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H160, H256, }; pub use crate::models::storage_log::StorageRecoveryLogEntry; @@ -68,6 +70,46 @@ impl StorageLogsDal<'_, '_> { copy.finish().await.unwrap(); } + pub async fn insert_storage_logs_from_snapshot( + &mut self, + miniblock_number: MiniblockNumber, + snapshot_storage_logs: &[SnapshotStorageLog], + ) -> sqlx::Result<()> { + let mut copy = self + .storage + .conn() + .copy_in_raw( + "COPY storage_logs( + hashed_key, address, key, value, operation_number, tx_hash, miniblock_number, + created_at, updated_at + ) + FROM STDIN WITH (DELIMITER '|')", + ) + .await?; + + let mut buffer = String::new(); + let now = Utc::now().naive_utc().to_string(); + for log in snapshot_storage_logs.iter() { + write_str!( + &mut buffer, + r"\\x{hashed_key:x}|\\x{address:x}|\\x{key:x}|\\x{value:x}|", + hashed_key = log.key.hashed_key(), + address = log.key.address(), + key = log.key.key(), + value = log.value + ); + writeln_str!( + &mut buffer, + r"{}|\\x{:x}|{miniblock_number}|{now}|{now}", + log.enumeration_index, + H256::zero() + ); + } + copy.send(buffer.as_bytes()).await?; + copy.finish().await?; + Ok(()) + } + pub async fn append_storage_logs( &mut self, block_number: MiniblockNumber, @@ -503,6 +545,38 @@ impl StorageLogsDal<'_, '_> { .collect()) } + /// Retrieves all storage log entries for testing purposes. + pub async fn dump_all_storage_logs_for_tests(&mut self) -> Vec { + let rows = sqlx::query!( + r#" + SELECT + hashed_key, + address, + key, + value, + operation_number, + tx_hash, + miniblock_number + FROM + storage_logs + "# + ) + .fetch_all(self.storage.conn()) + .await + .expect("get_all_storage_logs_for_tests"); + rows.into_iter() + .map(|row| StorageLogDbRow { + hashed_key: H256::from_slice(&row.hashed_key), + address: H160::from_slice(&row.address), + key: H256::from_slice(&row.key), + value: H256::from_slice(&row.value), + operation_number: row.operation_number as u64, + tx_hash: H256::from_slice(&row.tx_hash), + miniblock_number: MiniblockNumber(row.miniblock_number as u32), + }) + .collect() + } + pub async fn get_miniblock_storage_logs( &mut self, miniblock_number: MiniblockNumber, diff --git a/core/lib/dal/src/storage_logs_dedup_dal.rs b/core/lib/dal/src/storage_logs_dedup_dal.rs index 4b603742705f..980989a9ccf4 100644 --- a/core/lib/dal/src/storage_logs_dedup_dal.rs +++ b/core/lib/dal/src/storage_logs_dedup_dal.rs @@ -2,7 +2,9 @@ use std::collections::HashSet; use sqlx::types::chrono::Utc; use zksync_types::{ - zk_evm_types::LogQuery, AccountTreeId, Address, L1BatchNumber, StorageKey, H256, + snapshots::{InitialWriteDbRow, SnapshotStorageLog}, + zk_evm_types::LogQuery, + AccountTreeId, Address, L1BatchNumber, StorageKey, H256, }; use zksync_utils::u256_to_h256; @@ -46,6 +48,38 @@ impl StorageLogsDedupDal<'_, '_> { /// Insert initial writes and assigns indices to them. /// Assumes indices are already assigned for all saved initial_writes, so must be called only after the migration. + pub async fn insert_initial_writes_from_snapshot( + &mut self, + snapshot_storage_logs: &[SnapshotStorageLog], + ) -> sqlx::Result<()> { + let mut copy = self + .storage + .conn() + .copy_in_raw( + "COPY initial_writes (hashed_key, index, l1_batch_number, created_at, updated_at) \ + FROM STDIN WITH (DELIMITER '|')", + ) + .await?; + + let mut bytes: Vec = Vec::new(); + let now = Utc::now().naive_utc().to_string(); + for log in snapshot_storage_logs.iter() { + let row = format!( + "\\\\x{:x}|{}|{}|{}|{}\n", + log.key.hashed_key(), + log.enumeration_index, + log.l1_batch_number_of_initial_write, + now, + now, + ); + bytes.extend_from_slice(row.as_bytes()); + } + copy.send(bytes).await?; + copy.finish().await?; + + Ok(()) + } + pub async fn insert_initial_writes( &mut self, l1_batch_number: L1BatchNumber, @@ -193,4 +227,29 @@ impl StorageLogsDedupDal<'_, '_> { .map(|row| H256::from_slice(&row.hashed_key)) .collect() } + + /// Retrieves all initial write entries for testing purposes. + pub async fn dump_all_initial_writes_for_tests(&mut self) -> Vec { + let rows = sqlx::query!( + r#" + SELECT + hashed_key, + l1_batch_number, + INDEX + FROM + initial_writes + "# + ) + .fetch_all(self.storage.conn()) + .await + .expect("get_all_initial_writes_for_tests"); + + rows.into_iter() + .map(|row| InitialWriteDbRow { + hashed_key: H256::from_slice(&row.hashed_key), + l1_batch_number: L1BatchNumber(row.l1_batch_number as u32), + index: row.index as u64, + }) + .collect() + } } diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index e5cc4fc8b975..eac597a19a89 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -346,11 +346,10 @@ mod tests { l1_batch_root_hash: H256::zero(), miniblock_number: MiniblockNumber(42), miniblock_root_hash: H256::zero(), - last_finished_chunk_id: None, - total_chunk_count: 100, + storage_logs_chunks_processed: vec![true; 100], }; conn.snapshot_recovery_dal() - .set_applied_snapshot_status(&snapshot_recovery) + .insert_initial_recovery_status(&snapshot_recovery) .await .unwrap(); diff --git a/core/lib/snapshots_applier/Cargo.toml b/core/lib/snapshots_applier/Cargo.toml new file mode 100644 index 000000000000..2acbc1c07cac --- /dev/null +++ b/core/lib/snapshots_applier/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "zksync_snapshots_applier" +version = "0.1.0" +edition = "2018" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +repository = "https://github.com/matter-labs/zksync-era" +license = "MIT OR Apache-2.0" +keywords = ["blockchain", "zksync"] +categories = ["cryptography"] + + +[dependencies] +zksync_dal = { path = "../../lib/dal" } +zksync_types = { path = "../../lib/types" } +zksync_object_store = { path = "../../lib/object_store" } +zksync_web3_decl = { path = "../../lib/web3_decl" } +zksync_utils = { path = "../../lib/utils" } + +vise = { git = "https://github.com/matter-labs/vise.git", version = "0.1.0", rev = "1c9cc500e92cf9ea052b230e114a6f9cce4fb2c1" } + +anyhow = "1.0" +async-trait = "0.1" +tokio = { version = "1", features = ["time"] } +tracing = "0.1" +thiserror = "1.0" diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs new file mode 100644 index 000000000000..470462213261 --- /dev/null +++ b/core/lib/snapshots_applier/src/lib.rs @@ -0,0 +1,376 @@ +use std::{collections::HashMap, fmt, time::Duration}; + +use async_trait::async_trait; +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; +use zksync_dal::{ConnectionPool, SqlxError, StorageProcessor}; +use zksync_object_store::{ObjectStore, ObjectStoreError}; +use zksync_types::{ + api::en::SyncBlock, + snapshots::{ + SnapshotFactoryDependencies, SnapshotHeader, SnapshotRecoveryStatus, SnapshotStorageLog, + SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, + }, + MiniblockNumber, H256, +}; +use zksync_utils::bytecode::hash_bytecode; +use zksync_web3_decl::jsonrpsee::core::{client::Error, ClientError as RpcError}; + +#[cfg(test)] +mod tests; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(crate) enum StorageLogsChunksStage { + LoadFromGcs, + SaveToPostgres, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(crate) enum InitialStage { + FetchMetadataFromMainNode, + ApplyFactoryDeps, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "snapshots_applier")] +pub(crate) struct SnapshotsApplierMetrics { + /// Number of chunks in the applied snapshot. Set when snapshots applier starts. + pub storage_logs_chunks_count: Gauge, + + /// Number of chunks left to apply. + pub storage_logs_chunks_left_to_process: Gauge, + + /// Total latency of applying snapshot. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub snapshot_applying_duration: Histogram, + + /// Latency of initial recovery operation split by stage. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub initial_stage_duration: Family>, + + /// Latency of storage log chunk processing split by stage. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub storage_logs_chunks_duration: Family>, +} + +#[vise::register] +pub(crate) static METRICS: vise::Global = vise::Global::new(); + +#[derive(thiserror::Error, Debug)] +pub enum SnapshotsApplierError { + #[error("canceled")] + Canceled(String), + #[error(transparent)] + Fatal(#[from] anyhow::Error), + #[error(transparent)] + Retryable(anyhow::Error), +} + +#[derive(Debug)] +pub struct SnapshotsApplier<'a, 'b> { + connection_pool: &'a ConnectionPool, + blob_store: &'b dyn ObjectStore, + applied_snapshot_status: SnapshotRecoveryStatus, +} + +impl From for SnapshotsApplierError { + fn from(error: ObjectStoreError) -> Self { + match error { + ObjectStoreError::KeyNotFound(_) | ObjectStoreError::Serialization(_) => { + Self::Fatal(error.into()) + } + ObjectStoreError::Other(_) => Self::Retryable(error.into()), + } + } +} + +impl From for SnapshotsApplierError { + fn from(error: SqlxError) -> Self { + match error { + SqlxError::Database(_) + | SqlxError::RowNotFound + | SqlxError::ColumnNotFound(_) + | SqlxError::Configuration(_) + | SqlxError::TypeNotFound { .. } => Self::Fatal(error.into()), + _ => Self::Retryable(error.into()), + } + } +} + +impl From for SnapshotsApplierError { + fn from(error: RpcError) -> Self { + match error { + Error::Transport(_) | Error::RequestTimeout | Error::RestartNeeded(_) => { + Self::Retryable(error.into()) + } + _ => Self::Fatal(error.into()), + } + } +} + +#[async_trait] +pub trait SnapshotsApplierMainNodeClient: fmt::Debug + Send + Sync { + async fn fetch_l2_block(&self, number: MiniblockNumber) -> Result, RpcError>; + + async fn fetch_newest_snapshot(&self) -> Result, RpcError>; +} +impl<'a, 'b> SnapshotsApplier<'a, 'b> { + pub async fn prepare_applied_snapshot_status( + storage: &mut StorageProcessor<'_>, + main_node_client: &'_ dyn SnapshotsApplierMainNodeClient, + ) -> Result<(SnapshotRecoveryStatus, bool), SnapshotsApplierError> { + let latency = + METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start(); + + let applied_snapshot_status = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await?; + + if let Some(applied_snapshot_status) = applied_snapshot_status { + if !applied_snapshot_status + .storage_logs_chunks_processed + .contains(&false) + { + return Err(SnapshotsApplierError::Canceled( + "This node has already been initialized from a snapshot".to_string(), + )); + } + + let latency = latency.observe(); + tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}"); + + Ok((applied_snapshot_status, false)) + } else { + if !storage.blocks_dal().is_genesis_needed().await? { + return Err(SnapshotsApplierError::Canceled( + "This node has already been initialized without a snapshot".to_string(), + )); + } + + let latency = latency.observe(); + + tracing::info!("Initialized fresh snapshots applier in {latency:?}"); + + Ok(( + SnapshotsApplier::create_fresh_recovery_status(main_node_client).await?, + true, + )) + } + } + + pub async fn load_snapshot( + connection_pool: &'a ConnectionPool, + main_node_client: &'_ dyn SnapshotsApplierMainNodeClient, + blob_store: &'b dyn ObjectStore, + ) -> Result<(), SnapshotsApplierError> { + let mut storage = connection_pool + .access_storage_tagged("snapshots_applier") + .await?; + let mut storage = storage.start_transaction().await?; + + let (applied_snapshot_status, created_from_scratch) = + SnapshotsApplier::prepare_applied_snapshot_status(&mut storage, main_node_client) + .await?; + + let mut recovery = Self { + connection_pool, + blob_store, + applied_snapshot_status, + }; + + METRICS.storage_logs_chunks_count.set( + recovery + .applied_snapshot_status + .storage_logs_chunks_processed + .len(), + ); + + METRICS.storage_logs_chunks_left_to_process.set( + recovery + .applied_snapshot_status + .storage_logs_chunks_processed + .iter() + .filter(|x| !(**x)) + .count(), + ); + + if created_from_scratch { + recovery.recover_factory_deps(&mut storage).await?; + + storage + .snapshot_recovery_dal() + .insert_initial_recovery_status(&recovery.applied_snapshot_status) + .await?; + } + + storage.commit().await?; + + recovery.recover_storage_logs().await?; + + Ok(()) + } + + async fn create_fresh_recovery_status( + main_node_client: &'_ dyn SnapshotsApplierMainNodeClient, + ) -> Result { + let snapshot_response = main_node_client.fetch_newest_snapshot().await?; + + let snapshot = snapshot_response.ok_or(SnapshotsApplierError::Canceled( + "Main node does not have any ready snapshots, skipping initialization from snapshot!" + .to_string(), + ))?; + + let l1_batch_number = snapshot.l1_batch_number; + tracing::info!( + "Found snapshot with data up to l1_batch {}, storage_logs are divided into {} chunk(s)", + l1_batch_number, + snapshot.storage_logs_chunks.len() + ); + + let miniblock = main_node_client + .fetch_l2_block(snapshot.miniblock_number) + .await? + .ok_or(SnapshotsApplierError::Fatal(anyhow::anyhow!( + "Miniblock {} is missing", + snapshot.miniblock_number + )))?; + let miniblock_root_hash = miniblock.hash.unwrap(); + + Ok(SnapshotRecoveryStatus { + l1_batch_number, + l1_batch_root_hash: snapshot.last_l1_batch_with_metadata.metadata.root_hash, + miniblock_number: snapshot.miniblock_number, + miniblock_root_hash, + storage_logs_chunks_processed: vec![false; snapshot.storage_logs_chunks.len()], + }) + } + + async fn recover_factory_deps( + &mut self, + storage: &mut StorageProcessor<'_>, + ) -> Result<(), SnapshotsApplierError> { + let latency = METRICS.initial_stage_duration[&InitialStage::ApplyFactoryDeps].start(); + + let factory_deps: SnapshotFactoryDependencies = self + .blob_store + .get(self.applied_snapshot_status.l1_batch_number) + .await?; + + let all_deps_hashmap: HashMap> = factory_deps + .factory_deps + .into_iter() + .map(|dep| (hash_bytecode(&dep.bytecode.0), dep.bytecode.0)) + .collect(); + storage + .storage_dal() + .insert_factory_deps( + self.applied_snapshot_status.miniblock_number, + &all_deps_hashmap, + ) + .await?; + + let latency = latency.observe(); + tracing::info!("Applied factory dependencies in {latency:?}"); + + Ok(()) + } + + async fn insert_initial_writes_chunk( + &mut self, + storage_logs: &[SnapshotStorageLog], + storage: &mut StorageProcessor<'_>, + ) -> Result<(), SnapshotsApplierError> { + storage + .storage_logs_dedup_dal() + .insert_initial_writes_from_snapshot(storage_logs) + .await?; + Ok(()) + } + async fn insert_storage_logs_chunk( + &mut self, + storage_logs: &[SnapshotStorageLog], + storage: &mut StorageProcessor<'_>, + ) -> Result<(), SnapshotsApplierError> { + storage + .storage_logs_dal() + .insert_storage_logs_from_snapshot( + self.applied_snapshot_status.miniblock_number, + storage_logs, + ) + .await?; + Ok(()) + } + + async fn recover_storage_logs_single_chunk( + &mut self, + chunk_id: u64, + ) -> Result<(), SnapshotsApplierError> { + let latency = + METRICS.storage_logs_chunks_duration[&StorageLogsChunksStage::LoadFromGcs].start(); + + let storage_key = SnapshotStorageLogsStorageKey { + chunk_id, + l1_batch_number: self.applied_snapshot_status.l1_batch_number, + }; + + tracing::info!("Processing chunk {chunk_id}"); + + let storage_snapshot_chunk: SnapshotStorageLogsChunk = + self.blob_store.get(storage_key).await?; + + let latency = latency.observe(); + tracing::info!("Loaded storage logs from GCS for chunk {chunk_id} in {latency:?}"); + + let latency = + METRICS.storage_logs_chunks_duration[&StorageLogsChunksStage::SaveToPostgres].start(); + + let mut storage = self + .connection_pool + .access_storage_tagged("snapshots_applier") + .await?; + let mut storage = storage.start_transaction().await?; + + let storage_logs = &storage_snapshot_chunk.storage_logs; + + tracing::info!("Loading {} storage logs into postgres", storage_logs.len()); + + self.insert_storage_logs_chunk(storage_logs, &mut storage) + .await?; + + self.insert_initial_writes_chunk(storage_logs, &mut storage) + .await?; + + self.applied_snapshot_status.storage_logs_chunks_processed[chunk_id as usize] = true; + storage + .snapshot_recovery_dal() + .mark_storage_logs_chunk_as_processed(chunk_id) + .await?; + + storage.commit().await?; + + let chunks_left = METRICS.storage_logs_chunks_left_to_process.dec_by(1) - 1; + + let latency = latency.observe(); + tracing::info!("Saved storage logs for chunk {chunk_id} in {latency:?}, there are {chunks_left} left to process"); + + Ok(()) + } + + pub async fn recover_storage_logs(mut self) -> Result<(), SnapshotsApplierError> { + for chunk_id in 0..self + .applied_snapshot_status + .storage_logs_chunks_processed + .len() + { + //TODO Add retries and parallelize this step + if !self.applied_snapshot_status.storage_logs_chunks_processed[chunk_id] { + self.recover_storage_logs_single_chunk(chunk_id as u64) + .await?; + } + } + + Ok(()) + } +} diff --git a/core/lib/snapshots_applier/src/tests.rs b/core/lib/snapshots_applier/src/tests.rs new file mode 100644 index 000000000000..12495a4a5389 --- /dev/null +++ b/core/lib/snapshots_applier/src/tests.rs @@ -0,0 +1,228 @@ +mod utils { + use std::collections::HashMap; + + use async_trait::async_trait; + use zksync_types::{ + api::en::SyncBlock, + block::L1BatchHeader, + commitment::{L1BatchMetaParameters, L1BatchMetadata, L1BatchWithMetadata}, + snapshots::{SnapshotHeader, SnapshotStorageLog}, + AccountTreeId, L1BatchNumber, MiniblockNumber, StorageKey, StorageValue, H160, H256, + }; + use zksync_web3_decl::jsonrpsee::core::ClientError as RpcError; + + use crate::SnapshotsApplierMainNodeClient; + + #[derive(Debug, Default)] + pub(crate) struct MockMainNodeClient { + pub(crate) fetch_l2_block_responses: HashMap, + pub(crate) fetch_newest_snapshot_response: Option, + } + + #[async_trait] + impl SnapshotsApplierMainNodeClient for MockMainNodeClient { + async fn fetch_l2_block( + &self, + number: MiniblockNumber, + ) -> Result, RpcError> { + if let Some(response) = self.fetch_l2_block_responses.get(&number) { + Ok(Some((*response).clone())) + } else { + Ok(None) + } + } + + async fn fetch_newest_snapshot(&self) -> Result, RpcError> { + Ok(self.fetch_newest_snapshot_response.clone()) + } + } + + pub(crate) fn miniblock_metadata( + miniblock_number: MiniblockNumber, + l1_batch_number: L1BatchNumber, + root_hash: H256, + ) -> SyncBlock { + SyncBlock { + number: miniblock_number, + l1_batch_number, + last_in_batch: true, + timestamp: 0, + l1_gas_price: 0, + l2_fair_gas_price: 0, + fair_pubdata_price: None, + base_system_contracts_hashes: Default::default(), + operator_address: Default::default(), + transactions: None, + virtual_blocks: None, + hash: Some(root_hash), + protocol_version: Default::default(), + } + } + + pub(crate) fn l1_block_metadata( + l1_batch_number: L1BatchNumber, + root_hash: H256, + ) -> L1BatchWithMetadata { + L1BatchWithMetadata { + header: L1BatchHeader::new( + l1_batch_number, + 0, + Default::default(), + Default::default(), + Default::default(), + ), + metadata: L1BatchMetadata { + root_hash, + rollup_last_leaf_index: 0, + merkle_root_hash: Default::default(), + initial_writes_compressed: vec![], + repeated_writes_compressed: vec![], + commitment: Default::default(), + l2_l1_messages_compressed: vec![], + l2_l1_merkle_root: Default::default(), + block_meta_params: L1BatchMetaParameters { + zkporter_is_available: false, + bootloader_code_hash: Default::default(), + default_aa_code_hash: Default::default(), + }, + aux_data_hash: Default::default(), + meta_parameters_hash: Default::default(), + pass_through_data_hash: Default::default(), + events_queue_commitment: None, + bootloader_initial_content_commitment: None, + state_diffs_compressed: vec![], + }, + factory_deps: vec![], + } + } + + pub(crate) fn random_storage_logs( + l1_batch_number: L1BatchNumber, + chunk_id: u64, + logs_per_chunk: u64, + ) -> Vec { + (0..logs_per_chunk) + .map(|x| SnapshotStorageLog { + key: StorageKey::new( + AccountTreeId::from_fixed_bytes(H160::random().to_fixed_bytes()), + H256::random(), + ), + value: StorageValue::random(), + l1_batch_number_of_initial_write: l1_batch_number, + enumeration_index: x + chunk_id * logs_per_chunk, + }) + .collect() + } +} + +mod snapshots_applier_tests { + use zksync_dal::ConnectionPool; + use zksync_object_store::ObjectStoreFactory; + use zksync_types::{ + snapshots::{ + SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader, + SnapshotRecoveryStatus, SnapshotStorageLog, SnapshotStorageLogsChunk, + SnapshotStorageLogsChunkMetadata, SnapshotStorageLogsStorageKey, + }, + Bytes, L1BatchNumber, MiniblockNumber, H256, + }; + + use crate::{ + tests::utils::{ + l1_block_metadata, miniblock_metadata, random_storage_logs, MockMainNodeClient, + }, + SnapshotsApplier, + }; + + #[tokio::test] + async fn snapshots_creator_can_successfully_recover_db() { + let pool = ConnectionPool::test_pool().await; + let object_store_factory = ObjectStoreFactory::mock(); + let object_store = object_store_factory.create_store().await; + let mut client = MockMainNodeClient::default(); + let miniblock_number = MiniblockNumber(1234); + let l1_batch_number = L1BatchNumber(123); + let l1_root_hash = H256::random(); + let l2_root_hash = H256::random(); + let factory_dep_bytes: Vec = (0..32).collect(); + let factory_deps = SnapshotFactoryDependencies { + factory_deps: vec![SnapshotFactoryDependency { + bytecode: Bytes::from(factory_dep_bytes), + }], + }; + object_store + .put(l1_batch_number, &factory_deps) + .await + .unwrap(); + + let mut all_snapshot_storage_logs: Vec = vec![]; + for chunk_id in 0..2 { + let mut chunk_storage_logs = SnapshotStorageLogsChunk { + storage_logs: random_storage_logs(l1_batch_number, chunk_id, 10), + }; + let chunk_key = SnapshotStorageLogsStorageKey { + l1_batch_number, + chunk_id, + }; + object_store + .put(chunk_key, &chunk_storage_logs) + .await + .unwrap(); + all_snapshot_storage_logs.append(&mut chunk_storage_logs.storage_logs); + } + + let snapshot_header = SnapshotHeader { + l1_batch_number, + miniblock_number, + last_l1_batch_with_metadata: l1_block_metadata(l1_batch_number, l1_root_hash), + storage_logs_chunks: vec![ + SnapshotStorageLogsChunkMetadata { + chunk_id: 0, + filepath: "file0".to_string(), + }, + SnapshotStorageLogsChunkMetadata { + chunk_id: 1, + filepath: "file1".to_string(), + }, + ], + factory_deps_filepath: "some_filepath".to_string(), + }; + client.fetch_newest_snapshot_response = Some(snapshot_header); + client.fetch_l2_block_responses.insert( + miniblock_number, + miniblock_metadata(miniblock_number, l1_batch_number, l2_root_hash), + ); + + SnapshotsApplier::load_snapshot(&pool, &client, &object_store) + .await + .unwrap(); + + let mut storage = pool.access_storage().await.unwrap(); + let mut recovery_dal = storage.snapshot_recovery_dal(); + + let expected_status = SnapshotRecoveryStatus { + l1_batch_number, + l1_batch_root_hash: l1_root_hash, + miniblock_number, + miniblock_root_hash: l2_root_hash, + storage_logs_chunks_processed: vec![true, true], + }; + + let current_db_status = recovery_dal.get_applied_snapshot_status().await.unwrap(); + assert_eq!(current_db_status.unwrap(), expected_status); + + let all_initial_writes = storage + .storage_logs_dedup_dal() + .dump_all_initial_writes_for_tests() + .await; + + assert_eq!(all_initial_writes.len(), all_snapshot_storage_logs.len()); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + + assert_eq!(all_storage_logs.len(), all_snapshot_storage_logs.len()); + } +} diff --git a/core/lib/state/src/postgres/tests.rs b/core/lib/state/src/postgres/tests.rs index 6514da136d56..75adcbba8c63 100644 --- a/core/lib/state/src/postgres/tests.rs +++ b/core/lib/state/src/postgres/tests.rs @@ -221,12 +221,15 @@ fn test_factory_deps_cache(pool: &ConnectionPool, rt_handle: Handle) { // insert the contracts let mut contracts = HashMap::new(); contracts.insert(H256::zero(), vec![1, 2, 3]); - storage.rt_handle.block_on( - storage - .connection - .storage_dal() - .insert_factory_deps(MiniblockNumber(0), &contracts), - ); + storage + .rt_handle + .block_on( + storage + .connection + .storage_dal() + .insert_factory_deps(MiniblockNumber(0), &contracts), + ) + .unwrap(); // Create the storage that should have the cache filled. let mut storage = PostgresStorage::new( diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index 0d127d79c901..38ca942e6795 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -165,7 +165,8 @@ async fn insert_factory_deps( .collect(); conn.storage_dal() .insert_factory_deps(miniblock_number, &factory_deps) - .await; + .await + .unwrap(); } #[tokio::test] @@ -372,7 +373,8 @@ async fn recovering_factory_deps_from_snapshot() { create_miniblock(&mut conn, number, vec![]).await; conn.storage_dal() .insert_factory_deps(number, &HashMap::from([(bytecode_hash, bytecode)])) - .await; + .await + .unwrap(); } let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index 87157d05fa98..1e12f82f8dfd 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -33,6 +33,10 @@ pub(crate) async fn prepare_postgres(conn: &mut StorageProcessor<'_>) { .delete_l1_batches(L1BatchNumber(0)) .await .unwrap(); + conn.blocks_dal() + .delete_initial_writes(L1BatchNumber(0)) + .await + .unwrap(); } pub(crate) fn gen_storage_logs(indices: ops::Range) -> Vec { @@ -133,11 +137,10 @@ pub(crate) async fn prepare_postgres_for_snapshot_recovery( l1_batch_root_hash: H256::zero(), // not used miniblock_number: MiniblockNumber(42), miniblock_root_hash: H256::zero(), // not used - last_finished_chunk_id: None, - total_chunk_count: 100, + storage_logs_chunks_processed: vec![true; 100], }; conn.snapshot_recovery_dal() - .set_applied_snapshot_status(&snapshot_recovery) + .insert_initial_recovery_status(&snapshot_recovery) .await .unwrap(); diff --git a/core/lib/types/src/snapshots.rs b/core/lib/types/src/snapshots.rs index 5d7b14fb6e96..db653ebd44aa 100644 --- a/core/lib/types/src/snapshots.rs +++ b/core/lib/types/src/snapshots.rs @@ -6,7 +6,7 @@ use zksync_basic_types::{AccountTreeId, L1BatchNumber, MiniblockNumber, H256}; use zksync_protobuf::{required, ProtoFmt}; use zksync_utils::u256_to_h256; -use crate::{commitment::L1BatchWithMetadata, Bytes, StorageKey, StorageValue, U256}; +use crate::{commitment::L1BatchWithMetadata, Bytes, StorageKey, StorageValue, H160, U256}; /// Information about all snapshots persisted by the node. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -49,7 +49,7 @@ pub struct SnapshotHeader { pub last_l1_batch_with_metadata: L1BatchWithMetadata, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct SnapshotStorageLogsChunkMetadata { pub chunk_id: u64, @@ -194,8 +194,27 @@ pub struct SnapshotRecoveryStatus { pub l1_batch_root_hash: H256, pub miniblock_number: MiniblockNumber, pub miniblock_root_hash: H256, - pub last_finished_chunk_id: Option, - pub total_chunk_count: u64, + pub storage_logs_chunks_processed: Vec, +} + +// Used only in tests +#[derive(Debug, PartialEq)] +pub struct InitialWriteDbRow { + pub hashed_key: H256, + pub l1_batch_number: L1BatchNumber, + pub index: u64, +} + +// Used only in tests +#[derive(Debug, PartialEq)] +pub struct StorageLogDbRow { + pub hashed_key: H256, + pub address: H160, + pub key: H256, + pub value: H256, + pub operation_number: u64, + pub tx_hash: H256, + pub miniblock_number: MiniblockNumber, } /// Returns a chunk of `hashed_keys` with 0-based index `chunk_id` among `count`. Chunks do not intersect and jointly cover diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index 9b8e3707dfd1..b80f4bc5020f 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -227,7 +227,7 @@ impl StorageInitialization { MiniblockNumber(Self::SNAPSHOT_RECOVERY_BLOCK), factory_deps, ) - .await; + .await?; } } Ok(()) diff --git a/core/lib/zksync_core/src/block_reverter/mod.rs b/core/lib/zksync_core/src/block_reverter/mod.rs index 6a97d1c99c19..9b3c23dfc844 100644 --- a/core/lib/zksync_core/src/block_reverter/mod.rs +++ b/core/lib/zksync_core/src/block_reverter/mod.rs @@ -278,6 +278,11 @@ impl BlockReverter { .delete_l1_batches(last_l1_batch_to_keep) .await .unwrap(); + transaction + .blocks_dal() + .delete_initial_writes(last_l1_batch_to_keep) + .await + .unwrap(); tracing::info!("rolling back miniblocks..."); transaction .blocks_dal() diff --git a/core/lib/zksync_core/src/genesis.rs b/core/lib/zksync_core/src/genesis.rs index f40904aa87a7..9444d35aca78 100644 --- a/core/lib/zksync_core/src/genesis.rs +++ b/core/lib/zksync_core/src/genesis.rs @@ -167,7 +167,8 @@ async fn insert_base_system_contracts_to_factory_deps( storage .storage_dal() .insert_factory_deps(MiniblockNumber(0), &factory_deps) - .await; + .await + .unwrap(); } async fn insert_system_contracts( @@ -275,7 +276,8 @@ async fn insert_system_contracts( transaction .storage_dal() .insert_factory_deps(MiniblockNumber(0), &factory_deps) - .await; + .await + .unwrap(); transaction.commit().await.unwrap(); } diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 3ba91da56ae0..d3eff132a860 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -124,8 +124,7 @@ async fn prepare_recovery_snapshot_with_genesis( l1_batch_root_hash, miniblock_number: MiniblockNumber(1), miniblock_root_hash: H256::zero(), // not used - last_finished_chunk_id: Some(0), - total_chunk_count: 1, + storage_logs_chunks_processed: vec![], } } diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 8a2b9beb6fb6..ef8f7e1d8868 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -458,6 +458,11 @@ pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usize) { .delete_l1_batches(L1BatchNumber(0)) .await .unwrap(); + storage + .blocks_dal() + .delete_initial_writes(L1BatchNumber(0)) + .await + .unwrap(); storage .basic_witness_input_producer_dal() .delete_all_jobs() @@ -616,6 +621,11 @@ async fn remove_l1_batches( .delete_l1_batches(last_l1_batch_to_keep) .await .unwrap(); + storage + .blocks_dal() + .delete_initial_writes(last_l1_batch_to_keep) + .await + .unwrap(); batch_headers } diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs index 59d03f364db7..defb420a939e 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs @@ -408,7 +408,8 @@ impl MiniblockSealCommand { transaction .storage_dal() .insert_factory_deps(miniblock_number, new_factory_deps) - .await; + .await + .unwrap(); } progress.observe(new_factory_deps_count); diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index c6ca76026ff7..8057ef77b723 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -211,7 +211,8 @@ impl ExternalIO { self.current_miniblock_number, &HashMap::from_iter([(contract.hash, be_words_to_bytes(&contract.code))]), ) - .await; + .await + .unwrap(); contract } } diff --git a/core/lib/zksync_core/src/utils/testonly.rs b/core/lib/zksync_core/src/utils/testonly.rs index e6883f2585e8..935024d05a13 100644 --- a/core/lib/zksync_core/src/utils/testonly.rs +++ b/core/lib/zksync_core/src/utils/testonly.rs @@ -153,12 +153,11 @@ pub(crate) async fn prepare_recovery_snapshot( l1_batch_root_hash, miniblock_number: miniblock.number, miniblock_root_hash: H256::zero(), // not used - last_finished_chunk_id: None, - total_chunk_count: 100, + storage_logs_chunks_processed: vec![true; 100], }; storage .snapshot_recovery_dal() - .set_applied_snapshot_status(&snapshot_recovery) + .insert_initial_recovery_status(&snapshot_recovery) .await .unwrap(); storage.commit().await.unwrap(); @@ -180,12 +179,11 @@ pub(crate) async fn prepare_empty_recovery_snapshot( l1_batch_root_hash: H256::zero(), miniblock_number: l1_batch_number.into(), miniblock_root_hash: H256::zero(), // not used - last_finished_chunk_id: None, - total_chunk_count: 100, + storage_logs_chunks_processed: vec![true; 100], }; storage .snapshot_recovery_dal() - .set_applied_snapshot_status(&snapshot_recovery) + .insert_initial_recovery_status(&snapshot_recovery) .await .unwrap(); snapshot_recovery