Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add simple ycsb-c-esque benchmark limit test #56

Merged
merged 10 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions betree/haura-benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ log = "0.4"
# Dependent on versions from haura
parking_lot = "0.11"
zip = "0.5"
zipf = "7.0.1"
12 changes: 12 additions & 0 deletions betree/haura-benchmarks/haura-plots/haura_plots/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from . import util
from . import metrics_plots
from . import cache_plots
from . import ycsb_plots

def sort_by_o_id(key):
"""
Expand Down Expand Up @@ -263,7 +264,12 @@ def main():
# Prep the color scheme
util.init_colormap()

ycsb_c = []

# Individual Plots
for path in sys.argv[1:]:
if os.path.isfile(path):
continue
with open(f"{path}/betree-metrics.jsonl", 'r', encoding="UTF-8") as metrics:
data = util.read_jsonl(metrics)
# Plot actions
Expand All @@ -274,7 +280,13 @@ def main():
plot_object_distribution(path)
metrics_plots.plot_system(path)
cache_plots.plot_cache(data, path)
ycsb_c.append(ycsb_plots.plot_c(path))
#plot_filesystem_test()

# Grouped Plots
for group in list({run["group"] for run in filter(lambda x: x is not None, ycsb_c)}):
ycsb_plots.plot_grouped_c(group, list(filter(lambda x: x["group"] == group, ycsb_c)))


if __name__ == "__main__":
main()
57 changes: 57 additions & 0 deletions betree/haura-benchmarks/haura-plots/haura_plots/ycsb_plots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import pandas as pd
import matplotlib.pyplot as plt

def plot_c(path):
"""
Bar chart for YCSB-C-esque scalability of a singular run.
"""
if not os.path.exists(f"{path}/ycsb_c.csv"):
return

data = pd.read_csv(f"{path}/ycsb_c.csv");
fig, ax = plt.subplots()
# op / s
first = data["ops"][0] / (data["time_ns"][0] / 10**9)
second = data["ops"][1] / (data["time_ns"][1] / 10**9)
# in some cases in local tests the proper scaling behavior only happened
# with 2 or more threads, this is uncommon but can be easily checked like
# this to not make the optimal scaling curve entirely useless
if first < second / 2:
first = second / 2
optimal_scaling = [x * first for x in data["threads"]]
ax.plot(data["threads"], optimal_scaling, linestyle=":", label="Optimal", color='grey')
ax.bar(data["threads"], data["ops"] / (data["time_ns"] / 10**9))
ax.set_ylabel("Throughput [op/s]")
ax.set_title(f"YCSB-C Scaling | {' | '.join(path.split('/')[-2:])}")
ax.set_xlabel("Threads [#]")
fig.legend()
fig.savefig(f"{path}/ycsb_c.svg")
return {
"title": path.split('/')[-1:][0],
"group": '/'.join(path.split('/')[:-1]),
"threads": data["threads"],
"results": data["ops"] / (data["time_ns"] / 10**9),
}

def plot_grouped_c(path, runs):
"""
Bar chart for YCSB-C-esque scalability over multiple runs.
"""
if not os.path.exists(path):
return

fig, ax = plt.subplots()
off = 1 / (len(runs) + 1)
for idx, run in enumerate(runs):
ax.bar(
[l - off * ((len(runs)-1)/2) + idx * off for l in run["threads"]],
run["results"],
off,
label=run["title"]
)

group = runs[0]["group"].split('/')[-1:][0]
ax.set_title(f'YCSB Scaling | {group}')
extra = fig.legend(loc="upper left", bbox_to_anchor=(0.9, 0.89))
fig.savefig(f"{path}/ycsb_c_comparison.svg", bbox_extra_artists=(extra,), bbox_inches="tight")
6 changes: 6 additions & 0 deletions betree/haura-benchmarks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ function ci() {
run "$RUN_IDENT" switchover_small switchover 4 "$((128 * 1024 * 1024))"
}

function ycsb() {
run "$RUN_IDENT" ycsb_c_block ycsb-c "$((8 * 1024 * 1024 * 1024))" 0 8
run "$RUN_IDENT" ycsb_c_memory ycsb-c "$((8 * 1024 * 1024 * 1024))" 1 8
}

cargo build --release

if [ -z "$BETREE_CONFIG" ]
Expand Down Expand Up @@ -252,3 +257,4 @@ ensure_config
#checkpoints
#switchover
#ingest
#ycsb
39 changes: 38 additions & 1 deletion betree/haura-benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Control {

cfg.access_mode = AccessMode::AlwaysCreateNew;

cfg.sync_interval_ms = Some(1000);
cfg.sync_interval_ms = None;

cfg.metrics = Some(metrics::MetricsConfiguration {
enabled: true,
Expand Down Expand Up @@ -77,6 +77,43 @@ impl Control {
object_store: os,
}
}

pub fn kv_client(&mut self, id: u32) -> KvClient {
KvClient::new(
self.database.clone(),
Xoshiro256Plus::seed_from_u64(id as u64),
)
}
}

pub struct KvClient {
pub db: Arc<RwLock<Database>>,
pub rng: Xoshiro256Plus,
pub ds: Dataset,
}

impl KvClient {
pub fn new(db: Arc<RwLock<Database>>, rng: Xoshiro256Plus) -> Self {
let ds = db.write().open_or_create_dataset(b"FOOBAR").unwrap();
Self { db, ds, rng }
}

pub fn fill_entries(&mut self, entries: u64, entry_size: u32) -> Vec<[u8; 8]> {
let mut keys = vec![];
let mut value = vec![0u8; entry_size as usize];
for idx in 0..entries {
self.rng.fill(&mut value[..]);
let k = (idx as u64).to_be_bytes();
self.ds.insert(&k[..], &value).unwrap();
keys.push(k);
}
self.db.write().sync().unwrap();
keys
}

pub fn rng(&mut self) -> &mut Xoshiro256Plus {
&mut self.rng
}
}

pub struct Client {
Expand Down
17 changes: 17 additions & 0 deletions betree/haura-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod rewrite;
mod scientific_evaluation;
mod switchover;
mod tiered1;
mod ycsb;
mod zip;

#[global_allocator]
Expand Down Expand Up @@ -60,6 +61,13 @@ enum Mode {
object_size: u64,
rewrite_count: u64,
},
YcsbC {
size: u64,
kind: u8,
threads: u32,
#[structopt(default_value = "120")]
runtime: u64,
},
}

fn run_all(mode: Mode) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -162,6 +170,15 @@ fn run_all(mode: Mode) -> Result<(), Box<dyn Error>> {
let mut client = control.client(0, b"rewrite");
rewrite::run(&mut client, object_size, rewrite_count)?;
}
Mode::YcsbC {
size,
kind,
threads,
runtime,
} => {
let client = control.kv_client(0);
ycsb::c(client, size, threads as usize, runtime)
}
}

thread::sleep(Duration::from_millis(2000));
Expand Down
79 changes: 79 additions & 0 deletions betree/haura-benchmarks/src/ycsb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Benchmarks based on the YCSB-{A,B,C,D,E} workloads.
//!
//! Link: https://web.archive.org/web/20170809211159id_/http://www.cs.toronto.edu/~delara/courses/csc2231/papers/cooper.pdf

use betree_perf::KvClient;
use rand::distributions::Distribution;
use rand::prelude::SliceRandom;
use rand::SeedableRng;
use std::io::Write;

// Default in YCSB, 10 x 100 bytes field in one struct.
const ENTRY_SIZE: usize = 1000;
// Default of YCSB
const ZIPF_EXP: f64 = 0.99;

/// C - Read heavy
/// Operations: Read 100%
/// Distribution: Zipfian
/// Application example: User profile cache, where profiles are constructed elsewhere (e.g., Hadoop)
pub fn c(mut client: KvClient, size: u64, threads: usize, runtime: u64) {
println!("Running YCSB Workload C");
println!("Filling KV store...");
let mut keys = client.fill_entries(size / ENTRY_SIZE as u64, ENTRY_SIZE as u32);
keys.shuffle(client.rng());
println!("Creating distribution...");
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(format!("ycsb_c.csv"))
.unwrap();
let mut w = std::io::BufWriter::new(f);
w.write_all(b"threads,ops,time_ns\n").unwrap();

for workers in 1..=threads {
println!("Running benchmark with {workers} threads...");
let threads = (0..workers)
.map(|_| std::sync::mpsc::channel::<std::time::Instant>())
.enumerate()
.map(|(id, (tx, rx))| {
let keys = keys.clone();
let ds = client.ds.clone();
(
std::thread::spawn(move || {
let mut rng = rand_xoshiro::Xoshiro256Plus::seed_from_u64(id as u64);
let dist = zipf::ZipfDistribution::new(keys.len(), ZIPF_EXP).unwrap();
let mut total = 0;
while let Ok(start) = rx.recv() {
while start.elapsed().as_secs() < runtime {
for _ in 0..100 {
ds.get(&keys[dist.sample(&mut rng) - 1][..])
.unwrap()
.unwrap();
total += 1;
}
}
}
total
}),
tx,
)
})
.collect::<Vec<_>>();
let start = std::time::Instant::now();
for (_t, tx) in threads.iter() {
tx.send(start).unwrap();
}
let mut total = 0;
for (t, tx) in threads.into_iter() {
drop(tx);
total += t.join().unwrap();
}
let end = start.elapsed();
w.write_fmt(format_args!("{workers},{total},{}\n", end.as_nanos()))
.unwrap();
w.flush().unwrap();
println!("Achieved: {} ops/sec", total as f32 / end.as_secs_f32());
println!(" {} ns avg", end.as_nanos() / total);
}
}
Loading