Skip to content

Commit

Permalink
Compress the snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Dec 3, 2024
1 parent 03cf2f4 commit 48b1fe2
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 34 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ uuid = { version = "1.2.1", features = ["v4"] }
walkdir = "2.2.5"
wasmbin = "0.6"
webbrowser = "1.0.2"
zstd = { version = "0.13.2", features = ["arrays", "zdict_builder"] }
snap = "1.1.1"
lz4_flex = { version = "0.11" }

xdg = "2.5"

# Vendor the openssl we rely on, rather than depend on a
Expand Down
2 changes: 2 additions & 0 deletions crates/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ spacetimedb-paths.workspace = true
spacetimedb-primitives = { path = "../primitives" }
spacetimedb-sats = { path = "../sats" }
spacetimedb-schema = { workspace = true, features = ["test"] }
spacetimedb-snapshot = { path = "../snapshot" }
spacetimedb-standalone = { path = "../standalone" }
spacetimedb-table = { path = "../table" }
spacetimedb-testing = { path = "../testing" }
spacetimedb-fs-utils.workspace = true

anyhow.workspace = true
anymap.workspace = true
Expand Down
134 changes: 133 additions & 1 deletion crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
use criterion::async_executor::AsyncExecutor;
use criterion::{criterion_group, criterion_main, Criterion, SamplingMode};
use mimalloc::MiMalloc;
use spacetimedb::db::datastore::traits::IsolationLevel;
use spacetimedb::db::relational_db::tests_utils::{make_snapshot, TestDB};
use spacetimedb::db::relational_db::{open_snapshot_repo, RelationalDB};
use spacetimedb::execution_context::Workload;
use spacetimedb_bench::{
database::BenchDatabase,
schemas::{create_sequential, u32_u64_str, u32_u64_u64, u64_u64_u32, BenchTable, RandomTable},
spacetime_module::SpacetimeModule,
};
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::sats::{self, bsatn};
use spacetimedb_lib::{bsatn::ToBsatn as _, ProductValue};
use spacetimedb_lib::{bsatn::ToBsatn as _, Identity, ProductValue};
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
use spacetimedb_paths::FromPathUnchecked;
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_snapshot::{SnapshotRepository, SnapshotSize};
use spacetimedb_testing::modules::{Csharp, ModuleLanguage, Rust};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use tempdir::TempDir;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand All @@ -30,6 +41,9 @@ fn custom_benchmarks<L: ModuleLanguage>(c: &mut Criterion) {

custom_module_benchmarks(&db, c);
custom_db_benchmarks(&db, c);

snapshot(c);
snapshot_existing(c);
}

fn custom_module_benchmarks<L: ModuleLanguage>(m: &SpacetimeModule<L>, c: &mut Criterion) {
Expand Down Expand Up @@ -186,5 +200,123 @@ fn serialize_benchmarks<
// TODO: deserialize benches (needs a typespace)
}

fn _snapshot<F>(c: &mut Criterion, name: &str, dir: SnapshotsPath, take: F)
where
F: Fn(&SnapshotRepository),
{
let mut disk_size = None;
let mut size_on_disk = |size: SnapshotSize| {
if size.compressed_type == CompressType::None {
// Save the size of the last snapshot to use as throughput
disk_size = Some(size.clone());
}
dbg!(&size);
};

let algos = [
CompressType::None,
CompressType::Zstd,
CompressType::Lz4,
CompressType::Snap,
];
// For show the size of the last snapshot
for compress in &algos {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true);
take(&repo);
size_on_disk(repo.size_on_disk_last_snapshot().unwrap());
}

let mut group = c.benchmark_group(&format!("special/snapshot/{name}]"));
group.throughput(criterion::Throughput::Bytes(disk_size.unwrap().total_size));
group.sample_size(50);
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);

for compress in &algos {
group.bench_function(format!("save_compression_{compress:?}"), |b| {
b.iter_batched(
|| {},
|_| {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true);
take(&repo);
},
criterion::BatchSize::NumIterations(100),
);
});

group.bench_function(format!("open_compression_{compress:?}"), |b| {
b.iter_batched(
|| {},
|_| {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, false);
let last = repo.latest_snapshot().unwrap().unwrap();
repo.read_snapshot(last).unwrap()
},
criterion::BatchSize::NumIterations(100),
);
});
}
}

fn snapshot(c: &mut Criterion) {
let db = TestDB::in_memory().unwrap();

let dir = db.path().snapshots();
dir.create().unwrap();
let mut t1 = TableSchema::from_product_type(u32_u64_str::product_type());
t1.table_name = "u32_u64_str".into();

let mut t2 = TableSchema::from_product_type(u32_u64_u64::product_type());
t2.table_name = "u32_u64_u64".into();

let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let t1 = db.create_table(&mut tx, t1).unwrap();
let t2 = db.create_table(&mut tx, t2).unwrap();

let data = create_sequential::<u32_u64_str>(0xdeadbeef, 1_000, 100);
for row in data.into_iter() {
db.insert(&mut tx, t1, row.into_product_value()).unwrap();
}

let data = create_sequential::<u32_u64_u64>(0xdeadbeef, 1_000, 100);
for row in data.into_iter() {
db.insert(&mut tx, t2, row.into_product_value()).unwrap();
}
db.commit_tx(tx).unwrap();

_snapshot(c, "synthetic", dir, |repo| {
db.take_snapshot(repo).unwrap();
});
}

// For test compression into an existing database.
// Must supply the path to the database and the identity of the replica using the `ENV`:
// - `SNAPSHOT` the path to the database, like `/tmp/db/replicas/.../8/database`
// - `IDENTITY` the identity in hex format
fn snapshot_existing(c: &mut Criterion) {
let path_db = if let Ok(path) = std::env::var("SNAPSHOT") {
PathBuf::from(path)
} else {
eprintln!("SNAPSHOT must be set to a valid path to the database");
return;
};
let identity =
Identity::from_hex(std::env::var("IDENTITY").expect("IDENTITY must be set to a valid hex identity")).unwrap();

let path = ReplicaDir::from_path_unchecked(path_db);
let repo = open_snapshot_repo(path.snapshots(), Identity::ZERO, 0).unwrap();

let last = repo.latest_snapshot().unwrap();
let db = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last).unwrap();

let out = TempDir::new("snapshots").unwrap();

let dir = SnapshotsPath::from_path_unchecked(out.path());

_snapshot(c, "existing", dir, |repo| {
db.take_snapshot(repo).unwrap();
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde = { workspace = true, optional = true }
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-sats.workspace = true
spacetimedb-fs-utils.workspace = true
tempfile.workspace = true
thiserror.workspace = true

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-fs-utils.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
Loading

0 comments on commit 48b1fe2

Please sign in to comment.