Skip to content

Commit

Permalink
ref: decouple state saves from tree inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
zeapoz committed Oct 20, 2023
1 parent ebed5d7 commit 235b077
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde_json = { version = "1.0.107", features = ["std"] }
serde_json_any_key = "2.0.0"
thiserror = "1.0"
tokio = { version = "1.32.0", features = ["macros", "signal"] }
tokio-util = "0.7.9"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
zksync_merkle_tree = { git = "https://github.com/matter-labs/zksync-era.git" }
2 changes: 1 addition & 1 deletion src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use eyre::Result;
use rand::random;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, Mutex, oneshot},
time::{sleep, Duration},
};

Expand Down
46 changes: 27 additions & 19 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod query_tree;
mod tree_wrapper;

use std::{path::PathBuf, sync::Arc};
use std::{io, path::PathBuf, sync::Arc};

use async_trait::async_trait;
use ethers::types::H256;
Expand Down Expand Up @@ -50,31 +50,39 @@ impl TreeProcessor<'static> {
snapshot,
})
}

pub async fn write_state(&self) -> Result<(), io::Error> {
let snapshot = self.snapshot.lock().await;
// Write the current state to a file.
let state_file_path = self.db_path.join(STATE_FILE_NAME);
snapshot.write(&state_file_path)
}
}

#[async_trait]
impl Processor for TreeProcessor<'static> {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
while let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
continue;
}
loop {
if let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
continue;
}

self.tree.insert_block(&block);
self.tree.insert_block(&block);

// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();

// Write the current state to a file.
let state_file_path = self.db_path.join(STATE_FILE_NAME);
snapshot.write(&state_file_path).unwrap();
// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();
} else {
self.write_state().await.unwrap();
break;
}
}
}
}

0 comments on commit 235b077

Please sign in to comment.