Skip to content

Commit

Permalink
Add some metrics for L1 processing (#29)
Browse files Browse the repository at this point in the history
Prints total progress and number of processed blocks.

Co-authored-by: Jonathan <[email protected]>
  • Loading branch information
tuommaki and zeapoz authored Oct 20, 2023
1 parent ef16d49 commit 1918eb1
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 17 deletions.
115 changes: 99 additions & 16 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::{
const MAX_RETRIES: u8 = 5;
/// The interval in seconds in which to poll for new blocks.
const LONG_POLLING_INTERVAL_S: u64 = 120;
/// The interval in seconds in which to print metrics.
const METRICS_PRINT_INTERVAL_S: u64 = 10;

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
Expand All @@ -34,6 +36,47 @@ pub enum L1FetchError {
GetTx,
}

#[derive(Default)]
struct L1Metrics {
// Metrics variables.
l1_blocks_processed: u64,
l2_blocks_processed: u64,
latest_l1_block_nbr: u64,
latest_l2_block_nbr: u64,

first_l1_block: Option<u64>,
last_l1_block: u64,
}

impl L1Metrics {
fn print(&mut self) {
if self.first_l1_block.is_none()
|| self.latest_l1_block_nbr == 0
|| self.latest_l2_block_nbr == 0
{
return;
}

let first_l1_block = self.first_l1_block.unwrap();
let progress = {
let total = self.last_l1_block - first_l1_block;
let cur = self.latest_l1_block_nbr - first_l1_block;
// If polling past `last_l1_block`, stop at 100%.
let perc = std::cmp::min((cur * 100) / total, 100);
format!("{perc:>2}%")
};

tracing::info!(
"PROGRESS: [{}] CUR BLOCK L1: {} L2: {} TOTAL BLOCKS PROCESSED L1: {} L2: {}",
progress,
self.latest_l1_block_nbr,
self.latest_l2_block_nbr,
self.l1_blocks_processed,
self.l2_blocks_processed
);
}
}

pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
Expand Down Expand Up @@ -81,6 +124,21 @@ impl L1Fetcher {
};
}

let metrics = Arc::new(Mutex::new(L1Metrics {
first_l1_block: Some(current_l1_block_number.as_u64()),
..Default::default()
}));

tokio::spawn({
let metrics = metrics.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await;
metrics.lock().await.print();
}
}
});

let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

Expand All @@ -92,36 +150,53 @@ impl L1Fetcher {
// - Referred L1 block fetch.
// - Calldata parsing.
let tx_handle = tokio::spawn({
let mut last_block = current_l1_block_number.as_u64();
let metrics = metrics.clone();
let provider = self.provider.clone();
async move {
while let Some(hash) = hash_rx.recv().await {
let data = match L1Fetcher::retry_call(
let Ok(Some(tx)) = L1Fetcher::retry_call(
|| provider.get_transaction(hash),
L1FetchError::GetTx,
)
.await
{
Ok(Some(tx)) => tx.input,
_ => continue,
else {
continue;
};

calldata_tx.send(data).await.unwrap();
if let Some(current_block) = tx.block_number {
let current_block = current_block.as_u64();
if last_block < current_block {
let mut metrics = metrics.lock().await;
metrics.l1_blocks_processed += current_block - last_block;
last_block = current_block;
}
}

calldata_tx.send(tx.input).await.unwrap();
}
}
});

let parse_handle = tokio::spawn(async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};
let parse_handle = tokio::spawn({
let metrics = metrics.clone();
async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};

for blk in blocks {
sink.send(blk).await.unwrap();
for blk in blocks {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
sink.send(blk).await.unwrap();
}
}
}
});
Expand All @@ -130,6 +205,7 @@ impl L1Fetcher {
let main_handle = tokio::spawn({
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
let metrics = metrics.clone();
async move {
let mut latest_l2_block_number = U256::zero();

Expand All @@ -148,6 +224,9 @@ impl L1Fetcher {
.unwrap(),
);

// Update last L1 block to metrics calculation.
metrics.clone().lock().await.last_l1_block = end_block_number.as_u64();

loop {
// Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.
if (disable_polling && current_l1_block_number > end_block_number)
Expand Down Expand Up @@ -199,6 +278,8 @@ impl L1Fetcher {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}

metrics.lock().await.latest_l1_block_nbr = current_l1_block_number.as_u64();

// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}
Expand All @@ -216,6 +297,8 @@ impl L1Fetcher {
tx_handle.await?;
parse_handle.await?;

metrics.lock().await.print();

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl TreeWrapper<'static> {
hex::encode(root_hash)
);

tracing::info!("Successfully processed block {}", block.block_number);
tracing::debug!("Successfully processed block {}", block.block_number);

root_hash
}
Expand Down

0 comments on commit 1918eb1

Please sign in to comment.