From a41943ca93711995a2c7ce4ee4505dd71bebc113 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 15:35:00 +0800 Subject: [PATCH 1/9] chore: README Signed-off-by: tison --- bin/oli/Cargo.toml | 1 - bin/oli/README.md | 22 +++++++++++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index 7420f42fafc2..51f40a678fc1 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -44,7 +44,6 @@ opendal = { version = "0.51.0", path = "../../core", features = [ "services-ghac", "services-http", "services-ipmfs", - "services-memory", "services-obs", "services-oss", "services-s3", diff --git a/bin/oli/README.md b/bin/oli/README.md index 0a44e5a0ab5c..ad0467f0ef8a 100644 --- a/bin/oli/README.md +++ b/bin/oli/README.md @@ -19,7 +19,7 @@ `oli` could be installed by `cargo`: ```bash -cargo install oli --all-features +cargo install oli ``` > `cargo` is the Rust package manager. `cargo` could be installed by following the [Installation](https://www.rust-lang.org/tools/install) from Rust official website. @@ -67,23 +67,23 @@ access_key_id = "" secret_access_key = "" ``` -For different services, you could find the configuration keys in the corresponding [service document](https://opendal.apache.org/docs/services/). +For different services, you could find the configuration keys in the corresponding [service document](https://docs.rs/opendal/0.50.2/opendal/services/index.html). ### Example: use `oli` to upload file to AWS S3 -```text -$ oli cp ./update-ecs-loadbalancer.json s3:/update-ecs-loadbalancer.json -$ oli ls s3:/ -fleet.png -update-ecs-loadbalancer.json +```shell +oli cp ./update-ecs-loadbalancer.json s3:/update-ecs-loadbalancer.json +oli ls s3:/ +# fleet.png +# update-ecs-loadbalancer.json ``` ### Example: use `oli` copy file from S3 to R2 -```text -$ oli cp s3:/fleet.png r2:/fleet.png -$ oli ls r2:/ -fleet.png +```shell +oli cp s3:/fleet.png r2:/fleet.png +oli ls r2:/ +# fleet.png ``` ## Contribute to `oli` From 1f86fbffd9de4e6ff4991bc2bad35ec714df734a Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 17:29:32 +0800 Subject: [PATCH 2/9] add obench Signed-off-by: tison --- bin/oli/Cargo.lock | 29 ++++ bin/oli/Cargo.toml | 4 + bin/oli/src/bin/oli.rs | 4 + bin/oli/src/commands/bench/mod.rs | 39 +++++ bin/oli/src/commands/bench/report.rs | 216 +++++++++++++++++++++++++++ bin/oli/src/commands/bench/suite.rs | 166 ++++++++++++++++++++ bin/oli/src/commands/cat.rs | 2 +- bin/oli/src/commands/cp.rs | 2 +- bin/oli/src/commands/ls.rs | 2 +- bin/oli/src/commands/mod.rs | 5 +- bin/oli/src/commands/mv.rs | 2 +- bin/oli/src/commands/rm.rs | 2 +- bin/oli/src/commands/stat.rs | 2 +- bin/oli/src/config/mod.rs | 10 +- bin/oli/src/params/config.rs | 2 +- 15 files changed, 475 insertions(+), 12 deletions(-) create mode 100644 bin/oli/src/commands/bench/mod.rs create mode 100644 bin/oli/src/commands/bench/report.rs create mode 100644 bin/oli/src/commands/bench/suite.rs diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index 3e402ab24e35..3592df241f77 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -747,6 +747,31 @@ version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.5.2" @@ -1229,6 +1254,9 @@ dependencies = [ "clap", "dirs", "futures", + "humansize", + "humantime", + "humantime-serde", "indicatif", "opendal", "serde", @@ -1236,6 +1264,7 @@ dependencies = [ "tokio", "toml", "url", + "uuid", ] [[package]] diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index 51f40a678fc1..3853051ebc5b 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -34,6 +34,9 @@ anyhow = { version = "1.0" } clap = { version = "4.5", features = ["cargo", "string", "derive"] } dirs = { version = "5.0" } futures = { version = "0.3" } +humansize = { version = "2.1" } +humantime = { version = "2.1" } +humantime-serde = { version = "1.1" } indicatif = { version = "0.17" } opendal = { version = "0.51.0", path = "../../core", features = [ "services-azblob", @@ -55,6 +58,7 @@ serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.42", features = ["full"] } toml = { version = "0.8" } url = { version = "2.5" } +uuid = { version = "1.11" } [dev-dependencies] assert_cmd = { version = "2.0" } diff --git a/bin/oli/src/bin/oli.rs b/bin/oli/src/bin/oli.rs index 8ae115602f4a..08232b1c8a55 100644 --- a/bin/oli/src/bin/oli.rs +++ b/bin/oli/src/bin/oli.rs @@ -54,6 +54,10 @@ async fn main() -> Result<()> { let cmd: Oli = clap::Parser::parse(); cmd.subcommand.run().await?; } + Some("obench") => { + let cmd: oli::commands::bench::BenchCmd = clap::Parser::parse(); + cmd.run().await?; + } Some("ocat") => { let cmd: oli::commands::cat::CatCmd = clap::Parser::parse(); cmd.run().await?; diff --git a/bin/oli/src/commands/bench/mod.rs b/bin/oli/src/commands/bench/mod.rs new file mode 100644 index 000000000000..275217f3b7d6 --- /dev/null +++ b/bin/oli/src/commands/bench/mod.rs @@ -0,0 +1,39 @@ +use crate::config::Config; +use crate::params::config::ConfigParams; +use anyhow::Result; +use std::path::PathBuf; + +mod report; +mod suite; + +#[derive(Debug, clap::Parser)] +#[command( + name = "bench", + about = "Run benchmark against the storage backend", + disable_version_flag = true +)] +pub struct BenchCmd { + #[command(flatten)] + pub config_params: ConfigParams, + /// Name of the profile to use. + #[arg()] + pub profile: String, + /// Path to the benchmark config. + #[arg( + value_parser = clap::value_parser!(PathBuf), + )] + pub bench: PathBuf, +} + +impl BenchCmd { + pub async fn run(self) -> Result<()> { + let cfg = Config::load(&self.config_params.config)?; + let suite = suite::BenchSuite::load(&self.bench)?; + + let op = cfg.operator(&self.profile)?; + let report = suite.run(op).await?; + println!("{report}"); + + Ok(()) + } +} diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs new file mode 100644 index 000000000000..a48a5dc52b76 --- /dev/null +++ b/bin/oli/src/commands/bench/report.rs @@ -0,0 +1,216 @@ +use std::fmt::{Display, Formatter}; +use std::time::Duration; + +#[derive(Debug)] +pub(crate) struct Report { + // bench suite infos + parallelism: u32, + file_size: u32, + workload: String, + + // bench result metrics + /// Throughput (bytes per second). + bandwidth: Metric, + /// Latency (microseconds). + latency: Metric, + /// IOPS (operations per second). + iops: Metric, +} + +impl Report { + pub fn new( + parallelism: u32, + file_size: u32, + workload: String, + bandwidth: SampleSet, + latency: SampleSet, + iops: SampleSet, + ) -> Self { + Self { + parallelism, + file_size, + workload, + bandwidth: bandwidth.to_metric(), + latency: latency.to_metric(), + iops: iops.to_metric(), + } + } +} + +impl Display for Report { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Parallel tasks: {}", self.parallelism)?; + writeln!(f, "Workload: {}", self.workload)?; + writeln!( + f, + "File size: {}", + humansize::format_size(self.file_size, humansize::BINARY) + )?; + + writeln!(f)?; + writeln!(f, "Bandwidth:")?; + writeln!( + f, + "{}/s", + self.bandwidth.format(2, |x| { + format!("{}", humansize::format_size_i(x, humansize::BINARY)) + }) + )?; + + writeln!(f)?; + writeln!(f, "Latency:")?; + writeln!( + f, + "{}", + self.latency.format(2, |x| { + let dur = Duration::from_micros(x as u64); + format!("{}", humantime::format_duration(dur)) + }) + )?; + + writeln!(f)?; + writeln!(f, "IOPS:")?; + writeln!(f, "{}", self.iops.format(2, |x| { format!("{x:.3}") }))?; + + Ok(()) + } +} + +#[derive(Debug)] +pub(crate) struct Metric { + count: u32, + min: f64, + max: f64, + avg: f64, + stddev: f64, + p99: f64, + p95: f64, + p50: f64, +} + +impl Metric { + fn format(&self, indent: usize, formatter: fn(f64) -> String) -> String { + format!( + "{:indent$}count: {}\n\ + {:indent$}min: {}\n\ + {:indent$}max: {}\n\ + {:indent$}avg: {}\n\ + {:indent$}stddev: {}\n\ + {:indent$}p99: {}\n\ + {:indent$}p95: {}\n\ + {:indent$}p50: {}", + "", + self.count, + "", + formatter(self.min), + "", + formatter(self.max), + "", + formatter(self.avg), + "", + formatter(self.stddev), + "", + formatter(self.p99), + "", + formatter(self.p95), + "", + formatter(self.p50), + ) + } +} + +#[derive(Debug, Default)] +pub(crate) struct SampleSet { + values: Vec, +} + +impl SampleSet { + /// Add a new sample value. + pub fn add(&mut self, sample: f64) { + assert!(!sample.is_finite(), "sample value must not be finite"); + assert!(!sample.is_nan(), "sample value must not be NaN"); + self.values.push(sample); + } + + /// Merge two sample sets. + pub fn merge(&mut self, other: SampleSet) { + self.values.extend(other.values); + } + + /// Get the minimum value. + fn min(&self) -> Option { + self.values.iter().copied().min_by(|a, b| a.total_cmp(b)) + } + + /// Get the maximum value. + fn max(&self) -> Option { + self.values.iter().copied().max_by(|a, b| a.total_cmp(b)) + } + + /// Get number of samples. + fn count(&self) -> usize { + self.values.len() + } + + /// Get the average of values. + fn avg(&self) -> Option { + let count = self.count(); + if count == 0 { + return None; + } + + let sum: f64 = self.values.iter().copied().sum(); + Some(sum / (count as f64)) + } + + /// Get the standard deviation of values. + fn stddev(&self) -> Option { + let count = self.count(); + if count == 0 { + return None; + } + + let avg = self.avg()?; + let sum = self + .values + .iter() + .copied() + .map(|x| (x - avg).powi(2)) + .sum::(); + Some((sum / count as f64).sqrt()) + } + + /// Get the percentile value. + /// + /// The percentile value must between 0.0 and 100.0 (both inclusive). + fn percentile(&self, percentile: f64) -> Option { + assert!( + percentile >= 0.0 && percentile <= 100.0, + "percentile must be between 0.0 and 100.0" + ); + + let count = self.count(); + if count == 0 { + return None; + } + + let index = ((count - 1) as f64 * percentile / 100.0).trunc() as usize; + let mut sorted = self.values.clone(); + sorted.sort_by(|a, b| a.total_cmp(b)); + sorted.get(index).copied() + } + + /// Create a metric from the sample set. + fn to_metric(&self) -> Metric { + Metric { + count: self.count() as u32, + min: self.min().unwrap_or(f64::NAN), + max: self.max().unwrap_or(f64::NAN), + avg: self.avg().unwrap_or(f64::NAN), + stddev: self.stddev().unwrap_or(f64::NAN), + p99: self.percentile(99.0).unwrap_or(f64::NAN), + p95: self.percentile(95.0).unwrap_or(f64::NAN), + p50: self.percentile(50.0).unwrap_or(f64::NAN), + } + } +} diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs new file mode 100644 index 000000000000..7470adf29637 --- /dev/null +++ b/bin/oli/src/commands/bench/suite.rs @@ -0,0 +1,166 @@ +use crate::commands::bench::report::{Report, SampleSet}; +use anyhow::{ensure, Context, Result}; +use opendal::Operator; +use serde::Deserialize; +use std::path::Path; +use std::time::{Duration, Instant}; + +#[derive(Deserialize, Debug, Clone)] +struct BenchSuiteConfig { + /// Workload to run. + workload: Workload, + + /// Number of parallel tasks to run. + /// + /// Default to 1. + parallelism: Option, + + /// Size of file in bytes. + file_size: u32, + + /// Maximum time to run the bench suite. + #[serde(with = "humantime_serde")] + timout: Duration, +} + +#[derive(Deserialize, Debug, Copy, Clone)] +enum Workload { + #[serde(rename = "upload")] + Upload, + #[serde(rename = "download")] + Download, +} + +pub struct BenchSuite { + config: BenchSuiteConfig, +} + +impl BenchSuite { + pub fn load(path: &Path) -> Result { + let content = std::fs::read_to_string(path)?; + let config = toml::from_str::(&content)?; + ensure!( + config.file_size >= 4096, + "file_size must be greater or equal to 4096" + ); + Ok(BenchSuite { config }) + } + + pub async fn run(self, op: Operator) -> Result { + let start = Instant::now(); + + let timeout = self.config.timout; + let parallelism = self.config.parallelism.unwrap_or(1); + let file_size = self.config.file_size; + let workload = match self.config.workload { + Workload::Upload => "upload".to_string(), + Workload::Download => "download".to_string(), + }; + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(parallelism as usize) + .enable_all() + .build()?; + + let task = { + let config = self.config.clone(); + let op = op.clone(); + rt.spawn(async move { Task::prepare(config, op).await }) + .await + .context("failed to prepare bench task")?? + }; + + let mut results = vec![]; + for _ in 0..parallelism { + let op = op.clone(); + let task = task.clone(); + results.push(rt.spawn(async move { + let mut bandwidth = SampleSet::default(); + let mut latency = SampleSet::default(); + let mut iops = SampleSet::default(); + let mut count = 0; + + loop { + if start.elapsed() > timeout { + return Ok::<_, anyhow::Error>((bandwidth, latency, iops)); + } + + let iter_start = Instant::now(); + let iter_bytes = task.run(&op).await?; + let iter_latency = iter_start.elapsed(); + count += 1; + latency.add(iter_latency.as_micros() as f64); + bandwidth.add(iter_bytes as f64 / iter_latency.as_secs_f64()); + iops.add(count as f64 / start.elapsed().as_secs_f64()); + } + })) + } + + let mut bandwidth = SampleSet::default(); + let mut latency = SampleSet::default(); + let mut iops = SampleSet::default(); + + for result in results { + let (iter_bandwidth, iter_latency, iter_iops) = result.await??; + bandwidth.merge(iter_bandwidth); + latency.merge(iter_latency); + iops.merge(iter_iops); + } + Ok(Report::new( + parallelism, + file_size, + workload, + bandwidth, + latency, + iops, + )) + } +} + +#[derive(Clone, Debug)] +enum Task { + Upload { path: String, file_size: u32 }, + Download { path: String }, +} + +const BATCH_SIZE: u32 = 4096; + +impl Task { + async fn prepare(config: BenchSuiteConfig, op: Operator) -> Result { + let BenchSuiteConfig { + workload, + file_size, + .. + } = config; + let path = format!("obench-test-{}", uuid::Uuid::new_v4()); + match workload { + Workload::Upload => Ok(Task::Upload { path, file_size }), + Workload::Download => { + let mut writer = op.writer(&path).await?; + let batch_cnt = config.file_size / (BATCH_SIZE); + for _ in 0..batch_cnt { + writer.write(vec![139u8; BATCH_SIZE as usize]).await? + } + writer.close().await?; + Ok(Task::Download { path }) + } + } + } + + async fn run(&self, op: &Operator) -> Result { + match self { + Task::Upload { path, file_size } => { + let mut writer = op.writer(path).await?; + for _ in 0..(*file_size / BATCH_SIZE) { + writer.write(vec![254u8; *file_size as usize]).await?; + } + writer.close().await?; + Ok((*file_size / BATCH_SIZE) * BATCH_SIZE) + } + Task::Download { path } => { + let bytes = op.read_with(path).await?; + Ok(bytes.len() as u32) + } + } + } +} diff --git a/bin/oli/src/commands/cat.rs b/bin/oli/src/commands/cat.rs index 567ebf9f1cba..da57b90e1dc5 100644 --- a/bin/oli/src/commands/cat.rs +++ b/bin/oli/src/commands/cat.rs @@ -36,7 +36,7 @@ pub struct CatCmd { } impl CatCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs index ed954a3df1ae..07f9ccf31ef8 100644 --- a/bin/oli/src/commands/cp.rs +++ b/bin/oli/src/commands/cp.rs @@ -59,7 +59,7 @@ pub struct CopyCmd { } impl CopyCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/ls.rs b/bin/oli/src/commands/ls.rs index 10050982936d..4aa756769a04 100644 --- a/bin/oli/src/commands/ls.rs +++ b/bin/oli/src/commands/ls.rs @@ -35,7 +35,7 @@ pub struct LsCmd { } impl LsCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/mod.rs b/bin/oli/src/commands/mod.rs index 0ab829edb647..c2334430a4f7 100644 --- a/bin/oli/src/commands/mod.rs +++ b/bin/oli/src/commands/mod.rs @@ -17,6 +17,7 @@ //! Provides the implementation of each command. +pub mod bench; pub mod cat; pub mod cp; pub mod ls; @@ -26,6 +27,7 @@ pub mod stat; #[derive(Debug, clap::Subcommand)] pub enum OliSubcommand { + Bench(bench::BenchCmd), Cat(cat::CatCmd), Cp(cp::CopyCmd), Ls(ls::LsCmd), @@ -35,8 +37,9 @@ pub enum OliSubcommand { } impl OliSubcommand { - pub async fn run(&self) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result<()> { match self { + Self::Bench(cmd) => cmd.run().await, Self::Cat(cmd) => cmd.run().await, Self::Cp(cmd) => cmd.run().await, Self::Ls(cmd) => cmd.run().await, diff --git a/bin/oli/src/commands/mv.rs b/bin/oli/src/commands/mv.rs index 3f5fa093a010..81a9eed65b80 100644 --- a/bin/oli/src/commands/mv.rs +++ b/bin/oli/src/commands/mv.rs @@ -37,7 +37,7 @@ pub struct MoveCmd { } impl MoveCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/rm.rs b/bin/oli/src/commands/rm.rs index 5c38bf1a001e..e1af31167055 100644 --- a/bin/oli/src/commands/rm.rs +++ b/bin/oli/src/commands/rm.rs @@ -34,7 +34,7 @@ pub struct RmCmd { } impl RmCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/stat.rs b/bin/oli/src/commands/stat.rs index 04cc2431bfca..11839f2ebac8 100644 --- a/bin/oli/src/commands/stat.rs +++ b/bin/oli/src/commands/stat.rs @@ -35,7 +35,7 @@ pub struct StatCmd { } impl StatCmd { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let target = &self.target; diff --git a/bin/oli/src/config/mod.rs b/bin/oli/src/config/mod.rs index 3ed44e0246cc..23e1ecf3a142 100644 --- a/bin/oli/src/config/mod.rs +++ b/bin/oli/src/config/mod.rs @@ -150,19 +150,21 @@ impl Config { Err(anyhow!("Host part in a location is not supported. Hint: are you typing `://` instead of `:/`?"))?; } - let profile_name = location.scheme(); + let op = self.operator(location.scheme())?; let path = location.path().to_string(); + Ok((op, path)) + } + + pub fn operator(&self, profile_name: &str) -> Result { let profile = self .profiles .get(profile_name) .ok_or_else(|| anyhow!("unknown profile: {}", profile_name))?; - let svc = profile .get("type") .ok_or_else(|| anyhow!("missing 'type' in profile"))?; let scheme = Scheme::from_str(svc)?; - let op = Operator::via_iter(scheme, profile.clone())?; - Ok((op, path)) + Ok(Operator::via_iter(scheme, profile.clone())?) } } diff --git a/bin/oli/src/params/config.rs b/bin/oli/src/params/config.rs index 3ceb8f951bfc..f3b9b9f72d32 100644 --- a/bin/oli/src/params/config.rs +++ b/bin/oli/src/params/config.rs @@ -24,7 +24,7 @@ pub struct ConfigParams { #[arg( long, default_value = default_config_path(), - value_parser = clap::value_parser!(PathBuf) + value_parser = clap::value_parser!(PathBuf), )] pub config: PathBuf, } From f4ff6554eefdf340318b60673d8b6d6a5d70d5c2 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 18:14:12 +0800 Subject: [PATCH 3/9] test Signed-off-by: tison --- bin/oli/Cargo.lock | 33 +++++++++++++++ bin/oli/Cargo.toml | 2 + bin/oli/dev/config.toml | 9 ++++ bin/oli/dev/suite.toml | 4 ++ bin/oli/src/commands/bench/mod.rs | 9 ++-- bin/oli/src/commands/bench/report.rs | 15 ++++--- bin/oli/src/commands/bench/suite.rs | 61 +++++++++++++--------------- 7 files changed, 89 insertions(+), 44 deletions(-) create mode 100644 bin/oli/dev/config.toml create mode 100644 bin/oli/dev/suite.toml diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index 3592df241f77..dd7a5ec0622d 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -1041,6 +1041,31 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jiff" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db69f08d4fb10524cacdb074c10b296299d71274ddbc830a8ee65666867002e9" +dependencies = [ + "jiff-tzdb-platform", + "windows-sys 0.59.0", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91335e575850c5c4c673b9bd467b0e025f164ca59d0564f69d0c2ee0ffad4653" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9835f0060a626fe59f160437bc725491a6af23133ea906500027d1bd2f8f4329" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "js-sys" version = "0.3.76" @@ -1258,7 +1283,9 @@ dependencies = [ "humantime", "humantime-serde", "indicatif", + "jiff", "opendal", + "pollster", "serde", "tempfile", "tokio", @@ -1425,6 +1452,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pollster" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f3a9f18d041e6d0e102a0a46750538147e5e8992d3b4873aaafee2520b00ce3" + [[package]] name = "portable-atomic" version = "1.10.0" diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index 3853051ebc5b..ae79d0201f7b 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -37,6 +37,7 @@ futures = { version = "0.3" } humansize = { version = "2.1" } humantime = { version = "2.1" } humantime-serde = { version = "1.1" } +jiff = { version = "0.1.15" } indicatif = { version = "0.17" } opendal = { version = "0.51.0", path = "../../core", features = [ "services-azblob", @@ -54,6 +55,7 @@ opendal = { version = "0.51.0", path = "../../core", features = [ "services-webhdfs", "services-azfile", ] } +pollster = { version = "0.4" } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.42", features = ["full"] } toml = { version = "0.8" } diff --git a/bin/oli/dev/config.toml b/bin/oli/dev/config.toml new file mode 100644 index 000000000000..528d12d145fc --- /dev/null +++ b/bin/oli/dev/config.toml @@ -0,0 +1,9 @@ +[profiles.demo] +type = "s3" +root = "/benchmarks" +bucket = "test-bucket" +region = "us-east-1" +endpoint = "http://127.0.0.1:9000" +access_key_id = "minioadmin" +secret_access_key = "minioadmin" +virtual_host_style = "false" diff --git a/bin/oli/dev/suite.toml b/bin/oli/dev/suite.toml new file mode 100644 index 000000000000..d36623ede3a1 --- /dev/null +++ b/bin/oli/dev/suite.toml @@ -0,0 +1,4 @@ +workload = "upload" +parallelism = 4 +file_size = 67108864 # 64MiB +timeout = "60s" diff --git a/bin/oli/src/commands/bench/mod.rs b/bin/oli/src/commands/bench/mod.rs index 275217f3b7d6..012d68b69c73 100644 --- a/bin/oli/src/commands/bench/mod.rs +++ b/bin/oli/src/commands/bench/mod.rs @@ -28,12 +28,13 @@ pub struct BenchCmd { impl BenchCmd { pub async fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; - let suite = suite::BenchSuite::load(&self.bench)?; - let op = cfg.operator(&self.profile)?; - let report = suite.run(op).await?; - println!("{report}"); + let suite = suite::BenchSuite::load(&self.bench)?; + tokio::task::spawn_blocking(move || { + suite.run(op).expect("failed to run bench suite"); + }) + .await?; Ok(()) } } diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs index a48a5dc52b76..83c07e91a1ec 100644 --- a/bin/oli/src/commands/bench/report.rs +++ b/bin/oli/src/commands/bench/report.rs @@ -51,9 +51,9 @@ impl Display for Report { writeln!(f, "Bandwidth:")?; writeln!( f, - "{}/s", + "{}", self.bandwidth.format(2, |x| { - format!("{}", humansize::format_size_i(x, humansize::BINARY)) + format!("{}/s", humansize::format_size_i(x, humansize::BINARY)) }) )?; @@ -78,7 +78,7 @@ impl Display for Report { #[derive(Debug)] pub(crate) struct Metric { - count: u32, + num_samples: u32, min: f64, max: f64, avg: f64, @@ -91,7 +91,7 @@ pub(crate) struct Metric { impl Metric { fn format(&self, indent: usize, formatter: fn(f64) -> String) -> String { format!( - "{:indent$}count: {}\n\ + "{:indent$}num_samples: {}\n\ {:indent$}min: {}\n\ {:indent$}max: {}\n\ {:indent$}avg: {}\n\ @@ -100,7 +100,7 @@ impl Metric { {:indent$}p95: {}\n\ {:indent$}p50: {}", "", - self.count, + self.num_samples, "", formatter(self.min), "", @@ -127,8 +127,7 @@ pub(crate) struct SampleSet { impl SampleSet { /// Add a new sample value. pub fn add(&mut self, sample: f64) { - assert!(!sample.is_finite(), "sample value must not be finite"); - assert!(!sample.is_nan(), "sample value must not be NaN"); + assert!(sample.is_finite(), "sample value must be finite"); self.values.push(sample); } @@ -203,7 +202,7 @@ impl SampleSet { /// Create a metric from the sample set. fn to_metric(&self) -> Metric { Metric { - count: self.count() as u32, + num_samples: self.count() as u32, min: self.min().unwrap_or(f64::NAN), max: self.max().unwrap_or(f64::NAN), avg: self.avg().unwrap_or(f64::NAN), diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs index 7470adf29637..26ca843d512e 100644 --- a/bin/oli/src/commands/bench/suite.rs +++ b/bin/oli/src/commands/bench/suite.rs @@ -5,7 +5,7 @@ use serde::Deserialize; use std::path::Path; use std::time::{Duration, Instant}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug)] struct BenchSuiteConfig { /// Workload to run. workload: Workload, @@ -20,10 +20,10 @@ struct BenchSuiteConfig { /// Maximum time to run the bench suite. #[serde(with = "humantime_serde")] - timout: Duration, + timeout: Duration, } -#[derive(Deserialize, Debug, Copy, Clone)] +#[derive(Deserialize, Debug)] enum Workload { #[serde(rename = "upload")] Upload, @@ -43,13 +43,15 @@ impl BenchSuite { config.file_size >= 4096, "file_size must be greater or equal to 4096" ); + println!("Create bench suite with config: {config:?}"); Ok(BenchSuite { config }) } - pub async fn run(self, op: Operator) -> Result { + pub fn run(self, op: Operator) -> Result<()> { + println!("Start running bench suite ..."); let start = Instant::now(); - let timeout = self.config.timout; + let timeout = self.config.timeout; let parallelism = self.config.parallelism.unwrap_or(1); let file_size = self.config.file_size; let workload = match self.config.workload { @@ -62,13 +64,9 @@ impl BenchSuite { .enable_all() .build()?; - let task = { - let config = self.config.clone(); - let op = op.clone(); - rt.spawn(async move { Task::prepare(config, op).await }) - .await - .context("failed to prepare bench task")?? - }; + let task = rt + .block_on(Task::prepare(&self.config, &op)) + .context("failed to prepare task")?; let mut results = vec![]; for _ in 0..parallelism { @@ -86,7 +84,7 @@ impl BenchSuite { } let iter_start = Instant::now(); - let iter_bytes = task.run(&op).await?; + let iter_bytes = task.run(&op).await.context("failed to execute task")?; let iter_latency = iter_start.elapsed(); count += 1; latency.add(iter_latency.as_micros() as f64); @@ -101,19 +99,16 @@ impl BenchSuite { let mut iops = SampleSet::default(); for result in results { - let (iter_bandwidth, iter_latency, iter_iops) = result.await??; + let (iter_bandwidth, iter_latency, iter_iops) = pollster::block_on(result)??; bandwidth.merge(iter_bandwidth); latency.merge(iter_latency); iops.merge(iter_iops); } - Ok(Report::new( - parallelism, - file_size, - workload, - bandwidth, - latency, - iops, - )) + + let report = Report::new(parallelism, file_size, workload, bandwidth, latency, iops); + println!("Bench suite completed in {:?}; result:\n", start.elapsed()); + println!("{report}"); + Ok(()) } } @@ -126,18 +121,20 @@ enum Task { const BATCH_SIZE: u32 = 4096; impl Task { - async fn prepare(config: BenchSuiteConfig, op: Operator) -> Result { - let BenchSuiteConfig { - workload, - file_size, - .. - } = config; - let path = format!("obench-test-{}", uuid::Uuid::new_v4()); - match workload { + async fn prepare(config: &BenchSuiteConfig, op: &Operator) -> Result { + let now = jiff::Timestamp::now(); + let path = format!( + "obench-test-{}-{}", + now.as_millisecond(), + uuid::Uuid::new_v4() + ); + println!("Prepare task with path: {path}"); + let file_size = config.file_size; + match config.workload { Workload::Upload => Ok(Task::Upload { path, file_size }), Workload::Download => { let mut writer = op.writer(&path).await?; - let batch_cnt = config.file_size / (BATCH_SIZE); + let batch_cnt = file_size / (BATCH_SIZE); for _ in 0..batch_cnt { writer.write(vec![139u8; BATCH_SIZE as usize]).await? } @@ -152,7 +149,7 @@ impl Task { Task::Upload { path, file_size } => { let mut writer = op.writer(path).await?; for _ in 0..(*file_size / BATCH_SIZE) { - writer.write(vec![254u8; *file_size as usize]).await?; + writer.write(vec![254u8; BATCH_SIZE as usize]).await?; } writer.close().await?; Ok((*file_size / BATCH_SIZE) * BATCH_SIZE) From 5b963ab098cde624924bca576eabc7576c7a94d0 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 18:16:08 +0800 Subject: [PATCH 4/9] coauthor Signed-off-by: tison Co-authored-by: Yu Lei From 54d67f8935c7c6907ebe501a1512bfd18e0349fd Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 18:17:15 +0800 Subject: [PATCH 5/9] license header Signed-off-by: tison --- bin/oli/dev/config.toml | 17 +++++++++++++++++ bin/oli/dev/suite.toml | 17 +++++++++++++++++ bin/oli/src/commands/bench/mod.rs | 17 +++++++++++++++++ bin/oli/src/commands/bench/report.rs | 17 +++++++++++++++++ bin/oli/src/commands/bench/suite.rs | 17 +++++++++++++++++ 5 files changed, 85 insertions(+) diff --git a/bin/oli/dev/config.toml b/bin/oli/dev/config.toml index 528d12d145fc..9a0af22fc536 100644 --- a/bin/oli/dev/config.toml +++ b/bin/oli/dev/config.toml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + [profiles.demo] type = "s3" root = "/benchmarks" diff --git a/bin/oli/dev/suite.toml b/bin/oli/dev/suite.toml index d36623ede3a1..42fa0ab07f32 100644 --- a/bin/oli/dev/suite.toml +++ b/bin/oli/dev/suite.toml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + workload = "upload" parallelism = 4 file_size = 67108864 # 64MiB diff --git a/bin/oli/src/commands/bench/mod.rs b/bin/oli/src/commands/bench/mod.rs index 012d68b69c73..36e7d81caf8f 100644 --- a/bin/oli/src/commands/bench/mod.rs +++ b/bin/oli/src/commands/bench/mod.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::config::Config; use crate::params::config::ConfigParams; use anyhow::Result; diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs index 83c07e91a1ec..60c3a779735e 100644 --- a/bin/oli/src/commands/bench/report.rs +++ b/bin/oli/src/commands/bench/report.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::fmt::{Display, Formatter}; use std::time::Duration; diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs index 26ca843d512e..182943d2372e 100644 --- a/bin/oli/src/commands/bench/suite.rs +++ b/bin/oli/src/commands/bench/suite.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::commands::bench::report::{Report, SampleSet}; use anyhow::{ensure, Context, Result}; use opendal::Operator; From f6f50ed780019f1d660251d1fd4a29ae0bb25ca6 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 23 Dec 2024 18:32:55 +0800 Subject: [PATCH 6/9] fmt clippy Signed-off-by: tison --- bin/oli/src/commands/bench/report.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs index 60c3a779735e..bec1115cd3e9 100644 --- a/bin/oli/src/commands/bench/report.rs +++ b/bin/oli/src/commands/bench/report.rs @@ -201,7 +201,7 @@ impl SampleSet { /// The percentile value must between 0.0 and 100.0 (both inclusive). fn percentile(&self, percentile: f64) -> Option { assert!( - percentile >= 0.0 && percentile <= 100.0, + (0.0..=100.0).contains(&percentile), "percentile must be between 0.0 and 100.0" ); From ecc57d72a0ed59ab1e39b950d7ce3d7ade14583d Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 24 Dec 2024 21:52:13 +0800 Subject: [PATCH 7/9] rework oli main Signed-off-by: tison --- bin/oli/src/bin/oli.rs | 26 +++++++++++++------------- bin/oli/src/commands/bench/mod.rs | 10 +++------- bin/oli/src/commands/cat.rs | 7 ++++++- bin/oli/src/commands/cp.rs | 6 +++++- bin/oli/src/commands/ls.rs | 6 +++++- bin/oli/src/commands/mod.rs | 16 ++++++++-------- bin/oli/src/commands/mv.rs | 7 ++++++- bin/oli/src/commands/rm.rs | 7 ++++++- bin/oli/src/commands/stat.rs | 7 ++++++- bin/oli/src/lib.rs | 8 ++++++++ 10 files changed, 66 insertions(+), 34 deletions(-) diff --git a/bin/oli/src/bin/oli.rs b/bin/oli/src/bin/oli.rs index 08232b1c8a55..ab07e8b64c8e 100644 --- a/bin/oli/src/bin/oli.rs +++ b/bin/oli/src/bin/oli.rs @@ -26,7 +26,7 @@ use std::env; use std::ffi::OsStr; use std::path::PathBuf; -use anyhow::anyhow; +use anyhow::bail; use anyhow::Result; use oli::commands::OliSubcommand; @@ -37,8 +37,7 @@ pub struct Oli { subcommand: OliSubcommand, } -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { // Guard against infinite proxy recursion. This mostly happens due to // bugs in oli. do_recursion_guard()?; @@ -52,40 +51,40 @@ async fn main() -> Result<()> { { Some("oli") => { let cmd: Oli = clap::Parser::parse(); - cmd.subcommand.run().await?; + cmd.subcommand.run()?; } Some("obench") => { let cmd: oli::commands::bench::BenchCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ocat") => { let cmd: oli::commands::cat::CatCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ocp") => { let cmd: oli::commands::cp::CopyCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ols") => { let cmd: oli::commands::ls::LsCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("orm") => { let cmd: oli::commands::rm::RmCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ostat") => { let cmd: oli::commands::stat::StatCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("omv") => { let cmd: oli::commands::mv::MoveCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some(v) => { println!("{v} is not supported") } - None => return Err(anyhow!("couldn't determine self executable name")), + None => bail!("couldn't determine self executable name"), } Ok(()) @@ -98,8 +97,9 @@ fn do_recursion_guard() -> Result<()> { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(0); + if recursion_count > OLI_RECURSION_COUNT_MAX { - return Err(anyhow!("infinite recursion detected")); + bail!("infinite recursion detected"); } Ok(()) diff --git a/bin/oli/src/commands/bench/mod.rs b/bin/oli/src/commands/bench/mod.rs index 36e7d81caf8f..7d6906a98652 100644 --- a/bin/oli/src/commands/bench/mod.rs +++ b/bin/oli/src/commands/bench/mod.rs @@ -43,15 +43,11 @@ pub struct BenchCmd { } impl BenchCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; - let op = cfg.operator(&self.profile)?; let suite = suite::BenchSuite::load(&self.bench)?; - - tokio::task::spawn_blocking(move || { - suite.run(op).expect("failed to run bench suite"); - }) - .await?; + let op = cfg.operator(&self.profile)?; + suite.run(op)?; Ok(()) } } diff --git a/bin/oli/src/commands/cat.rs b/bin/oli/src/commands/cat.rs index da57b90e1dc5..5bce96e357e9 100644 --- a/bin/oli/src/commands/cat.rs +++ b/bin/oli/src/commands/cat.rs @@ -19,6 +19,7 @@ use anyhow::Result; use futures::io; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -36,7 +37,11 @@ pub struct CatCmd { } impl CatCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs index 07f9ccf31ef8..5f4cafb64bd4 100644 --- a/bin/oli/src/commands/cp.rs +++ b/bin/oli/src/commands/cp.rs @@ -26,6 +26,7 @@ use futures::AsyncWriteExt; use futures::TryStreamExt; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; /// Template for the progress bar display. @@ -59,7 +60,10 @@ pub struct CopyCmd { } impl CopyCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/ls.rs b/bin/oli/src/commands/ls.rs index 4aa756769a04..f9805577a8bd 100644 --- a/bin/oli/src/commands/ls.rs +++ b/bin/oli/src/commands/ls.rs @@ -19,6 +19,7 @@ use anyhow::Result; use futures::TryStreamExt; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -35,7 +36,10 @@ pub struct LsCmd { } impl LsCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/mod.rs b/bin/oli/src/commands/mod.rs index c2334430a4f7..9ecba9731256 100644 --- a/bin/oli/src/commands/mod.rs +++ b/bin/oli/src/commands/mod.rs @@ -37,15 +37,15 @@ pub enum OliSubcommand { } impl OliSubcommand { - pub async fn run(self) -> anyhow::Result<()> { + pub fn run(self) -> anyhow::Result<()> { match self { - Self::Bench(cmd) => cmd.run().await, - Self::Cat(cmd) => cmd.run().await, - Self::Cp(cmd) => cmd.run().await, - Self::Ls(cmd) => cmd.run().await, - Self::Rm(cmd) => cmd.run().await, - Self::Stat(cmd) => cmd.run().await, - Self::Mv(cmd) => cmd.run().await, + Self::Bench(cmd) => cmd.run(), + Self::Cat(cmd) => cmd.run(), + Self::Cp(cmd) => cmd.run(), + Self::Ls(cmd) => cmd.run(), + Self::Rm(cmd) => cmd.run(), + Self::Stat(cmd) => cmd.run(), + Self::Mv(cmd) => cmd.run(), } } } diff --git a/bin/oli/src/commands/mv.rs b/bin/oli/src/commands/mv.rs index 81a9eed65b80..4a6942930c1e 100644 --- a/bin/oli/src/commands/mv.rs +++ b/bin/oli/src/commands/mv.rs @@ -21,6 +21,7 @@ use anyhow::{Error, Result}; use futures::{AsyncWriteExt, TryStreamExt}; use opendal::Operator; use std::path::Path; +use crate::make_tokio_runtime; #[derive(Debug, clap::Parser)] #[command(name = "mv", about = "Move object", disable_version_flag = true)] @@ -37,7 +38,11 @@ pub struct MoveCmd { } impl MoveCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/rm.rs b/bin/oli/src/commands/rm.rs index e1af31167055..f3949a864a5b 100644 --- a/bin/oli/src/commands/rm.rs +++ b/bin/oli/src/commands/rm.rs @@ -18,6 +18,7 @@ use anyhow::Result; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -34,7 +35,11 @@ pub struct RmCmd { } impl RmCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/stat.rs b/bin/oli/src/commands/stat.rs index 11839f2ebac8..7842545a90b7 100644 --- a/bin/oli/src/commands/stat.rs +++ b/bin/oli/src/commands/stat.rs @@ -18,6 +18,7 @@ use anyhow::Result; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -35,7 +36,11 @@ pub struct StatCmd { } impl StatCmd { - pub async fn run(self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let target = &self.target; diff --git a/bin/oli/src/lib.rs b/bin/oli/src/lib.rs index e829973afc58..ecc00aaf4fa1 100644 --- a/bin/oli/src/lib.rs +++ b/bin/oli/src/lib.rs @@ -18,3 +18,11 @@ pub mod commands; pub mod config; pub mod params; + +fn make_tokio_runtime(n_threads: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n_threads) + .enable_all() + .build() + .expect("failed to create tokio runtime") +} From 9a24a99961d23bfd3d2d8e942f7653db0c9d1aab Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 24 Dec 2024 22:04:22 +0800 Subject: [PATCH 8/9] parse-size Signed-off-by: tison --- bin/oli/Cargo.lock | 7 +++++++ bin/oli/Cargo.toml | 1 + bin/oli/dev/suite.toml | 2 +- bin/oli/src/commands/bench/report.rs | 4 ++-- bin/oli/src/commands/bench/suite.rs | 23 ++++++++++++++++------- 5 files changed, 27 insertions(+), 10 deletions(-) diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index dd7a5ec0622d..313eab1d5536 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -1285,6 +1285,7 @@ dependencies = [ "indicatif", "jiff", "opendal", + "parse-size", "pollster", "serde", "tempfile", @@ -1367,6 +1368,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-size" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" + [[package]] name = "pbkdf2" version = "0.12.2" diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index ae79d0201f7b..98e517b8bbb4 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -61,6 +61,7 @@ tokio = { version = "1.42", features = ["full"] } toml = { version = "0.8" } url = { version = "2.5" } uuid = { version = "1.11" } +parse-size = "1.0.0" [dev-dependencies] assert_cmd = { version = "2.0" } diff --git a/bin/oli/dev/suite.toml b/bin/oli/dev/suite.toml index 42fa0ab07f32..7b0eec1c8a9e 100644 --- a/bin/oli/dev/suite.toml +++ b/bin/oli/dev/suite.toml @@ -17,5 +17,5 @@ workload = "upload" parallelism = 4 -file_size = 67108864 # 64MiB +file_size = "64MiB" timeout = "60s" diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs index bec1115cd3e9..ddae5b255efa 100644 --- a/bin/oli/src/commands/bench/report.rs +++ b/bin/oli/src/commands/bench/report.rs @@ -22,7 +22,7 @@ use std::time::Duration; pub(crate) struct Report { // bench suite infos parallelism: u32, - file_size: u32, + file_size: u64, workload: String, // bench result metrics @@ -37,7 +37,7 @@ pub(crate) struct Report { impl Report { pub fn new( parallelism: u32, - file_size: u32, + file_size: u64, workload: String, bandwidth: SampleSet, latency: SampleSet, diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs index 182943d2372e..65dc2a1d349a 100644 --- a/bin/oli/src/commands/bench/suite.rs +++ b/bin/oli/src/commands/bench/suite.rs @@ -18,7 +18,7 @@ use crate::commands::bench::report::{Report, SampleSet}; use anyhow::{ensure, Context, Result}; use opendal::Operator; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use std::path::Path; use std::time::{Duration, Instant}; @@ -32,14 +32,23 @@ struct BenchSuiteConfig { /// Default to 1. parallelism: Option, - /// Size of file in bytes. - file_size: u32, + /// Size of file. + #[serde(deserialize_with = "deserialize_file_size")] + file_size: u64, /// Maximum time to run the bench suite. #[serde(with = "humantime_serde")] timeout: Duration, } +fn deserialize_file_size<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + parse_size::parse_size(&s).map_err(serde::de::Error::custom) +} + #[derive(Deserialize, Debug)] enum Workload { #[serde(rename = "upload")] @@ -131,11 +140,11 @@ impl BenchSuite { #[derive(Clone, Debug)] enum Task { - Upload { path: String, file_size: u32 }, + Upload { path: String, file_size: u64 }, Download { path: String }, } -const BATCH_SIZE: u32 = 4096; +const BATCH_SIZE: u64 = 4096; impl Task { async fn prepare(config: &BenchSuiteConfig, op: &Operator) -> Result { @@ -161,7 +170,7 @@ impl Task { } } - async fn run(&self, op: &Operator) -> Result { + async fn run(&self, op: &Operator) -> Result { match self { Task::Upload { path, file_size } => { let mut writer = op.writer(path).await?; @@ -173,7 +182,7 @@ impl Task { } Task::Download { path } => { let bytes = op.read_with(path).await?; - Ok(bytes.len() as u32) + Ok(bytes.len() as u64) } } } From 5354f74ec885b7fd66f707f74796396c9b1c1821 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 24 Dec 2024 22:16:58 +0800 Subject: [PATCH 9/9] fixup usability Signed-off-by: tison --- bin/oli/Cargo.lock | 26 ------------------- bin/oli/Cargo.toml | 3 +-- bin/oli/src/commands/bench/report.rs | 4 +-- bin/oli/src/commands/bench/suite.rs | 39 ++++++++++++++++++---------- bin/oli/src/commands/mv.rs | 2 +- 5 files changed, 29 insertions(+), 45 deletions(-) diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index 313eab1d5536..91b109ea7548 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -1041,31 +1041,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" -[[package]] -name = "jiff" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db69f08d4fb10524cacdb074c10b296299d71274ddbc830a8ee65666867002e9" -dependencies = [ - "jiff-tzdb-platform", - "windows-sys 0.59.0", -] - -[[package]] -name = "jiff-tzdb" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91335e575850c5c4c673b9bd467b0e025f164ca59d0564f69d0c2ee0ffad4653" - -[[package]] -name = "jiff-tzdb-platform" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9835f0060a626fe59f160437bc725491a6af23133ea906500027d1bd2f8f4329" -dependencies = [ - "jiff-tzdb", -] - [[package]] name = "js-sys" version = "0.3.76" @@ -1283,7 +1258,6 @@ dependencies = [ "humantime", "humantime-serde", "indicatif", - "jiff", "opendal", "parse-size", "pollster", diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index 98e517b8bbb4..84b91d6247bc 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -37,7 +37,6 @@ futures = { version = "0.3" } humansize = { version = "2.1" } humantime = { version = "2.1" } humantime-serde = { version = "1.1" } -jiff = { version = "0.1.15" } indicatif = { version = "0.17" } opendal = { version = "0.51.0", path = "../../core", features = [ "services-azblob", @@ -61,7 +60,7 @@ tokio = { version = "1.42", features = ["full"] } toml = { version = "0.8" } url = { version = "2.5" } uuid = { version = "1.11" } -parse-size = "1.0.0" +parse-size = { version = "1.1" } [dev-dependencies] assert_cmd = { version = "2.0" } diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs index ddae5b255efa..a6a731424bc7 100644 --- a/bin/oli/src/commands/bench/report.rs +++ b/bin/oli/src/commands/bench/report.rs @@ -21,7 +21,7 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Report { // bench suite infos - parallelism: u32, + parallelism: usize, file_size: u64, workload: String, @@ -36,7 +36,7 @@ pub(crate) struct Report { impl Report { pub fn new( - parallelism: u32, + parallelism: usize, file_size: u64, workload: String, bandwidth: SampleSet, diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs index 65dc2a1d349a..1f43965d2800 100644 --- a/bin/oli/src/commands/bench/suite.rs +++ b/bin/oli/src/commands/bench/suite.rs @@ -16,6 +16,7 @@ // under the License. use crate::commands::bench::report::{Report, SampleSet}; +use crate::make_tokio_runtime; use anyhow::{ensure, Context, Result}; use opendal::Operator; use serde::{Deserialize, Deserializer}; @@ -39,6 +40,13 @@ struct BenchSuiteConfig { /// Maximum time to run the bench suite. #[serde(with = "humantime_serde")] timeout: Duration, + + /// Whether retain the object on success. + /// + /// Default to false, which means the object used in the suite will be deleted + /// after the suite successfully returned. + #[serde(default)] + retain_on_success: bool, } fn deserialize_file_size<'de, D>(deserializer: D) -> Result @@ -78,20 +86,20 @@ impl BenchSuite { let start = Instant::now(); let timeout = self.config.timeout; - let parallelism = self.config.parallelism.unwrap_or(1); + let parallelism = self.config.parallelism.unwrap_or(1) as usize; let file_size = self.config.file_size; let workload = match self.config.workload { Workload::Upload => "upload".to_string(), Workload::Download => "download".to_string(), }; + let retain_on_success = self.config.retain_on_success; - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(parallelism as usize) - .enable_all() - .build()?; + let rt = make_tokio_runtime(parallelism); + let path = format!("obench-test-{}", uuid::Uuid::new_v4()); + println!("Prepare task with path: {path}"); let task = rt - .block_on(Task::prepare(&self.config, &op)) + .block_on(Task::prepare(&self.config, &path, &op)) .context("failed to prepare task")?; let mut results = vec![]; @@ -131,6 +139,15 @@ impl BenchSuite { iops.merge(iter_iops); } + if !retain_on_success { + rt.block_on(async { + println!("Deleting object at path: {path}"); + if let Err(err) = op.delete(&path).await { + eprintln!("failed to delete object: {}", err); + } + }); + } + let report = Report::new(parallelism, file_size, workload, bandwidth, latency, iops); println!("Bench suite completed in {:?}; result:\n", start.elapsed()); println!("{report}"); @@ -147,14 +164,8 @@ enum Task { const BATCH_SIZE: u64 = 4096; impl Task { - async fn prepare(config: &BenchSuiteConfig, op: &Operator) -> Result { - let now = jiff::Timestamp::now(); - let path = format!( - "obench-test-{}-{}", - now.as_millisecond(), - uuid::Uuid::new_v4() - ); - println!("Prepare task with path: {path}"); + async fn prepare(config: &BenchSuiteConfig, path: &str, op: &Operator) -> Result { + let path = path.to_string(); let file_size = config.file_size; match config.workload { Workload::Upload => Ok(Task::Upload { path, file_size }), diff --git a/bin/oli/src/commands/mv.rs b/bin/oli/src/commands/mv.rs index 4a6942930c1e..1a72ecb90ddf 100644 --- a/bin/oli/src/commands/mv.rs +++ b/bin/oli/src/commands/mv.rs @@ -16,12 +16,12 @@ // under the License. use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; use anyhow::{Error, Result}; use futures::{AsyncWriteExt, TryStreamExt}; use opendal::Operator; use std::path::Path; -use crate::make_tokio_runtime; #[derive(Debug, clap::Parser)] #[command(name = "mv", about = "Move object", disable_version_flag = true)]