Skip to content

Commit

Permalink
downloading & parsing blobs (#75)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan <[email protected]>
  • Loading branch information
vbar and zeapoz authored Mar 21, 2024
1 parent f55dc80 commit 424cdfd
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 192 deletions.
30 changes: 16 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use clap::{Args, Parser, Subcommand, ValueEnum};
use state_reconstruct_fetcher::constants::ethereum;
use state_reconstruct_fetcher::{
constants::ethereum, l1_fetcher::L1FetcherOptions as FetcherOptions,
};

use crate::processor::snapshot;

Expand All @@ -8,6 +10,9 @@ pub struct L1FetcherOptions {
/// The Ethereum JSON-RPC HTTP URL to use.
#[arg(long)]
pub http_url: String,
/// The Ethereum blob storage URL base.
#[arg(long, default_value_t = ethereum::BLOBS_URL.to_string())]
pub blobs_url: String,
/// Ethereum block number to start state import from.
#[arg(short, long, default_value_t = ethereum::GENESIS_BLOCK)]
pub start_block: u64,
Expand All @@ -22,6 +27,20 @@ pub struct L1FetcherOptions {
pub disable_polling: bool,
}

/// Allow conversion into `l1_fetcher::L1FetcherOptions`, for use at lower level.
impl From<L1FetcherOptions> for FetcherOptions {
fn from(opt: L1FetcherOptions) -> Self {
FetcherOptions {
http_url: opt.http_url,
blobs_url: opt.blobs_url,
start_block: opt.start_block,
block_step: opt.block_step,
block_count: opt.block_count,
disable_polling: opt.disable_polling,
}
}
}

#[derive(Subcommand)]
pub enum ReconstructSource {
/// Fetch data from L1.
Expand Down
33 changes: 4 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::{SnapshotBuilder, SnapshotExporter};
use state_reconstruct_fetcher::{
constants::storage,
l1_fetcher::{L1Fetcher, L1FetcherOptions},
types::CommitBlock,
};
use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock};
use tikv_jemallocator::Jemalloc;
use tokio::sync::mpsc;
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -71,14 +67,7 @@ async fn main() -> Result<()> {

match source {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let processor = TreeProcessor::new(db_path.clone()).await?;
let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_snapshot()))?;
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
Expand Down Expand Up @@ -114,14 +103,7 @@ async fn main() -> Result<()> {
l1_fetcher_options,
file,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
Expand Down Expand Up @@ -158,14 +140,7 @@ async fn main() -> Result<()> {
l1_fetcher_options,
db_path,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotBuilder::new(db_path);

Expand Down
2 changes: 2 additions & 0 deletions state-reconstruct-fetcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ethers = "1.0.2"
eyre = "0.6.8"
indexmap = { version = "2.0.2", features = ["serde"] }
rand = "0.8.5"
reqwest = "0.11.24"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = { version = "1.0.107", features = ["std"] }
serde_json_any_key = "2.0.0"
Expand All @@ -20,3 +21,4 @@ tracing = "0.1.40"
rocksdb = "0.21.0"
hex = "0.4.3"
chrono = "0.4.31"
zkevm_circuits = { git = "https://github.com/matter-labs/era-zkevm_circuits", branch = "v1.4.1" }
90 changes: 90 additions & 0 deletions state-reconstruct-fetcher/src/blob_http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use serde::Deserialize;
use tokio::time::{sleep, Duration};

use crate::types::ParseError;

/// `MAX_RETRIES` is the maximum number of retries on failed blob retrieval.
const MAX_RETRIES: u8 = 5;
/// The interval in seconds to wait before retrying to fetch a blob.
const FAILED_FETCH_RETRY_INTERVAL_S: u64 = 10;

#[derive(Deserialize)]
struct JsonResponse {
data: String,
}

pub struct BlobHttpClient {
client: reqwest::Client,
url_base: String,
}

impl BlobHttpClient {
pub fn new(blob_url: String) -> eyre::Result<Self> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Accept",
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
Ok(Self {
client,
url_base: blob_url,
})
}

pub async fn get_blob(&self, kzg_commitment: &[u8]) -> Result<Vec<u8>, ParseError> {
let url = self.format_url(kzg_commitment);
for attempt in 1..=MAX_RETRIES {
match self.retrieve_url(&url).await {
Ok(response) => match response.text().await {
Ok(text) => match get_blob_data(&text) {
Ok(data) => {
let plain = if let Some(p) = data.strip_prefix("0x") {
p
} else {
&data
};
return hex::decode(plain).map_err(|e| {
ParseError::BlobFormatError(plain.to_string(), e.to_string())
});
}
Err(e) => {
tracing::error!("failed parsing response of {url}");
return Err(e);
}
},
Err(e) => {
tracing::error!("attempt {}: {} failed: {:?}", attempt, url, e);
sleep(Duration::from_secs(FAILED_FETCH_RETRY_INTERVAL_S)).await;
}
},
Err(e) => {
tracing::error!("attempt {}: GET {} failed: {:?}", attempt, url, e);
sleep(Duration::from_secs(FAILED_FETCH_RETRY_INTERVAL_S)).await;
}
}
}
Err(ParseError::BlobStorageError(url))
}

fn format_url(&self, kzg_commitment: &[u8]) -> String {
format!("{}0x{}", self.url_base, hex::encode(kzg_commitment))
}

async fn retrieve_url(&self, url: &str) -> eyre::Result<reqwest::Response> {
let result = self.client.get(url).send().await?;
Ok(result)
}
}

fn get_blob_data(json_str: &str) -> Result<String, ParseError> {
match serde_json::from_str::<JsonResponse>(json_str) {
Ok(data) => Ok(data.data),
Err(e) => Err(ParseError::BlobFormatError(
json_str.to_string(),
e.to_string(),
)),
}
}
7 changes: 7 additions & 0 deletions state-reconstruct-fetcher/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub mod ethereum {

/// zkSync smart contract address.
pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324";

/// Default Ethereum blob storage URL base.
pub const BLOBS_URL: &str = "https://api.blobscan.com/blobs/";
}

pub mod storage {
Expand All @@ -33,4 +36,8 @@ pub mod zksync {
pub const OPERATION_BITMASK: u8 = 7;
// The number of bits shifting the compressed state diff metadata by which we retrieve its length.
pub const LENGTH_BITS_OFFSET: u8 = 3;
// Size of `CommitBatchInfo.pubdataCommitments` item.
pub const PUBDATA_COMMITMENT_SIZE: usize = 144;
// The number of trailing bytes to ignore when using calldata post-blobs. Contains unused blob commitments.
pub const CALLDATA_SOURCE_TAIL_SIZE: usize = 32;
}
Loading

0 comments on commit 424cdfd

Please sign in to comment.