From 9045a1fca1d5e3f481367a0d9c64809cc2a12b86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Thu, 19 Oct 2023 15:14:10 +0300 Subject: [PATCH 1/8] Add some metrics for L1 processing Prints total progress and number of processed blocks. --- src/l1_fetcher.rs | 108 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index ade1e3d..a683123 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -23,6 +23,8 @@ use crate::{ const MAX_RETRIES: u8 = 5; /// The interval in seconds in which to poll for new blocks. const LONG_POLLING_INTERVAL_S: u64 = 120; +/// The interval in seconds in which to print metrics. +const METRICS_PRINT_INTERVAL_S: u64 = 10; #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] @@ -34,6 +36,46 @@ pub enum L1FetchError { GetTx, } +#[derive(Default)] +struct L1Metrics { + // Metrics variables. + l1_blocks_processed: u64, + l2_blocks_processed: u64, + latest_l1_block_nbr: u64, + latest_l2_block_nbr: u64, + + first_l1_block: Option, + last_l1_block: Option, +} + +impl L1Metrics { + fn print(&mut self) { + if self.first_l1_block.is_none() { + return; + } + + let first_l1_block = self.first_l1_block.unwrap(); + let progress = match self.last_l1_block { + Some(last_l1_block) => { + let total = last_l1_block - first_l1_block; + let cur = self.latest_l1_block_nbr - first_l1_block; + let perc: u64 = cur / total; + format!("{perc}%") + } + None => "∞".to_string(), + }; + + println!( + "PROGRESS: [{}] CUR BLOCK L1: {} L2: {} TOTAL BLOCKS PROCESSED L1: {} L2: {}", + progress, + self.latest_l1_block_nbr, + self.latest_l2_block_nbr, + self.l1_blocks_processed, + self.l2_blocks_processed + ); + } +} + pub struct L1Fetcher { provider: Provider, contract: Contract, @@ -81,6 +123,21 @@ impl L1Fetcher { }; } + let metrics = Arc::new(Mutex::new(L1Metrics { + first_l1_block: Some(current_l1_block_number.as_u64()), + ..Default::default() + })); + + tokio::spawn({ + let metrics = metrics.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await; + metrics.lock().await.print(); + } + } + }); + let event = self.contract.events_by_name("BlockCommit")?[0].clone(); let function = self.contract.functions_by_name("commitBlocks")?[0].clone(); @@ -92,36 +149,55 @@ impl L1Fetcher { // - Referred L1 block fetch. // - Calldata parsing. let tx_handle = tokio::spawn({ + let mut last_block = current_l1_block_number.as_u64().clone(); + let metrics = metrics.clone(); let provider = self.provider.clone(); async move { while let Some(hash) = hash_rx.recv().await { - let data = match L1Fetcher::retry_call( + let tx = match L1Fetcher::retry_call( || provider.get_transaction(hash), L1FetchError::GetTx, ) .await { - Ok(Some(tx)) => tx.input, + Ok(Some(tx)) => tx, _ => continue, }; - calldata_tx.send(data).await.unwrap(); + if let Some(current_block) = tx.block_number { + let current_block = current_block.as_u64(); + if last_block < current_block { + let mut metrics = metrics.lock().await; + metrics.l1_blocks_processed += current_block - last_block; + metrics.latest_l1_block_nbr = current_block; + last_block = current_block; + } + } + + calldata_tx.send(tx.input).await.unwrap(); } } }); - let parse_handle = tokio::spawn(async move { - while let Some(calldata) = calldata_rx.recv().await { - let blocks = match parse_calldata(&function, &calldata) { - Ok(blks) => blks, - Err(e) => { - tracing::error!("failed to parse calldata: {e}"); - continue; - } - }; + let parse_handle = tokio::spawn({ + let metrics = metrics.clone(); + async move { + while let Some(calldata) = calldata_rx.recv().await { + let blocks = match parse_calldata(&function, &calldata) { + Ok(blks) => blks, + Err(e) => { + tracing::error!("failed to parse calldata: {e}"); + continue; + } + }; - for blk in blocks { - sink.send(blk).await.unwrap(); + for blk in blocks { + // NOTE: Let's see if we want to increment this in batches, instead of each block individually. + let mut metrics = metrics.lock().await; + metrics.l2_blocks_processed += 1; + metrics.latest_l2_block_nbr = blk.block_number; + sink.send(blk).await.unwrap(); + } } } }); @@ -145,6 +221,10 @@ impl L1Fetcher { .unwrap(), ); + if disable_polling { + metrics.clone().lock().await.last_l1_block = Some(end_block_number.as_u64()); + } + loop { if disable_polling && current_l1_block_number > end_block_number { tracing::info!("Successfully reached end block. Shutting down..."); From c3ab76374fbea367b035658236e14da347c72e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Thu, 19 Oct 2023 16:16:56 +0300 Subject: [PATCH 2/8] Fixes --- src/l1_fetcher.rs | 13 ++++++++++--- src/processor/tree/tree_wrapper.rs | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 0a88d4b..9b077d8 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -50,7 +50,10 @@ struct L1Metrics { impl L1Metrics { fn print(&mut self) { - if self.first_l1_block.is_none() { + if self.first_l1_block.is_none() + || self.latest_l1_block_nbr == 0 + || self.latest_l2_block_nbr == 0 + { return; } @@ -59,7 +62,7 @@ impl L1Metrics { Some(last_l1_block) => { let total = last_l1_block - first_l1_block; let cur = self.latest_l1_block_nbr - first_l1_block; - let perc: u64 = cur / total; + let perc = (cur * 100) / total; format!("{perc}%") } None => "∞".to_string(), @@ -169,7 +172,6 @@ impl L1Fetcher { if last_block < current_block { let mut metrics = metrics.lock().await; metrics.l1_blocks_processed += current_block - last_block; - metrics.latest_l1_block_nbr = current_block; last_block = current_block; } } @@ -206,6 +208,7 @@ impl L1Fetcher { let main_handle = tokio::spawn({ let provider_clone = self.provider.clone(); let snapshot_clone = self.snapshot.clone(); + let metrics = metrics.clone(); async move { let mut latest_l2_block_number = U256::zero(); @@ -279,6 +282,8 @@ impl L1Fetcher { snapshot.lock().await.latest_l1_block_number = current_l1_block_number; } + metrics.lock().await.latest_l1_block_nbr = current_l1_block_number.as_u64(); + // Increment current block index. current_l1_block_number += BLOCK_STEP.into(); } @@ -296,6 +301,8 @@ impl L1Fetcher { tx_handle.await?; parse_handle.await?; + metrics.lock().await.print(); + Ok(()) } diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 219277b..26f5b54 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -65,7 +65,7 @@ impl TreeWrapper<'static> { hex::encode(root_hash) ); - tracing::info!("Successfully processed block {}", block.block_number); + tracing::debug!("Successfully processed block {}", block.block_number); root_hash } From d58f7b8b63790d0564c6a7037c44759a7acc7a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Thu, 19 Oct 2023 16:20:58 +0300 Subject: [PATCH 3/8] Use tracing instead of println --- src/l1_fetcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 9b077d8..678d6f3 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -68,7 +68,7 @@ impl L1Metrics { None => "∞".to_string(), }; - println!( + tracing::info!( "PROGRESS: [{}] CUR BLOCK L1: {} L2: {} TOTAL BLOCKS PROCESSED L1: {} L2: {}", progress, self.latest_l1_block_nbr, From e9c1e8eae9f8934bcfe02074c300f4058f65ef71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= <1947505+tuommaki@users.noreply.github.com> Date: Fri, 20 Oct 2023 08:07:26 +0300 Subject: [PATCH 4/8] Update src/l1_fetcher.rs Co-authored-by: Jonathan <94441036+zeapoz@users.noreply.github.com> --- src/l1_fetcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 678d6f3..7fefd3a 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -152,7 +152,7 @@ impl L1Fetcher { // - Referred L1 block fetch. // - Calldata parsing. let tx_handle = tokio::spawn({ - let mut last_block = current_l1_block_number.as_u64().clone(); + let mut last_block = current_l1_block_number.as_u64(); let metrics = metrics.clone(); let provider = self.provider.clone(); async move { From a85a6f542888f396c7b6502a3f70b2c25669e13c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Fri, 20 Oct 2023 08:10:18 +0300 Subject: [PATCH 5/8] Match pattern differently --- src/l1_fetcher.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 7fefd3a..a5aa062 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -157,14 +157,13 @@ impl L1Fetcher { let provider = self.provider.clone(); async move { while let Some(hash) = hash_rx.recv().await { - let tx = match L1Fetcher::retry_call( + let Ok(Some(tx)) = L1Fetcher::retry_call( || provider.get_transaction(hash), L1FetchError::GetTx, ) .await - { - Ok(Some(tx)) => tx, - _ => continue, + else { + continue; }; if let Some(current_block) = tx.block_number { From 688fea1ebbb317bdb80273dbb5d3bd65ef2e6793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Fri, 20 Oct 2023 10:55:19 +0300 Subject: [PATCH 6/8] Padding for percentages --- src/l1_fetcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index a5aa062..6a4d674 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -63,7 +63,7 @@ impl L1Metrics { let total = last_l1_block - first_l1_block; let cur = self.latest_l1_block_nbr - first_l1_block; let perc = (cur * 100) / total; - format!("{perc}%") + format!("{perc:>2}%") } None => "∞".to_string(), }; From 7401b0b0c29f841e0bfa00cfc4eead2b6d10109f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Fri, 20 Oct 2023 12:20:24 +0300 Subject: [PATCH 7/8] Always display percentages --- src/l1_fetcher.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 6a4d674..4690872 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -62,7 +62,8 @@ impl L1Metrics { Some(last_l1_block) => { let total = last_l1_block - first_l1_block; let cur = self.latest_l1_block_nbr - first_l1_block; - let perc = (cur * 100) / total; + // If polling past `last_l1_block`, stop at 100%. + let perc = std::cmp::min((cur * 100) / total, 100); format!("{perc:>2}%") } None => "∞".to_string(), @@ -226,9 +227,8 @@ impl L1Fetcher { .unwrap(), ); - if disable_polling { - metrics.clone().lock().await.last_l1_block = Some(end_block_number.as_u64()); - } + // Update last L1 block to metrics calculation. + metrics.clone().lock().await.last_l1_block = Some(end_block_number.as_u64()); loop { // Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal. From 4328bbfdcd9f48a26377c972830760ee0b0e6e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Fri, 20 Oct 2023 12:39:16 +0300 Subject: [PATCH 8/8] Drop Option<> from last_l1_block --- src/l1_fetcher.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 4690872..3cd2183 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -45,7 +45,7 @@ struct L1Metrics { latest_l2_block_nbr: u64, first_l1_block: Option, - last_l1_block: Option, + last_l1_block: u64, } impl L1Metrics { @@ -58,15 +58,12 @@ impl L1Metrics { } let first_l1_block = self.first_l1_block.unwrap(); - let progress = match self.last_l1_block { - Some(last_l1_block) => { - let total = last_l1_block - first_l1_block; - let cur = self.latest_l1_block_nbr - first_l1_block; - // If polling past `last_l1_block`, stop at 100%. - let perc = std::cmp::min((cur * 100) / total, 100); - format!("{perc:>2}%") - } - None => "∞".to_string(), + let progress = { + let total = self.last_l1_block - first_l1_block; + let cur = self.latest_l1_block_nbr - first_l1_block; + // If polling past `last_l1_block`, stop at 100%. + let perc = std::cmp::min((cur * 100) / total, 100); + format!("{perc:>2}%") }; tracing::info!( @@ -228,7 +225,7 @@ impl L1Fetcher { ); // Update last L1 block to metrics calculation. - metrics.clone().lock().await.last_l1_block = Some(end_block_number.as_u64()); + metrics.clone().lock().await.last_l1_block = end_block_number.as_u64(); loop { // Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.