Skip to content

Commit

Permalink
feat: ctrl-c handling (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeapoz authored Oct 19, 2023
1 parent 5b74094 commit 20c70f1
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 63 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde = { version = "1.0.188", features = ["derive"] }
serde_json = { version = "1.0.107", features = ["std"] }
serde_json_any_key = "2.0.0"
thiserror = "1.0"
tokio = { version = "1.32.0", features = ["macros"] }
tokio = { version = "1.32.0", features = ["macros", "signal"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
zksync_merkle_tree = { git = "https://github.com/matter-labs/zksync-era.git" }
136 changes: 76 additions & 60 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use eyre::Result;
use rand::random;
use thiserror::Error;
use tokio::{
sync::{mpsc, Mutex},
sync::{mpsc, oneshot, Mutex},
time::{sleep, Duration},
};

Expand Down Expand Up @@ -126,77 +126,93 @@ impl L1Fetcher {
}
});

{
// NOTE: The channel should close once it goes out of scope we move it here.
let hash_tx = hash_tx;
let mut latest_l2_block_number = U256::zero();

// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let main_handle = tokio::spawn({
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
async move {
let mut latest_l2_block_number = U256::zero();

let end_block_number = end_block.unwrap_or(
self.provider
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap(),
);

loop {
if disable_polling && current_l1_block_number > end_block_number {
tracing::info!("Successfully reached end block. Shutting down...");
break;
// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}

// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>()?)
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
if let Ok(logs) =
L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs)
let end_block_number = end_block.unwrap_or(
provider_clone
.get_block(BlockNumber::Latest)
.await
{
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number =
U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}
.unwrap()
.unwrap()
.number
.unwrap(),
);

loop {
// Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.
if (disable_polling && current_l1_block_number > end_block_number)
|| shutdown_rx.try_recv().is_ok()
{
break;
}

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await?;
// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>().unwrap())
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
if let Ok(logs) = L1Fetcher::retry_call(
|| provider_clone.get_logs(&filter),
L1FetchError::GetLogs,
)
.await
{
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number =
U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await.unwrap();
}

latest_l2_block_number = new_l2_block_number;
}
} else {
tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
continue;
};

latest_l2_block_number = new_l2_block_number;
// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &snapshot_clone {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}
} else {
tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
continue;
};

// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &self.snapshot {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}

// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}
}
});

// Wait for shutdown signal in background.
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("Shutdown signal received, finishing up and shutting down...");
let _ = shutdown_tx.send("");
});

main_handle.await?;
tx_handle.await?;
parse_handle.await?;

Expand Down
3 changes: 1 addition & 2 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct TreeProcessor<'a> {

impl TreeProcessor<'static> {
pub async fn new(db_path: PathBuf, snapshot: Arc<Mutex<StateSnapshot>>) -> Result<Self> {
// TODO: Implement graceful shutdown.
// If database directory already exists, we try to restore the latest state.
// The state contains the last processed block and a mapping of index to key
// values, if a state file does not exist, we simply use the defaults instead.
Expand Down Expand Up @@ -60,7 +59,7 @@ impl Processor for TreeProcessor<'static> {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::info!(
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
Expand Down

0 comments on commit 20c70f1

Please sign in to comment.