Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize state sync snapshot format #3701

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- Optimize the format of snapshots taken for state syncing purposes.
Snapshots are taken over the entire RocksDB database, packaged into
a `zstd` compressed `tar` archive, and split into 10 MB chunks.
([\#3701](https://github.com/anoma/namada/pull/3701))
24 changes: 22 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ winapi = "0.3.9"
xorf = { version = "0.11.0", features = ["serde"] }
yansi = "0.5.1"
zeroize = { version = "1.5.5", features = ["zeroize_derive"] }
zstd = "0.13.2"

[patch.crates-io]
# Patch to the fork containing the correct personalization and basepoints for masp
Expand Down
2 changes: 2 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ serde_json = {workspace = true, features = ["raw_value"]}
sha2.workspace = true
smooth-operator.workspace = true
sysinfo.workspace = true
tar.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = {workspace = true, features = ["full"]}
Expand All @@ -100,6 +101,7 @@ tower.workspace = true
tracing-subscriber = { workspace = true, optional = true, features = ["std", "json", "ansi", "tracing-log"]}
tracing.workspace = true
warp = "0.3.2"
zstd.workspace = true

[dev-dependencies]
namada_apps_lib = {path = "../apps_lib", features = ["testing"]}
Expand Down
131 changes: 130 additions & 1 deletion crates/node/src/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ pub struct SnapshotSync {
pub height: BlockHeight,
pub expected: Vec<Hash>,
pub strikes: u64,
pub snapshot: std::fs::File,
}

#[derive(Debug)]
Expand Down Expand Up @@ -421,6 +422,48 @@ impl EthereumOracleChannels {
}
}

impl Shell<crate::storage::PersistentDB, Sha256Hasher> {
/// Restore the database with data fetched from the State Sync protocol.
pub fn restore_database_from_state_sync(&mut self) {
let Some(syncing) = self.syncing.as_mut() else {
return;
};

let db_block_cache_size_bytes = {
let config = crate::config::Config::load(
&self.base_dir,
&self.chain_id,
None,
);

config.ledger.shell.block_cache_bytes.unwrap_or_else(|| {
use sysinfo::{RefreshKind, System, SystemExt};

let sys = System::new_with_specifics(
RefreshKind::new().with_memory(),
);
let available_memory_bytes = sys.available_memory();

available_memory_bytes / 3
})
};

let db_cache = rocksdb::Cache::new_lru_cache(
usize::try_from(db_block_cache_size_bytes).expect(
"`db_block_cache_size_bytes` must not exceed `usize::MAX`",
),
);

self.state
.db_mut()
.restore_from((&db_cache, &mut syncing.snapshot))
.expect("Failed to restore state from snapshot");

// rebuild the in-memory state
self.state.load_last_state();
}
}

impl<D, H> Shell<D, H>
where
D: DB + for<'iter> DBIter<'iter> + Sync + 'static,
Expand Down Expand Up @@ -774,7 +817,11 @@ where
_ => false,
};
if take_snapshot {
self.state.db().path().into()
self.state
.db()
.path()
.map(|p| (p, self.state.in_mem().get_last_block_height()))
.into()
} else {
TakeSnapshot::No
}
Expand Down Expand Up @@ -2001,7 +2048,10 @@ pub mod test_utils {

#[cfg(test)]
mod shell_tests {
use std::fs::File;

use eth_bridge::storage::eth_bridge_queries::is_bridge_comptime_enabled;
use namada_apps_lib::state::StorageWrite;
use namada_sdk::address;
use namada_sdk::chain::Epoch;
use namada_sdk::token::read_denom;
Expand All @@ -2011,10 +2061,13 @@ mod shell_tests {
use namada_vote_ext::{
bridge_pool_roots, ethereum_events, ethereum_tx_data_variants,
};
use tempfile::tempdir;
use {namada_replay_protection as replay_protection, wallet};

use super::*;
use crate::shell::test_utils::top_level_directory;
use crate::shell::token::DenominatedAmount;
use crate::storage::{DbSnapshot, PersistentDB, SnapshotPath};

const GAS_LIMIT_MULTIPLIER: u64 = 100_000;

Expand Down Expand Up @@ -2870,4 +2923,80 @@ mod shell_tests {
);
assert_eq!(result.code, ResultCode::TooLarge.into());
}

/// Test the that the shell can restore it's state
/// from a snapshot if it is not syncing
#[test]
fn test_restore_database_from_snapshot() {
let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();

let base_dir = tempdir().unwrap().as_ref().canonicalize().unwrap();
let vp_wasm_compilation_cache = 50 * 1024 * 1024; // 50 kiB
let tx_wasm_compilation_cache = 50 * 1024 * 1024; // 50 kiB
let config = config::Ledger::new(
base_dir.clone(),
Default::default(),
TendermintMode::Validator,
);
let mut shell = Shell::<PersistentDB, Sha256Hasher>::new(
config.clone(),
top_level_directory().join("wasm"),
sender,
None,
None,
None,
vp_wasm_compilation_cache,
tx_wasm_compilation_cache,
);
shell.state.in_mem_mut().block.height = BlockHeight::first();

shell.state.commit_block().expect("Test failed");
shell.state.db_mut().flush(true).expect("Test failed");
let original_root = shell.state.in_mem().merkle_root();
let snapshot = make_snapshot(config.db_dir(), base_dir);
shell
.state
.write(
&Key::parse("bing/fucking/bong").expect("Test failed"),
[1u8; 64],
)
.expect("Test failed");
shell.state.commit_block().expect("Test failed");
let new_root = shell.state.in_mem().merkle_root();
assert_ne!(new_root, original_root);

shell.restore_database_from_state_sync();
assert_eq!(shell.state.in_mem().merkle_root(), new_root,);
shell.syncing = Some(SnapshotSync {
next_chunk: 0,
height: BlockHeight::first(),
expected: vec![],
strikes: 0,
snapshot,
});
shell.restore_database_from_state_sync();
assert_eq!(shell.state.in_mem().merkle_root(), original_root,);
}

/// Helper function for the `test_restore_database_from_snapshot` test
fn make_snapshot(db_dir: PathBuf, base_dir: PathBuf) -> File {
let snapshot =
DbSnapshot(SnapshotPath(base_dir.clone(), BlockHeight::first()));
std::fs::create_dir_all(base_dir.join("snapshots"))
.expect("Test failed");
std::fs::create_dir_all(snapshot.0.base()).expect("Test failed");
std::fs::create_dir_all(snapshot.0.temp_rocksdb())
.expect("Test failed");
for entry in std::fs::read_dir(db_dir).expect("Test failed") {
let entry = entry.expect("Test failed");
let dest_file = snapshot
.0
.base()
.join("db")
.join(entry.file_name().to_string_lossy().to_string());
std::fs::copy(entry.path(), dest_file).expect("Test failed");
}
snapshot.clone().build_tarball().expect("Test failed");
File::open(snapshot.0.temp_tarball("zst")).expect("Test failed")
}
}
Loading
Loading