From 0cdd30251b2931f5c2f759efbad79619ab08a2de Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 27 Sep 2023 15:13:18 +0100 Subject: [PATCH] feat: create a `NippyJar` snapshot from multiple `Table` (#4716) --- Cargo.lock | 4 + bin/reth/src/utils.rs | 14 +- crates/interfaces/Cargo.toml | 1 + crates/interfaces/src/error.rs | 6 + crates/storage/db/Cargo.toml | 2 + crates/storage/db/src/abstraction/table.rs | 5 + crates/storage/db/src/lib.rs | 1 + crates/storage/db/src/snapshot.rs | 87 ++++++++ crates/storage/db/src/tables/raw.rs | 20 +- crates/storage/db/src/tables/utils.rs | 6 +- crates/storage/nippy-jar/src/cursor.rs | 2 +- crates/storage/nippy-jar/src/error.rs | 2 + crates/storage/nippy-jar/src/lib.rs | 79 ++++--- crates/storage/nippy-jar/src/phf/fmph.rs | 8 +- crates/storage/nippy-jar/src/phf/go_fmph.rs | 8 +- crates/storage/nippy-jar/src/phf/mod.rs | 14 +- crates/storage/provider/Cargo.toml | 2 + crates/storage/provider/src/providers/mod.rs | 2 + .../provider/src/providers/snapshot.rs | 205 ++++++++++++++++++ 19 files changed, 409 insertions(+), 59 deletions(-) create mode 100644 crates/storage/db/src/snapshot.rs create mode 100644 crates/storage/provider/src/providers/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index 7dae426b43d3..54046e1e2462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5527,6 +5527,7 @@ dependencies = [ "reth-interfaces", "reth-libmdbx", "reth-metrics", + "reth-nippy-jar", "reth-primitives", "secp256k1", "serde", @@ -5697,6 +5698,7 @@ dependencies = [ "reth-db", "reth-eth-wire", "reth-network-api", + "reth-nippy-jar", "reth-primitives", "reth-rpc-types", "revm-primitives", @@ -5971,9 +5973,11 @@ dependencies = [ "itertools 0.11.0", "parking_lot 0.12.1", "pin-project", + "rand 0.8.5", "rayon", "reth-db", "reth-interfaces", + "reth-nippy-jar", "reth-primitives", "reth-revm-primitives", "reth-rlp", diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs index 64a798dc9e1e..57519da6428f 100644 --- a/bin/reth/src/utils.rs +++ b/bin/reth/src/utils.rs @@ -6,7 +6,7 @@ use reth_consensus_common::validation::validate_block_standalone; use reth_db::{ cursor::DbCursorRO, database::Database, - table::{Table, TableRow}, + table::{Decode, Decompress, Table, TableRow}, transaction::{DbTx, DbTxMut}, DatabaseError, RawTable, TableRawRow, }; @@ -128,16 +128,22 @@ impl<'a, DB: Database> DbTool<'a, DB> { let map_filter = |row: Result, _>| { if let Ok((k, v)) = row { + let (key, value) = (k.into_key(), v.into_value()); + let result = || { if filter.only_count { return None } - Some((k.key().unwrap(), v.value().unwrap())) + Some(( + ::Key::decode(&key).unwrap(), + ::Value::decompress(&value).unwrap(), + )) }; + match &*bmb { Some(searcher) => { - if searcher.find_first_in(v.raw_value()).is_some() || - searcher.find_first_in(k.raw_key()).is_some() + if searcher.find_first_in(&value).is_some() || + searcher.find_first_in(&key).is_some() { hits += 1; return result() diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 158d03611664..bc2f35a4d6ae 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -9,6 +9,7 @@ repository.workspace = true [dependencies] reth-codecs = { path = "../storage/codecs" } +reth-nippy-jar = { path = "../storage/nippy-jar" } reth-primitives.workspace = true reth-rpc-types.workspace = true reth-network-api.workspace = true diff --git a/crates/interfaces/src/error.rs b/crates/interfaces/src/error.rs index 32d81e4f9823..85d0cdca6fb9 100644 --- a/crates/interfaces/src/error.rs +++ b/crates/interfaces/src/error.rs @@ -29,3 +29,9 @@ pub enum RethError { #[error("{0}")] Custom(String), } + +impl From for RethError { + fn from(err: reth_nippy_jar::NippyJarError) -> Self { + RethError::Custom(err.to_string()) + } +} diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 029039aa29ce..a04d895088f7 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -14,6 +14,7 @@ reth-primitives.workspace = true reth-interfaces.workspace = true reth-codecs = { path = "../codecs" } reth-libmdbx = { path = "../libmdbx-rs", optional = true, features = ["return-borrowed"] } +reth-nippy-jar = { path = "../nippy-jar" } # codecs serde = { workspace = true, default-features = false } @@ -42,6 +43,7 @@ tempfile = { version = "3.3.0", optional = true } parking_lot.workspace = true derive_more = "0.99" eyre = "0.6.8" +paste = "1.0" # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index 668bdf6998bb..81193bcc7386 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -35,6 +35,11 @@ pub trait Compress: Send + Sync + Sized + Debug { pub trait Decompress: Send + Sync + Sized + Debug { /// Decompresses data coming from the database. fn decompress>(value: B) -> Result; + + /// Decompresses owned data coming from the database. + fn decompress_owned(value: Vec) -> Result { + Self::decompress(value) + } } /// Trait that will transform the data to be saved in the DB. diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index cb705871d874..d6e0da017c68 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -68,6 +68,7 @@ pub mod abstraction; mod implementation; +pub mod snapshot; pub mod tables; mod utils; pub mod version; diff --git a/crates/storage/db/src/snapshot.rs b/crates/storage/db/src/snapshot.rs new file mode 100644 index 000000000000..cea6350d68ca --- /dev/null +++ b/crates/storage/db/src/snapshot.rs @@ -0,0 +1,87 @@ +//! reth's snapshot creation from database tables + +use crate::{ + abstraction::cursor::DbCursorRO, + table::{Key, Table}, + transaction::DbTx, + RawKey, RawTable, +}; +use reth_interfaces::RethResult; +use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey}; +use std::{error::Error as StdError, ops::RangeInclusive}; + +/// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and +/// creates a [`NippyJar`] file out of their [`Table::Value`]. Each list of [`Table::Value`] from a +/// table is a column of values. +/// +/// Has membership filter set and compression dictionary support. +macro_rules! generate_snapshot_func { + ($(($($tbl:ident),+)),+ $(,)? ) => { + $( + paste::item! { + /// Creates a snapshot from specified tables. Each table's `Value` iterator represents a column. + /// + /// **Ensure the range contains the same number of rows.** + /// + /// * `tx`: Database transaction. + /// * `range`: Data range for columns in tables. + /// * `keys`: Iterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`. + /// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent. + /// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`. + /// * `nippy_jar`: Snapshot object responsible for file generation. + #[allow(non_snake_case)] + pub fn []<'tx, + $($tbl: Table,)+ + K + > + ( + tx: &impl DbTx<'tx>, + range: RangeInclusive, + dict_compression_set: Option>>>, + keys: Option>>, + row_count: usize, + nippy_jar: &mut NippyJar + ) -> RethResult<()> + where K: Key + Copy + { + let range: RangeInclusive> = RawKey::new(*range.start())..=RawKey::new(*range.end()); + + // Create PHF and Filter if required + if let Some(keys) = keys { + nippy_jar.prepare_index(keys, row_count)?; + } + + // Create compression dictionaries if required + if let Some(data_sets) = dict_compression_set { + nippy_jar.prepare_compression(data_sets)?; + } + + // Creates the cursors for the columns + $( + let mut [< $tbl _cursor>] = tx.cursor_read::>()?; + let [< $tbl _iter>] = [< $tbl _cursor>] + .walk_range(range.clone())? + .into_iter() + .map(|row| + row + .map(|(_key, val)| val.into_value()) + .map_err(|e| Box::new(e) as Box) + ); + + )+ + + // Create the snapshot from the data + let col_iterators: Vec,_>>>> = vec![ + $(Box::new([< $tbl _iter>]),)+ + ]; + + nippy_jar.freeze(col_iterators, row_count as u64)?; + + Ok(()) + } + } + )+ + }; +} + +generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4),); diff --git a/crates/storage/db/src/tables/raw.rs b/crates/storage/db/src/tables/raw.rs index 00f5db2a142f..6e1db5f68e60 100644 --- a/crates/storage/db/src/tables/raw.rs +++ b/crates/storage/db/src/tables/raw.rs @@ -54,14 +54,21 @@ impl RawKey { pub fn new(key: K) -> Self { Self { key: K::encode(key).as_ref().to_vec(), _phantom: std::marker::PhantomData } } + /// Returns the decoded value. pub fn key(&self) -> Result { K::decode(&self.key) } + /// Returns the raw key as seen on the database. pub fn raw_key(&self) -> &Vec { &self.key } + + /// Consumes [`Self`] and returns the inner raw key. + pub fn into_key(self) -> Vec { + self.key + } } impl From for RawKey { @@ -105,14 +112,21 @@ impl RawValue { pub fn new(value: V) -> Self { Self { value: V::compress(value).as_ref().to_vec(), _phantom: std::marker::PhantomData } } + /// Returns the decompressed value. pub fn value(&self) -> Result { V::decompress(&self.value) } + /// Returns the raw value as seen on the database. - pub fn raw_value(&self) -> &Vec { + pub fn raw_value(&self) -> &[u8] { &self.value } + + /// Consumes [`Self`] and returns the inner raw value. + pub fn into_value(self) -> Vec { + self.value + } } impl AsRef<[u8]> for RawValue> { @@ -142,4 +156,8 @@ impl Decompress for RawValue { fn decompress>(value: B) -> Result { Ok(Self { value: value.as_ref().to_vec(), _phantom: std::marker::PhantomData }) } + + fn decompress_owned(value: Vec) -> Result { + Ok(Self { value, _phantom: std::marker::PhantomData }) + } } diff --git a/crates/storage/db/src/tables/utils.rs b/crates/storage/db/src/tables/utils.rs index 13bd1ce278e6..5e82eab62611 100644 --- a/crates/storage/db/src/tables/utils.rs +++ b/crates/storage/db/src/tables/utils.rs @@ -54,7 +54,7 @@ where }; let value = match kv.1 { Cow::Borrowed(v) => Decompress::decompress(v)?, - Cow::Owned(v) => Decompress::decompress(v)?, + Cow::Owned(v) => Decompress::decompress_owned(v)?, }; Ok((key, value)) } @@ -68,7 +68,7 @@ where { Ok(match kv.1 { Cow::Borrowed(v) => Decompress::decompress(v)?, - Cow::Owned(v) => Decompress::decompress(v)?, + Cow::Owned(v) => Decompress::decompress_owned(v)?, }) } @@ -79,6 +79,6 @@ where { Ok(match value { Cow::Borrowed(v) => Decompress::decompress(v)?, - Cow::Owned(v) => Decompress::decompress(v)?, + Cow::Owned(v) => Decompress::decompress_owned(v)?, }) } diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 1ea0fea106cf..1fd7170c9010 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -9,7 +9,7 @@ use sucds::int_vectors::Access; use zstd::bulk::Decompressor; /// Simple cursor implementation to retrieve data from [`NippyJar`]. -pub struct NippyJarCursor<'a, H> { +pub struct NippyJarCursor<'a, H = ()> { /// [`NippyJar`] which holds most of the required configuration to read from the file. jar: &'a NippyJar, /// Optional dictionary decompressors. diff --git a/crates/storage/nippy-jar/src/error.rs b/crates/storage/nippy-jar/src/error.rs index 86353d0a794d..1ea1b0f4d094 100644 --- a/crates/storage/nippy-jar/src/error.rs +++ b/crates/storage/nippy-jar/src/error.rs @@ -3,6 +3,8 @@ use thiserror::Error; /// Errors associated with [`crate::NippyJar`]. #[derive(Debug, Error)] pub enum NippyJarError { + #[error(transparent)] + Internal(#[from] Box), #[error(transparent)] Disconnect(#[from] std::io::Error), #[error(transparent)] diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 2626bbeb78c5..f2dfd0dabf2c 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -13,8 +13,8 @@ use serde::{Deserialize, Serialize}; use std::{ clone::Clone, + error::Error as StdError, fs::File, - hash::Hash, io::{Seek, Write}, marker::Sync, path::{Path, PathBuf}, @@ -32,6 +32,7 @@ pub mod compression; use compression::{Compression, Compressors}; pub mod phf; +pub use phf::PHFKey; use phf::{Fmph, Functions, GoFmph, PerfectHashingFunction}; mod error; @@ -45,6 +46,9 @@ const NIPPY_JAR_VERSION: usize = 1; /// A [`Row`] is a list of its selected column values. type Row = Vec>; +/// Alias type for a column value wrapped in `Result` +pub type ColumnResult = Result>; + /// `NippyJar` is a specialized storage format designed for immutable data. /// /// Data is organized into a columnar format, enabling column-based compression. Data retrieval @@ -106,6 +110,11 @@ impl NippyJar<()> { pub fn load_without_header(path: &Path) -> Result { NippyJar::<()>::load(path) } + + /// Whether this [`NippyJar`] uses a [`InclusionFilters`] and [`Functions`]. + pub fn uses_filters(&self) -> bool { + self.filter.is_some() && self.phf.is_some() + } } impl NippyJar @@ -210,19 +219,23 @@ where /// Prepares beforehand the offsets index for querying rows based on `values` (eg. transaction /// hash). Expects `values` to be sorted in the same way as the data that is going to be /// later on inserted. - pub fn prepare_index + Sync + Clone + Hash>( + /// + /// Currently collecting all items before acting on them. + pub fn prepare_index( &mut self, - values: &[T], + values: impl IntoIterator>, + row_count: usize, ) -> Result<(), NippyJarError> { - let mut offsets_index = vec![0; values.len()]; + let values = values.into_iter().collect::, _>>()?; + let mut offsets_index = vec![0; row_count]; // Builds perfect hashing function from the values if let Some(phf) = self.phf.as_mut() { - phf.set_keys(values)?; + phf.set_keys(&values)?; } if self.filter.is_some() || self.phf.is_some() { - for (row_num, v) in values.iter().enumerate() { + for (row_num, v) in values.into_iter().enumerate() { if let Some(filter) = self.filter.as_mut() { filter.add(v.as_ref())?; } @@ -242,7 +255,7 @@ where /// Writes all data and configuration to a file and the offset index to another. pub fn freeze( &mut self, - columns: Vec>>, + columns: Vec>>>, total_rows: u64, ) -> Result<(), NippyJarError> { let mut file = self.freeze_check(&columns)?; @@ -275,7 +288,7 @@ where offsets.push(file.stream_position()? as usize); match column_iter.next() { - Some(value) => { + Some(Ok(value)) => { if let Some(compression) = &self.compressor { // Special zstd case with dictionaries if let (Some(dict_compressors), Compressors::Zstd(_)) = @@ -300,6 +313,7 @@ where column_number as u64, )) } + Some(Err(err)) => return Err(err.into()), } iterators.push(column_iter); @@ -339,7 +353,7 @@ where /// Safety checks before creating and returning a [`File`] handle to write data to. fn freeze_check( &mut self, - columns: &Vec>>, + columns: &Vec>>>, ) -> Result { if columns.len() != self.columns { return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len())) @@ -384,10 +398,7 @@ impl PerfectHashingFunction for NippyJar where H: Send + Sync + Serialize + for<'a> Deserialize<'a>, { - fn set_keys + Sync + Clone + Hash>( - &mut self, - keys: &[T], - ) -> Result<(), NippyJarError> { + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys) } @@ -402,6 +413,7 @@ mod tests { use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng}; use std::collections::HashSet; + type ColumnResults = Vec>; type ColumnValues = Vec>; fn test_data(seed: Option) -> (ColumnValues, ColumnValues) { @@ -423,6 +435,10 @@ mod tests { (gen(), gen()) } + fn clone_with_result(col: &ColumnValues) -> ColumnResults> { + col.iter().map(|v| Ok(v.clone())).collect() + } + #[test] fn test_phf() { let (col1, col2) = test_data(None); @@ -455,8 +471,10 @@ mod tests { assert_eq!(indexes, collect_indexes(nippy)); // Ensure that loaded phf provides the same function outputs - nippy.prepare_index(&col1).unwrap(); - nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap(); + nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap(); + nippy + .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows) + .unwrap(); let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); assert_eq!(indexes, collect_indexes(&loaded_nippy)); }; @@ -504,7 +522,7 @@ mod tests { Err(NippyJarError::FilterMaxCapacity) )); - nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap(); + nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); assert_eq!(nippy, loaded_nippy); @@ -540,16 +558,14 @@ mod tests { )); } - let data = vec![col1.clone(), col2.clone()]; - // If ZSTD is enabled, do not write to the file unless the column dictionaries have been // calculated. assert!(matches!( - nippy.freeze(data.clone(), num_rows), + nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows), Err(NippyJarError::CompressorNotReady) )); - nippy.prepare_compression(data.clone()).unwrap(); + nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap(); if let Some(Compressors::Zstd(zstd)) = &nippy.compressor { assert!(matches!( @@ -558,7 +574,7 @@ mod tests { )); } - nippy.freeze(data.clone(), num_rows).unwrap(); + nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); assert_eq!(nippy, loaded_nippy); @@ -578,7 +594,7 @@ mod tests { // Iterate over compressed values and compare let mut row_index = 0usize; while let Some(row) = cursor.next_row().unwrap() { - assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index])); + assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index])); row_index += 1; } } @@ -598,9 +614,7 @@ mod tests { NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000); assert!(nippy.compressor.is_some()); - let data = vec![col1.clone(), col2.clone()]; - - nippy.freeze(data.clone(), num_rows).unwrap(); + nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); assert_eq!(nippy, loaded_nippy); @@ -613,7 +627,7 @@ mod tests { // Iterate over compressed values and compare let mut row_index = 0usize; while let Some(row) = cursor.next_row().unwrap() { - assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index])); + assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index])); row_index += 1; } } else { @@ -629,6 +643,7 @@ mod tests { let num_columns = 2; let file_path = tempfile::NamedTempFile::new().unwrap(); let data = vec![col1.clone(), col2.clone()]; + let block_start = 500; #[derive(Serialize, Deserialize, Debug)] @@ -645,8 +660,10 @@ mod tests { .with_mphf(); nippy.prepare_compression(data.clone()).unwrap(); - nippy.prepare_index(&col1).unwrap(); - nippy.freeze(data.clone(), num_rows).unwrap(); + nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap(); + nippy + .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows) + .unwrap(); } // Read file @@ -710,8 +727,10 @@ mod tests { .with_mphf(); nippy.prepare_compression(data.clone()).unwrap(); - nippy.prepare_index(&col1).unwrap(); - nippy.freeze(data.clone(), num_rows).unwrap(); + nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap(); + nippy + .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows) + .unwrap(); } // Read file diff --git a/crates/storage/nippy-jar/src/phf/fmph.rs b/crates/storage/nippy-jar/src/phf/fmph.rs index 62740e48a25b..e540bae64fe5 100644 --- a/crates/storage/nippy-jar/src/phf/fmph.rs +++ b/crates/storage/nippy-jar/src/phf/fmph.rs @@ -1,10 +1,9 @@ -use crate::{NippyJarError, PerfectHashingFunction}; +use crate::{NippyJarError, PHFKey, PerfectHashingFunction}; use ph::fmph::{BuildConf, Function}; use serde::{ de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize, Serializer, }; -use std::{clone::Clone, hash::Hash, marker::Sync}; /// Wrapper struct for [`Function`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453). #[derive(Default)] @@ -19,10 +18,7 @@ impl Fmph { } impl PerfectHashingFunction for Fmph { - fn set_keys + Sync + Clone + Hash>( - &mut self, - keys: &[T], - ) -> Result<(), NippyJarError> { + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { self.function = Some(Function::from_slice_with_conf( keys, BuildConf { use_multiple_threads: true, ..Default::default() }, diff --git a/crates/storage/nippy-jar/src/phf/go_fmph.rs b/crates/storage/nippy-jar/src/phf/go_fmph.rs index fc7b8fa89ec7..fd244cd1fc01 100644 --- a/crates/storage/nippy-jar/src/phf/go_fmph.rs +++ b/crates/storage/nippy-jar/src/phf/go_fmph.rs @@ -1,10 +1,9 @@ -use crate::{NippyJarError, PerfectHashingFunction}; +use crate::{NippyJarError, PHFKey, PerfectHashingFunction}; use ph::fmph::{GOBuildConf, GOFunction}; use serde::{ de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize, Serializer, }; -use std::{clone::Clone, hash::Hash, marker::Sync}; /// Wrapper struct for [`GOFunction`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453). #[derive(Default)] @@ -19,10 +18,7 @@ impl GoFmph { } impl PerfectHashingFunction for GoFmph { - fn set_keys + Sync + Clone + Hash>( - &mut self, - keys: &[T], - ) -> Result<(), NippyJarError> { + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { self.function = Some(GOFunction::from_slice_with_conf( keys, GOBuildConf { use_multiple_threads: true, ..Default::default() }, diff --git a/crates/storage/nippy-jar/src/phf/mod.rs b/crates/storage/nippy-jar/src/phf/mod.rs index 84113181bcca..d04d4fc2db49 100644 --- a/crates/storage/nippy-jar/src/phf/mod.rs +++ b/crates/storage/nippy-jar/src/phf/mod.rs @@ -8,13 +8,14 @@ pub use fmph::Fmph; mod go_fmph; pub use go_fmph::GoFmph; +/// Trait alias for [`PerfectHashingFunction`] keys. +pub trait PHFKey: AsRef<[u8]> + Sync + Clone + Hash {} +impl + Sync + Clone + Hash> PHFKey for T {} + /// Trait to build and query a perfect hashing function. pub trait PerfectHashingFunction: Serialize + for<'a> Deserialize<'a> { /// Adds the key set and builds the perfect hashing function. - fn set_keys + Sync + Clone + Hash>( - &mut self, - keys: &[T], - ) -> Result<(), NippyJarError>; + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError>; /// Get corresponding associated integer. There might be false positives. fn get_index(&self, key: &[u8]) -> Result, NippyJarError>; @@ -29,10 +30,7 @@ pub enum Functions { } impl PerfectHashingFunction for Functions { - fn set_keys + Sync + Clone + Hash>( - &mut self, - keys: &[T], - ) -> Result<(), NippyJarError> { + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { match self { Functions::Fmph(f) => f.set_keys(keys), Functions::GoFmph(f) => f.set_keys(keys), diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index ee3616e73666..a54e4cfda9f4 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -15,6 +15,7 @@ reth-interfaces.workspace = true reth-revm-primitives = { path = "../../revm/revm-primitives" } reth-db.workspace = true reth-trie = { path = "../../trie" } +reth-nippy-jar = { path = "../nippy-jar" } # async tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } @@ -45,6 +46,7 @@ reth-interfaces = { workspace = true, features = ["test-utils"] } parking_lot.workspace = true tempfile = "3.3" assert_matches.workspace = true +rand.workspace = true [features] test-utils = ["reth-rlp"] diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index bb58b7887acd..9db626f0bfc3 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -35,6 +35,8 @@ use tracing::trace; mod bundle_state_provider; mod chain_info; mod database; +mod snapshot; +pub use snapshot::SnapshotProvider; mod state; use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource}; pub use bundle_state_provider::BundleStateProvider; diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs new file mode 100644 index 000000000000..f7fe3b4a1cc8 --- /dev/null +++ b/crates/storage/provider/src/providers/snapshot.rs @@ -0,0 +1,205 @@ +use crate::HeaderProvider; +use reth_db::{ + table::{Decompress, Table}, + HeaderTD, +}; +use reth_interfaces::RethResult; +use reth_nippy_jar::{NippyJar, NippyJarCursor}; +use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256}; +use std::ops::RangeBounds; + +/// SnapshotProvider +/// +/// WIP Rudimentary impl just for testes +/// TODO: should be able to walk through snapshot files/block_ranges +/// TODO: Arc over NippyJars and/or NippyJarCursors (LRU) +#[derive(Debug)] +pub struct SnapshotProvider<'a> { + /// NippyJar + pub jar: &'a NippyJar, +} + +impl<'a> SnapshotProvider<'a> { + fn cursor(&self) -> NippyJarCursor<'a> { + NippyJarCursor::new(self.jar, None).unwrap() + } +} + +impl<'a> HeaderProvider for SnapshotProvider<'a> { + fn header(&self, block_hash: &BlockHash) -> RethResult> { + // WIP + let mut cursor = self.cursor(); + + let header = Header::decompress( + &cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], + ) + .unwrap(); + + if &header.hash_slow() == block_hash { + return Ok(Some(header)) + } else { + // check next snapshot + } + Ok(None) + } + + fn header_by_number(&self, _num: BlockNumber) -> RethResult> { + unimplemented!(); + } + + fn header_td(&self, block_hash: &BlockHash) -> RethResult> { + // WIP + let mut cursor = self.cursor(); + + let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); + + let header = Header::decompress(&row[0]).unwrap(); + let td = ::Value::decompress(&row[1]).unwrap(); + + if &header.hash_slow() == block_hash { + return Ok(Some(td.0)) + } else { + // check next snapshot + } + Ok(None) + } + + fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { + unimplemented!(); + } + + fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { + unimplemented!(); + } + + fn sealed_headers_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + unimplemented!(); + } + + fn sealed_header(&self, _number: BlockNumber) -> RethResult> { + unimplemented!(); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::ProviderFactory; + use rand::{self, seq::SliceRandom}; + use reth_db::{ + cursor::DbCursorRO, + database::Database, + snapshot::create_snapshot_T1_T2, + test_utils::create_test_rw_db, + transaction::{DbTx, DbTxMut}, + CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable, + }; + use reth_interfaces::test_utils::generators::{self, random_header_range}; + use reth_nippy_jar::NippyJar; + use reth_primitives::{H256, MAINNET}; + + #[test] + fn test_snap() { + // Ranges + let row_count = 100u64; + let range = 0..=(row_count - 1); + + // Data sources + let db = create_test_rw_db(); + let factory = ProviderFactory::new(&db, MAINNET.clone()); + let snap_file = tempfile::NamedTempFile::new().unwrap(); + + // Setup data + let mut headers = random_header_range( + &mut generators::rng(), + *range.start()..(*range.end() + 1), + H256::random(), + ); + + db.update(|tx| -> std::result::Result<(), DatabaseError> { + let mut td = U256::ZERO; + for header in headers.clone() { + td += header.header.difficulty; + let hash = header.hash(); + + tx.put::(header.number, hash)?; + tx.put::(header.number, header.clone().unseal())?; + tx.put::(header.number, td.into())?; + tx.put::(hash, header.number)?; + } + Ok(()) + }) + .unwrap() + .unwrap(); + + // Create Snapshot + { + let with_compression = true; + let with_filter = true; + + let mut nippy_jar = NippyJar::new_without_header(2, snap_file.path()); + + if with_compression { + nippy_jar = nippy_jar.with_zstd(false, 0); + } + + if with_filter { + nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_mphf(); + } + + let tx = db.tx().unwrap(); + + // Hacky type inference. TODO fix + let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]); + let _ = none_vec.take(); + + // Generate list of hashes for filters & PHF + let mut cursor = tx.cursor_read::>().unwrap(); + let hashes = cursor + .walk(None) + .unwrap() + .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); + + create_snapshot_T1_T2::( + &tx, + range, + none_vec, + Some(hashes), + row_count as usize, + &mut nippy_jar, + ) + .unwrap(); + } + + // Use providers to query Header data and compare if it matches + { + let jar = NippyJar::load_without_header(snap_file.path()).unwrap(); + + let db_provider = factory.provider().unwrap(); + let snap_provider = SnapshotProvider { jar: &jar }; + + assert!(!headers.is_empty()); + + // Shuffled for chaos. + headers.shuffle(&mut generators::rng()); + + for header in headers { + let header_hash = header.hash(); + let header = header.unseal(); + + // Compare Header + assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); + assert_eq!(header, snap_provider.header(&header_hash).unwrap().unwrap()); + + // Compare HeaderTD + assert_eq!( + db_provider.header_td(&header_hash).unwrap().unwrap(), + snap_provider.header_td(&header_hash).unwrap().unwrap() + ); + } + } + } +}