Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: pass l1_fetcher_options directly to the fetcher #33

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
};

use crate::{
cli::L1FetcherOptions,
constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR},
snapshot::StateSnapshot,
types::{CommitBlockInfoV1, ParseError},
Expand Down Expand Up @@ -80,36 +81,35 @@ impl L1Metrics {
pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
}

impl L1Fetcher {
pub fn new(http_url: &str, snapshot: Option<Arc<Mutex<StateSnapshot>>>) -> Result<Self> {
let provider =
Provider::<Http>::try_from(http_url).expect("could not instantiate HTTP Provider");
pub fn new(
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
) -> Result<Self> {
let provider = Provider::<Http>::try_from(&config.http_url)
.expect("could not instantiate HTTP Provider");

let abi_file = std::fs::File::open("./IZkSync.json")?;
let contract = Contract::load(abi_file)?;

Ok(L1Fetcher {
provider,
contract,
config,
snapshot,
})
}

#[allow(clippy::too_many_lines)]
pub async fn fetch(
&self,
sink: mpsc::Sender<CommitBlockInfoV1>,
start_block: Option<U64>,
end_block: Option<U64>,
mut disable_polling: bool,
) -> Result<()> {
pub async fn run(&self, sink: mpsc::Sender<CommitBlockInfoV1>) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
// exists, start from the block number specified in that snapshot.
let mut current_l1_block_number = start_block.unwrap_or(GENESIS_BLOCK.into());
let mut current_l1_block_number = U64::from(self.config.start_block);
// User might have supplied their own start block, in that case we shouldn't enforce the
// use of the snapshot value.
if current_l1_block_number == GENESIS_BLOCK.into() {
Expand Down Expand Up @@ -206,6 +206,12 @@ impl L1Fetcher {
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
let metrics = metrics.clone();
let mut disable_polling = self.config.disable_polling;
let end_block = self
.config
.block_count
.map(|count| U64::from(self.config.start_block + count));

async move {
let mut latest_l2_block_number = U256::zero();

Expand Down
39 changes: 7 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use std::{
};

use clap::Parser;
use cli::{Cli, Command, L1FetcherOptions, Query, ReconstructSource};
use cli::{Cli, Command, Query, ReconstructSource};
use constants::storage;
use ethers::types::U64;
use eyre::Result;
use snapshot::StateSnapshot;
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -70,31 +69,18 @@ async fn main() -> Result<()> {
};

match source {
ReconstructSource::L1 {
l1_fetcher_options:
L1FetcherOptions {
http_url,
start_block,
block_step: _,
block_count,
disable_polling,
},
} => {
ReconstructSource::L1 { l1_fetcher_options } => {
let snapshot = Arc::new(Mutex::new(StateSnapshot::default()));

let fetcher = L1Fetcher::new(&http_url, Some(snapshot.clone()))?;
let fetcher = L1Fetcher::new(l1_fetcher_options, Some(snapshot.clone()))?;
let processor = TreeProcessor::new(db_path, snapshot.clone()).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

let end_block = block_count.map(|n| U64([start_block + n]));

fetcher
.fetch(tx, Some(U64([start_block])), end_block, disable_polling)
.await?;
fetcher.run(tx).await?;
processor_handle.await?;
}
ReconstructSource::File { file } => {
Expand All @@ -120,29 +106,18 @@ async fn main() -> Result<()> {
}
}
Command::Download {
l1_fetcher_options:
L1FetcherOptions {
http_url,
start_block,
block_step: _,
block_count,
disable_polling,
},
l1_fetcher_options,
file,
} => {
let fetcher = L1Fetcher::new(&http_url, None)?;
let fetcher = L1Fetcher::new(l1_fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

let end_block = block_count.map(|n| U64([start_block + n]));

fetcher
.fetch(tx, Some(U64([start_block])), end_block, disable_polling)
.await?;
fetcher.run(tx).await?;
processor_handle.await?;
}
Command::Query {
Expand Down