Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
feat: create a NippyJar snapshot from multiple Table (paradigmxyz…
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Sep 27, 2023
1 parent ac6570f commit 0cdd302
Show file tree
Hide file tree
Showing 19 changed files with 409 additions and 59 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions bin/reth/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use reth_consensus_common::validation::validate_block_standalone;
use reth_db::{
cursor::DbCursorRO,
database::Database,
table::{Table, TableRow},
table::{Decode, Decompress, Table, TableRow},
transaction::{DbTx, DbTxMut},
DatabaseError, RawTable, TableRawRow,
};
Expand Down Expand Up @@ -128,16 +128,22 @@ impl<'a, DB: Database> DbTool<'a, DB> {

let map_filter = |row: Result<TableRawRow<T>, _>| {
if let Ok((k, v)) = row {
let (key, value) = (k.into_key(), v.into_value());

let result = || {
if filter.only_count {
return None
}
Some((k.key().unwrap(), v.value().unwrap()))
Some((
<T as Table>::Key::decode(&key).unwrap(),
<T as Table>::Value::decompress(&value).unwrap(),
))
};

match &*bmb {
Some(searcher) => {
if searcher.find_first_in(v.raw_value()).is_some() ||
searcher.find_first_in(k.raw_key()).is_some()
if searcher.find_first_in(&value).is_some() ||
searcher.find_first_in(&key).is_some()
{
hits += 1;
return result()
Expand Down
1 change: 1 addition & 0 deletions crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository.workspace = true

[dependencies]
reth-codecs = { path = "../storage/codecs" }
reth-nippy-jar = { path = "../storage/nippy-jar" }
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-network-api.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions crates/interfaces/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ pub enum RethError {
#[error("{0}")]
Custom(String),
}

impl From<reth_nippy_jar::NippyJarError> for RethError {
fn from(err: reth_nippy_jar::NippyJarError) -> Self {
RethError::Custom(err.to_string())
}
}
2 changes: 2 additions & 0 deletions crates/storage/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ reth-primitives.workspace = true
reth-interfaces.workspace = true
reth-codecs = { path = "../codecs" }
reth-libmdbx = { path = "../libmdbx-rs", optional = true, features = ["return-borrowed"] }
reth-nippy-jar = { path = "../nippy-jar" }

# codecs
serde = { workspace = true, default-features = false }
Expand Down Expand Up @@ -42,6 +43,7 @@ tempfile = { version = "3.3.0", optional = true }
parking_lot.workspace = true
derive_more = "0.99"
eyre = "0.6.8"
paste = "1.0"

# arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/storage/db/src/abstraction/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub trait Compress: Send + Sync + Sized + Debug {
pub trait Decompress: Send + Sync + Sized + Debug {
/// Decompresses data coming from the database.
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError>;

/// Decompresses owned data coming from the database.
fn decompress_owned(value: Vec<u8>) -> Result<Self, DatabaseError> {
Self::decompress(value)
}
}

/// Trait that will transform the data to be saved in the DB.
Expand Down
1 change: 1 addition & 0 deletions crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
pub mod abstraction;

mod implementation;
pub mod snapshot;
pub mod tables;
mod utils;
pub mod version;
Expand Down
87 changes: 87 additions & 0 deletions crates/storage/db/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! reth's snapshot creation from database tables

use crate::{
abstraction::cursor::DbCursorRO,
table::{Key, Table},
transaction::DbTx,
RawKey, RawTable,
};
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use std::{error::Error as StdError, ops::RangeInclusive};

/// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and
/// creates a [`NippyJar`] file out of their [`Table::Value`]. Each list of [`Table::Value`] from a
/// table is a column of values.
///
/// Has membership filter set and compression dictionary support.
macro_rules! generate_snapshot_func {
($(($($tbl:ident),+)),+ $(,)? ) => {
$(
paste::item! {
/// Creates a snapshot from specified tables. Each table's `Value` iterator represents a column.
///
/// **Ensure the range contains the same number of rows.**
///
/// * `tx`: Database transaction.
/// * `range`: Data range for columns in tables.
/// * `keys`: Iterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`.
/// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent.
/// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`.
/// * `nippy_jar`: Snapshot object responsible for file generation.
#[allow(non_snake_case)]
pub fn [<create_snapshot$(_ $tbl)+>]<'tx,
$($tbl: Table<Key=K>,)+
K
>
(
tx: &impl DbTx<'tx>,
range: RangeInclusive<K>,
dict_compression_set: Option<Vec<impl Iterator<Item = Vec<u8>>>>,
keys: Option<impl Iterator<Item = ColumnResult<impl PHFKey>>>,
row_count: usize,
nippy_jar: &mut NippyJar
) -> RethResult<()>
where K: Key + Copy
{
let range: RangeInclusive<RawKey<K>> = RawKey::new(*range.start())..=RawKey::new(*range.end());

// Create PHF and Filter if required
if let Some(keys) = keys {
nippy_jar.prepare_index(keys, row_count)?;
}

// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
nippy_jar.prepare_compression(data_sets)?;
}

// Creates the cursors for the columns
$(
let mut [< $tbl _cursor>] = tx.cursor_read::<RawTable<$tbl>>()?;
let [< $tbl _iter>] = [< $tbl _cursor>]
.walk_range(range.clone())?
.into_iter()
.map(|row|
row
.map(|(_key, val)| val.into_value())
.map_err(|e| Box::new(e) as Box<dyn StdError + Send + Sync>)
);

)+

// Create the snapshot from the data
let col_iterators: Vec<Box<dyn Iterator<Item = Result<Vec<u8>,_>>>> = vec![
$(Box::new([< $tbl _iter>]),)+
];

nippy_jar.freeze(col_iterators, row_count as u64)?;

Ok(())
}
}
)+
};
}

generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4),);
20 changes: 19 additions & 1 deletion crates/storage/db/src/tables/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@ impl<K: Key> RawKey<K> {
pub fn new(key: K) -> Self {
Self { key: K::encode(key).as_ref().to_vec(), _phantom: std::marker::PhantomData }
}

/// Returns the decoded value.
pub fn key(&self) -> Result<K, DatabaseError> {
K::decode(&self.key)
}

/// Returns the raw key as seen on the database.
pub fn raw_key(&self) -> &Vec<u8> {
&self.key
}

/// Consumes [`Self`] and returns the inner raw key.
pub fn into_key(self) -> Vec<u8> {
self.key
}
}

impl<K: Key> From<K> for RawKey<K> {
Expand Down Expand Up @@ -105,14 +112,21 @@ impl<V: Value> RawValue<V> {
pub fn new(value: V) -> Self {
Self { value: V::compress(value).as_ref().to_vec(), _phantom: std::marker::PhantomData }
}

/// Returns the decompressed value.
pub fn value(&self) -> Result<V, DatabaseError> {
V::decompress(&self.value)
}

/// Returns the raw value as seen on the database.
pub fn raw_value(&self) -> &Vec<u8> {
pub fn raw_value(&self) -> &[u8] {
&self.value
}

/// Consumes [`Self`] and returns the inner raw value.
pub fn into_value(self) -> Vec<u8> {
self.value
}
}

impl AsRef<[u8]> for RawValue<Vec<u8>> {
Expand Down Expand Up @@ -142,4 +156,8 @@ impl<V: Value> Decompress for RawValue<V> {
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Ok(Self { value: value.as_ref().to_vec(), _phantom: std::marker::PhantomData })
}

fn decompress_owned(value: Vec<u8>) -> Result<Self, DatabaseError> {
Ok(Self { value, _phantom: std::marker::PhantomData })
}
}
6 changes: 3 additions & 3 deletions crates/storage/db/src/tables/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
};
let value = match kv.1 {
Cow::Borrowed(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress_owned(v)?,
};
Ok((key, value))
}
Expand All @@ -68,7 +68,7 @@ where
{
Ok(match kv.1 {
Cow::Borrowed(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress_owned(v)?,
})
}

Expand All @@ -79,6 +79,6 @@ where
{
Ok(match value {
Cow::Borrowed(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress(v)?,
Cow::Owned(v) => Decompress::decompress_owned(v)?,
})
}
2 changes: 1 addition & 1 deletion crates/storage/nippy-jar/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;

/// Simple cursor implementation to retrieve data from [`NippyJar`].
pub struct NippyJarCursor<'a, H> {
pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Optional dictionary decompressors.
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/nippy-jar/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use thiserror::Error;
/// Errors associated with [`crate::NippyJar`].
#[derive(Debug, Error)]
pub enum NippyJarError {
#[error(transparent)]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Disconnect(#[from] std::io::Error),
#[error(transparent)]
Expand Down
Loading

0 comments on commit 0cdd302

Please sign in to comment.