Skip to content

Commit

Permalink
ref: abstract away protobuf from snapshot
Browse files Browse the repository at this point in the history
Introduces a new trait, `Proto`, that defines methods for converting
between native structs and ones generated by protobuf, and ones for
encoding/decoding those structs into/from gzipped protobuf files.
  • Loading branch information
zeapoz committed Apr 11, 2024
1 parent 6f7f973 commit 6b73a92
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 136 deletions.
106 changes: 16 additions & 90 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use std::{
io::Write,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use bytes::BytesMut;
use ethers::types::U64;
use eyre::Result;
use flate2::{write::GzEncoder, Compression};
use prost::Message;

use super::{
database::{self, SnapshotDB},
types::{self, SnapshotFactoryDependency, SnapshotHeader},
types::{SnapshotFactoryDependency, SnapshotHeader},
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}
use crate::processor::snapshot::types::{
Proto, SnapshotFactoryDependencies, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata,
};

pub struct SnapshotExporter {
basedir: PathBuf,
Expand Down Expand Up @@ -68,26 +61,15 @@ impl SnapshotExporter {
fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting factory dependencies...");

let mut buf = BytesMut::new();

let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap();
let mut iterator = self
.database
.iterator_cf(storage_logs, rocksdb::IteratorMode::Start);

let mut factory_deps = protobuf::SnapshotFactoryDependencies::default();
let mut factory_deps = SnapshotFactoryDependencies::default();
while let Some(Ok((_, bs))) = iterator.next() {
let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?;
factory_deps
.factory_deps
.push(protobuf::SnapshotFactoryDependency {
bytecode: Some(factory_dep.bytecode),
});
}

let fd_len = factory_deps.encoded_len();
if buf.capacity() < fd_len {
buf.reserve(fd_len - buf.capacity());
factory_deps.factory_deps.push(factory_dep);
}

let path = self.basedir.join(format!(
Expand All @@ -100,83 +82,44 @@ impl SnapshotExporter {
.into_string()
.expect("path to string");

// Serialize chunk.
factory_deps.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

factory_deps.encode(&path)?;
tracing::info!("All factory dependencies were successfully serialized!");
Ok(())
}

fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting storage logs...");

let mut buf = BytesMut::new();
let mut chunk_id = 0;

let num_logs = self.database.get_last_repeated_key_index()?;
tracing::info!("Found {num_logs} logs.");

let total_num_chunks = (num_logs / chunk_size) + 1;

let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap();
let mut iterator = self
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let mut has_more = true;

while has_more {
let total_num_chunks = (num_logs / chunk_size) + 1;
for chunk_id in 0..total_num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks);

let mut chunk = protobuf::SnapshotStorageLogsChunk {
storage_logs: vec![],
};

let mut chunk = SnapshotStorageLogsChunk::default();
for _ in 0..chunk_size {
if let Some(Ok((_, key))) = iterator.next() {
if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) {
let pb = protobuf::SnapshotStorageLog {
account_address: None,
storage_key: Some(key.to_vec()),
storage_value: Some(entry.value.0.to_vec()),
l1_batch_number_of_initial_write: Some(
entry.l1_batch_number_of_initial_write.as_u32(),
),
enumeration_index: Some(entry.enumeration_index),
};

chunk.storage_logs.push(pb);
chunk.storage_logs.push(entry);
}
} else {
has_more = false;
break;
}
}

// Ensure that write buffer has enough capacity.
let chunk_len = chunk.encoded_len();
if buf.capacity() < chunk_len {
buf.reserve(chunk_len - buf.capacity());
}

let path = &self.basedir.join(format!(
let path = self.basedir.join(format!(
"snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip",
header.l1_batch_number, chunk_id
));

header
.storage_logs_chunks
.push(types::SnapshotStorageLogsChunkMetadata {
.push(SnapshotStorageLogsChunkMetadata {
chunk_id,
filepath: path
.clone()
Expand All @@ -185,25 +128,8 @@ impl SnapshotExporter {
.expect("path to string"),
});

// Serialize chunk.
chunk.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

// Clear $tmp buffer.
buf.truncate(0);

chunk.encode(&path)?;
tracing::info!("Chunk {} was successfully serialized!", chunk_id + 1);
chunk_id += 1;
}

tracing::info!("All storage logs were successfully serialized!");
Expand Down
24 changes: 3 additions & 21 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use std::{
fs,
io::Read,
path::{Path, PathBuf},
sync::Arc,
};

use eyre::Result;
use flate2::read::GzDecoder;
use prost::Message;
use state_reconstruct_fetcher::{constants::storage::INNER_DB_NAME, database::InnerDB};
use tokio::sync::Mutex;

use super::{
exporter::protobuf::{SnapshotFactoryDependencies, SnapshotStorageLogsChunk},
types::SnapshotHeader,
types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk},
SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};
use crate::processor::tree::tree_wrapper::TreeWrapper;
Expand Down Expand Up @@ -59,13 +55,7 @@ impl SnapshotImporter {
header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX
));
let bytes = fs::read(factory_deps_path)?;
let mut decoder = GzDecoder::new(&bytes[..]);

let mut decompressed_bytes = Vec::new();
decoder.read_to_end(&mut decompressed_bytes)?;

let factory_deps = SnapshotFactoryDependencies::decode(&decompressed_bytes[..])?;
Ok(factory_deps)
SnapshotFactoryDependencies::decode(&bytes)
}

fn read_storage_logs_chunks(
Expand All @@ -85,15 +75,7 @@ impl SnapshotImporter {
.directory
.join(path.file_name().expect("path has no file name"));
let bytes = fs::read(factory_deps_path)?;
let mut decoder = GzDecoder::new(&bytes[..]);

let mut decompressed_bytes = Vec::new();
decoder.read_to_end(&mut decompressed_bytes)?;

// TODO: It would be nice to avoid the intermediary step of decoding. Something like
// implementing a method on the types::* that does it automatically. Will improve
// readabitly for the export code too as a bonus.
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&decompressed_bytes[..])?;
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?;
chunks.push(storage_logs_chunk);
}
Ok(chunks)
Expand Down
6 changes: 5 additions & 1 deletion src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{fs, path::PathBuf, str::FromStr};
pub mod database;
pub mod exporter;
pub mod importer;
pub mod types;

mod bytecode;
mod types;

use async_trait::async_trait;
use blake2::{Blake2s256, Digest};
Expand All @@ -28,6 +28,10 @@ pub const DEFAULT_DB_PATH: &str = "snapshot_db";
pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json";
pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip";

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}

pub struct SnapshotBuilder {
database: SnapshotDB,
}
Expand Down
Loading

0 comments on commit 6b73a92

Please sign in to comment.