diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 69edc2f..3cd2183 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -23,6 +23,8 @@ use crate::{ const MAX_RETRIES: u8 = 5; /// The interval in seconds in which to poll for new blocks. const LONG_POLLING_INTERVAL_S: u64 = 120; +/// The interval in seconds in which to print metrics. +const METRICS_PRINT_INTERVAL_S: u64 = 10; #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] @@ -34,6 +36,47 @@ pub enum L1FetchError { GetTx, } +#[derive(Default)] +struct L1Metrics { + // Metrics variables. + l1_blocks_processed: u64, + l2_blocks_processed: u64, + latest_l1_block_nbr: u64, + latest_l2_block_nbr: u64, + + first_l1_block: Option, + last_l1_block: u64, +} + +impl L1Metrics { + fn print(&mut self) { + if self.first_l1_block.is_none() + || self.latest_l1_block_nbr == 0 + || self.latest_l2_block_nbr == 0 + { + return; + } + + let first_l1_block = self.first_l1_block.unwrap(); + let progress = { + let total = self.last_l1_block - first_l1_block; + let cur = self.latest_l1_block_nbr - first_l1_block; + // If polling past `last_l1_block`, stop at 100%. + let perc = std::cmp::min((cur * 100) / total, 100); + format!("{perc:>2}%") + }; + + tracing::info!( + "PROGRESS: [{}] CUR BLOCK L1: {} L2: {} TOTAL BLOCKS PROCESSED L1: {} L2: {}", + progress, + self.latest_l1_block_nbr, + self.latest_l2_block_nbr, + self.l1_blocks_processed, + self.l2_blocks_processed + ); + } +} + pub struct L1Fetcher { provider: Provider, contract: Contract, @@ -81,6 +124,21 @@ impl L1Fetcher { }; } + let metrics = Arc::new(Mutex::new(L1Metrics { + first_l1_block: Some(current_l1_block_number.as_u64()), + ..Default::default() + })); + + tokio::spawn({ + let metrics = metrics.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await; + metrics.lock().await.print(); + } + } + }); + let event = self.contract.events_by_name("BlockCommit")?[0].clone(); let function = self.contract.functions_by_name("commitBlocks")?[0].clone(); @@ -92,36 +150,53 @@ impl L1Fetcher { // - Referred L1 block fetch. // - Calldata parsing. let tx_handle = tokio::spawn({ + let mut last_block = current_l1_block_number.as_u64(); + let metrics = metrics.clone(); let provider = self.provider.clone(); async move { while let Some(hash) = hash_rx.recv().await { - let data = match L1Fetcher::retry_call( + let Ok(Some(tx)) = L1Fetcher::retry_call( || provider.get_transaction(hash), L1FetchError::GetTx, ) .await - { - Ok(Some(tx)) => tx.input, - _ => continue, + else { + continue; }; - calldata_tx.send(data).await.unwrap(); + if let Some(current_block) = tx.block_number { + let current_block = current_block.as_u64(); + if last_block < current_block { + let mut metrics = metrics.lock().await; + metrics.l1_blocks_processed += current_block - last_block; + last_block = current_block; + } + } + + calldata_tx.send(tx.input).await.unwrap(); } } }); - let parse_handle = tokio::spawn(async move { - while let Some(calldata) = calldata_rx.recv().await { - let blocks = match parse_calldata(&function, &calldata) { - Ok(blks) => blks, - Err(e) => { - tracing::error!("failed to parse calldata: {e}"); - continue; - } - }; + let parse_handle = tokio::spawn({ + let metrics = metrics.clone(); + async move { + while let Some(calldata) = calldata_rx.recv().await { + let blocks = match parse_calldata(&function, &calldata) { + Ok(blks) => blks, + Err(e) => { + tracing::error!("failed to parse calldata: {e}"); + continue; + } + }; - for blk in blocks { - sink.send(blk).await.unwrap(); + for blk in blocks { + // NOTE: Let's see if we want to increment this in batches, instead of each block individually. + let mut metrics = metrics.lock().await; + metrics.l2_blocks_processed += 1; + metrics.latest_l2_block_nbr = blk.block_number; + sink.send(blk).await.unwrap(); + } } } }); @@ -130,6 +205,7 @@ impl L1Fetcher { let main_handle = tokio::spawn({ let provider_clone = self.provider.clone(); let snapshot_clone = self.snapshot.clone(); + let metrics = metrics.clone(); async move { let mut latest_l2_block_number = U256::zero(); @@ -148,6 +224,9 @@ impl L1Fetcher { .unwrap(), ); + // Update last L1 block to metrics calculation. + metrics.clone().lock().await.last_l1_block = end_block_number.as_u64(); + loop { // Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal. if (disable_polling && current_l1_block_number > end_block_number) @@ -199,6 +278,8 @@ impl L1Fetcher { snapshot.lock().await.latest_l1_block_number = current_l1_block_number; } + metrics.lock().await.latest_l1_block_nbr = current_l1_block_number.as_u64(); + // Increment current block index. current_l1_block_number += BLOCK_STEP.into(); } @@ -216,6 +297,8 @@ impl L1Fetcher { tx_handle.await?; parse_handle.await?; + metrics.lock().await.print(); + Ok(()) } diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 219277b..26f5b54 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -65,7 +65,7 @@ impl TreeWrapper<'static> { hex::encode(root_hash) ); - tracing::info!("Successfully processed block {}", block.block_number); + tracing::debug!("Successfully processed block {}", block.block_number); root_hash }