From 260fcd4c6049dd06e2c3c7e894aa4edd8b98517a Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 24 Dec 2024 23:23:24 +0800 Subject: [PATCH] feat(bin/oli): implement oli bench (#5443) Co-authored-by: Yu Lei --- bin/oli/Cargo.lock | 43 +++++ bin/oli/Cargo.toml | 7 +- bin/oli/README.md | 22 +-- bin/oli/dev/config.toml | 26 +++ bin/oli/dev/suite.toml | 21 +++ bin/oli/src/bin/oli.rs | 28 ++-- bin/oli/src/commands/bench/mod.rs | 53 ++++++ bin/oli/src/commands/bench/report.rs | 232 +++++++++++++++++++++++++++ bin/oli/src/commands/bench/suite.rs | 200 +++++++++++++++++++++++ bin/oli/src/commands/cat.rs | 7 +- bin/oli/src/commands/cp.rs | 6 +- bin/oli/src/commands/ls.rs | 6 +- bin/oli/src/commands/mod.rs | 17 +- bin/oli/src/commands/mv.rs | 7 +- bin/oli/src/commands/rm.rs | 7 +- bin/oli/src/commands/stat.rs | 7 +- bin/oli/src/config/mod.rs | 10 +- bin/oli/src/lib.rs | 8 + bin/oli/src/params/config.rs | 2 +- 19 files changed, 667 insertions(+), 42 deletions(-) create mode 100644 bin/oli/dev/config.toml create mode 100644 bin/oli/dev/suite.toml create mode 100644 bin/oli/src/commands/bench/mod.rs create mode 100644 bin/oli/src/commands/bench/report.rs create mode 100644 bin/oli/src/commands/bench/suite.rs diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index 3e402ab24e35..91b109ea7548 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -747,6 +747,31 @@ version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.5.2" @@ -1229,13 +1254,19 @@ dependencies = [ "clap", "dirs", "futures", + "humansize", + "humantime", + "humantime-serde", "indicatif", "opendal", + "parse-size", + "pollster", "serde", "tempfile", "tokio", "toml", "url", + "uuid", ] [[package]] @@ -1311,6 +1342,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-size" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" + [[package]] name = "pbkdf2" version = "0.12.2" @@ -1396,6 +1433,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pollster" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f3a9f18d041e6d0e102a0a46750538147e5e8992d3b4873aaafee2520b00ce3" + [[package]] name = "portable-atomic" version = "1.10.0" diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index 7420f42fafc2..84b91d6247bc 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -34,6 +34,9 @@ anyhow = { version = "1.0" } clap = { version = "4.5", features = ["cargo", "string", "derive"] } dirs = { version = "5.0" } futures = { version = "0.3" } +humansize = { version = "2.1" } +humantime = { version = "2.1" } +humantime-serde = { version = "1.1" } indicatif = { version = "0.17" } opendal = { version = "0.51.0", path = "../../core", features = [ "services-azblob", @@ -44,7 +47,6 @@ opendal = { version = "0.51.0", path = "../../core", features = [ "services-ghac", "services-http", "services-ipmfs", - "services-memory", "services-obs", "services-oss", "services-s3", @@ -52,10 +54,13 @@ opendal = { version = "0.51.0", path = "../../core", features = [ "services-webhdfs", "services-azfile", ] } +pollster = { version = "0.4" } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.42", features = ["full"] } toml = { version = "0.8" } url = { version = "2.5" } +uuid = { version = "1.11" } +parse-size = { version = "1.1" } [dev-dependencies] assert_cmd = { version = "2.0" } diff --git a/bin/oli/README.md b/bin/oli/README.md index 0a44e5a0ab5c..ad0467f0ef8a 100644 --- a/bin/oli/README.md +++ b/bin/oli/README.md @@ -19,7 +19,7 @@ `oli` could be installed by `cargo`: ```bash -cargo install oli --all-features +cargo install oli ``` > `cargo` is the Rust package manager. `cargo` could be installed by following the [Installation](https://www.rust-lang.org/tools/install) from Rust official website. @@ -67,23 +67,23 @@ access_key_id = "" secret_access_key = "" ``` -For different services, you could find the configuration keys in the corresponding [service document](https://opendal.apache.org/docs/services/). +For different services, you could find the configuration keys in the corresponding [service document](https://docs.rs/opendal/0.50.2/opendal/services/index.html). ### Example: use `oli` to upload file to AWS S3 -```text -$ oli cp ./update-ecs-loadbalancer.json s3:/update-ecs-loadbalancer.json -$ oli ls s3:/ -fleet.png -update-ecs-loadbalancer.json +```shell +oli cp ./update-ecs-loadbalancer.json s3:/update-ecs-loadbalancer.json +oli ls s3:/ +# fleet.png +# update-ecs-loadbalancer.json ``` ### Example: use `oli` copy file from S3 to R2 -```text -$ oli cp s3:/fleet.png r2:/fleet.png -$ oli ls r2:/ -fleet.png +```shell +oli cp s3:/fleet.png r2:/fleet.png +oli ls r2:/ +# fleet.png ``` ## Contribute to `oli` diff --git a/bin/oli/dev/config.toml b/bin/oli/dev/config.toml new file mode 100644 index 000000000000..9a0af22fc536 --- /dev/null +++ b/bin/oli/dev/config.toml @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[profiles.demo] +type = "s3" +root = "/benchmarks" +bucket = "test-bucket" +region = "us-east-1" +endpoint = "http://127.0.0.1:9000" +access_key_id = "minioadmin" +secret_access_key = "minioadmin" +virtual_host_style = "false" diff --git a/bin/oli/dev/suite.toml b/bin/oli/dev/suite.toml new file mode 100644 index 000000000000..7b0eec1c8a9e --- /dev/null +++ b/bin/oli/dev/suite.toml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +workload = "upload" +parallelism = 4 +file_size = "64MiB" +timeout = "60s" diff --git a/bin/oli/src/bin/oli.rs b/bin/oli/src/bin/oli.rs index 8ae115602f4a..ab07e8b64c8e 100644 --- a/bin/oli/src/bin/oli.rs +++ b/bin/oli/src/bin/oli.rs @@ -26,7 +26,7 @@ use std::env; use std::ffi::OsStr; use std::path::PathBuf; -use anyhow::anyhow; +use anyhow::bail; use anyhow::Result; use oli::commands::OliSubcommand; @@ -37,8 +37,7 @@ pub struct Oli { subcommand: OliSubcommand, } -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { // Guard against infinite proxy recursion. This mostly happens due to // bugs in oli. do_recursion_guard()?; @@ -52,36 +51,40 @@ async fn main() -> Result<()> { { Some("oli") => { let cmd: Oli = clap::Parser::parse(); - cmd.subcommand.run().await?; + cmd.subcommand.run()?; + } + Some("obench") => { + let cmd: oli::commands::bench::BenchCmd = clap::Parser::parse(); + cmd.run()?; } Some("ocat") => { let cmd: oli::commands::cat::CatCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ocp") => { let cmd: oli::commands::cp::CopyCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ols") => { let cmd: oli::commands::ls::LsCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("orm") => { let cmd: oli::commands::rm::RmCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("ostat") => { let cmd: oli::commands::stat::StatCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some("omv") => { let cmd: oli::commands::mv::MoveCmd = clap::Parser::parse(); - cmd.run().await?; + cmd.run()?; } Some(v) => { println!("{v} is not supported") } - None => return Err(anyhow!("couldn't determine self executable name")), + None => bail!("couldn't determine self executable name"), } Ok(()) @@ -94,8 +97,9 @@ fn do_recursion_guard() -> Result<()> { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(0); + if recursion_count > OLI_RECURSION_COUNT_MAX { - return Err(anyhow!("infinite recursion detected")); + bail!("infinite recursion detected"); } Ok(()) diff --git a/bin/oli/src/commands/bench/mod.rs b/bin/oli/src/commands/bench/mod.rs new file mode 100644 index 000000000000..7d6906a98652 --- /dev/null +++ b/bin/oli/src/commands/bench/mod.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::Config; +use crate::params::config::ConfigParams; +use anyhow::Result; +use std::path::PathBuf; + +mod report; +mod suite; + +#[derive(Debug, clap::Parser)] +#[command( + name = "bench", + about = "Run benchmark against the storage backend", + disable_version_flag = true +)] +pub struct BenchCmd { + #[command(flatten)] + pub config_params: ConfigParams, + /// Name of the profile to use. + #[arg()] + pub profile: String, + /// Path to the benchmark config. + #[arg( + value_parser = clap::value_parser!(PathBuf), + )] + pub bench: PathBuf, +} + +impl BenchCmd { + pub fn run(self) -> Result<()> { + let cfg = Config::load(&self.config_params.config)?; + let suite = suite::BenchSuite::load(&self.bench)?; + let op = cfg.operator(&self.profile)?; + suite.run(op)?; + Ok(()) + } +} diff --git a/bin/oli/src/commands/bench/report.rs b/bin/oli/src/commands/bench/report.rs new file mode 100644 index 000000000000..a6a731424bc7 --- /dev/null +++ b/bin/oli/src/commands/bench/report.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Display, Formatter}; +use std::time::Duration; + +#[derive(Debug)] +pub(crate) struct Report { + // bench suite infos + parallelism: usize, + file_size: u64, + workload: String, + + // bench result metrics + /// Throughput (bytes per second). + bandwidth: Metric, + /// Latency (microseconds). + latency: Metric, + /// IOPS (operations per second). + iops: Metric, +} + +impl Report { + pub fn new( + parallelism: usize, + file_size: u64, + workload: String, + bandwidth: SampleSet, + latency: SampleSet, + iops: SampleSet, + ) -> Self { + Self { + parallelism, + file_size, + workload, + bandwidth: bandwidth.to_metric(), + latency: latency.to_metric(), + iops: iops.to_metric(), + } + } +} + +impl Display for Report { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Parallel tasks: {}", self.parallelism)?; + writeln!(f, "Workload: {}", self.workload)?; + writeln!( + f, + "File size: {}", + humansize::format_size(self.file_size, humansize::BINARY) + )?; + + writeln!(f)?; + writeln!(f, "Bandwidth:")?; + writeln!( + f, + "{}", + self.bandwidth.format(2, |x| { + format!("{}/s", humansize::format_size_i(x, humansize::BINARY)) + }) + )?; + + writeln!(f)?; + writeln!(f, "Latency:")?; + writeln!( + f, + "{}", + self.latency.format(2, |x| { + let dur = Duration::from_micros(x as u64); + format!("{}", humantime::format_duration(dur)) + }) + )?; + + writeln!(f)?; + writeln!(f, "IOPS:")?; + writeln!(f, "{}", self.iops.format(2, |x| { format!("{x:.3}") }))?; + + Ok(()) + } +} + +#[derive(Debug)] +pub(crate) struct Metric { + num_samples: u32, + min: f64, + max: f64, + avg: f64, + stddev: f64, + p99: f64, + p95: f64, + p50: f64, +} + +impl Metric { + fn format(&self, indent: usize, formatter: fn(f64) -> String) -> String { + format!( + "{:indent$}num_samples: {}\n\ + {:indent$}min: {}\n\ + {:indent$}max: {}\n\ + {:indent$}avg: {}\n\ + {:indent$}stddev: {}\n\ + {:indent$}p99: {}\n\ + {:indent$}p95: {}\n\ + {:indent$}p50: {}", + "", + self.num_samples, + "", + formatter(self.min), + "", + formatter(self.max), + "", + formatter(self.avg), + "", + formatter(self.stddev), + "", + formatter(self.p99), + "", + formatter(self.p95), + "", + formatter(self.p50), + ) + } +} + +#[derive(Debug, Default)] +pub(crate) struct SampleSet { + values: Vec, +} + +impl SampleSet { + /// Add a new sample value. + pub fn add(&mut self, sample: f64) { + assert!(sample.is_finite(), "sample value must be finite"); + self.values.push(sample); + } + + /// Merge two sample sets. + pub fn merge(&mut self, other: SampleSet) { + self.values.extend(other.values); + } + + /// Get the minimum value. + fn min(&self) -> Option { + self.values.iter().copied().min_by(|a, b| a.total_cmp(b)) + } + + /// Get the maximum value. + fn max(&self) -> Option { + self.values.iter().copied().max_by(|a, b| a.total_cmp(b)) + } + + /// Get number of samples. + fn count(&self) -> usize { + self.values.len() + } + + /// Get the average of values. + fn avg(&self) -> Option { + let count = self.count(); + if count == 0 { + return None; + } + + let sum: f64 = self.values.iter().copied().sum(); + Some(sum / (count as f64)) + } + + /// Get the standard deviation of values. + fn stddev(&self) -> Option { + let count = self.count(); + if count == 0 { + return None; + } + + let avg = self.avg()?; + let sum = self + .values + .iter() + .copied() + .map(|x| (x - avg).powi(2)) + .sum::(); + Some((sum / count as f64).sqrt()) + } + + /// Get the percentile value. + /// + /// The percentile value must between 0.0 and 100.0 (both inclusive). + fn percentile(&self, percentile: f64) -> Option { + assert!( + (0.0..=100.0).contains(&percentile), + "percentile must be between 0.0 and 100.0" + ); + + let count = self.count(); + if count == 0 { + return None; + } + + let index = ((count - 1) as f64 * percentile / 100.0).trunc() as usize; + let mut sorted = self.values.clone(); + sorted.sort_by(|a, b| a.total_cmp(b)); + sorted.get(index).copied() + } + + /// Create a metric from the sample set. + fn to_metric(&self) -> Metric { + Metric { + num_samples: self.count() as u32, + min: self.min().unwrap_or(f64::NAN), + max: self.max().unwrap_or(f64::NAN), + avg: self.avg().unwrap_or(f64::NAN), + stddev: self.stddev().unwrap_or(f64::NAN), + p99: self.percentile(99.0).unwrap_or(f64::NAN), + p95: self.percentile(95.0).unwrap_or(f64::NAN), + p50: self.percentile(50.0).unwrap_or(f64::NAN), + } + } +} diff --git a/bin/oli/src/commands/bench/suite.rs b/bin/oli/src/commands/bench/suite.rs new file mode 100644 index 000000000000..1f43965d2800 --- /dev/null +++ b/bin/oli/src/commands/bench/suite.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::commands::bench::report::{Report, SampleSet}; +use crate::make_tokio_runtime; +use anyhow::{ensure, Context, Result}; +use opendal::Operator; +use serde::{Deserialize, Deserializer}; +use std::path::Path; +use std::time::{Duration, Instant}; + +#[derive(Deserialize, Debug)] +struct BenchSuiteConfig { + /// Workload to run. + workload: Workload, + + /// Number of parallel tasks to run. + /// + /// Default to 1. + parallelism: Option, + + /// Size of file. + #[serde(deserialize_with = "deserialize_file_size")] + file_size: u64, + + /// Maximum time to run the bench suite. + #[serde(with = "humantime_serde")] + timeout: Duration, + + /// Whether retain the object on success. + /// + /// Default to false, which means the object used in the suite will be deleted + /// after the suite successfully returned. + #[serde(default)] + retain_on_success: bool, +} + +fn deserialize_file_size<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + parse_size::parse_size(&s).map_err(serde::de::Error::custom) +} + +#[derive(Deserialize, Debug)] +enum Workload { + #[serde(rename = "upload")] + Upload, + #[serde(rename = "download")] + Download, +} + +pub struct BenchSuite { + config: BenchSuiteConfig, +} + +impl BenchSuite { + pub fn load(path: &Path) -> Result { + let content = std::fs::read_to_string(path)?; + let config = toml::from_str::(&content)?; + ensure!( + config.file_size >= 4096, + "file_size must be greater or equal to 4096" + ); + println!("Create bench suite with config: {config:?}"); + Ok(BenchSuite { config }) + } + + pub fn run(self, op: Operator) -> Result<()> { + println!("Start running bench suite ..."); + let start = Instant::now(); + + let timeout = self.config.timeout; + let parallelism = self.config.parallelism.unwrap_or(1) as usize; + let file_size = self.config.file_size; + let workload = match self.config.workload { + Workload::Upload => "upload".to_string(), + Workload::Download => "download".to_string(), + }; + let retain_on_success = self.config.retain_on_success; + + let rt = make_tokio_runtime(parallelism); + + let path = format!("obench-test-{}", uuid::Uuid::new_v4()); + println!("Prepare task with path: {path}"); + let task = rt + .block_on(Task::prepare(&self.config, &path, &op)) + .context("failed to prepare task")?; + + let mut results = vec![]; + for _ in 0..parallelism { + let op = op.clone(); + let task = task.clone(); + results.push(rt.spawn(async move { + let mut bandwidth = SampleSet::default(); + let mut latency = SampleSet::default(); + let mut iops = SampleSet::default(); + let mut count = 0; + + loop { + if start.elapsed() > timeout { + return Ok::<_, anyhow::Error>((bandwidth, latency, iops)); + } + + let iter_start = Instant::now(); + let iter_bytes = task.run(&op).await.context("failed to execute task")?; + let iter_latency = iter_start.elapsed(); + count += 1; + latency.add(iter_latency.as_micros() as f64); + bandwidth.add(iter_bytes as f64 / iter_latency.as_secs_f64()); + iops.add(count as f64 / start.elapsed().as_secs_f64()); + } + })) + } + + let mut bandwidth = SampleSet::default(); + let mut latency = SampleSet::default(); + let mut iops = SampleSet::default(); + + for result in results { + let (iter_bandwidth, iter_latency, iter_iops) = pollster::block_on(result)??; + bandwidth.merge(iter_bandwidth); + latency.merge(iter_latency); + iops.merge(iter_iops); + } + + if !retain_on_success { + rt.block_on(async { + println!("Deleting object at path: {path}"); + if let Err(err) = op.delete(&path).await { + eprintln!("failed to delete object: {}", err); + } + }); + } + + let report = Report::new(parallelism, file_size, workload, bandwidth, latency, iops); + println!("Bench suite completed in {:?}; result:\n", start.elapsed()); + println!("{report}"); + Ok(()) + } +} + +#[derive(Clone, Debug)] +enum Task { + Upload { path: String, file_size: u64 }, + Download { path: String }, +} + +const BATCH_SIZE: u64 = 4096; + +impl Task { + async fn prepare(config: &BenchSuiteConfig, path: &str, op: &Operator) -> Result { + let path = path.to_string(); + let file_size = config.file_size; + match config.workload { + Workload::Upload => Ok(Task::Upload { path, file_size }), + Workload::Download => { + let mut writer = op.writer(&path).await?; + let batch_cnt = file_size / (BATCH_SIZE); + for _ in 0..batch_cnt { + writer.write(vec![139u8; BATCH_SIZE as usize]).await? + } + writer.close().await?; + Ok(Task::Download { path }) + } + } + } + + async fn run(&self, op: &Operator) -> Result { + match self { + Task::Upload { path, file_size } => { + let mut writer = op.writer(path).await?; + for _ in 0..(*file_size / BATCH_SIZE) { + writer.write(vec![254u8; BATCH_SIZE as usize]).await?; + } + writer.close().await?; + Ok((*file_size / BATCH_SIZE) * BATCH_SIZE) + } + Task::Download { path } => { + let bytes = op.read_with(path).await?; + Ok(bytes.len() as u64) + } + } + } +} diff --git a/bin/oli/src/commands/cat.rs b/bin/oli/src/commands/cat.rs index 567ebf9f1cba..5bce96e357e9 100644 --- a/bin/oli/src/commands/cat.rs +++ b/bin/oli/src/commands/cat.rs @@ -19,6 +19,7 @@ use anyhow::Result; use futures::io; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -36,7 +37,11 @@ pub struct CatCmd { } impl CatCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs index ed954a3df1ae..5f4cafb64bd4 100644 --- a/bin/oli/src/commands/cp.rs +++ b/bin/oli/src/commands/cp.rs @@ -26,6 +26,7 @@ use futures::AsyncWriteExt; use futures::TryStreamExt; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; /// Template for the progress bar display. @@ -59,7 +60,10 @@ pub struct CopyCmd { } impl CopyCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/ls.rs b/bin/oli/src/commands/ls.rs index 10050982936d..f9805577a8bd 100644 --- a/bin/oli/src/commands/ls.rs +++ b/bin/oli/src/commands/ls.rs @@ -19,6 +19,7 @@ use anyhow::Result; use futures::TryStreamExt; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -35,7 +36,10 @@ pub struct LsCmd { } impl LsCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/mod.rs b/bin/oli/src/commands/mod.rs index 0ab829edb647..9ecba9731256 100644 --- a/bin/oli/src/commands/mod.rs +++ b/bin/oli/src/commands/mod.rs @@ -17,6 +17,7 @@ //! Provides the implementation of each command. +pub mod bench; pub mod cat; pub mod cp; pub mod ls; @@ -26,6 +27,7 @@ pub mod stat; #[derive(Debug, clap::Subcommand)] pub enum OliSubcommand { + Bench(bench::BenchCmd), Cat(cat::CatCmd), Cp(cp::CopyCmd), Ls(ls::LsCmd), @@ -35,14 +37,15 @@ pub enum OliSubcommand { } impl OliSubcommand { - pub async fn run(&self) -> anyhow::Result<()> { + pub fn run(self) -> anyhow::Result<()> { match self { - Self::Cat(cmd) => cmd.run().await, - Self::Cp(cmd) => cmd.run().await, - Self::Ls(cmd) => cmd.run().await, - Self::Rm(cmd) => cmd.run().await, - Self::Stat(cmd) => cmd.run().await, - Self::Mv(cmd) => cmd.run().await, + Self::Bench(cmd) => cmd.run(), + Self::Cat(cmd) => cmd.run(), + Self::Cp(cmd) => cmd.run(), + Self::Ls(cmd) => cmd.run(), + Self::Rm(cmd) => cmd.run(), + Self::Stat(cmd) => cmd.run(), + Self::Mv(cmd) => cmd.run(), } } } diff --git a/bin/oli/src/commands/mv.rs b/bin/oli/src/commands/mv.rs index 3f5fa093a010..1a72ecb90ddf 100644 --- a/bin/oli/src/commands/mv.rs +++ b/bin/oli/src/commands/mv.rs @@ -16,6 +16,7 @@ // under the License. use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; use anyhow::{Error, Result}; use futures::{AsyncWriteExt, TryStreamExt}; @@ -37,7 +38,11 @@ pub struct MoveCmd { } impl MoveCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (src_op, src_path) = cfg.parse_location(&self.source)?; diff --git a/bin/oli/src/commands/rm.rs b/bin/oli/src/commands/rm.rs index 5c38bf1a001e..f3949a864a5b 100644 --- a/bin/oli/src/commands/rm.rs +++ b/bin/oli/src/commands/rm.rs @@ -18,6 +18,7 @@ use anyhow::Result; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -34,7 +35,11 @@ pub struct RmCmd { } impl RmCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let (op, path) = cfg.parse_location(&self.target)?; diff --git a/bin/oli/src/commands/stat.rs b/bin/oli/src/commands/stat.rs index 04cc2431bfca..7842545a90b7 100644 --- a/bin/oli/src/commands/stat.rs +++ b/bin/oli/src/commands/stat.rs @@ -18,6 +18,7 @@ use anyhow::Result; use crate::config::Config; +use crate::make_tokio_runtime; use crate::params::config::ConfigParams; #[derive(Debug, clap::Parser)] @@ -35,7 +36,11 @@ pub struct StatCmd { } impl StatCmd { - pub async fn run(&self) -> Result<()> { + pub fn run(self) -> Result<()> { + make_tokio_runtime(1).block_on(self.do_run()) + } + + async fn do_run(self) -> Result<()> { let cfg = Config::load(&self.config_params.config)?; let target = &self.target; diff --git a/bin/oli/src/config/mod.rs b/bin/oli/src/config/mod.rs index 3ed44e0246cc..23e1ecf3a142 100644 --- a/bin/oli/src/config/mod.rs +++ b/bin/oli/src/config/mod.rs @@ -150,19 +150,21 @@ impl Config { Err(anyhow!("Host part in a location is not supported. Hint: are you typing `://` instead of `:/`?"))?; } - let profile_name = location.scheme(); + let op = self.operator(location.scheme())?; let path = location.path().to_string(); + Ok((op, path)) + } + + pub fn operator(&self, profile_name: &str) -> Result { let profile = self .profiles .get(profile_name) .ok_or_else(|| anyhow!("unknown profile: {}", profile_name))?; - let svc = profile .get("type") .ok_or_else(|| anyhow!("missing 'type' in profile"))?; let scheme = Scheme::from_str(svc)?; - let op = Operator::via_iter(scheme, profile.clone())?; - Ok((op, path)) + Ok(Operator::via_iter(scheme, profile.clone())?) } } diff --git a/bin/oli/src/lib.rs b/bin/oli/src/lib.rs index e829973afc58..ecc00aaf4fa1 100644 --- a/bin/oli/src/lib.rs +++ b/bin/oli/src/lib.rs @@ -18,3 +18,11 @@ pub mod commands; pub mod config; pub mod params; + +fn make_tokio_runtime(n_threads: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n_threads) + .enable_all() + .build() + .expect("failed to create tokio runtime") +} diff --git a/bin/oli/src/params/config.rs b/bin/oli/src/params/config.rs index 3ceb8f951bfc..f3b9b9f72d32 100644 --- a/bin/oli/src/params/config.rs +++ b/bin/oli/src/params/config.rs @@ -24,7 +24,7 @@ pub struct ConfigParams { #[arg( long, default_value = default_config_path(), - value_parser = clap::value_parser!(PathBuf) + value_parser = clap::value_parser!(PathBuf), )] pub config: PathBuf, }