Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some metrics for L1 processing #29

Merged
merged 10 commits into from
Oct 20, 2023
Merged
115 changes: 99 additions & 16 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::{
const MAX_RETRIES: u8 = 5;
/// The interval in seconds in which to poll for new blocks.
const LONG_POLLING_INTERVAL_S: u64 = 120;
/// The interval in seconds in which to print metrics.
const METRICS_PRINT_INTERVAL_S: u64 = 10;

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
Expand All @@ -34,6 +36,47 @@ pub enum L1FetchError {
GetTx,
}

#[derive(Default)]
struct L1Metrics {
// Metrics variables.
l1_blocks_processed: u64,
l2_blocks_processed: u64,
latest_l1_block_nbr: u64,
latest_l2_block_nbr: u64,

first_l1_block: Option<u64>,
last_l1_block: u64,
}

impl L1Metrics {
fn print(&mut self) {
if self.first_l1_block.is_none()
|| self.latest_l1_block_nbr == 0
|| self.latest_l2_block_nbr == 0
{
return;
}

let first_l1_block = self.first_l1_block.unwrap();
let progress = {
let total = self.last_l1_block - first_l1_block;
let cur = self.latest_l1_block_nbr - first_l1_block;
// If polling past `last_l1_block`, stop at 100%.
let perc = std::cmp::min((cur * 100) / total, 100);
format!("{perc:>2}%")
};

tracing::info!(
"PROGRESS: [{}] CUR BLOCK L1: {} L2: {} TOTAL BLOCKS PROCESSED L1: {} L2: {}",
tuommaki marked this conversation as resolved.
Show resolved Hide resolved
progress,
self.latest_l1_block_nbr,
self.latest_l2_block_nbr,
self.l1_blocks_processed,
self.l2_blocks_processed
);
}
}

pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
Expand Down Expand Up @@ -81,6 +124,21 @@ impl L1Fetcher {
};
}

let metrics = Arc::new(Mutex::new(L1Metrics {
first_l1_block: Some(current_l1_block_number.as_u64()),
..Default::default()
}));

tokio::spawn({
let metrics = metrics.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await;
metrics.lock().await.print();
}
}
});

let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

Expand All @@ -92,36 +150,53 @@ impl L1Fetcher {
// - Referred L1 block fetch.
// - Calldata parsing.
let tx_handle = tokio::spawn({
let mut last_block = current_l1_block_number.as_u64();
let metrics = metrics.clone();
let provider = self.provider.clone();
async move {
while let Some(hash) = hash_rx.recv().await {
let data = match L1Fetcher::retry_call(
let Ok(Some(tx)) = L1Fetcher::retry_call(
|| provider.get_transaction(hash),
L1FetchError::GetTx,
)
.await
{
Ok(Some(tx)) => tx.input,
_ => continue,
else {
continue;
};

calldata_tx.send(data).await.unwrap();
if let Some(current_block) = tx.block_number {
let current_block = current_block.as_u64();
if last_block < current_block {
let mut metrics = metrics.lock().await;
metrics.l1_blocks_processed += current_block - last_block;
last_block = current_block;
}
}

calldata_tx.send(tx.input).await.unwrap();
}
}
});

let parse_handle = tokio::spawn(async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};
let parse_handle = tokio::spawn({
let metrics = metrics.clone();
async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};

for blk in blocks {
sink.send(blk).await.unwrap();
for blk in blocks {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
sink.send(blk).await.unwrap();
}
}
}
});
Expand All @@ -130,6 +205,7 @@ impl L1Fetcher {
let main_handle = tokio::spawn({
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
let metrics = metrics.clone();
async move {
let mut latest_l2_block_number = U256::zero();

Expand All @@ -148,6 +224,9 @@ impl L1Fetcher {
.unwrap(),
);

// Update last L1 block to metrics calculation.
metrics.clone().lock().await.last_l1_block = end_block_number.as_u64();

loop {
// Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.
if (disable_polling && current_l1_block_number > end_block_number)
Expand Down Expand Up @@ -199,6 +278,8 @@ impl L1Fetcher {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}

metrics.lock().await.latest_l1_block_nbr = current_l1_block_number.as_u64();

// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}
Expand All @@ -216,6 +297,8 @@ impl L1Fetcher {
tx_handle.await?;
parse_handle.await?;

metrics.lock().await.print();

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl TreeWrapper<'static> {
hex::encode(root_hash)
);

tracing::info!("Successfully processed block {}", block.block_number);
tracing::debug!("Successfully processed block {}", block.block_number);

root_hash
}
Expand Down