Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Dec 23, 2024
1 parent 1f86fbf commit f4ff655
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 44 deletions.
33 changes: 33 additions & 0 deletions bin/oli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/oli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ futures = { version = "0.3" }
humansize = { version = "2.1" }
humantime = { version = "2.1" }
humantime-serde = { version = "1.1" }
jiff = { version = "0.1.15" }
indicatif = { version = "0.17" }
opendal = { version = "0.51.0", path = "../../core", features = [
"services-azblob",
Expand All @@ -54,6 +55,7 @@ opendal = { version = "0.51.0", path = "../../core", features = [
"services-webhdfs",
"services-azfile",
] }
pollster = { version = "0.4" }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.42", features = ["full"] }
toml = { version = "0.8" }
Expand Down
9 changes: 9 additions & 0 deletions bin/oli/dev/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[profiles.demo]
type = "s3"
root = "/benchmarks"
bucket = "test-bucket"
region = "us-east-1"
endpoint = "http://127.0.0.1:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
virtual_host_style = "false"
4 changes: 4 additions & 0 deletions bin/oli/dev/suite.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
workload = "upload"
parallelism = 4
file_size = 67108864 # 64MiB
timeout = "60s"
9 changes: 5 additions & 4 deletions bin/oli/src/commands/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ pub struct BenchCmd {
impl BenchCmd {
pub async fn run(self) -> Result<()> {
let cfg = Config::load(&self.config_params.config)?;
let suite = suite::BenchSuite::load(&self.bench)?;

let op = cfg.operator(&self.profile)?;
let report = suite.run(op).await?;
println!("{report}");
let suite = suite::BenchSuite::load(&self.bench)?;

tokio::task::spawn_blocking(move || {
suite.run(op).expect("failed to run bench suite");
})
.await?;
Ok(())
}
}
15 changes: 7 additions & 8 deletions bin/oli/src/commands/bench/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ impl Display for Report {
writeln!(f, "Bandwidth:")?;
writeln!(
f,
"{}/s",
"{}",
self.bandwidth.format(2, |x| {
format!("{}", humansize::format_size_i(x, humansize::BINARY))
format!("{}/s", humansize::format_size_i(x, humansize::BINARY))
})
)?;

Expand All @@ -78,7 +78,7 @@ impl Display for Report {

#[derive(Debug)]
pub(crate) struct Metric {
count: u32,
num_samples: u32,
min: f64,
max: f64,
avg: f64,
Expand All @@ -91,7 +91,7 @@ pub(crate) struct Metric {
impl Metric {
fn format(&self, indent: usize, formatter: fn(f64) -> String) -> String {
format!(
"{:indent$}count: {}\n\
"{:indent$}num_samples: {}\n\
{:indent$}min: {}\n\
{:indent$}max: {}\n\
{:indent$}avg: {}\n\
Expand All @@ -100,7 +100,7 @@ impl Metric {
{:indent$}p95: {}\n\
{:indent$}p50: {}",
"",
self.count,
self.num_samples,
"",
formatter(self.min),
"",
Expand All @@ -127,8 +127,7 @@ pub(crate) struct SampleSet {
impl SampleSet {
/// Add a new sample value.
pub fn add(&mut self, sample: f64) {
assert!(!sample.is_finite(), "sample value must not be finite");
assert!(!sample.is_nan(), "sample value must not be NaN");
assert!(sample.is_finite(), "sample value must be finite");
self.values.push(sample);
}

Expand Down Expand Up @@ -203,7 +202,7 @@ impl SampleSet {
/// Create a metric from the sample set.
fn to_metric(&self) -> Metric {
Metric {
count: self.count() as u32,
num_samples: self.count() as u32,
min: self.min().unwrap_or(f64::NAN),
max: self.max().unwrap_or(f64::NAN),
avg: self.avg().unwrap_or(f64::NAN),
Expand Down
61 changes: 29 additions & 32 deletions bin/oli/src/commands/bench/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Deserialize;
use std::path::Path;
use std::time::{Duration, Instant};

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug)]
struct BenchSuiteConfig {
/// Workload to run.
workload: Workload,
Expand All @@ -20,10 +20,10 @@ struct BenchSuiteConfig {

/// Maximum time to run the bench suite.
#[serde(with = "humantime_serde")]
timout: Duration,
timeout: Duration,
}

#[derive(Deserialize, Debug, Copy, Clone)]
#[derive(Deserialize, Debug)]
enum Workload {
#[serde(rename = "upload")]
Upload,
Expand All @@ -43,13 +43,15 @@ impl BenchSuite {
config.file_size >= 4096,
"file_size must be greater or equal to 4096"
);
println!("Create bench suite with config: {config:?}");
Ok(BenchSuite { config })
}

pub async fn run(self, op: Operator) -> Result<Report> {
pub fn run(self, op: Operator) -> Result<()> {
println!("Start running bench suite ...");
let start = Instant::now();

let timeout = self.config.timout;
let timeout = self.config.timeout;
let parallelism = self.config.parallelism.unwrap_or(1);
let file_size = self.config.file_size;
let workload = match self.config.workload {
Expand All @@ -62,13 +64,9 @@ impl BenchSuite {
.enable_all()
.build()?;

let task = {
let config = self.config.clone();
let op = op.clone();
rt.spawn(async move { Task::prepare(config, op).await })
.await
.context("failed to prepare bench task")??
};
let task = rt
.block_on(Task::prepare(&self.config, &op))
.context("failed to prepare task")?;

let mut results = vec![];
for _ in 0..parallelism {
Expand All @@ -86,7 +84,7 @@ impl BenchSuite {
}

let iter_start = Instant::now();
let iter_bytes = task.run(&op).await?;
let iter_bytes = task.run(&op).await.context("failed to execute task")?;
let iter_latency = iter_start.elapsed();
count += 1;
latency.add(iter_latency.as_micros() as f64);
Expand All @@ -101,19 +99,16 @@ impl BenchSuite {
let mut iops = SampleSet::default();

for result in results {
let (iter_bandwidth, iter_latency, iter_iops) = result.await??;
let (iter_bandwidth, iter_latency, iter_iops) = pollster::block_on(result)??;
bandwidth.merge(iter_bandwidth);
latency.merge(iter_latency);
iops.merge(iter_iops);
}
Ok(Report::new(
parallelism,
file_size,
workload,
bandwidth,
latency,
iops,
))

let report = Report::new(parallelism, file_size, workload, bandwidth, latency, iops);
println!("Bench suite completed in {:?}; result:\n", start.elapsed());
println!("{report}");
Ok(())
}
}

Expand All @@ -126,18 +121,20 @@ enum Task {
const BATCH_SIZE: u32 = 4096;

impl Task {
async fn prepare(config: BenchSuiteConfig, op: Operator) -> Result<Task> {
let BenchSuiteConfig {
workload,
file_size,
..
} = config;
let path = format!("obench-test-{}", uuid::Uuid::new_v4());
match workload {
async fn prepare(config: &BenchSuiteConfig, op: &Operator) -> Result<Task> {
let now = jiff::Timestamp::now();
let path = format!(
"obench-test-{}-{}",
now.as_millisecond(),
uuid::Uuid::new_v4()
);
println!("Prepare task with path: {path}");
let file_size = config.file_size;
match config.workload {
Workload::Upload => Ok(Task::Upload { path, file_size }),
Workload::Download => {
let mut writer = op.writer(&path).await?;
let batch_cnt = config.file_size / (BATCH_SIZE);
let batch_cnt = file_size / (BATCH_SIZE);
for _ in 0..batch_cnt {
writer.write(vec![139u8; BATCH_SIZE as usize]).await?
}
Expand All @@ -152,7 +149,7 @@ impl Task {
Task::Upload { path, file_size } => {
let mut writer = op.writer(path).await?;
for _ in 0..(*file_size / BATCH_SIZE) {
writer.write(vec![254u8; *file_size as usize]).await?;
writer.write(vec![254u8; BATCH_SIZE as usize]).await?;
}
writer.close().await?;
Ok((*file_size / BATCH_SIZE) * BATCH_SIZE)
Expand Down

0 comments on commit f4ff655

Please sign in to comment.