Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inital support for Boojum format #48

Merged
merged 5 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
641 changes: 641 additions & 0 deletions abi/IZkSyncV2.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use state_reconstruct_fetcher::{
constants::storage::{self, STATE_FILE_NAME},
l1_fetcher::{L1Fetcher, L1FetcherOptions},
snapshot::StateSnapshot,
types::CommitBlockInfoV1,
types::CommitBlock,
};
use tokio::sync::{mpsc, Mutex};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -79,7 +79,7 @@ async fn main() -> Result<()> {

let fetcher = L1Fetcher::new(fetcher_options, Some(snapshot.clone()))?;
let processor = TreeProcessor::new(db_path.clone(), snapshot.clone()).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
Expand All @@ -98,13 +98,13 @@ async fn main() -> Result<()> {

let reader = BufReader::new(File::open(&file)?);
let processor = TreeProcessor::new(db_path, snapshot).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

tokio::spawn(async move {
processor.run(rx).await;
});

let json_iter = json::iter_json_array::<CommitBlockInfoV1, _>(reader);
let json_iter = json::iter_json_array::<CommitBlock, _>(reader);
let mut num_objects = 0;
for blk in json_iter {
tx.send(blk.expect("parsing")).await?;
Expand All @@ -129,7 +129,7 @@ async fn main() -> Result<()> {

let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
Expand Down Expand Up @@ -172,7 +172,7 @@ async fn main() -> Result<()> {
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotBuilder::new(db_path);

let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});
Expand Down
4 changes: 2 additions & 2 deletions src/processor/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use eyre::Result;
use serde::ser::{SerializeSeq, Serializer};
use serde_json;
use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use state_reconstruct_fetcher::types::CommitBlock;
use tokio::sync::mpsc;

use super::Processor;
Expand All @@ -23,7 +23,7 @@ impl JsonSerializationProcessor {

#[async_trait]
impl Processor for JsonSerializationProcessor {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
let mut seq = self
.serializer
.serialize_seq(None)
Expand Down
4 changes: 2 additions & 2 deletions src/processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use state_reconstruct_fetcher::types::CommitBlock;
use tokio::sync::mpsc;

pub mod json;
Expand All @@ -8,5 +8,5 @@ pub mod tree;

#[async_trait]
pub trait Processor {
async fn run(self, mut rx: mpsc::Receiver<CommitBlockInfoV1>);
async fn run(self, mut rx: mpsc::Receiver<CommitBlock>);
}
4 changes: 2 additions & 2 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use eyre::Result;
use prost::Message;
use state_reconstruct_fetcher::{
constants::{ethereum, storage},
types::CommitBlockInfoV1,
types::CommitBlock,
};
use tokio::sync::mpsc;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl SnapshotBuilder {

#[async_trait]
impl Processor for SnapshotBuilder {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
while let Some(block) = rx.recv().await {
// Initial calldata.
for (key, value) in &block.initial_storage_changes {
Expand Down
10 changes: 5 additions & 5 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use ethers::types::H256;
use eyre::Result;
use state_reconstruct_fetcher::{
constants::storage::STATE_FILE_NAME, snapshot::StateSnapshot, types::CommitBlockInfoV1,
constants::storage::STATE_FILE_NAME, snapshot::StateSnapshot, types::CommitBlock,
};
use tokio::sync::{mpsc, Mutex};

Expand Down Expand Up @@ -48,22 +48,22 @@ impl TreeProcessor {

#[async_trait]
impl Processor for TreeProcessor {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlock>) {
while let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
if snapshot.latest_l2_block_number >= block.l2_block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
block.l2_block_number
);
continue;
}

self.tree.insert_block(&block);

// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.latest_l2_block_number = block.l2_block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use blake2::{Blake2s256, Digest};
use ethers::types::{Address, H256, U256};
use eyre::Result;
use indexmap::IndexSet;
use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlockInfoV1};
use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlock};
use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper};

use super::RootHash;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl TreeWrapper {
}

/// Inserts a block into the tree and returns the root hash of the resulting state tree.
pub fn insert_block(&mut self, block: &CommitBlockInfoV1) -> RootHash {
pub fn insert_block(&mut self, block: &CommitBlock) -> RootHash {
// INITIAL CALLDATA.
let mut key_value_pairs: Vec<(U256, H256)> =
Vec::with_capacity(block.initial_storage_changes.len());
Expand Down Expand Up @@ -61,11 +61,11 @@ impl TreeWrapper {
assert_eq!(root_hash.as_bytes(), block.new_state_root);
tracing::debug!(
"Root hash of block {} = {}",
block.block_number,
block.l2_block_number,
hex::encode(root_hash)
);

tracing::debug!("Successfully processed block {}", block.block_number);
tracing::debug!("Successfully processed block {}", block.l2_block_number);

root_hash
}
Expand Down
12 changes: 12 additions & 0 deletions state-reconstruct-fetcher/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub mod ethereum {
/// Block number in Ethereum for zkSync genesis block.
pub const GENESIS_BLOCK: u64 = 16_627_460;

/// Block number in Ethereum of the first Boojum-formatted block.
pub const BOOJUM_BLOCK: u64 = 18_711_784;

/// zkSync smart contract address.
pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324";
}
Expand All @@ -19,3 +22,12 @@ pub mod storage {
/// The name of the state file.
pub const STATE_FILE_NAME: &str = "StateSnapshot.json";
}

pub mod zksync {
/// Bytes in raw L2 to L1 log.
pub const L2_TO_L1_LOG_SERIALIZE_SIZE: usize = 88;
// The bitmask by applying which to the compressed state diff metadata we retrieve its operation.
pub const OPERATION_BITMASK: u8 = 7;
// The number of bits shifting the compressed state diff metadata by which we retrieve its length.
pub const LENGTH_BITS_OFFSET: u8 = 3;
}
94 changes: 63 additions & 31 deletions state-reconstruct-fetcher/src/l1_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, ops::Fn, sync::Arc};
use std::{fs::File, future::Future, ops::Fn, sync::Arc};

use ethers::{
abi::{Contract, Function},
Expand All @@ -15,9 +15,9 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR},
constants::ethereum::{BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR},
snapshot::StateSnapshot,
types::{CommitBlockInfoV1, ParseError},
types::{v1::V1, v2::V2, CommitBlock, ParseError},
};

/// `MAX_RETRIES` is the maximum number of retries on failed L1 call.
Expand Down Expand Up @@ -89,9 +89,15 @@ impl L1Metrics {
}
}

#[derive(Clone)]
struct Contracts {
v1: Contract,
v2: Contract,
}
Comment on lines +93 to +96
Copy link
Member Author

@zeapoz zeapoz Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of this sort of structure, but it was the easiest to implement without merging the two ABI-files. If you have any thoughts/ideas, do let me know

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good middle-ground solution. I have related notes below where I elaborate on this.


pub struct L1Fetcher {
provider: Provider<Http>,
contract: Contract,
contracts: Contracts,
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
metrics: Arc<Mutex<L1Metrics>>,
Expand All @@ -105,21 +111,22 @@ impl L1Fetcher {
let provider = Provider::<Http>::try_from(&config.http_url)
.expect("could not instantiate HTTP Provider");

let abi_file = std::fs::File::open("./IZkSync.json")?;
let contract = Contract::load(abi_file)?;
let v1 = Contract::load(File::open("./abi/IZkSync.json")?)?;
let v2 = Contract::load(File::open("./abi/IZkSyncV2.json")?)?;
let contracts = Contracts { v1, v2 };

let metrics = Arc::new(Mutex::new(L1Metrics::default()));

Ok(L1Fetcher {
provider,
contract,
contracts,
config,
snapshot,
metrics,
})
}

pub async fn run(&self, sink: mpsc::Sender<CommitBlockInfoV1>) -> Result<()> {
pub async fn run(&self, sink: mpsc::Sender<CommitBlock>) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
// exists, start from the block number specified in that snapshot.
Expand Down Expand Up @@ -237,7 +244,7 @@ impl L1Fetcher {
disable_polling: bool,
) -> Result<tokio::task::JoinHandle<()>> {
let metrics = self.metrics.clone();
let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. this reads quite well for me and I like this structure.

let provider_clone = self.provider.clone();

Ok(tokio::spawn({
Expand Down Expand Up @@ -357,21 +364,33 @@ impl L1Fetcher {
})
}

// FIXME:
#[allow(clippy::absurd_extreme_comparisons)]
fn spawn_parsing_handler(
&self,
mut l1_tx_rx: mpsc::Receiver<Transaction>,
sink: mpsc::Sender<CommitBlockInfoV1>,
sink: mpsc::Sender<CommitBlock>,
) -> Result<tokio::task::JoinHandle<Option<u64>>> {
let metrics = self.metrics.clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

let contracts = self.contracts.clone();
Ok(tokio::spawn({
async move {
let mut function =
contracts.v1.functions_by_name("commitBlocks").unwrap()[0].clone();
let mut last_block_number_processed = None;

while let Some(tx) = l1_tx_rx.recv().await {
let block_number = tx.block_number.map(|v| v.as_u64());
let blocks = match parse_calldata(block_number, &function, &tx.input) {

if let Some(block_number) = block_number {
if block_number >= BOOJUM_BLOCK {
function =
contracts.v2.functions_by_name("commitBatches").unwrap()[0].clone();
tracing::debug!("Reached `BOOJUM_BLOCK`, changing commit block format");
}
};

let blocks = match parse_calldata(block_number, &function, &tx.input).await {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
Expand All @@ -383,15 +402,14 @@ impl L1Fetcher {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
metrics.latest_l2_block_nbr = blk.l2_block_number;
sink.send(blk).await.unwrap();
}

last_block_number_processed = block_number;
}

// Return the last processed l1 block number,
// so we can resume from the same point later on.
// Return the last processed l1 block number, so we can resume from the same point later on.
last_block_number_processed
}
}))
Expand All @@ -414,11 +432,11 @@ impl L1Fetcher {
}
}

pub fn parse_calldata(
pub async fn parse_calldata(
l1_block_number: Option<u64>,
commit_blocks_fn: &Function,
calldata: &[u8],
) -> Result<Vec<CommitBlockInfoV1>> {
) -> Result<Vec<CommitBlock>> {
let mut parsed_input = commit_blocks_fn
.decode_input(&calldata[4..])
.map_err(|e| ParseError::InvalidCalldata(e.to_string()))?;
Expand Down Expand Up @@ -460,16 +478,26 @@ pub fn parse_calldata(
// TODO: What to do here?
// assert_eq!(previous_enumeration_index, tree.next_enumeration_index());

// Supplement every CommitBlockInfoV1 element with L1 block number information.
parse_commit_block_info(&new_blocks_data).map(|mut vec| {
vec.iter_mut()
.for_each(|e| e.l1_block_number = l1_block_number);
vec
})
// Supplement every `CommitBlock` element with L1 block number information.
parse_commit_block_info(&new_blocks_data, l1_block_number)
.await
.map(|mut vec| {
vec.iter_mut()
.for_each(|e| e.l1_block_number = l1_block_number);
vec
})
}

fn parse_commit_block_info(data: &abi::Token) -> Result<Vec<CommitBlockInfoV1>> {
let mut res = vec![];
async fn parse_commit_block_info(
data: &abi::Token,
l1_block_number: Option<u64>,
) -> Result<Vec<CommitBlock>> {
// By default parse blocks using [`CommitBlockInfoV1`]; if we have reached [`BOOJUM_BLOCK`], use [`CommitBlockInfoV2`].
let use_new_format = if let Some(block_number) = l1_block_number {
block_number >= BOOJUM_BLOCK
} else {
false
};

let abi::Token::Array(data) = data else {
return Err(ParseError::InvalidCommitBlockInfo(
Expand All @@ -478,12 +506,16 @@ fn parse_commit_block_info(data: &abi::Token) -> Result<Vec<CommitBlockInfoV1>>
.into());
};

let mut result = vec![];
for d in data {
match CommitBlockInfoV1::try_from(d) {
Ok(blk) => res.push(blk),
Err(e) => tracing::error!("failed to parse commit block info: {e}"),
}
let commit_block = if use_new_format {
CommitBlock::try_from_token::<V2>(d)?
} else {
CommitBlock::try_from_token::<V1>(d)?
};

result.push(commit_block);
}

Ok(res)
Ok(result)
}
Loading