Skip to content

Commit

Permalink
ref: pass l1_fetcher_options directly to the fetcher (#33)
Browse files Browse the repository at this point in the history
* ref: pass `l1_fetcher_options` directly to the fetcher

* chore: cargo fmt
  • Loading branch information
zeapoz authored Oct 23, 2023
1 parent 83a8956 commit 314028f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 43 deletions.
28 changes: 17 additions & 11 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
};

use crate::{
cli::L1FetcherOptions,
constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR},
snapshot::StateSnapshot,
types::{CommitBlockInfoV1, ParseError},
Expand Down Expand Up @@ -80,36 +81,35 @@ impl L1Metrics {
pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
}

impl L1Fetcher {
pub fn new(http_url: &str, snapshot: Option<Arc<Mutex<StateSnapshot>>>) -> Result<Self> {
let provider =
Provider::<Http>::try_from(http_url).expect("could not instantiate HTTP Provider");
pub fn new(
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
) -> Result<Self> {
let provider = Provider::<Http>::try_from(&config.http_url)
.expect("could not instantiate HTTP Provider");

let abi_file = std::fs::File::open("./IZkSync.json")?;
let contract = Contract::load(abi_file)?;

Ok(L1Fetcher {
provider,
contract,
config,
snapshot,
})
}

#[allow(clippy::too_many_lines)]
pub async fn fetch(
&self,
sink: mpsc::Sender<CommitBlockInfoV1>,
start_block: Option<U64>,
end_block: Option<U64>,
mut disable_polling: bool,
) -> Result<()> {
pub async fn run(&self, sink: mpsc::Sender<CommitBlockInfoV1>) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
// exists, start from the block number specified in that snapshot.
let mut current_l1_block_number = start_block.unwrap_or(GENESIS_BLOCK.into());
let mut current_l1_block_number = U64::from(self.config.start_block);
// User might have supplied their own start block, in that case we shouldn't enforce the
// use of the snapshot value.
if current_l1_block_number == GENESIS_BLOCK.into() {
Expand Down Expand Up @@ -206,6 +206,12 @@ impl L1Fetcher {
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
let metrics = metrics.clone();
let mut disable_polling = self.config.disable_polling;
let end_block = self
.config
.block_count
.map(|count| U64::from(self.config.start_block + count));

async move {
let mut latest_l2_block_number = U256::zero();

Expand Down
39 changes: 7 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use std::{
};

use clap::Parser;
use cli::{Cli, Command, L1FetcherOptions, Query, ReconstructSource};
use cli::{Cli, Command, Query, ReconstructSource};
use constants::storage;
use ethers::types::U64;
use eyre::Result;
use snapshot::StateSnapshot;
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -70,31 +69,18 @@ async fn main() -> Result<()> {
};

match source {
ReconstructSource::L1 {
l1_fetcher_options:
L1FetcherOptions {
http_url,
start_block,
block_step: _,
block_count,
disable_polling,
},
} => {
ReconstructSource::L1 { l1_fetcher_options } => {
let snapshot = Arc::new(Mutex::new(StateSnapshot::default()));

let fetcher = L1Fetcher::new(&http_url, Some(snapshot.clone()))?;
let fetcher = L1Fetcher::new(l1_fetcher_options, Some(snapshot.clone()))?;
let processor = TreeProcessor::new(db_path, snapshot.clone()).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

let end_block = block_count.map(|n| U64([start_block + n]));

fetcher
.fetch(tx, Some(U64([start_block])), end_block, disable_polling)
.await?;
fetcher.run(tx).await?;
processor_handle.await?;
}
ReconstructSource::File { file } => {
Expand All @@ -120,29 +106,18 @@ async fn main() -> Result<()> {
}
}
Command::Download {
l1_fetcher_options:
L1FetcherOptions {
http_url,
start_block,
block_step: _,
block_count,
disable_polling,
},
l1_fetcher_options,
file,
} => {
let fetcher = L1Fetcher::new(&http_url, None)?;
let fetcher = L1Fetcher::new(l1_fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

let end_block = block_count.map(|n| U64([start_block + n]));

fetcher
.fetch(tx, Some(U64([start_block])), end_block, disable_polling)
.await?;
fetcher.run(tx).await?;
processor_handle.await?;
}
Command::Query {
Expand Down

0 comments on commit 314028f

Please sign in to comment.