From 851b81a532dea432be6a3f89ae7841f2784fdb38 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Wed, 8 Nov 2023 12:39:34 +0100 Subject: [PATCH 1/2] feat: initial export snapshot template --- Cargo.lock | 1 + Cargo.toml | 1 + src/cli.rs | 8 ++ src/main.rs | 25 ++++ src/processor/mod.rs | 3 +- src/processor/snapshot/mod.rs | 238 ++++++++++++++++++++++++++++++++ src/processor/snapshot/types.rs | 74 ++++++++++ 7 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 src/processor/snapshot/mod.rs create mode 100644 src/processor/snapshot/types.rs diff --git a/Cargo.lock b/Cargo.lock index 1ec3b40..4158af8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4631,6 +4631,7 @@ version = "0.1.0" dependencies = [ "async-trait", "blake2 0.10.6", + "chrono", "clap 4.4.7", "ethers", "eyre", diff --git a/Cargo.toml b/Cargo.toml index 2be4481..d4ed50c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = ["state-reconstruct-fetcher"] [dependencies] async-trait = "0.1.74" blake2 = "0.10.6" +chrono = "0.4.31" clap = { version = "4.4.7", features = ["derive", "env"] } ethers = "1.0.2" eyre = "0.6.8" diff --git a/src/cli.rs b/src/cli.rs index a049726..0935bb3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -72,6 +72,14 @@ pub enum Command { #[arg(short, long, env = "ZK_SYNC_DB_PATH")] db_path: Option, }, + + /// Testing. + ExportSnapshot { + #[command(flatten)] + l1_fetcher_options: L1FetcherOptions, + /// The path of the file to export the snapshot to. + file: Option, + }, } #[derive(Parser)] diff --git a/src/main.rs b/src/main.rs index 024152e..c0eae2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ use std::{ use clap::Parser; use cli::{Cli, Command, ReconstructSource}; use eyre::Result; +use processor::snapshot::SnapshotExporter; use state_reconstruct_fetcher::{ constants::storage, l1_fetcher::{L1Fetcher, L1FetcherOptions}, @@ -52,6 +53,7 @@ fn start_logger(default_level: LevelFilter) { } #[tokio::main] +#[allow(clippy::too_many_lines)] async fn main() -> Result<()> { start_logger(LevelFilter::INFO); @@ -150,6 +152,29 @@ async fn main() -> Result<()> { println!("{result}"); } } + Command::ExportSnapshot { + l1_fetcher_options, + file, + } => { + let fetcher_options = L1FetcherOptions { + http_url: l1_fetcher_options.http_url, + start_block: l1_fetcher_options.start_block, + block_step: l1_fetcher_options.block_step, + block_count: l1_fetcher_options.block_count, + disable_polling: l1_fetcher_options.disable_polling, + }; + + let fetcher = L1Fetcher::new(fetcher_options, None)?; + let processor = SnapshotExporter::new(file); + + let (tx, rx) = mpsc::channel::(5); + let processor_handle = tokio::spawn(async move { + processor.run(rx).await; + }); + + fetcher.run(tx).await?; + processor_handle.await?; + } } Ok(()) diff --git a/src/processor/mod.rs b/src/processor/mod.rs index 316d05d..e88ffae 100644 --- a/src/processor/mod.rs +++ b/src/processor/mod.rs @@ -3,9 +3,10 @@ use state_reconstruct_fetcher::types::CommitBlockInfoV1; use tokio::sync::mpsc; pub mod json; +pub mod snapshot; pub mod tree; #[async_trait] pub trait Processor { - async fn run(self, rx: mpsc::Receiver); + async fn run(self, mut rx: mpsc::Receiver); } diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs new file mode 100644 index 0000000..f794f13 --- /dev/null +++ b/src/processor/snapshot/mod.rs @@ -0,0 +1,238 @@ +use std::{collections::HashMap, fmt, fs, path::PathBuf, str::FromStr}; + +mod types; + +use async_trait::async_trait; +use blake2::{Blake2s256, Digest}; +use ethers::types::{Address, H256, U256, U64}; +use eyre::Result; +use indexmap::IndexSet; +use state_reconstruct_fetcher::{ + constants::{ethereum, storage}, + types::CommitBlockInfoV1, +}; +use tokio::sync::mpsc; + +use self::types::{SnapshotStorageLog, StorageKey, StorageValue}; +use super::Processor; +use crate::processor::snapshot::types::MiniblockNumber; + +// NOTE: What file extension to use? +const DEFAULT_EXPORT_PATH: &str = "snapshot_export"; + +pub struct SnapshotExporter { + storage_log_entries: HashMap, + index_to_key_map: IndexSet, + path: PathBuf, +} + +impl SnapshotExporter { + pub fn new(path: Option) -> Self { + let path = match path { + Some(p) => PathBuf::from(p), + None => PathBuf::from(DEFAULT_EXPORT_PATH), + }; + + let mut index_to_key_map = IndexSet::new(); + let mut storage_log_entries = HashMap::new(); + + reconstruct_genesis_state( + &mut storage_log_entries, + &mut index_to_key_map, + storage::INITAL_STATE_PATH, + ) + .unwrap(); + + Self { + storage_log_entries, + index_to_key_map, + path, + } + } +} + +impl fmt::Display for SnapshotExporter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut s = String::new(); + + for entry in &self.storage_log_entries { + s.push_str(&entry.1.to_string()); + s.push('\n'); + } + + write!(f, "{s}") + } +} + +#[async_trait] +impl Processor for SnapshotExporter { + async fn run(mut self, mut rx: mpsc::Receiver) { + // TODO: Send these from fetcher. + let miniblock_number = U64::from(0); + let l1_block_number = U64::from(0); + + while let Some(block) = rx.recv().await { + // Initial calldata. + for (key, value) in &block.initial_storage_changes { + let key = U256::from_little_endian(key); + let value = H256::from(value); + self.index_to_key_map.insert(key); + + let log = self + .storage_log_entries + .entry(key) + .or_insert(SnapshotStorageLog { + key, + value: StorageValue::default(), + miniblock_number_of_initial_write: miniblock_number, + l1_batch_number_of_initial_write: l1_block_number, + enumeration_index: 0, + }); + log.value = value; + } + + // Repeated calldata. + for (index, value) in &block.repeated_storage_changes { + let index = usize::try_from(*index).expect("truncation failed"); + // Index is 1-based so we subtract 1. + let key = *self.index_to_key_map.get_index(index - 1).unwrap(); + let value = H256::from(value); + + self.storage_log_entries + .entry(key) + .and_modify(|log| log.value = value); + } + + // TODO: We need to index these by hash. + // Factory dependencies. + // for dep in &block.factory_deps {} + } + + fs::write(&self.path, self.to_string()).expect("failed to export snapshot"); + tracing::info!("Successfully exported snapshot to {}", self.path.display()); + } +} + +// TODO: Can this be made somewhat generic? +/// Attempts to reconstruct the genesis state from a CSV file. +fn reconstruct_genesis_state( + storage_log_entries: &mut HashMap, + index_to_key: &mut IndexSet, + path: &str, +) -> Result<()> { + fn cleanup_encoding(input: &'_ str) -> &'_ str { + input + .strip_prefix("E'\\\\x") + .unwrap() + .strip_suffix('\'') + .unwrap() + } + + let mut block_batched_accesses = vec![]; + + let input = fs::read_to_string(path)?; + for line in input.lines() { + let mut separated = line.split(','); + let _derived_key = separated.next().unwrap(); + let address = separated.next().unwrap(); + let key = separated.next().unwrap(); + let value = separated.next().unwrap(); + let op_number: u32 = separated.next().unwrap().parse()?; + let _ = separated.next().unwrap(); + let miniblock_number: u32 = separated.next().unwrap().parse()?; + + if miniblock_number != 0 { + break; + } + + let address = Address::from_str(cleanup_encoding(address))?; + let key = U256::from_str_radix(cleanup_encoding(key), 16)?; + let value = U256::from_str_radix(cleanup_encoding(value), 16)?; + + let record = (address, key, value, op_number, miniblock_number); + block_batched_accesses.push(record); + } + + // Sort in block block. + block_batched_accesses.sort_by(|a, b| match a.0.cmp(&b.0) { + std::cmp::Ordering::Equal => match a.1.cmp(&b.1) { + std::cmp::Ordering::Equal => match a.3.cmp(&b.3) { + std::cmp::Ordering::Equal => { + panic!("must be unique") + } + a => a, + }, + a => a, + }, + a => a, + }); + + let mut key_set = std::collections::HashSet::new(); + + // Batch. + for el in &block_batched_accesses { + let derived_key = derive_final_address_for_params(&el.0, &el.1); + key_set.insert(derived_key); + } + + let mut batched = vec![]; + let mut it = block_batched_accesses.into_iter(); + let mut previous = it.next().unwrap(); + for el in it { + if el.0 != previous.0 || el.1 != previous.1 { + batched.push((previous.0, previous.1, previous.2, previous.4)); + } + + previous = el; + } + + // Finalize. + batched.push((previous.0, previous.1, previous.2, previous.4)); + + tracing::trace!("Have {} unique keys in the tree", key_set.len()); + + for (address, key, value, miniblock_number) in batched { + let derived_key = derive_final_address_for_params(&address, &key); + // TODO: what to do here? + // let version = tree.latest_version().unwrap_or_default(); + // let _leaf = tree.read_leaves(version, &[key]); + + // let existing_value = U256::from_big_endian(existing_leaf.leaf.value()); + // if existing_value == value { + // // we downgrade to read + // // println!("Downgrading to read") + // } else { + // we write + let mut tmp = [0u8; 32]; + value.to_big_endian(&mut tmp); + + let key = U256::from_little_endian(&derived_key); + let value = H256::from(tmp); + + let log = storage_log_entries + .entry(key) + .or_insert(SnapshotStorageLog { + key, + value: StorageValue::default(), + miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number), + l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK), + enumeration_index: 0, + }); + + log.value = value; + index_to_key.insert(key); + } + + Ok(()) +} + +fn derive_final_address_for_params(address: &Address, key: &U256) -> [u8; 32] { + let mut buffer = [0u8; 64]; + buffer[12..32].copy_from_slice(&address.0); + key.to_big_endian(&mut buffer[32..64]); + + let mut result = [0u8; 32]; + result.copy_from_slice(Blake2s256::digest(buffer).as_slice()); + + result +} diff --git a/src/processor/snapshot/types.rs b/src/processor/snapshot/types.rs new file mode 100644 index 0000000..bda619d --- /dev/null +++ b/src/processor/snapshot/types.rs @@ -0,0 +1,74 @@ +// FIXME: +#![allow(dead_code)] +use std::fmt; + +use chrono::{offset::Utc, DateTime}; +use ethers::types::{H256, U256, U64}; + +pub type L1BatchNumber = U64; +pub type MiniblockNumber = U64; + +pub type StorageKey = U256; +pub type StorageValue = H256; + +#[derive(Default, Debug)] +pub struct SnapshotHeader { + pub l1_batch_number: L1BatchNumber, + pub miniblock_number: MiniblockNumber, + /// Chunk metadata ordered by chunk_id + pub chunks: Vec, + // TODO: + // pub last_l1_batch_with_metadata: L1BatchWithMetadata, + pub generated_at: DateTime, +} + +#[derive(Default, Debug)] +pub struct SnapshotChunkMetadata { + pub key: SnapshotStorageKey, + /// Can be either a gs or filesystem path + pub filepath: String, +} + +#[derive(Default, Debug)] +pub struct SnapshotStorageKey { + pub l1_batch_number: L1BatchNumber, + /// Chunks with smaller id's must contain storage_logs with smaller hashed_keys + pub chunk_id: u64, +} + +#[derive(Default, Debug)] +pub struct SnapshotChunk { + // Sorted by hashed_keys interpreted as little-endian numbers + pub storage_logs: Vec, + pub factory_deps: Vec, +} + +// "most recent" for each key together with info when the key was first used +#[derive(Default, Debug)] +pub struct SnapshotStorageLog { + pub key: StorageKey, + pub value: StorageValue, + pub miniblock_number_of_initial_write: MiniblockNumber, + pub l1_batch_number_of_initial_write: L1BatchNumber, + pub enumeration_index: u64, +} + +impl fmt::Display for SnapshotStorageLog { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{},{},{},{},{}", + self.key, + hex::encode(self.value), + self.miniblock_number_of_initial_write, + self.l1_batch_number_of_initial_write, + self.enumeration_index + ) + } +} + +#[derive(Default, Debug)] +pub struct SnapshotFactoryDependency { + pub bytecode_hash: H256, + pub bytecode: Vec, +} From 141526db155c64b02cb52ce8bc057f0f003f52d1 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Thu, 16 Nov 2023 11:25:22 +0100 Subject: [PATCH 2/2] doc: clarify miniblocks --- src/processor/snapshot/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index f794f13..03f3a53 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -67,8 +67,7 @@ impl fmt::Display for SnapshotExporter { #[async_trait] impl Processor for SnapshotExporter { async fn run(mut self, mut rx: mpsc::Receiver) { - // TODO: Send these from fetcher. - let miniblock_number = U64::from(0); + // TODO: Send from fetcher. let l1_block_number = U64::from(0); while let Some(block) = rx.recv().await { @@ -84,7 +83,8 @@ impl Processor for SnapshotExporter { .or_insert(SnapshotStorageLog { key, value: StorageValue::default(), - miniblock_number_of_initial_write: miniblock_number, + // NOTE: This isn't stored in L1, can we procure it some other way? + miniblock_number_of_initial_write: U64::from(0), l1_batch_number_of_initial_write: l1_block_number, enumeration_index: 0, });