Skip to content

Commit

Permalink
feat: inital support for Boojum format
Browse files Browse the repository at this point in the history
Introduces structs and calldata parsing for blocks formatted with the newer format.
  • Loading branch information
zeapoz committed Dec 20, 2023
1 parent 261cb54 commit 18e725e
Show file tree
Hide file tree
Showing 13 changed files with 1,171 additions and 165 deletions.
File renamed without changes.
641 changes: 641 additions & 0 deletions abi/IZkSyncV2.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use state_reconstruct_fetcher::{
constants::storage::{self, STATE_FILE_NAME},
l1_fetcher::{L1Fetcher, L1FetcherOptions},
snapshot::StateSnapshot,
types::CommitBlockInfoV1,
types::CommitBlock,
};
use tokio::sync::{mpsc, Mutex};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -79,7 +79,7 @@ async fn main() -> Result<()> {

let fetcher = L1Fetcher::new(fetcher_options, Some(snapshot.clone()))?;
let processor = TreeProcessor::new(db_path.clone(), snapshot.clone()).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
Expand All @@ -98,13 +98,13 @@ async fn main() -> Result<()> {

let reader = BufReader::new(File::open(&file)?);
let processor = TreeProcessor::new(db_path, snapshot).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

tokio::spawn(async move {
processor.run(rx).await;
});

let json_iter = json::iter_json_array::<CommitBlockInfoV1, _>(reader);
let json_iter = json::iter_json_array::<CommitBlock, _>(reader);
let mut num_objects = 0;
for blk in json_iter {
tx.send(blk.expect("parsing")).await?;
Expand All @@ -129,7 +129,7 @@ async fn main() -> Result<()> {

let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
Expand Down Expand Up @@ -172,7 +172,7 @@ async fn main() -> Result<()> {
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotBuilder::new(db_path);

let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});
Expand Down
4 changes: 2 additions & 2 deletions src/processor/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use eyre::Result;
use serde::ser::{SerializeSeq, Serializer};
use serde_json;
use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use state_reconstruct_fetcher::types::CommitBlock;
use tokio::sync::mpsc;

use super::Processor;
Expand All @@ -23,7 +23,7 @@ impl JsonSerializationProcessor {

#[async_trait]
impl Processor for JsonSerializationProcessor {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
let mut seq = self
.serializer
.serialize_seq(None)
Expand Down
4 changes: 2 additions & 2 deletions src/processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use state_reconstruct_fetcher::types::CommitBlock;
use tokio::sync::mpsc;

pub mod json;
Expand All @@ -8,5 +8,5 @@ pub mod tree;

#[async_trait]
pub trait Processor {
async fn run(self, mut rx: mpsc::Receiver<CommitBlockInfoV1>);
async fn run(self, mut rx: mpsc::Receiver<CommitBlock>);
}
4 changes: 2 additions & 2 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use eyre::Result;
use prost::Message;
use state_reconstruct_fetcher::{
constants::{ethereum, storage},
types::CommitBlockInfoV1,
types::CommitBlock,
};
use tokio::sync::mpsc;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl SnapshotBuilder {

#[async_trait]
impl Processor for SnapshotBuilder {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
while let Some(block) = rx.recv().await {
// Initial calldata.
for (key, value) in &block.initial_storage_changes {
Expand Down
10 changes: 5 additions & 5 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use ethers::types::H256;
use eyre::Result;
use state_reconstruct_fetcher::{
constants::storage::STATE_FILE_NAME, snapshot::StateSnapshot, types::CommitBlockInfoV1,
constants::storage::STATE_FILE_NAME, snapshot::StateSnapshot, types::CommitBlock,
};
use tokio::sync::{mpsc, Mutex};

Expand Down Expand Up @@ -48,22 +48,22 @@ impl TreeProcessor {

#[async_trait]
impl Processor for TreeProcessor {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
while let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
if snapshot.latest_l2_block_number >= block.l2_block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
block.l2_block_number
);
continue;
}

self.tree.insert_block(&block);

// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.latest_l2_block_number = block.l2_block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use blake2::{Blake2s256, Digest};
use ethers::types::{Address, H256, U256};
use eyre::Result;
use indexmap::IndexSet;
use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlockInfoV1};
use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlock};
use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper};

use super::RootHash;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl TreeWrapper {
}

/// Inserts a block into the tree and returns the root hash of the resulting state tree.
pub fn insert_block(&mut self, block: &CommitBlockInfoV1) -> RootHash {
pub fn insert_block(&mut self, block: &CommitBlock) -> RootHash {
// INITIAL CALLDATA.
let mut key_value_pairs: Vec<(U256, H256)> =
Vec::with_capacity(block.initial_storage_changes.len());
Expand Down Expand Up @@ -61,11 +61,11 @@ impl TreeWrapper {
assert_eq!(root_hash.as_bytes(), block.new_state_root);
tracing::debug!(
"Root hash of block {} = {}",
block.block_number,
block.l2_block_number,
hex::encode(root_hash)
);

tracing::debug!("Successfully processed block {}", block.block_number);
tracing::debug!("Successfully processed block {}", block.l2_block_number);

root_hash
}
Expand Down
12 changes: 12 additions & 0 deletions state-reconstruct-fetcher/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub mod ethereum {
/// Block number in Ethereum for zkSync genesis block.
pub const GENESIS_BLOCK: u64 = 16_627_460;

/// Block number in Ethereum of the first Boojum-formatted block.
pub const BOOJUM_BLOCK: u64 = 18_711_784;

/// zkSync smart contract address.
pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324";
}
Expand All @@ -19,3 +22,12 @@ pub mod storage {
/// The name of the state file.
pub const STATE_FILE_NAME: &str = "StateSnapshot.json";
}

pub mod zksync {
/// Bytes in raw L2 to L1 log.
pub const L2_TO_L1_LOG_SERIALIZE_SIZE: usize = 88;
// The bitmask by applying which to the compressed state diff metadata we retrieve its operation.
pub const OPERATION_BITMASK: u8 = 7;
// The number of bits shifting the compressed state diff metadata by which we retrieve its length.
pub const LENGTH_BITS_OFFSET: u8 = 3;
}
97 changes: 66 additions & 31 deletions state-reconstruct-fetcher/src/l1_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, ops::Fn, sync::Arc};
use std::{fs::File, future::Future, ops::Fn, sync::Arc};

use ethers::{
abi::{Contract, Function},
Expand All @@ -15,9 +15,9 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR},
constants::ethereum::{BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR},
snapshot::StateSnapshot,
types::{CommitBlockInfoV1, ParseError},
types::{v1, v2, CommitBlock, CommitBlockInfo, ParseError},
};

/// `MAX_RETRIES` is the maximum number of retries on failed L1 call.
Expand Down Expand Up @@ -89,9 +89,15 @@ impl L1Metrics {
}
}

#[derive(Clone)]
struct Contracts {
v1: Contract,
v2: Contract,
}

pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
contracts: Contracts,
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
metrics: Arc<Mutex<L1Metrics>>,
Expand All @@ -105,21 +111,22 @@ impl L1Fetcher {
let provider = Provider::<Http>::try_from(&config.http_url)
.expect("could not instantiate HTTP Provider");

let abi_file = std::fs::File::open("./IZkSync.json")?;
let contract = Contract::load(abi_file)?;
let v1 = Contract::load(File::open("./abi/IZkSync.json")?)?;
let v2 = Contract::load(File::open("./abi/IZkSyncV2.json")?)?;
let contracts = Contracts { v1, v2 };

let metrics = Arc::new(Mutex::new(L1Metrics::default()));

Ok(L1Fetcher {
provider,
contract,
contracts,
config,
snapshot,
metrics,
})
}

pub async fn run(&self, sink: mpsc::Sender<CommitBlockInfoV1>) -> Result<()> {
pub async fn run(&self, sink: mpsc::Sender<CommitBlock>) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
// exists, start from the block number specified in that snapshot.
Expand Down Expand Up @@ -237,7 +244,7 @@ impl L1Fetcher {
disable_polling: bool,
) -> Result<tokio::task::JoinHandle<()>> {
let metrics = self.metrics.clone();
let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone();
let provider_clone = self.provider.clone();

Ok(tokio::spawn({
Expand Down Expand Up @@ -357,21 +364,33 @@ impl L1Fetcher {
})
}

// FIXME:
#[allow(clippy::absurd_extreme_comparisons)]
fn spawn_parsing_handler(
&self,
mut l1_tx_rx: mpsc::Receiver<Transaction>,
sink: mpsc::Sender<CommitBlockInfoV1>,
sink: mpsc::Sender<CommitBlock>,
) -> Result<tokio::task::JoinHandle<Option<u64>>> {
let metrics = self.metrics.clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

let contracts = self.contracts.clone();
Ok(tokio::spawn({
async move {
let mut function =
contracts.v1.functions_by_name("commitBlocks").unwrap()[0].clone();
let mut last_block_number_processed = None;

while let Some(tx) = l1_tx_rx.recv().await {
let block_number = tx.block_number.map(|v| v.as_u64());
let blocks = match parse_calldata(block_number, &function, &tx.input) {

if let Some(block_number) = block_number {
if block_number >= BOOJUM_BLOCK {
function =
contracts.v2.functions_by_name("commitBatches").unwrap()[0].clone();
tracing::debug!("Reached `BOOJUM_BLOCK`, changing commit block format");
}
};

let blocks = match parse_calldata(block_number, &function, &tx.input).await {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
Expand All @@ -383,15 +402,14 @@ impl L1Fetcher {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
metrics.latest_l2_block_nbr = blk.l2_block_number;
sink.send(blk).await.unwrap();
}

last_block_number_processed = block_number;
}

// Return the last processed l1 block number,
// so we can resume from the same point later on.
// Return the last processed l1 block number, so we can resume from the same point later on.
last_block_number_processed
}
}))
Expand All @@ -414,11 +432,11 @@ impl L1Fetcher {
}
}

pub fn parse_calldata(
pub async fn parse_calldata(
l1_block_number: Option<u64>,
commit_blocks_fn: &Function,
calldata: &[u8],
) -> Result<Vec<CommitBlockInfoV1>> {
) -> Result<Vec<CommitBlock>> {
let mut parsed_input = commit_blocks_fn
.decode_input(&calldata[4..])
.map_err(|e| ParseError::InvalidCalldata(e.to_string()))?;
Expand Down Expand Up @@ -460,16 +478,26 @@ pub fn parse_calldata(
// TODO: What to do here?
// assert_eq!(previous_enumeration_index, tree.next_enumeration_index());

// Supplement every CommitBlockInfoV1 element with L1 block number information.
parse_commit_block_info(&new_blocks_data).map(|mut vec| {
vec.iter_mut()
.for_each(|e| e.l1_block_number = l1_block_number);
vec
})
// Supplement every `CommitBlock` element with L1 block number information.
parse_commit_block_info(&new_blocks_data, l1_block_number)
.await
.map(|mut vec| {
vec.iter_mut()
.for_each(|e| e.l1_block_number = l1_block_number);
vec
})
}

fn parse_commit_block_info(data: &abi::Token) -> Result<Vec<CommitBlockInfoV1>> {
let mut res = vec![];
async fn parse_commit_block_info(
data: &abi::Token,
l1_block_number: Option<u64>,
) -> Result<Vec<CommitBlock>> {
// By default parse blocks using [`CommitBlockInfoV1`]; if we have reached [`BOOJUM_BLOCK`], use [`CommitBlockInfoV2`].
let use_new_format = if let Some(block_number) = l1_block_number {
block_number >= BOOJUM_BLOCK
} else {
false
};

let abi::Token::Array(data) = data else {
return Err(ParseError::InvalidCommitBlockInfo(
Expand All @@ -478,12 +506,19 @@ fn parse_commit_block_info(data: &abi::Token) -> Result<Vec<CommitBlockInfoV1>>
.into());
};

let mut result = vec![];
for d in data {
match CommitBlockInfoV1::try_from(d) {
Ok(blk) => res.push(blk),
Err(e) => tracing::error!("failed to parse commit block info: {e}"),
}
let block_info = {
if use_new_format {
CommitBlockInfo::V2(v2::CommitBlockInfo::try_from(d)?)
} else {
CommitBlockInfo::V1(v1::CommitBlockInfo::try_from(d)?)
}
};

let commit_block = CommitBlock::from_commit_block(block_info).await;
result.push(commit_block);
}

Ok(res)
Ok(result)
}
Loading

0 comments on commit 18e725e

Please sign in to comment.