diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e77534a..3af7d45 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,7 @@ jobs: ] steps: - uses: actions/checkout@v4 + - uses: arduino/setup-protoc@v2 - uses: Swatinem/rust-cache@v2 - name: Default on nightly Rust run: rustup default nightly diff --git a/Cargo.lock b/Cargo.lock index 6776095..e035f03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5044,12 +5044,15 @@ dependencies = [ "async-trait", "bincode", "blake2 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes", "chrono", "clap 4.4.8", "ethers", "eyre", "hex", "indexmap 2.1.0", + "prost", + "prost-build", "rocksdb", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 249e951..3f0e520 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,14 @@ members = ["state-reconstruct-fetcher"] async-trait = "0.1.74" bincode = "1" blake2 = "0.10.6" +bytes = "1.5" chrono = "0.4.31" clap = { version = "4.4.7", features = ["derive", "env"] } ethers = "1.0.2" eyre = "0.6.8" hex = "0.4.3" indexmap = { version = "2.0.2" } +prost = "0.12" rocksdb = "0.21" serde = { version = "1.0.189", features = ["derive"] } serde_json = { version = "1.0.107", features = ["std"] } @@ -27,3 +29,6 @@ tracing = "0.1.40" tracing-subscriber = "0.3.17" zksync_merkle_tree = { git = "https://github.com/matter-labs/zksync-era.git" } zkevm_opcode_defs = { git = "https://github.com/matter-labs/era-zkevm_opcode_defs.git" } + +[build-dependencies] +prost-build = "0.12" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..df17b8e --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + prost_build::compile_protos(&["proto/snapshot.proto"], &["proto/"])?; + Ok(()) +} diff --git a/proto/snapshot.proto b/proto/snapshot.proto new file mode 100644 index 0000000..3a656ef --- /dev/null +++ b/proto/snapshot.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package protobuf; + +message SnapshotStorageLogsChunk { + repeated SnapshotStorageLog storage_logs = 1; +} + +message SnapshotStorageLog { + optional bytes account_address = 1; // required; H160 + optional bytes storage_key = 2; // required; H256 + optional bytes storage_value = 3; // required; H256 + optional uint32 l1_batch_number_of_initial_write = 4; // required + optional uint64 enumeration_index = 5; // required +} + +message SnapshotFactoryDependencies { + repeated SnapshotFactoryDependency factory_deps = 1; +} + +message SnapshotFactoryDependency { + optional bytes bytecode = 1; // required +} diff --git a/src/cli.rs b/src/cli.rs index 0935bb3..edd3946 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,8 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use state_reconstruct_fetcher::constants::ethereum; +use crate::processor::snapshot; + #[derive(Args)] pub struct L1FetcherOptions { /// The Ethereum JSON-RPC HTTP URL to use. @@ -73,12 +75,22 @@ pub enum Command { db_path: Option, }, - /// Testing. - ExportSnapshot { + PrepareSnapshot { #[command(flatten)] l1_fetcher_options: L1FetcherOptions, - /// The path of the file to export the snapshot to. - file: Option, + /// The path to the storage solution. + #[arg(short, long)] + db_path: Option, + }, + ExportSnapshot { + /// The path to the storage solution. + #[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)] + db_path: Option, + /// Number of storage logs to stuff into one chunk. + #[arg(short, long, default_value_t = 1_000_000)] + chunk_size: u64, + /// The directory to export the snapshot files to. + directory: String, }, } diff --git a/src/main.rs b/src/main.rs index 17205a7..4be9674 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use std::{ use clap::Parser; use cli::{Cli, Command, ReconstructSource}; use eyre::Result; -use processor::snapshot::SnapshotBuilder; +use processor::snapshot::{SnapshotBuilder, SnapshotExporter}; use state_reconstruct_fetcher::{ constants::storage, l1_fetcher::{L1Fetcher, L1FetcherOptions}, @@ -152,9 +152,9 @@ async fn main() -> Result<()> { println!("{result}"); } } - Command::ExportSnapshot { + Command::PrepareSnapshot { l1_fetcher_options, - file, + db_path, } => { let fetcher_options = L1FetcherOptions { http_url: l1_fetcher_options.http_url, @@ -165,7 +165,7 @@ async fn main() -> Result<()> { }; let fetcher = L1Fetcher::new(fetcher_options, None)?; - let processor = SnapshotBuilder::new(file); + let processor = SnapshotBuilder::new(db_path); let (tx, rx) = mpsc::channel::(5); let processor_handle = tokio::spawn(async move { @@ -175,6 +175,16 @@ async fn main() -> Result<()> { fetcher.run(tx).await?; processor_handle.await?; } + Command::ExportSnapshot { + db_path, + chunk_size, + directory, + } => { + let export_path = Path::new(&directory); + std::fs::create_dir_all(export_path)?; + let exporter = SnapshotExporter::new(export_path, db_path); + exporter.export_snapshot(chunk_size)?; + } } Ok(()) diff --git a/src/processor/snapshot/database.rs b/src/processor/snapshot/database.rs index 44c5d85..a248dc6 100644 --- a/src/processor/snapshot/database.rs +++ b/src/processor/snapshot/database.rs @@ -11,9 +11,9 @@ use thiserror::Error; use super::types::{SnapshotFactoryDependency, SnapshotStorageLog}; -const STORAGE_LOGS: &str = "storage_logs"; -const INDEX_TO_KEY_MAP: &str = "index_to_key_map"; -const FACTORY_DEPS: &str = "factory_deps"; +pub const STORAGE_LOGS: &str = "storage_logs"; +pub const INDEX_TO_KEY_MAP: &str = "index_to_key_map"; +pub const FACTORY_DEPS: &str = "factory_deps"; const METADATA: &str = "metadata"; const LAST_REPEATED_KEY_INDEX: &str = "LAST_REPEATED_KEY_INDEX"; @@ -50,6 +50,21 @@ impl SnapshotDB { Ok(Self(db)) } + pub fn new_read_only(db_path: PathBuf) -> Result { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let db = DB::open_cf_for_read_only( + &db_opts, + db_path, + vec![METADATA, STORAGE_LOGS, INDEX_TO_KEY_MAP, FACTORY_DEPS], + false, + )?; + + Ok(Self(db)) + } + pub fn get_last_repeated_key_index(&self) -> Result { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. @@ -90,7 +105,7 @@ impl SnapshotDB { .map_err(Into::into) } - pub fn insert_storage_log(&self, storage_log_entry: &SnapshotStorageLog) -> Result<()> { + pub fn insert_storage_log(&self, storage_log_entry: &mut SnapshotStorageLog) -> Result<()> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. let index_to_key_map = self.cf_handle(INDEX_TO_KEY_MAP).unwrap(); @@ -102,6 +117,9 @@ impl SnapshotDB { // XXX: These should really be inside a transaction... let idx = self.get_last_repeated_key_index()? + 1; + // Update the enumeration index. + storage_log_entry.enumeration_index = idx; + self.put_cf(index_to_key_map, idx.to_be_bytes(), key)?; self.set_last_repeated_key_index(idx)?; diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 7dedd5b..5d4cc35 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -1,4 +1,9 @@ -use std::{fs, path::PathBuf, str::FromStr}; +use std::{ + fs, + io::Write, + path::{Path, PathBuf}, + str::FromStr, +}; mod bytecode; mod database; @@ -6,8 +11,10 @@ mod types; use async_trait::async_trait; use blake2::{Blake2s256, Digest}; +use bytes::BytesMut; use ethers::types::{Address, H256, U256, U64}; use eyre::Result; +use prost::Message; use state_reconstruct_fetcher::{ constants::{ethereum, storage}, types::CommitBlockInfoV1, @@ -16,12 +23,16 @@ use tokio::sync::mpsc; use self::{ database::SnapshotDB, - types::{SnapshotFactoryDependency, SnapshotStorageLog}, + types::{SnapshotFactoryDependency, SnapshotHeader, SnapshotStorageLog}, }; use super::Processor; -use crate::processor::snapshot::types::{MiniblockNumber, StorageValue}; +use crate::processor::snapshot::types::MiniblockNumber; + +pub mod protobuf { + include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); +} -const DEFAULT_DB_PATH: &str = "snapshot_db"; +pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub struct SnapshotBuilder { database: SnapshotDB, @@ -36,7 +47,15 @@ impl SnapshotBuilder { let mut database = SnapshotDB::new(db_path).unwrap(); - reconstruct_genesis_state(&mut database, storage::INITAL_STATE_PATH).unwrap(); + let idx = database + .get_last_repeated_key_index() + .expect("failed to read last repeated key index"); + + // When last repeated key index is 0, there is no content in the DB. + // Every write of new storage log key increases the index by one. + if idx == 0 { + reconstruct_genesis_state(&mut database, storage::INITAL_STATE_PATH).unwrap(); + } Self { database } } @@ -49,7 +68,7 @@ impl Processor for SnapshotBuilder { // Initial calldata. for (key, value) in &block.initial_storage_changes { self.database - .insert_storage_log(&SnapshotStorageLog { + .insert_storage_log(&mut SnapshotStorageLog { key: U256::from_little_endian(key), value: H256::from(value), miniblock_number_of_initial_write: U64::from(0), @@ -187,9 +206,9 @@ fn reconstruct_genesis_state(database: &mut SnapshotDB, path: &str) -> Result<() let value = H256::from(tmp); if database.get_storage_log(&derived_key)?.is_none() { - database.insert_storage_log(&SnapshotStorageLog { + database.insert_storage_log(&mut SnapshotStorageLog { key, - value: StorageValue::default(), + value, miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number), l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK), enumeration_index: 0, @@ -213,4 +232,161 @@ fn derive_final_address_for_params(address: &Address, key: &U256) -> [u8; 32] { result } -// TODO: SnapshotExporter which iterates over the SnapshotDB and exports snapshot chunks. +pub struct SnapshotExporter { + basedir: PathBuf, + database: SnapshotDB, +} + +impl SnapshotExporter { + pub fn new(basedir: &Path, db_path: Option) -> Self { + let db_path = match db_path { + Some(p) => PathBuf::from(p), + None => PathBuf::from(DEFAULT_DB_PATH), + }; + + let database = SnapshotDB::new_read_only(db_path).unwrap(); + Self { + basedir: basedir.to_path_buf(), + database, + } + } + + pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { + let mut header = SnapshotHeader::default(); + self.export_storage_logs(chunk_size, &mut header)?; + self.export_factory_deps(&mut header)?; + + let path = PathBuf::new() + .join(&self.basedir) + .join("snapshot-header.json"); + + let outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(path)?; + + serde_json::to_writer(outfile, &header)?; + + Ok(()) + } + + fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> { + let mut buf = BytesMut::new(); + + let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap(); + let mut iterator = self + .database + .iterator_cf(storage_logs, rocksdb::IteratorMode::Start); + + let mut factory_deps = protobuf::SnapshotFactoryDependencies::default(); + while let Some(Ok((_, bs))) = iterator.next() { + let factory_dep: SnapshotFactoryDependency = bincode::deserialize(bs.as_ref())?; + factory_deps + .factory_deps + .push(protobuf::SnapshotFactoryDependency { + bytecode: Some(factory_dep.bytecode), + }); + } + + let fd_len = factory_deps.encoded_len(); + if buf.capacity() < fd_len { + buf.reserve(fd_len - buf.capacity()); + } + + let path = PathBuf::new().join(&self.basedir).join("factory_deps.dat"); + header.factory_deps_filepath = path + .clone() + .into_os_string() + .into_string() + .expect("path to string"); + + // TODO: Wrap gzip compression around the outfile. + let mut outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(path)?; + + // Serialize chunk. + factory_deps.encode(&mut buf)?; + outfile.write_all(&buf)?; + outfile.flush()?; + + Ok(()) + } + + fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> { + let mut buf = BytesMut::new(); + let mut chunk_index = 0; + + let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap(); + let mut iterator = self + .database + .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); + + let mut has_more = true; + + while has_more { + let mut chunk = protobuf::SnapshotStorageLogsChunk { + storage_logs: vec![], + }; + + for _ in 0..chunk_size { + if let Some(Ok((_, key))) = iterator.next() { + if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) { + let pb = protobuf::SnapshotStorageLog { + account_address: None, + storage_key: Some(key.to_vec()), + storage_value: Some(entry.value.0.to_vec()), + l1_batch_number_of_initial_write: Some( + entry.l1_batch_number_of_initial_write.as_u32(), + ), + enumeration_index: Some(entry.enumeration_index), + }; + + chunk.storage_logs.push(pb); + } + } else { + has_more = false; + } + } + + // Ensure that write buffer has enough capacity. + let chunk_len = chunk.encoded_len(); + if buf.capacity() < chunk_len { + buf.reserve(chunk_len - buf.capacity()); + } + + chunk_index += 1; + let path = PathBuf::new() + .join(&self.basedir) + .join(format!("{chunk_index}.chunk")); + + header + .storage_logs_chunks + .push(types::SnapshotStorageLogsChunkMetadata { + chunk_id: chunk_index, + filepath: path + .clone() + .into_os_string() + .into_string() + .expect("path to string"), + }); + + // TODO: Wrap gzip compression around the outfile. + let mut outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(path)?; + + // Serialize chunk. + chunk.encode(&mut buf)?; + outfile.write_all(&buf)?; + outfile.flush()?; + + // Clear $tmp buffer. + buf.truncate(0); + } + + Ok(()) + } +} diff --git a/src/processor/snapshot/types.rs b/src/processor/snapshot/types.rs index 0590983..2392b91 100644 --- a/src/processor/snapshot/types.rs +++ b/src/processor/snapshot/types.rs @@ -16,17 +16,18 @@ pub type StorageValue = H256; pub struct SnapshotHeader { pub l1_batch_number: L1BatchNumber, pub miniblock_number: MiniblockNumber, - /// Chunk metadata ordered by chunk_id - pub chunks: Vec, - // TODO: - // pub last_l1_batch_with_metadata: L1BatchWithMetadata, + // ordered by chunk_id + pub storage_logs_chunks: Vec, + pub factory_deps_filepath: String, + // Following `L1BatchWithMetadata` type doesn't have definition. Ignoring. + //pub last_l1_batch_with_metadata: L1BatchWithMetadata, pub generated_at: DateTime, } #[derive(Default, Debug, Serialize, Deserialize)] -pub struct SnapshotChunkMetadata { - pub key: SnapshotStorageKey, - /// Can be either a gs or filesystem path +pub struct SnapshotStorageLogsChunkMetadata { + pub chunk_id: u64, + // can be either a gs or filesystem path pub filepath: String, }