diff --git a/betree/haura-benchmarks/Cargo.toml b/betree/haura-benchmarks/Cargo.toml index dd821acd..31b0a053 100644 --- a/betree/haura-benchmarks/Cargo.toml +++ b/betree/haura-benchmarks/Cargo.toml @@ -24,3 +24,4 @@ log = "0.4" # Dependent on versions from haura parking_lot = "0.11" zip = "0.5" +zipf = "7.0.1" diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py b/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py index 4fcac1f3..45cf5253 100755 --- a/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py +++ b/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py @@ -14,6 +14,7 @@ from . import util from . import metrics_plots from . import cache_plots +from . import ycsb_plots def sort_by_o_id(key): """ @@ -263,7 +264,12 @@ def main(): # Prep the color scheme util.init_colormap() + ycsb_c = [] + + # Individual Plots for path in sys.argv[1:]: + if os.path.isfile(path): + continue with open(f"{path}/betree-metrics.jsonl", 'r', encoding="UTF-8") as metrics: data = util.read_jsonl(metrics) # Plot actions @@ -274,7 +280,13 @@ def main(): plot_object_distribution(path) metrics_plots.plot_system(path) cache_plots.plot_cache(data, path) + ycsb_c.append(ycsb_plots.plot_c(path)) #plot_filesystem_test() + # Grouped Plots + for group in list({run["group"] for run in filter(lambda x: x is not None, ycsb_c)}): + ycsb_plots.plot_grouped_c(group, list(filter(lambda x: x["group"] == group, ycsb_c))) + + if __name__ == "__main__": main() diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/ycsb_plots.py b/betree/haura-benchmarks/haura-plots/haura_plots/ycsb_plots.py new file mode 100644 index 00000000..9d9cd8d2 --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/ycsb_plots.py @@ -0,0 +1,57 @@ +import os +import pandas as pd +import matplotlib.pyplot as plt + +def plot_c(path): + """ + Bar chart for YCSB-C-esque scalability of a singular run. + """ + if not os.path.exists(f"{path}/ycsb_c.csv"): + return + + data = pd.read_csv(f"{path}/ycsb_c.csv"); + fig, ax = plt.subplots() + # op / s + first = data["ops"][0] / (data["time_ns"][0] / 10**9) + second = data["ops"][1] / (data["time_ns"][1] / 10**9) + # in some cases in local tests the proper scaling behavior only happened + # with 2 or more threads, this is uncommon but can be easily checked like + # this to not make the optimal scaling curve entirely useless + if first < second / 2: + first = second / 2 + optimal_scaling = [x * first for x in data["threads"]] + ax.plot(data["threads"], optimal_scaling, linestyle=":", label="Optimal", color='grey') + ax.bar(data["threads"], data["ops"] / (data["time_ns"] / 10**9)) + ax.set_ylabel("Throughput [op/s]") + ax.set_title(f"YCSB-C Scaling | {' | '.join(path.split('/')[-2:])}") + ax.set_xlabel("Threads [#]") + fig.legend() + fig.savefig(f"{path}/ycsb_c.svg") + return { + "title": path.split('/')[-1:][0], + "group": '/'.join(path.split('/')[:-1]), + "threads": data["threads"], + "results": data["ops"] / (data["time_ns"] / 10**9), + } + +def plot_grouped_c(path, runs): + """ + Bar chart for YCSB-C-esque scalability over multiple runs. + """ + if not os.path.exists(path): + return + + fig, ax = plt.subplots() + off = 1 / (len(runs) + 1) + for idx, run in enumerate(runs): + ax.bar( + [l - off * ((len(runs)-1)/2) + idx * off for l in run["threads"]], + run["results"], + off, + label=run["title"] + ) + + group = runs[0]["group"].split('/')[-1:][0] + ax.set_title(f'YCSB Scaling | {group}') + extra = fig.legend(loc="upper left", bbox_to_anchor=(0.9, 0.89)) + fig.savefig(f"{path}/ycsb_c_comparison.svg", bbox_extra_artists=(extra,), bbox_inches="tight") diff --git a/betree/haura-benchmarks/run.sh b/betree/haura-benchmarks/run.sh index f6edbe72..cbbf8989 100755 --- a/betree/haura-benchmarks/run.sh +++ b/betree/haura-benchmarks/run.sh @@ -208,6 +208,11 @@ function ci() { run "$RUN_IDENT" switchover_small switchover 4 "$((128 * 1024 * 1024))" } +function ycsb() { + run "$RUN_IDENT" ycsb_c_block ycsb-c "$((8 * 1024 * 1024 * 1024))" 0 8 + run "$RUN_IDENT" ycsb_c_memory ycsb-c "$((8 * 1024 * 1024 * 1024))" 1 8 +} + cargo build --release if [ -z "$BETREE_CONFIG" ] @@ -252,3 +257,4 @@ ensure_config #checkpoints #switchover #ingest +#ycsb diff --git a/betree/haura-benchmarks/src/lib.rs b/betree/haura-benchmarks/src/lib.rs index 4b8e9f24..ae8949f4 100644 --- a/betree/haura-benchmarks/src/lib.rs +++ b/betree/haura-benchmarks/src/lib.rs @@ -44,7 +44,7 @@ impl Control { cfg.access_mode = AccessMode::AlwaysCreateNew; - cfg.sync_interval_ms = Some(1000); + cfg.sync_interval_ms = None; cfg.metrics = Some(metrics::MetricsConfiguration { enabled: true, @@ -77,6 +77,43 @@ impl Control { object_store: os, } } + + pub fn kv_client(&mut self, id: u32) -> KvClient { + KvClient::new( + self.database.clone(), + Xoshiro256Plus::seed_from_u64(id as u64), + ) + } +} + +pub struct KvClient { + pub db: Arc>, + pub rng: Xoshiro256Plus, + pub ds: Dataset, +} + +impl KvClient { + pub fn new(db: Arc>, rng: Xoshiro256Plus) -> Self { + let ds = db.write().open_or_create_dataset(b"FOOBAR").unwrap(); + Self { db, ds, rng } + } + + pub fn fill_entries(&mut self, entries: u64, entry_size: u32) -> Vec<[u8; 8]> { + let mut keys = vec![]; + let mut value = vec![0u8; entry_size as usize]; + for idx in 0..entries { + self.rng.fill(&mut value[..]); + let k = (idx as u64).to_be_bytes(); + self.ds.insert(&k[..], &value).unwrap(); + keys.push(k); + } + self.db.write().sync().unwrap(); + keys + } + + pub fn rng(&mut self) -> &mut Xoshiro256Plus { + &mut self.rng + } } pub struct Client { diff --git a/betree/haura-benchmarks/src/main.rs b/betree/haura-benchmarks/src/main.rs index 2a4629d6..c8b87b60 100644 --- a/betree/haura-benchmarks/src/main.rs +++ b/betree/haura-benchmarks/src/main.rs @@ -11,6 +11,7 @@ mod rewrite; mod scientific_evaluation; mod switchover; mod tiered1; +mod ycsb; mod zip; #[global_allocator] @@ -60,6 +61,13 @@ enum Mode { object_size: u64, rewrite_count: u64, }, + YcsbC { + size: u64, + kind: u8, + threads: u32, + #[structopt(default_value = "120")] + runtime: u64, + }, } fn run_all(mode: Mode) -> Result<(), Box> { @@ -162,6 +170,15 @@ fn run_all(mode: Mode) -> Result<(), Box> { let mut client = control.client(0, b"rewrite"); rewrite::run(&mut client, object_size, rewrite_count)?; } + Mode::YcsbC { + size, + kind, + threads, + runtime, + } => { + let client = control.kv_client(0); + ycsb::c(client, size, threads as usize, runtime) + } } thread::sleep(Duration::from_millis(2000)); diff --git a/betree/haura-benchmarks/src/ycsb.rs b/betree/haura-benchmarks/src/ycsb.rs new file mode 100644 index 00000000..59888798 --- /dev/null +++ b/betree/haura-benchmarks/src/ycsb.rs @@ -0,0 +1,79 @@ +//! Benchmarks based on the YCSB-{A,B,C,D,E} workloads. +//! +//! Link: https://web.archive.org/web/20170809211159id_/http://www.cs.toronto.edu/~delara/courses/csc2231/papers/cooper.pdf + +use betree_perf::KvClient; +use rand::distributions::Distribution; +use rand::prelude::SliceRandom; +use rand::SeedableRng; +use std::io::Write; + +// Default in YCSB, 10 x 100 bytes field in one struct. +const ENTRY_SIZE: usize = 1000; +// Default of YCSB +const ZIPF_EXP: f64 = 0.99; + +/// C - Read heavy +/// Operations: Read 100% +/// Distribution: Zipfian +/// Application example: User profile cache, where profiles are constructed elsewhere (e.g., Hadoop) +pub fn c(mut client: KvClient, size: u64, threads: usize, runtime: u64) { + println!("Running YCSB Workload C"); + println!("Filling KV store..."); + let mut keys = client.fill_entries(size / ENTRY_SIZE as u64, ENTRY_SIZE as u32); + keys.shuffle(client.rng()); + println!("Creating distribution..."); + let f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(format!("ycsb_c.csv")) + .unwrap(); + let mut w = std::io::BufWriter::new(f); + w.write_all(b"threads,ops,time_ns\n").unwrap(); + + for workers in 1..=threads { + println!("Running benchmark with {workers} threads..."); + let threads = (0..workers) + .map(|_| std::sync::mpsc::channel::()) + .enumerate() + .map(|(id, (tx, rx))| { + let keys = keys.clone(); + let ds = client.ds.clone(); + ( + std::thread::spawn(move || { + let mut rng = rand_xoshiro::Xoshiro256Plus::seed_from_u64(id as u64); + let dist = zipf::ZipfDistribution::new(keys.len(), ZIPF_EXP).unwrap(); + let mut total = 0; + while let Ok(start) = rx.recv() { + while start.elapsed().as_secs() < runtime { + for _ in 0..100 { + ds.get(&keys[dist.sample(&mut rng) - 1][..]) + .unwrap() + .unwrap(); + total += 1; + } + } + } + total + }), + tx, + ) + }) + .collect::>(); + let start = std::time::Instant::now(); + for (_t, tx) in threads.iter() { + tx.send(start).unwrap(); + } + let mut total = 0; + for (t, tx) in threads.into_iter() { + drop(tx); + total += t.join().unwrap(); + } + let end = start.elapsed(); + w.write_fmt(format_args!("{workers},{total},{}\n", end.as_nanos())) + .unwrap(); + w.flush().unwrap(); + println!("Achieved: {} ops/sec", total as f32 / end.as_secs_f32()); + println!(" {} ns avg", end.as_nanos() / total); + } +}