From 48b1fe253b16ab1965cde4f7942890da5fa86505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Wed, 27 Nov 2024 10:18:42 -0500 Subject: [PATCH] Compress the snapshot --- Cargo.lock | 38 ++++++++ Cargo.toml | 4 + crates/bench/Cargo.toml | 2 + crates/bench/benches/special.rs | 134 ++++++++++++++++++++++++++- crates/commitlog/Cargo.toml | 1 + crates/core/Cargo.toml | 1 + crates/core/src/db/relational_db.rs | 102 +++++++++++++++++++- crates/fs-utils/Cargo.toml | 3 + crates/fs-utils/src/compression.rs | 133 +++++++++++++++++++++++++++ crates/fs-utils/src/dir_trie.rs | 50 ++++++---- crates/fs-utils/src/lib.rs | 1 + crates/snapshot/src/lib.rs | 138 +++++++++++++++++++++++++--- 12 files changed, 573 insertions(+), 34 deletions(-) create mode 100644 crates/fs-utils/src/compression.rs diff --git a/Cargo.lock b/Cargo.lock index 8fb139ea9a..f4c5380ba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2778,6 +2778,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "mach2" version = "0.4.2" @@ -4596,6 +4605,12 @@ dependencies = [ "serde", ] +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.7" @@ -4658,11 +4673,13 @@ dependencies = [ "spacetimedb-client-api", "spacetimedb-core", "spacetimedb-data-structures", + "spacetimedb-fs-utils", "spacetimedb-lib", "spacetimedb-paths", "spacetimedb-primitives", "spacetimedb-sats", "spacetimedb-schema", + "spacetimedb-snapshot", "spacetimedb-standalone", "spacetimedb-table", "spacetimedb-testing", @@ -4832,6 +4849,7 @@ dependencies = [ "proptest-derive", "rand 0.8.5", "serde", + "spacetimedb-fs-utils", "spacetimedb-paths", "spacetimedb-primitives", "spacetimedb-sats", @@ -4910,6 +4928,7 @@ dependencies = [ "spacetimedb-data-structures", "spacetimedb-durability", "spacetimedb-expr", + "spacetimedb-fs-utils", "spacetimedb-jsonwebtoken", "spacetimedb-jwks", "spacetimedb-lib", @@ -4999,9 +5018,12 @@ version = "1.0.0-rc2" dependencies = [ "anyhow", "hex", + "lz4_flex", "rand 0.8.5", + "snap", "tempdir", "thiserror", + "zstd", ] [[package]] @@ -5431,6 +5453,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "str-buf" version = "1.0.6" @@ -6252,6 +6280,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typed-arena" version = "2.0.2" diff --git a/Cargo.toml b/Cargo.toml index 7f700269cf..e5ff2ac878 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -257,6 +257,10 @@ uuid = { version = "1.2.1", features = ["v4"] } walkdir = "2.2.5" wasmbin = "0.6" webbrowser = "1.0.2" +zstd = { version = "0.13.2", features = ["arrays", "zdict_builder"] } +snap = "1.1.1" +lz4_flex = { version = "0.11" } + xdg = "2.5" # Vendor the openssl we rely on, rather than depend on a diff --git a/crates/bench/Cargo.toml b/crates/bench/Cargo.toml index 53bc3f3298..7331facbbf 100644 --- a/crates/bench/Cargo.toml +++ b/crates/bench/Cargo.toml @@ -36,9 +36,11 @@ spacetimedb-paths.workspace = true spacetimedb-primitives = { path = "../primitives" } spacetimedb-sats = { path = "../sats" } spacetimedb-schema = { workspace = true, features = ["test"] } +spacetimedb-snapshot = { path = "../snapshot" } spacetimedb-standalone = { path = "../standalone" } spacetimedb-table = { path = "../table" } spacetimedb-testing = { path = "../testing" } +spacetimedb-fs-utils.workspace = true anyhow.workspace = true anymap.workspace = true diff --git a/crates/bench/benches/special.rs b/crates/bench/benches/special.rs index c62e4f709e..7ad8e8c0b3 100644 --- a/crates/bench/benches/special.rs +++ b/crates/bench/benches/special.rs @@ -1,17 +1,28 @@ use criterion::async_executor::AsyncExecutor; use criterion::{criterion_group, criterion_main, Criterion, SamplingMode}; use mimalloc::MiMalloc; +use spacetimedb::db::datastore::traits::IsolationLevel; +use spacetimedb::db::relational_db::tests_utils::{make_snapshot, TestDB}; +use spacetimedb::db::relational_db::{open_snapshot_repo, RelationalDB}; +use spacetimedb::execution_context::Workload; use spacetimedb_bench::{ database::BenchDatabase, schemas::{create_sequential, u32_u64_str, u32_u64_u64, u64_u64_u32, BenchTable, RandomTable}, spacetime_module::SpacetimeModule, }; +use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::sats::{self, bsatn}; -use spacetimedb_lib::{bsatn::ToBsatn as _, ProductValue}; +use spacetimedb_lib::{bsatn::ToBsatn as _, Identity, ProductValue}; +use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath}; +use spacetimedb_paths::FromPathUnchecked; use spacetimedb_schema::schema::TableSchema; +use spacetimedb_snapshot::{SnapshotRepository, SnapshotSize}; use spacetimedb_testing::modules::{Csharp, ModuleLanguage, Rust}; +use std::path::PathBuf; use std::sync::Arc; use std::sync::OnceLock; +use std::time::Duration; +use tempdir::TempDir; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -30,6 +41,9 @@ fn custom_benchmarks(c: &mut Criterion) { custom_module_benchmarks(&db, c); custom_db_benchmarks(&db, c); + + snapshot(c); + snapshot_existing(c); } fn custom_module_benchmarks(m: &SpacetimeModule, c: &mut Criterion) { @@ -186,5 +200,123 @@ fn serialize_benchmarks< // TODO: deserialize benches (needs a typespace) } +fn _snapshot(c: &mut Criterion, name: &str, dir: SnapshotsPath, take: F) +where + F: Fn(&SnapshotRepository), +{ + let mut disk_size = None; + let mut size_on_disk = |size: SnapshotSize| { + if size.compressed_type == CompressType::None { + // Save the size of the last snapshot to use as throughput + disk_size = Some(size.clone()); + } + dbg!(&size); + }; + + let algos = [ + CompressType::None, + CompressType::Zstd, + CompressType::Lz4, + CompressType::Snap, + ]; + // For show the size of the last snapshot + for compress in &algos { + let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true); + take(&repo); + size_on_disk(repo.size_on_disk_last_snapshot().unwrap()); + } + + let mut group = c.benchmark_group(&format!("special/snapshot/{name}]")); + group.throughput(criterion::Throughput::Bytes(disk_size.unwrap().total_size)); + group.sample_size(50); + group.warm_up_time(Duration::from_secs(10)); + group.sampling_mode(SamplingMode::Flat); + + for compress in &algos { + group.bench_function(format!("save_compression_{compress:?}"), |b| { + b.iter_batched( + || {}, + |_| { + let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true); + take(&repo); + }, + criterion::BatchSize::NumIterations(100), + ); + }); + + group.bench_function(format!("open_compression_{compress:?}"), |b| { + b.iter_batched( + || {}, + |_| { + let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, false); + let last = repo.latest_snapshot().unwrap().unwrap(); + repo.read_snapshot(last).unwrap() + }, + criterion::BatchSize::NumIterations(100), + ); + }); + } +} + +fn snapshot(c: &mut Criterion) { + let db = TestDB::in_memory().unwrap(); + + let dir = db.path().snapshots(); + dir.create().unwrap(); + let mut t1 = TableSchema::from_product_type(u32_u64_str::product_type()); + t1.table_name = "u32_u64_str".into(); + + let mut t2 = TableSchema::from_product_type(u32_u64_u64::product_type()); + t2.table_name = "u32_u64_u64".into(); + + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + let t1 = db.create_table(&mut tx, t1).unwrap(); + let t2 = db.create_table(&mut tx, t2).unwrap(); + + let data = create_sequential::(0xdeadbeef, 1_000, 100); + for row in data.into_iter() { + db.insert(&mut tx, t1, row.into_product_value()).unwrap(); + } + + let data = create_sequential::(0xdeadbeef, 1_000, 100); + for row in data.into_iter() { + db.insert(&mut tx, t2, row.into_product_value()).unwrap(); + } + db.commit_tx(tx).unwrap(); + + _snapshot(c, "synthetic", dir, |repo| { + db.take_snapshot(repo).unwrap(); + }); +} + +// For test compression into an existing database. +// Must supply the path to the database and the identity of the replica using the `ENV`: +// - `SNAPSHOT` the path to the database, like `/tmp/db/replicas/.../8/database` +// - `IDENTITY` the identity in hex format +fn snapshot_existing(c: &mut Criterion) { + let path_db = if let Ok(path) = std::env::var("SNAPSHOT") { + PathBuf::from(path) + } else { + eprintln!("SNAPSHOT must be set to a valid path to the database"); + return; + }; + let identity = + Identity::from_hex(std::env::var("IDENTITY").expect("IDENTITY must be set to a valid hex identity")).unwrap(); + + let path = ReplicaDir::from_path_unchecked(path_db); + let repo = open_snapshot_repo(path.snapshots(), Identity::ZERO, 0).unwrap(); + + let last = repo.latest_snapshot().unwrap(); + let db = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last).unwrap(); + + let out = TempDir::new("snapshots").unwrap(); + + let dir = SnapshotsPath::from_path_unchecked(out.path()); + + _snapshot(c, "existing", dir, |repo| { + db.take_snapshot(repo).unwrap(); + }); +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index ea3826ffca..c3c859d94d 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -22,6 +22,7 @@ serde = { workspace = true, optional = true } spacetimedb-primitives.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true +spacetimedb-fs-utils.workspace = true tempfile.workspace = true thiserror.workspace = true diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0baa3eae7a..75d78d5203 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -29,6 +29,7 @@ spacetimedb-table.workspace = true spacetimedb-vm.workspace = true spacetimedb-snapshot.workspace = true spacetimedb-expr.workspace = true +spacetimedb-fs-utils.workspace = true anyhow = { workspace = true, features = ["backtrace"] } arrayvec.workspace = true diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 801bd63976..75f6e4d0cc 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -417,7 +417,7 @@ impl RelationalDB { self.inner.update_program(tx, program_kind, program) } - fn restore_from_snapshot_or_bootstrap( + pub fn restore_from_snapshot_or_bootstrap( database_identity: Identity, snapshot_repo: Option<&SnapshotRepository>, durable_tx_offset: Option, @@ -1317,6 +1317,8 @@ pub mod tests_utils { use super::*; use core::ops::Deref; use durability::EmptyHistory; + use spacetimedb_fs_utils::compression::CompressType; + use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -1525,6 +1527,10 @@ pub mod tests_utils { fn row_count_fn() -> RowCountFn { Arc::new(|_, _| i64::MAX) } + + pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result, DBError> { + self.inner.take_snapshot(repo) + } } impl Deref for TestDB { @@ -1534,6 +1540,27 @@ pub mod tests_utils { &self.db } } + + pub fn make_snapshot( + dir: SnapshotsPath, + identity: Identity, + replica: u64, + compress: CompressType, + delete_if_exists: bool, + ) -> (SnapshotsPath, SnapshotRepository) { + let path = dir.0.join(format!("{replica}_{compress:?}")); + if delete_if_exists { + std::fs::remove_dir_all(&path).unwrap(); + } + let dir = SnapshotsPath::from_path_unchecked(path); + dir.create().unwrap(); + ( + dir.clone(), + SnapshotRepository::open(dir, identity, replica) + .unwrap() + .with_compression(compress), + ) + } } #[cfg(test)] @@ -1541,6 +1568,7 @@ mod tests { #![allow(clippy::disallowed_macros)] use std::cell::RefCell; + use std::path::PathBuf; use std::rc::Rc; use super::*; @@ -1548,7 +1576,7 @@ mod tests { system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{make_snapshot, TestDB}; use crate::error::IndexError; use crate::execution_context::ReducerContext; use anyhow::bail; @@ -1559,14 +1587,17 @@ mod tests { use pretty_assertions::assert_eq; use spacetimedb_client_api_messages::timestamp::Timestamp; use spacetimedb_data_structures::map::IntMap; + use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::db::raw_def::v9::RawTableDefBuilder; use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::Identity; + use spacetimedb_paths::FromPathUnchecked; use spacetimedb_sats::buffer::BufReader; use spacetimedb_sats::product; use spacetimedb_schema::schema::RowLevelSecuritySchema; use spacetimedb_table::read_column::ReadColumn; use spacetimedb_table::table::RowRef; + use tempfile::TempDir; fn my_table(col_type: AlgebraicType) -> TableSchema { table("MyTable", ProductType::from([("my_col", col_type)]), |builder| builder) @@ -2441,4 +2472,71 @@ mod tests { assert_eq!(reducer_timestamp, timestamp); } } + + #[test] + fn snapshot_test() -> ResultTest<()> { + let stdb = TestDB::durable()?; + + let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); + let schema = my_table(AlgebraicType::I32); + let table_id = stdb.create_table(&mut tx, schema)?; + + insert_three_i32s(&stdb, &mut tx, table_id)?; + stdb.commit_tx(tx)?; + let dir = stdb.path().snapshots(); + + for compress in [ + CompressType::None, + CompressType::Zstd, + CompressType::Lz4, + CompressType::Snap, + ] { + let (dir, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, compress, false); + stdb.take_snapshot(&repo)?; + + let size = repo.size_on_disk_last_snapshot()?; + dbg!(&size); + assert!(size.total_size > 0, "Snapshot size should be greater than 0"); + let repo = open_snapshot_repo(dir, Identity::ZERO, 0)?; + let last = repo.latest_snapshot()?; + RelationalDB::restore_from_snapshot_or_bootstrap(Identity::ZERO, Some(&repo), last)?; + } + + Ok(()) + } + + // For test compression into an existing database. + // Must supply the path to the database and the identity of the replica using the `ENV`: + // - `SNAPSHOT` the path to the database, like `/tmp/db/replicas/.../8/database` + // - `IDENTITY` the identity in hex format + #[tokio::test] + #[ignore] + async fn read_existing() -> ResultTest<()> { + let path_db = PathBuf::from(std::env::var("SNAPSHOT").expect("SNAPSHOT must be set to a valid path")); + let identity = + Identity::from_hex(std::env::var("IDENTITY").expect("IDENTITY must be set to a valid hex identity"))?; + let path = ReplicaDir::from_path_unchecked(path_db); + + let repo = open_snapshot_repo(path.snapshots(), Identity::ZERO, 0)?; + dbg!(repo.size_on_disk_last_snapshot()?); + assert!( + repo.size_on_disk()?.total_size > 0, + "Snapshot size should be greater than 0" + ); + + let last = repo.latest_snapshot()?; + let stdb = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last)?; + + let out = TempDir::with_prefix("snapshot_test")?; + let dir = SnapshotsPath::from_path_unchecked(out.path()); + + let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, CompressType::Zstd, false); + + stdb.take_snapshot(&repo)?; + let size = repo.size_on_disk_last_snapshot()?; + dbg!(&size); + assert!(size.total_size > 0, "Snapshot size should be greater than 0"); + + Ok(()) + } } diff --git a/crates/fs-utils/Cargo.toml b/crates/fs-utils/Cargo.toml index 033969ea81..f10aa40cba 100644 --- a/crates/fs-utils/Cargo.toml +++ b/crates/fs-utils/Cargo.toml @@ -11,6 +11,9 @@ anyhow.workspace = true thiserror.workspace = true hex.workspace = true rand.workspace = true +zstd.workspace = true +snap.workspace = true +lz4_flex.workspace = true [dev-dependencies] tempdir.workspace = true diff --git a/crates/fs-utils/src/compression.rs b/crates/fs-utils/src/compression.rs new file mode 100644 index 0000000000..a7646777db --- /dev/null +++ b/crates/fs-utils/src/compression.rs @@ -0,0 +1,133 @@ +use lz4_flex::frame::{AutoFinishEncoder as Lz4Encoder, FrameDecoder, FrameEncoder}; +use snap::read::FrameDecoder as SnapDecoder; +use snap::write::FrameEncoder as SnapEncoder; +use std::fs::{File, Metadata}; +use std::io; +use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use zstd::stream::AutoFinishEncoder; +use zstd::{Decoder, Encoder}; + +const ZSTD_MAGIC_BYTES: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD]; +const LZ4_MAGIC_BYTES: [u8; 4] = [0x04, 0x22, 0x4D, 0x18]; +const SNAP_MAGIC_BYTES: [u8; 4] = [0xFF, 0x06, 0x00, 0x00]; + +/// Compression type +/// +/// if `None`, the file is not compressed, otherwise it will be compressed using the specified algorithm. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum CompressType { + None, + Zstd, + Lz4, + Snap, +} + +/// A reader that can read compressed files +pub enum CompressReader<'a> { + None(BufReader), + Zstd(Decoder<'a, BufReader>), + Lz4(FrameDecoder>), + Snap(SnapDecoder>), +} + +impl<'a> CompressReader<'a> { + /// Create a new CompressReader from a File + /// + /// It will detect the compression type using `magic bytes` and return the appropriate reader. + /// + /// **Note**: The reader will be return to the original position after detecting the compression type. + pub fn new(mut inner: File) -> io::Result { + let current_pos = inner.stream_position()?; + + let mut magic_bytes = [0u8; 4]; + let bytes_read = inner.read(&mut magic_bytes)?; + + // Restore the original position + inner.seek(SeekFrom::Start(current_pos))?; + + // Determine compression type + Ok(if bytes_read == 4 { + match magic_bytes { + ZSTD_MAGIC_BYTES => { + let decoder = Decoder::new(inner)?; + CompressReader::Zstd(decoder) + } + LZ4_MAGIC_BYTES => { + let decoder = FrameDecoder::new(BufReader::new(inner)); + CompressReader::Lz4(decoder) + } + SNAP_MAGIC_BYTES => { + let decoder = SnapDecoder::new(BufReader::new(inner)); + CompressReader::Snap(decoder) + } + _ => CompressReader::None(BufReader::new(inner)), + } + } else { + CompressReader::None(BufReader::new(inner)) + }) + } + + pub fn metadata(&self) -> io::Result { + match self { + CompressReader::None(inner) => inner.get_ref().metadata(), + CompressReader::Zstd(inner) => inner.get_ref().get_ref().metadata(), + CompressReader::Lz4(inner) => inner.get_ref().get_ref().metadata(), + CompressReader::Snap(inner) => inner.get_ref().get_ref().metadata(), + } + } +} + +impl<'a> Read for CompressReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + CompressReader::None(inner) => inner.read(buf), + CompressReader::Zstd(inner) => inner.read(buf), + CompressReader::Lz4(inner) => inner.read(buf), + CompressReader::Snap(inner) => inner.read(buf), + } + } +} + +/// A writer that can write compressed files +pub enum CompressWriter<'a> { + None(BufWriter), + Zstd(AutoFinishEncoder<'a, BufWriter>), + Lz4(Lz4Encoder>), + Snap(Box>), +} + +impl<'a> CompressWriter<'a> { + pub fn new(inner: File, compress_type: CompressType) -> io::Result { + match compress_type { + CompressType::None => Ok(CompressWriter::None(BufWriter::new(inner))), + CompressType::Zstd => Ok(CompressWriter::Zstd( + Encoder::new(BufWriter::new(inner), 0)?.auto_finish(), + )), + CompressType::Lz4 => Ok(CompressWriter::Lz4( + FrameEncoder::new(BufWriter::new(inner)).auto_finish(), + )), + // SnapEncoder does buffer internally, so we don't need to wrap it in a BufWriter + CompressType::Snap => Ok(CompressWriter::Snap(Box::new(SnapEncoder::new(inner)))), + } + } +} + +impl<'a> Write for CompressWriter<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self { + CompressWriter::None(inner) => inner.write(buf), + CompressWriter::Zstd(inner) => inner.write(buf), + CompressWriter::Lz4(inner) => inner.write(buf), + CompressWriter::Snap(inner) => inner.write(buf), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self { + CompressWriter::None(inner) => inner.flush(), + CompressWriter::Zstd(inner) => inner.flush(), + CompressWriter::Lz4(inner) => inner.flush(), + CompressWriter::Snap(inner) => inner.flush(), + } + } +} diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index f2b62d04ac..e9d807bbb7 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -13,8 +13,9 @@ //! The trie structure implemented here is still O(n), but with a drastically reduced constant factor (1/128th), //! which we expect to shrink the linear lookups to an acceptable size. +use crate::compression::{CompressReader, CompressType, CompressWriter}; use std::{ - fs::{create_dir_all, File, OpenOptions}, + fs::{create_dir_all, OpenOptions}, io::{self, Read, Write}, path::{Path, PathBuf}, }; @@ -49,6 +50,8 @@ pub struct CountCreated { pub struct DirTrie { /// The directory name at which the dir trie is stored. root: PathBuf, + /// The [CompressType] used for files in this trie. + compress_type: CompressType, } const FILE_ID_BYTES: usize = 32; @@ -70,9 +73,9 @@ impl DirTrie { /// /// Returns an error if the `root` cannot be created as a directory. /// See documentation on [`create_dir_all`] for more details. - pub fn open(root: PathBuf) -> Result { + pub fn open(root: PathBuf, compress_type: CompressType) -> Result { create_dir_all(&root)?; - Ok(Self { root }) + Ok(Self { root, compress_type }) } fn file_path(&self, file_id: &FileId) -> PathBuf { @@ -164,27 +167,42 @@ impl DirTrie { } } - let mut file = self.open_entry(file_id, &o_excl())?; + let mut file = self.open_entry_writer(file_id, self.compress_type)?; let contents = contents(); file.write_all(contents.as_ref())?; counter.objects_written += 1; Ok(()) } - /// Open the file keyed with `file_id` with the given `options`. + /// Open the file keyed with `file_id` for reading. /// - /// Sensible choices for `options` are: - /// - [`o_excl`], to create a new entry. - /// - [`o_rdonly`], to read an existing entry. - pub fn open_entry(&self, file_id: &FileId, options: &OpenOptions) -> Result { + /// It will be decompressed based on the file's magic bytes. + /// + /// It will be opened with [`o_rdonly`]. + pub fn open_entry_reader(&self, file_id: &FileId) -> Result { + let path = self.file_path(file_id); + Self::create_parent(&path)?; + CompressReader::new(o_rdonly().open(path)?) + } + + /// Open the file keyed with `file_id` for writing. + /// + /// If `ty` is [`CompressType::None`], the file will be written uncompressed. + /// + /// The file will be opened with [`o_excl`]. + pub fn open_entry_writer( + &self, + file_id: &FileId, + compress_type: CompressType, + ) -> Result { let path = self.file_path(file_id); Self::create_parent(&path)?; - options.open(path) + CompressWriter::new(o_excl().open(path)?, compress_type) } /// Open the entry keyed with `file_id` and read it into a `Vec`. pub fn read_entry(&self, file_id: &FileId) -> Result, io::Error> { - let mut file = self.open_entry(file_id, &o_rdonly())?; + let mut file = self.open_entry_reader(file_id)?; let mut buf = Vec::with_capacity(file.metadata()?.len() as usize); // TODO(perf): Async IO? file.read_to_end(&mut buf)?; @@ -202,19 +220,19 @@ mod test { fn with_test_dir_trie(f: impl FnOnce(DirTrie)) { let root = tempdir::TempDir::new("test_dir_trie").unwrap(); - let trie = DirTrie::open(root.path().to_path_buf()).unwrap(); + let trie = DirTrie::open(root.path().to_path_buf(), CompressType::None).unwrap(); f(trie) } /// Write the [`TEST_STRING`] into the entry [`TEST_ID`]. fn write_test_string(trie: &DirTrie) { - let mut file = trie.open_entry(&TEST_ID, &o_excl()).unwrap(); + let mut file = trie.open_entry_writer(&TEST_ID, CompressType::None).unwrap(); file.write_all(TEST_STRING).unwrap(); } /// Read the entry [`TEST_ID`] and assert that its contents match the [`TEST_STRING`]. fn read_test_string(trie: &DirTrie) { - let mut file = trie.open_entry(&TEST_ID, &o_rdonly()).unwrap(); + let mut file = trie.open_entry_reader(&TEST_ID).unwrap(); let mut contents = Vec::new(); file.read_to_end(&mut contents).unwrap(); assert_eq!(&contents, TEST_STRING); @@ -274,7 +292,7 @@ mod test { assert!(!trie.contains_entry(&TEST_ID)); // Because the file isn't there, we can't open it. - assert!(trie.open_entry(&TEST_ID, &o_rdonly()).is_err()); + assert!(trie.open_entry_reader(&TEST_ID).is_err()); // Create an entry in the trie and write some data to it. write_test_string(&trie); @@ -283,7 +301,7 @@ mod test { assert!(trie.contains_entry(&TEST_ID)); // Because the file is there, we can't create it. - assert!(trie.open_entry(&TEST_ID, &o_excl()).is_err()); + assert!(trie.open_entry_writer(&TEST_ID, CompressType::Zstd).is_err()); }) } } diff --git a/crates/fs-utils/src/lib.rs b/crates/fs-utils/src/lib.rs index 489b3bb3cf..990d0155fe 100644 --- a/crates/fs-utils/src/lib.rs +++ b/crates/fs-utils/src/lib.rs @@ -2,6 +2,7 @@ use rand::Rng; use std::io::Write; use std::path::Path; +pub mod compression; pub mod dir_trie; pub mod lockfile; diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 2972043652..423c7027e5 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -24,6 +24,7 @@ #![allow(clippy::result_large_err)] use spacetimedb_durability::TxOffset; +use spacetimedb_fs_utils::compression::{CompressReader, CompressType, CompressWriter}; use spacetimedb_fs_utils::{ dir_trie::{o_excl, o_rdonly, CountCreated, DirTrie}, lockfile::{Lockfile, LockfileError}, @@ -41,12 +42,8 @@ use spacetimedb_table::{ page::Page, table::Table, }; -use std::{ - collections::BTreeMap, - ffi::OsStr, - io::{Read, Write}, - path::PathBuf, -}; +use std::io::Write; +use std::{collections::BTreeMap, ffi::OsStr, fmt, io::Read, path::PathBuf}; #[derive(Debug, Copy, Clone)] /// An object which may be associated with an error during snapshotting. @@ -300,8 +297,11 @@ impl Snapshot { /// Read a [`Snapshot`] from the file at `path`, verify its hash, and return it. /// + /// **NOTE**: It detects if the file was compressed or not. + /// /// Fails if: /// - `path` does not refer to a readable file. + /// - Fails to check if is compressed or not. /// - The file at `path` is corrupted, /// as detected by comparing the hash of its bytes to a hash recorded in the file. pub fn read_from_file(path: &SnapshotFilePath) -> Result { @@ -310,7 +310,8 @@ impl Snapshot { source_repo: path.0.clone(), cause, }; - let mut snapshot_file = path.open_file(&o_rdonly()).map_err(err_read_object)?; + let snapshot_file = path.open_file(&o_rdonly()).map_err(err_read_object)?; + let mut snapshot_file = CompressReader::new(snapshot_file)?; // The snapshot file is prefixed with the hash of the `Snapshot`'s BSATN. // Read that hash. @@ -462,6 +463,31 @@ impl Snapshot { } } +#[derive(Clone)] +pub struct SnapshotSize { + pub compressed_type: CompressType, + /// The size of the snapshot file in `bytes`. + pub file_size: u64, + /// The size of the snapshot's objects in `bytes`. + pub object_size: u64, + /// The number of objects in the snapshot. + pub object_count: u64, + /// Total size of the snapshot in `bytes`. + pub total_size: u64, +} + +impl fmt::Debug for SnapshotSize { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SnapshotSize") + .field("compressed_type", &self.compressed_type) + .field("object_count ", &self.object_count) + .field("file_size ", &format_args!("{:>8} bytes", self.file_size)) + .field("object_size ", &format_args!("{:>8} bytes", self.object_size)) + .field("total_size ", &format_args!("{:>8} bytes", self.total_size)) + .finish() + } +} + /// A repository of snapshots of a particular database instance. pub struct SnapshotRepository { /// The directory which contains all the snapshots. @@ -472,12 +498,20 @@ pub struct SnapshotRepository { /// The database instance ID of the database instance for which this repository stores snapshots. replica_id: u64, + + /// Whether to use compression when *writing* snapshots. + compress_type: CompressType, // TODO(deduplication): track the most recent successful snapshot // (possibly in a file) // and hardlink its objects into the next snapshot for deduplication. } impl SnapshotRepository { + /// Enabling compression with the specified [CompressType] algorithm. + pub fn with_compression(mut self, compress_type: CompressType) -> Self { + self.compress_type = compress_type; + self + } /// Returns [`Address`] of the database this [`SnapshotRepository`] is configured to snapshot. pub fn database_identity(&self) -> Identity { self.database_identity @@ -504,7 +538,7 @@ impl SnapshotRepository { prev_snapshot.0.is_dir(), "prev_snapshot {prev_snapshot:?} is not a directory" ); - let object_repo = Self::object_repo(&prev_snapshot)?; + let object_repo = Self::object_repo(&prev_snapshot, self.compress_type)?; Some(object_repo) } else { None @@ -523,7 +557,7 @@ impl SnapshotRepository { snapshot_dir.create()?; // Create a new `DirTrie` to hold all the content-addressed objects in the snapshot. - let object_repo = Self::object_repo(&snapshot_dir)?; + let object_repo = Self::object_repo(&snapshot_dir, self.compress_type)?; // Build the in-memory `Snapshot` object. let mut snapshot = self.empty_snapshot(tx_offset); @@ -542,9 +576,10 @@ impl SnapshotRepository { // Create the snapshot file, containing first the hash, then the `Snapshot`. { - let mut snapshot_file = snapshot_dir.snapshot_file(tx_offset).open_file(&o_excl())?; - snapshot_file.write_all(hash.as_bytes())?; - snapshot_file.write_all(&snapshot_bsatn)?; + let snapshot_file = snapshot_dir.snapshot_file(tx_offset).open_file(&o_excl())?; + let mut compress = CompressWriter::new(snapshot_file, self.compress_type)?; + compress.write_all(hash.as_bytes())?; + compress.write_all(&snapshot_bsatn)?; } log::info!( @@ -603,8 +638,8 @@ impl SnapshotRepository { /// Any mutations to the returned [`DirTrie`] or its contents /// will likely render the snapshot corrupted, /// causing future attempts to reconstruct it to fail. - pub fn object_repo(snapshot_dir: &SnapshotDirPath) -> Result { - DirTrie::open(snapshot_dir.objects().0) + pub fn object_repo(snapshot_dir: &SnapshotDirPath, compress_type: CompressType) -> Result { + DirTrie::open(snapshot_dir.objects().0, compress_type) } /// Read a snapshot contained in self referring to `tx_offset`, @@ -656,7 +691,7 @@ impl SnapshotRepository { }); } - let object_repo = Self::object_repo(&snapshot_dir)?; + let object_repo = Self::object_repo(&snapshot_dir, self.compress_type)?; let blob_store = snapshot.reconstruct_blob_store(&object_repo)?; @@ -684,6 +719,7 @@ impl SnapshotRepository { root, database_identity, replica_id, + compress_type: CompressType::None, }) } @@ -757,6 +793,78 @@ impl SnapshotRepository { } Ok(()) } + + /// Calculate the size of the snapshot repository in bytes. + pub fn size_on_disk(&self) -> Result { + let mut size = SnapshotSize { + compressed_type: self.compress_type, + file_size: 0, + object_size: 0, + object_count: 0, + total_size: 0, + }; + + for snapshot in self.all_snapshots()? { + let snap = self.size_on_disk_snapshot(snapshot)?; + size.file_size += snap.file_size; + size.object_size += snap.object_size; + size.object_count += snap.object_count; + size.total_size += snap.total_size; + } + Ok(size) + } + + pub fn size_on_disk_snapshot(&self, offset: TxOffset) -> Result { + let mut size = SnapshotSize { + compressed_type: self.compress_type, + file_size: 0, + object_size: 0, + object_count: 0, + total_size: 0, + }; + + let snapshot_dir = self.snapshot_dir_path(offset); + let snapshot_file = snapshot_dir.snapshot_file(offset); + let snapshot_file_size = snapshot_file.metadata()?.len(); + size.file_size += snapshot_file_size; + size.total_size += snapshot_file_size; + let objects = snapshot_dir.objects().read_dir()?; + //Search the subdirectories + for object in objects { + let object = object?; + // now the files in the subdirectories + let object_files = object.path().read_dir()?; + for object_file in object_files { + let object_file = object_file?; + let file_size = object_file.metadata()?.len(); + size.object_size += file_size; + size.total_size += file_size; + size.object_count += 1; + } + } + + Ok(size) + } + + /// Calculate the size of the snapshot repository in bytes. + pub fn size_on_disk_last_snapshot(&self) -> Result { + let mut size = SnapshotSize { + compressed_type: self.compress_type, + file_size: 0, + object_size: 0, + object_count: 0, + total_size: 0, + }; + + if let Some(snapshot) = self.latest_snapshot()? { + let snap = self.size_on_disk_snapshot(snapshot)?; + size.file_size += snap.file_size; + size.object_size += snap.object_size; + size.object_count += snap.object_count; + size.total_size += snap.total_size; + } + Ok(size) + } } pub struct ReconstructedSnapshot {