From 9e30635eb7408928b919de3c7979ba99f683bf80 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Fri, 2 Aug 2024 12:45:32 +0200 Subject: [PATCH 1/5] feat: map l1 batch numbers to their l1 block number equivalent --- src/cli.rs | 3 - src/main.rs | 33 ++-- src/processor/snapshot/exporter.rs | 17 +- src/processor/snapshot/importer.rs | 11 +- src/processor/snapshot/mod.rs | 7 +- src/processor/tree/tree_wrapper.rs | 9 +- state-reconstruct-fetcher/src/l1_fetcher.rs | 190 +++++++++++++++----- state-reconstruct-fetcher/src/metrics.rs | 2 +- state-reconstruct-storage/src/snapshot.rs | 6 +- state-reconstruct-storage/src/types.rs | 2 +- 10 files changed, 192 insertions(+), 88 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index f5c5380..3b756e4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -109,9 +109,6 @@ pub enum Command { /// The path to the storage solution. #[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)] db_path: Option, - /// Number of chunks to split storage chunks into. - #[arg(short, long, default_value_t = snapshot::DEFAULT_NUM_CHUNKS)] - num_chunks: usize, /// The directory to export the snapshot files to. directory: String, }, diff --git a/src/main.rs b/src/main.rs index 78c544c..0648b4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,9 +12,9 @@ use std::{ path::{Path, PathBuf}, }; +use ::eyre::Result; use clap::Parser; use cli::{Cli, Command, ReconstructSource}; -use eyre::Result; use processor::snapshot::{ exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder, }; @@ -75,24 +75,29 @@ async fn main() -> Result<()> { None => env::current_dir()?.join(storage::DEFAULT_DB_NAME), }; - if let Some(directory) = snapshot { - tracing::info!("Trying to restore state from snapshot..."); - let importer = SnapshotImporter::new(PathBuf::from(directory)); - importer.run(&db_path.clone()).await?; - } + let snapshot_end_batch = match snapshot { + Some(directory) => { + tracing::info!("Trying to restore state from snapshot..."); + let importer = SnapshotImporter::new(PathBuf::from(directory)); + let end_batch = importer.run(&db_path.clone()).await?; + Some(end_batch) + } + None => None, + }; match source { ReconstructSource::L1 { l1_fetcher_options } => { let fetcher_options = l1_fetcher_options.into(); let processor = TreeProcessor::new(db_path.clone()).await?; let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_inner_db()))?; + let (tx, rx) = mpsc::channel::(5); let processor_handle = tokio::spawn(async move { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, snapshot_end_batch).await?; processor_handle.await?; } ReconstructSource::File { file } => { @@ -128,7 +133,7 @@ async fn main() -> Result<()> { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, None).await?; processor_handle.await?; tracing::info!("Successfully downloaded CommitBlocks to {}", file); @@ -159,7 +164,7 @@ async fn main() -> Result<()> { let processor = SnapshotBuilder::new(db_path); let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into(); - if let Ok(batch_number) = processor.get_latest_l1_batch_number() { + if let Ok(batch_number) = processor.get_latest_l1_block_number() { let batch_number = batch_number.as_u64(); if batch_number > ethereum::GENESIS_BLOCK { tracing::info!( @@ -176,18 +181,14 @@ async fn main() -> Result<()> { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, None).await?; processor_handle.await?; } - Command::ExportSnapshot { - db_path, - num_chunks, - directory, - } => { + Command::ExportSnapshot { db_path, directory } => { let export_path = Path::new(&directory); std::fs::create_dir_all(export_path)?; let exporter = SnapshotExporter::new(export_path, db_path)?; - exporter.export_snapshot(num_chunks)?; + exporter.export_snapshot()?; tracing::info!("Succesfully exported snapshot files to \"{directory}\"!"); } diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index a37ab82..5f5abec 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -16,6 +16,9 @@ use crate::processor::snapshot::{ DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, }; +/// Number of storage logs included in each chunk. +const SNAPSHOT_CHUNK_SIZE: usize = 1_000_000; + pub struct SnapshotExporter { basedir: PathBuf, database: SnapshotDatabase, @@ -35,7 +38,7 @@ impl SnapshotExporter { }) } - pub fn export_snapshot(&self, num_chunks: usize) -> Result<()> { + pub fn export_snapshot(&self) -> Result<()> { let l1_batch_number = self .database .get_latest_l1_batch_number()? @@ -50,7 +53,7 @@ impl SnapshotExporter { ..Default::default() }; - self.export_storage_logs(num_chunks, &mut header)?; + self.export_storage_logs(&mut header)?; self.export_factory_deps(&mut header)?; let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME); @@ -97,7 +100,7 @@ impl SnapshotExporter { Ok(()) } - fn export_storage_logs(&self, num_chunks: usize, header: &mut SnapshotHeader) -> Result<()> { + fn export_storage_logs(&self, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting storage logs..."); let num_logs = self.database.get_last_repeated_key_index()?; @@ -108,17 +111,17 @@ impl SnapshotExporter { .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let chunk_size = num_logs / num_chunks as u64; + let num_chunks = (num_logs / SNAPSHOT_CHUNK_SIZE as u64) + 1; for chunk_id in 0..num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks); let mut chunk = SnapshotStorageLogsChunk::default(); - for _ in 0..chunk_size { + for _ in 0..SNAPSHOT_CHUNK_SIZE { if let Some(Ok((_, key))) = iterator.next() { let key = U256::from_big_endian(&key); if let Ok(Some(entry)) = self.database.get_storage_log(&key) { chunk.storage_logs.push(entry); - } + }; } else { break; } @@ -131,7 +134,7 @@ impl SnapshotExporter { header .storage_logs_chunks .push(SnapshotStorageLogsChunkMetadata { - chunk_id: chunk_id as u64, + chunk_id, filepath: path .clone() .into_os_string() diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index 140a55f..9a39457 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -6,7 +6,6 @@ use std::{ use ethers::types::U64; use eyre::Result; use regex::{Captures, Regex}; -use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK; use state_reconstruct_storage::types::{ Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata, @@ -29,7 +28,8 @@ impl SnapshotImporter { Self { directory } } - pub async fn run(self, db_path: &Path) -> Result<()> { + /// Run the snapshot importer task. Returns the batch number contained in the header. + pub async fn run(self, db_path: &Path) -> Result { let (tx, rx) = mpsc::channel(1); let header = self.read_header().expect("failed to read header filepath"); @@ -46,14 +46,13 @@ impl SnapshotImporter { } }); - let l1_batch_number = header.l1_batch_number + GENESIS_BLOCK; + let l1_batch_number = U64::from(header.l1_batch_number); let mut tree = TreeWrapper::new_snapshot_wrapper(db_path) .await .expect("can't create tree"); - tree.restore_from_snapshot(rx, U64::from(l1_batch_number)) - .await?; + tree.restore_from_snapshot(rx, l1_batch_number).await?; - Ok(()) + Ok(l1_batch_number) } fn read_header(&self) -> Result { diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 07c2943..eda7a65 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -25,7 +25,6 @@ use crate::util::{h256_to_u256, unpack_block_info}; pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; -pub const DEFAULT_NUM_CHUNKS: usize = 10; pub struct SnapshotBuilder { database: SnapshotDatabase, @@ -53,8 +52,8 @@ impl SnapshotBuilder { Self { database } } - // Gets the next L1 batch number to be processed for ues in state recovery. - pub fn get_latest_l1_batch_number(&self) -> Result { + // Gets the next L1 block number to be processed for ues in state recovery. + pub fn get_latest_l1_block_number(&self) -> Result { self.database .get_latest_l1_block_number() .map(|o| o.unwrap_or(U64::from(0))) @@ -107,7 +106,7 @@ impl Processor for SnapshotBuilder { if self .database - .update_storage_log_value(index as u64, &value.to_fixed_bytes()) + .update_storage_log_value(index as u64, value) .is_err() { let max_idx = self diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 26bbc50..3577d0c 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -168,18 +168,21 @@ impl TreeWrapper { total_tree_entries += tree_entries.len(); self.tree.extend(tree_entries); - tracing::info!("Chunk {} was succesfully imported!", i + 1); + tracing::info!("Chunk {} was successfully imported!", i + 1); i += 1; } tracing::info!( - "Succesfully imported snapshot containing {total_tree_entries} storage logs!", + "Successfully imported snapshot containing {total_tree_entries} storage logs!", ); + let root_hash = hex::encode(self.tree.latest_root_hash()); + tracing::debug!("Current root hash is: {}", root_hash); + self.inner_db .lock() .await - .set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; + .set_latest_l1_batch_number(l1_batch_number.as_u64())?; Ok(()) } diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index 522ff31..f37dd52 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -4,7 +4,7 @@ use ethers::{ abi::{Contract, Function}, prelude::*, }; -use eyre::Result; +use eyre::{eyre, OptionExt, Result}; use rand::random; use state_reconstruct_storage::reconstruction::ReconstructionDatabase; use thiserror::Error; @@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken; use crate::{ blob_http_client::BlobHttpClient, - constants::ethereum::{BLOB_BLOCK, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR}, + constants::ethereum::{BLOB_BLOCK, BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR}, metrics::L1Metrics, types::{v1::V1, v2::V2, CommitBlock, ParseError}, }; @@ -124,26 +124,56 @@ impl L1Fetcher { }) } - pub async fn run(&self, sink: mpsc::Sender) -> Result<()> { + /// Decide which block to start fetching from based on the following criteria: + /// - Has the tool already made progress before? + /// - Was a snapshot just imported? + /// - Did the user set an explicit start block? + /// + /// Returns the block number to start fetching from. + async fn decide_start_block(&self, snapshot_end_batch: Option) -> Result { // Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied, // in which case, start from that instead. If no argument was supplied and a state snapshot // exists, start from the block number specified in that snapshot. - let mut current_l1_block_number = U64::from(self.config.start_block); + let mut start_block = U64::from(self.config.start_block); + + // We also have to check if a snapshot was recently imported. If so we + // should continue from the last imported batch. + if let Some(target_batch_number) = snapshot_end_batch { + tracing::info!( + "Trying to map snapshots latest L1 batch number, this might take a while..." + ); + match self.map_l1_batch_to_l1_block(U256::from(target_batch_number.as_u64())).await { + Ok(block_number) => return Ok(block_number), + Err(e) => tracing::error!("Unable to find a corresponding L1 block number for the latest imported L1 batch: {e}"), + } + } + // User might have supplied their own start block, in that case we shouldn't enforce the // use of the snapshot value. - if current_l1_block_number == GENESIS_BLOCK.into() { + if start_block == GENESIS_BLOCK.into() { if let Some(snapshot) = &self.inner_db { let snapshot_latest_l1_block_number = snapshot.lock().await.get_latest_l1_block_number()?; - if snapshot_latest_l1_block_number > current_l1_block_number { - current_l1_block_number = snapshot_latest_l1_block_number; - tracing::info!( - "Found snapshot, starting from L1 block {current_l1_block_number}" - ); + if snapshot_latest_l1_block_number > start_block { + start_block = snapshot_latest_l1_block_number; } }; } + Ok(start_block) + } + + pub async fn run( + &self, + sink: mpsc::Sender, + snapshot_end_batch: Option, + ) -> Result<()> { + let current_l1_block_number = self.decide_start_block(snapshot_end_batch).await?; + tracing::info!( + "Starting fetching from block number: {}", + current_l1_block_number + ); + let end_block = self .config .block_count @@ -263,7 +293,7 @@ impl L1Fetcher { let metrics = self.metrics.clone(); let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone(); - let provider_clone = self.provider.clone(); + let provider = self.provider.clone(); let block_step = self.config.block_step; Ok(tokio::spawn({ @@ -279,28 +309,20 @@ impl L1Fetcher { } let Some(end_block_number) = end_block else { - if let Ok(new_end) = L1Fetcher::retry_call( - || provider_clone.get_block(BlockNumber::Finalized), - L1FetchError::GetEndBlockNumber, - ) - .await + if let Ok(new_end_block_number_candidate) = + L1Fetcher::get_last_l1_block_number(&provider).await { - if let Some(found_block) = new_end { - if let Some(ebn) = found_block.number { - let end_block_number = - if let Some(end_block_limit) = max_end_block { - if end_block_limit < ebn { - end_block_limit - } else { - ebn - } - } else { - ebn - }; - end_block = Some(end_block_number); - metrics.lock().await.last_l1_block = end_block_number.as_u64(); + let end_block_number = if let Some(end_block_limit) = max_end_block { + if end_block_limit < new_end_block_number_candidate { + end_block_limit + } else { + new_end_block_number_candidate } - } + } else { + new_end_block_number_candidate + }; + end_block = Some(end_block_number); + metrics.lock().await.last_l1_block = end_block_number.as_u64(); } else { tracing::debug!("Cannot get latest block number..."); cancellation_token.cancelled_else_long_timeout().await; @@ -335,11 +357,9 @@ impl L1Fetcher { // Grab all relevant logs. let before = Instant::now(); - if let Ok(logs) = L1Fetcher::retry_call( - || provider_clone.get_logs(&filter), - L1FetchError::GetLogs, - ) - .await + if let Ok(logs) = + L1Fetcher::retry_call(|| provider.get_logs(&filter), L1FetchError::GetLogs) + .await { let duration = before.elapsed(); metrics.lock().await.log_acquisition.add(duration); @@ -437,13 +457,8 @@ impl L1Fetcher { while let Some(hash) = hash_rx.recv().await { let tx = loop { let before = Instant::now(); - match L1Fetcher::retry_call( - || provider.get_transaction(hash), - L1FetchError::GetTx, - ) - .await - { - Ok(Some(tx)) => { + match L1Fetcher::get_transaction_by_hash(&provider, hash).await { + Ok(tx) => { let duration = before.elapsed(); metrics.lock().await.tx_acquisition.add(duration); break tx; @@ -582,6 +597,94 @@ impl L1Fetcher { })) } + /// Use binary-search to find the Ethereum block on which a particular batch + /// was published. + pub async fn map_l1_batch_to_l1_block(&self, target_batch_number: U256) -> Result { + let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone(); + let provider = self.provider.clone(); + + let mut lower_block_number = U64::from(GENESIS_BLOCK); + let mut upper_block_number = L1Fetcher::get_last_l1_block_number(&provider).await?; + + let mut current_block_number = (upper_block_number + lower_block_number) / 2; + loop { + let mut target_is_higher = false; + let mut target_is_lower = false; + + let filter = Filter::new() + .address(ZK_SYNC_ADDR.parse::
().unwrap()) + .topic0(event.signature()) + .from_block(current_block_number) + .to_block(current_block_number + BLOCK_STEP); + + if let Ok(logs) = + L1Fetcher::retry_call(|| provider.get_logs(&filter), L1FetchError::GetLogs).await + { + for log in logs { + let l1_batch_number = U256::from_big_endian(log.topics[1].as_fixed_bytes()); + let tx_hash = if let Some(hash) = log.transaction_hash { + hash + } else { + continue; + }; + + match l1_batch_number.cmp(&target_batch_number) { + cmp::Ordering::Equal => { + let block_number = + L1Fetcher::get_transaction_by_hash(&provider, tx_hash) + .await + .map(|tx| tx.block_number)?; + return block_number + .ok_or_eyre("found transaction, but it has no block number"); + } + cmp::Ordering::Less => target_is_higher = true, + cmp::Ordering::Greater => target_is_lower = true, + } + } + } + + if target_is_higher { + lower_block_number = current_block_number; + current_block_number = (upper_block_number + lower_block_number) / 2; + } else if target_is_lower { + upper_block_number = current_block_number; + current_block_number = (upper_block_number + lower_block_number) / 2; + } + + // Batch number was not found. + if upper_block_number.saturating_sub(lower_block_number) <= U64::from(1) { + return Err(eyre!( + "provided batch number ({target_batch_number}) does not exist yet!" + )); + }; + } + } + + /// Get a specified transaction on L1 by its hash. + async fn get_transaction_by_hash(provider: &Provider, hash: H256) -> Result { + match L1Fetcher::retry_call(|| provider.get_transaction(hash), L1FetchError::GetTx).await { + Ok(Some(tx)) => Ok(tx), + Ok(None) => Err(eyre!("unable to find transaction with hash: {}", hash)), + Err(e) => Err(e), + } + } + + /// Get the last published L1 block marked as `Finalized`. + async fn get_last_l1_block_number(provider: &Provider) -> Result { + let Some(last_block) = L1Fetcher::retry_call( + || provider.get_block(BlockNumber::Finalized), + L1FetchError::GetEndBlockNumber, + ) + .await? + else { + return Err(eyre!("latest finalized block was not found")); + }; + + last_block + .number + .ok_or_eyre("found latest finalized block, but it contained no block number") + } + async fn retry_call(callback: impl Fn() -> Fut, err: L1FetchError) -> Result where Fut: Future>, @@ -598,7 +701,6 @@ impl L1Fetcher { Err(err.into()) } } - pub async fn parse_calldata( l1_block_number: u64, commit_candidates: &[Function], diff --git a/state-reconstruct-fetcher/src/metrics.rs b/state-reconstruct-fetcher/src/metrics.rs index f2dd3be..3c8a487 100644 --- a/state-reconstruct-fetcher/src/metrics.rs +++ b/state-reconstruct-fetcher/src/metrics.rs @@ -62,7 +62,7 @@ impl L1Metrics { }; tracing::info!( - "PROGRESS: [{}] CUR L1 BLOCK: {} L2 BATCH: {} TOTAL PROCESSED L1 BLOCKS: {} L2 BATCHES: {}", + "PROGRESS: [{}] CUR L1 BLOCK: {} L1 BATCH: {} TOTAL PROCESSED L1 BLOCKS: {} L1 BATCHES: {}", progress, self.latest_l1_block_num, self.latest_l1_batch_num, diff --git a/state-reconstruct-storage/src/snapshot.rs b/state-reconstruct-storage/src/snapshot.rs index c974369..7078cd1 100644 --- a/state-reconstruct-storage/src/snapshot.rs +++ b/state-reconstruct-storage/src/snapshot.rs @@ -74,7 +74,7 @@ impl SnapshotDatabase { PackingType::NoCompression(v) | PackingType::Transform(v) => v, PackingType::Add(_) | PackingType::Sub(_) => { let existing_value = if let Some(log) = self.get_storage_log(&key)? { - U256::from(log.value.to_fixed_bytes()) + U256::from_big_endian(log.value.as_bytes()) } else { U256::from(0) }; @@ -134,7 +134,7 @@ impl SnapshotDatabase { } } - pub fn update_storage_log_value(&self, key_idx: u64, value: &[u8]) -> Result<()> { + pub fn update_storage_log_value(&self, key_idx: u64, value: H256) -> Result<()> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. let storage_logs = self.cf_handle(snapshot_columns::STORAGE_LOGS).unwrap(); @@ -143,7 +143,7 @@ impl SnapshotDatabase { // XXX: These should really be inside a transaction... let entry_bs = self.get_cf(storage_logs, &key)?.unwrap(); let mut entry: SnapshotStorageLog = bincode::deserialize(&entry_bs)?; - entry.value = H256::from(<&[u8; 32]>::try_from(value).unwrap()); + entry.value = value; self.put_cf(storage_logs, key, bincode::serialize(&entry)?) .map_err(Into::into) } diff --git a/state-reconstruct-storage/src/types.rs b/state-reconstruct-storage/src/types.rs index e0c4806..e837539 100644 --- a/state-reconstruct-storage/src/types.rs +++ b/state-reconstruct-storage/src/types.rs @@ -169,7 +169,7 @@ impl Proto for SnapshotStorageLog { fn from_proto(proto: Self::ProtoStruct) -> Result { let value_bytes: [u8; 32] = proto.storage_value().try_into()?; Ok(Self { - key: U256::from_big_endian(proto.storage_key()), + key: U256::from_big_endian(proto.hashed_key()), value: StorageValue::from(&value_bytes), l1_batch_number_of_initial_write: proto.l1_batch_number_of_initial_write().into(), enumeration_index: proto.enumeration_index(), From 29bf715a33abf9d32bea99e67a47a742ada935c1 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Tue, 6 Aug 2024 15:26:22 +0200 Subject: [PATCH 2/5] chore: ceil instead of add one --- src/processor/snapshot/exporter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index 5f5abec..d5f9d61 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -111,7 +111,8 @@ impl SnapshotExporter { .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let num_chunks = (num_logs / SNAPSHOT_CHUNK_SIZE as u64) + 1; + let num_chunks = num_logs.div_ceil(SNAPSHOT_CHUNK_SIZE as u64); + println!("{num_chunks}"); for chunk_id in 0..num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks); From a24e3c774263bdc3950cd48eb952c2cacb024202 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Tue, 6 Aug 2024 15:50:24 +0200 Subject: [PATCH 3/5] chore: make mapping task cancellable --- state-reconstruct-fetcher/src/l1_fetcher.rs | 52 +++++++++++++-------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index f37dd52..1deb92d 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -94,6 +94,7 @@ pub struct L1Fetcher { config: L1FetcherOptions, inner_db: Option>>, metrics: Arc>, + cancellation_token: FetcherCancellationToken, } impl L1Fetcher { @@ -114,6 +115,7 @@ impl L1Fetcher { config.start_block }; let metrics = Arc::new(Mutex::new(L1Metrics::new(initial_l1_block))); + let cancellation_token = FetcherCancellationToken::new(); Ok(L1Fetcher { provider, @@ -121,6 +123,7 @@ impl L1Fetcher { config, inner_db, metrics, + cancellation_token, }) } @@ -168,6 +171,9 @@ impl L1Fetcher { sink: mpsc::Sender, snapshot_end_batch: Option, ) -> Result<()> { + // Wait for shutdown signal in background. + self.spawn_sigint_handler(); + let current_l1_block_number = self.decide_start_block(snapshot_end_batch).await?; tracing::info!( "Starting fetching from block number: {}", @@ -200,22 +206,6 @@ impl L1Fetcher { } }); - // Wait for shutdown signal in background. - let token = FetcherCancellationToken::new(); - let cloned_token = token.clone(); - tokio::spawn(async move { - match tokio::signal::ctrl_c().await { - Ok(()) => { - tracing::info!("Shutdown signal received, finishing up and shutting down..."); - } - Err(err) => { - tracing::error!("Shutdown signal failed: {err}"); - } - }; - - cloned_token.cancel(); - }); - let (hash_tx, hash_rx) = mpsc::channel(5); let (calldata_tx, calldata_rx) = mpsc::channel(5); @@ -232,13 +222,14 @@ impl L1Fetcher { let tx_handle = self.spawn_tx_handler( hash_rx, calldata_tx, - token.clone(), + self.cancellation_token.clone(), current_l1_block_number.as_u64(), ); - let parse_handle = self.spawn_parsing_handler(calldata_rx, sink, token.clone())?; + let parse_handle = + self.spawn_parsing_handler(calldata_rx, sink, self.cancellation_token.clone())?; let main_handle = self.spawn_main_handler( hash_tx, - token, + self.cancellation_token.clone(), current_l1_block_number, end_block, disable_polling, @@ -597,6 +588,23 @@ impl L1Fetcher { })) } + /// Spawn the handler that will wait for shutdown signal in background. + fn spawn_sigint_handler(&self) { + let cloned_token = self.cancellation_token.clone(); + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => { + tracing::info!("Shutdown signal received, finishing up and shutting down..."); + } + Err(err) => { + tracing::error!("Shutdown signal failed: {err}"); + } + }; + + cloned_token.cancel(); + }); + } + /// Use binary-search to find the Ethereum block on which a particular batch /// was published. pub async fn map_l1_batch_to_l1_block(&self, target_batch_number: U256) -> Result { @@ -607,7 +615,7 @@ impl L1Fetcher { let mut upper_block_number = L1Fetcher::get_last_l1_block_number(&provider).await?; let mut current_block_number = (upper_block_number + lower_block_number) / 2; - loop { + while !self.cancellation_token.is_cancelled() { let mut target_is_higher = false; let mut target_is_lower = false; @@ -641,6 +649,8 @@ impl L1Fetcher { cmp::Ordering::Greater => target_is_lower = true, } } + } else if self.cancellation_token.is_cancelled() { + break; } if target_is_higher { @@ -658,6 +668,8 @@ impl L1Fetcher { )); }; } + + Err(eyre!("l1 batch to block task was cancelled")) } /// Get a specified transaction on L1 by its hash. From b76b99ac38258228578b67a0da9943c549fee1c1 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Wed, 7 Aug 2024 08:58:06 +0200 Subject: [PATCH 4/5] chore: remove `println!` --- src/processor/snapshot/exporter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index d5f9d61..62db039 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -112,7 +112,6 @@ impl SnapshotExporter { .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); let num_chunks = num_logs.div_ceil(SNAPSHOT_CHUNK_SIZE as u64); - println!("{num_chunks}"); for chunk_id in 0..num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks); From 753f6314f035452f563e8e17b9145e2bf427dc39 Mon Sep 17 00:00:00 2001 From: zeapoz Date: Wed, 7 Aug 2024 09:04:16 +0200 Subject: [PATCH 5/5] chore: clone tokens directly instead of passing around --- state-reconstruct-fetcher/src/l1_fetcher.rs | 27 +++++++-------------- 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index 1deb92d..e0a6ad7 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -219,21 +219,11 @@ impl L1Fetcher { // - BlockCommit event filter (main). // - Referred L1 block fetch (tx). // - Calldata parsing (parse). - let tx_handle = self.spawn_tx_handler( - hash_rx, - calldata_tx, - self.cancellation_token.clone(), - current_l1_block_number.as_u64(), - ); - let parse_handle = - self.spawn_parsing_handler(calldata_rx, sink, self.cancellation_token.clone())?; - let main_handle = self.spawn_main_handler( - hash_tx, - self.cancellation_token.clone(), - current_l1_block_number, - end_block, - disable_polling, - )?; + let tx_handle = + self.spawn_tx_handler(hash_rx, calldata_tx, current_l1_block_number.as_u64()); + let parse_handle = self.spawn_parsing_handler(calldata_rx, sink)?; + let main_handle = + self.spawn_main_handler(hash_tx, current_l1_block_number, end_block, disable_polling)?; tx_handle.await?; let last_processed_l1_block_num = parse_handle.await?; @@ -273,7 +263,6 @@ impl L1Fetcher { fn spawn_main_handler( &self, hash_tx: mpsc::Sender, - cancellation_token: FetcherCancellationToken, mut current_l1_block_number: U64, max_end_block: Option, disable_polling: bool, @@ -286,6 +275,7 @@ impl L1Fetcher { let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone(); let provider = self.provider.clone(); let block_step = self.config.block_step; + let cancellation_token = self.cancellation_token.clone(); Ok(tokio::spawn({ async move { @@ -437,11 +427,11 @@ impl L1Fetcher { &self, mut hash_rx: mpsc::Receiver, l1_tx_tx: mpsc::Sender, - cancellation_token: FetcherCancellationToken, mut last_block: u64, ) -> tokio::task::JoinHandle<()> { let metrics = self.metrics.clone(); let provider = self.provider.clone(); + let cancellation_token = self.cancellation_token.clone(); tokio::spawn({ async move { @@ -499,11 +489,12 @@ impl L1Fetcher { &self, mut l1_tx_rx: mpsc::Receiver, sink: mpsc::Sender, - cancellation_token: FetcherCancellationToken, ) -> Result>> { let metrics = self.metrics.clone(); let contracts = self.contracts.clone(); let client = BlobHttpClient::new(self.config.blobs_url.clone())?; + let cancellation_token = self.cancellation_token.clone(); + Ok(tokio::spawn({ async move { let mut boojum_mode = false;