Skip to content

Commit

Permalink
benchmark: Add simple ycsb-c-esque benchmark limit test (#56)
Browse files Browse the repository at this point in the history
* benchmark: rudimentary ycsb c workload

* benchmark: fix ycsb-c

* benchmark: add ycsb like to run script

* benchmarks: add workers to ycsb

* benchmark: make ycsb threads and time configurable

* benchmark: shuffle keys in ycsb to disperse front locality

* benchmark: add ycsb-c-esque plot

* benchmark: add comment to workaround

* benchmarks: add grouped plot for ycsb-c

* benchmark: remove orphan doc comment
  • Loading branch information
jwuensche authored Apr 12, 2024
1 parent 76f6319 commit 2ce688e
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 1 deletion.
1 change: 1 addition & 0 deletions betree/haura-benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ log = "0.4"
# Dependent on versions from haura
parking_lot = "0.11"
zip = "0.5"
zipf = "7.0.1"
12 changes: 12 additions & 0 deletions betree/haura-benchmarks/haura-plots/haura_plots/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from . import util
from . import metrics_plots
from . import cache_plots
from . import ycsb_plots

def sort_by_o_id(key):
"""
Expand Down Expand Up @@ -263,7 +264,12 @@ def main():
# Prep the color scheme
util.init_colormap()

ycsb_c = []

# Individual Plots
for path in sys.argv[1:]:
if os.path.isfile(path):
continue
with open(f"{path}/betree-metrics.jsonl", 'r', encoding="UTF-8") as metrics:
data = util.read_jsonl(metrics)
# Plot actions
Expand All @@ -274,7 +280,13 @@ def main():
plot_object_distribution(path)
metrics_plots.plot_system(path)
cache_plots.plot_cache(data, path)
ycsb_c.append(ycsb_plots.plot_c(path))
#plot_filesystem_test()

# Grouped Plots
for group in list({run["group"] for run in filter(lambda x: x is not None, ycsb_c)}):
ycsb_plots.plot_grouped_c(group, list(filter(lambda x: x["group"] == group, ycsb_c)))


if __name__ == "__main__":
main()
57 changes: 57 additions & 0 deletions betree/haura-benchmarks/haura-plots/haura_plots/ycsb_plots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import pandas as pd
import matplotlib.pyplot as plt

def plot_c(path):
"""
Bar chart for YCSB-C-esque scalability of a singular run.
"""
if not os.path.exists(f"{path}/ycsb_c.csv"):
return

data = pd.read_csv(f"{path}/ycsb_c.csv");
fig, ax = plt.subplots()
# op / s
first = data["ops"][0] / (data["time_ns"][0] / 10**9)
second = data["ops"][1] / (data["time_ns"][1] / 10**9)
# in some cases in local tests the proper scaling behavior only happened
# with 2 or more threads, this is uncommon but can be easily checked like
# this to not make the optimal scaling curve entirely useless
if first < second / 2:
first = second / 2
optimal_scaling = [x * first for x in data["threads"]]
ax.plot(data["threads"], optimal_scaling, linestyle=":", label="Optimal", color='grey')
ax.bar(data["threads"], data["ops"] / (data["time_ns"] / 10**9))
ax.set_ylabel("Throughput [op/s]")
ax.set_title(f"YCSB-C Scaling | {' | '.join(path.split('/')[-2:])}")
ax.set_xlabel("Threads [#]")
fig.legend()
fig.savefig(f"{path}/ycsb_c.svg")
return {
"title": path.split('/')[-1:][0],
"group": '/'.join(path.split('/')[:-1]),
"threads": data["threads"],
"results": data["ops"] / (data["time_ns"] / 10**9),
}

def plot_grouped_c(path, runs):
"""
Bar chart for YCSB-C-esque scalability over multiple runs.
"""
if not os.path.exists(path):
return

fig, ax = plt.subplots()
off = 1 / (len(runs) + 1)
for idx, run in enumerate(runs):
ax.bar(
[l - off * ((len(runs)-1)/2) + idx * off for l in run["threads"]],
run["results"],
off,
label=run["title"]
)

group = runs[0]["group"].split('/')[-1:][0]
ax.set_title(f'YCSB Scaling | {group}')
extra = fig.legend(loc="upper left", bbox_to_anchor=(0.9, 0.89))
fig.savefig(f"{path}/ycsb_c_comparison.svg", bbox_extra_artists=(extra,), bbox_inches="tight")
6 changes: 6 additions & 0 deletions betree/haura-benchmarks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ function ci() {
run "$RUN_IDENT" switchover_small switchover 4 "$((128 * 1024 * 1024))"
}

function ycsb() {
run "$RUN_IDENT" ycsb_c_block ycsb-c "$((8 * 1024 * 1024 * 1024))" 0 8
run "$RUN_IDENT" ycsb_c_memory ycsb-c "$((8 * 1024 * 1024 * 1024))" 1 8
}

cargo build --release

if [ -z "$BETREE_CONFIG" ]
Expand Down Expand Up @@ -252,3 +257,4 @@ ensure_config
#checkpoints
#switchover
#ingest
#ycsb
39 changes: 38 additions & 1 deletion betree/haura-benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Control {

cfg.access_mode = AccessMode::AlwaysCreateNew;

cfg.sync_interval_ms = Some(1000);
cfg.sync_interval_ms = None;

cfg.metrics = Some(metrics::MetricsConfiguration {
enabled: true,
Expand Down Expand Up @@ -77,6 +77,43 @@ impl Control {
object_store: os,
}
}

pub fn kv_client(&mut self, id: u32) -> KvClient {
KvClient::new(
self.database.clone(),
Xoshiro256Plus::seed_from_u64(id as u64),
)
}
}

pub struct KvClient {
pub db: Arc<RwLock<Database>>,
pub rng: Xoshiro256Plus,
pub ds: Dataset,
}

impl KvClient {
pub fn new(db: Arc<RwLock<Database>>, rng: Xoshiro256Plus) -> Self {
let ds = db.write().open_or_create_dataset(b"FOOBAR").unwrap();
Self { db, ds, rng }
}

pub fn fill_entries(&mut self, entries: u64, entry_size: u32) -> Vec<[u8; 8]> {
let mut keys = vec![];
let mut value = vec![0u8; entry_size as usize];
for idx in 0..entries {
self.rng.fill(&mut value[..]);
let k = (idx as u64).to_be_bytes();
self.ds.insert(&k[..], &value).unwrap();
keys.push(k);
}
self.db.write().sync().unwrap();
keys
}

pub fn rng(&mut self) -> &mut Xoshiro256Plus {
&mut self.rng
}
}

pub struct Client {
Expand Down
17 changes: 17 additions & 0 deletions betree/haura-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod rewrite;
mod scientific_evaluation;
mod switchover;
mod tiered1;
mod ycsb;
mod zip;

#[global_allocator]
Expand Down Expand Up @@ -60,6 +61,13 @@ enum Mode {
object_size: u64,
rewrite_count: u64,
},
YcsbC {
size: u64,
kind: u8,
threads: u32,
#[structopt(default_value = "120")]
runtime: u64,
},
}

fn run_all(mode: Mode) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -162,6 +170,15 @@ fn run_all(mode: Mode) -> Result<(), Box<dyn Error>> {
let mut client = control.client(0, b"rewrite");
rewrite::run(&mut client, object_size, rewrite_count)?;
}
Mode::YcsbC {
size,
kind,
threads,
runtime,
} => {
let client = control.kv_client(0);
ycsb::c(client, size, threads as usize, runtime)
}
}

thread::sleep(Duration::from_millis(2000));
Expand Down
79 changes: 79 additions & 0 deletions betree/haura-benchmarks/src/ycsb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Benchmarks based on the YCSB-{A,B,C,D,E} workloads.
//!
//! Link: https://web.archive.org/web/20170809211159id_/http://www.cs.toronto.edu/~delara/courses/csc2231/papers/cooper.pdf
use betree_perf::KvClient;
use rand::distributions::Distribution;
use rand::prelude::SliceRandom;
use rand::SeedableRng;
use std::io::Write;

// Default in YCSB, 10 x 100 bytes field in one struct.
const ENTRY_SIZE: usize = 1000;
// Default of YCSB
const ZIPF_EXP: f64 = 0.99;

/// C - Read heavy
/// Operations: Read 100%
/// Distribution: Zipfian
/// Application example: User profile cache, where profiles are constructed elsewhere (e.g., Hadoop)
pub fn c(mut client: KvClient, size: u64, threads: usize, runtime: u64) {
println!("Running YCSB Workload C");
println!("Filling KV store...");
let mut keys = client.fill_entries(size / ENTRY_SIZE as u64, ENTRY_SIZE as u32);
keys.shuffle(client.rng());
println!("Creating distribution...");
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(format!("ycsb_c.csv"))
.unwrap();
let mut w = std::io::BufWriter::new(f);
w.write_all(b"threads,ops,time_ns\n").unwrap();

for workers in 1..=threads {
println!("Running benchmark with {workers} threads...");
let threads = (0..workers)
.map(|_| std::sync::mpsc::channel::<std::time::Instant>())
.enumerate()
.map(|(id, (tx, rx))| {
let keys = keys.clone();
let ds = client.ds.clone();
(
std::thread::spawn(move || {
let mut rng = rand_xoshiro::Xoshiro256Plus::seed_from_u64(id as u64);
let dist = zipf::ZipfDistribution::new(keys.len(), ZIPF_EXP).unwrap();
let mut total = 0;
while let Ok(start) = rx.recv() {
while start.elapsed().as_secs() < runtime {
for _ in 0..100 {
ds.get(&keys[dist.sample(&mut rng) - 1][..])
.unwrap()
.unwrap();
total += 1;
}
}
}
total
}),
tx,
)
})
.collect::<Vec<_>>();
let start = std::time::Instant::now();
for (_t, tx) in threads.iter() {
tx.send(start).unwrap();
}
let mut total = 0;
for (t, tx) in threads.into_iter() {
drop(tx);
total += t.join().unwrap();
}
let end = start.elapsed();
w.write_fmt(format_args!("{workers},{total},{}\n", end.as_nanos()))
.unwrap();
w.flush().unwrap();
println!("Achieved: {} ops/sec", total as f32 / end.as_secs_f32());
println!(" {} ns avg", end.as_nanos() / total);
}
}

0 comments on commit 2ce688e

Please sign in to comment.