Skip to content

Commit

Permalink
feat: Add benchmark vs aws sdk s3 (#3620)
Browse files Browse the repository at this point in the history
* feat: Add benchmark vs aws sdk s3

Signed-off-by: Xuanwo <[email protected]>

* Fix test

Signed-off-by: Xuanwo <[email protected]>

* Fix unit test

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Nov 18, 2023
1 parent 4a6d1f6 commit a53a7e5
Show file tree
Hide file tree
Showing 10 changed files with 934 additions and 76 deletions.
644 changes: 621 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ members = [
"core",
"core/fuzz",
"core/edge/*",
"core/benches/vs_fs",
"core/benches/vs_*",

"bindings/c",
"bindings/nodejs",
Expand Down
40 changes: 40 additions & 0 deletions core/benches/vs_s3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "opendal-benchmark-vs-s3"
description = "OpenDAL Benchmark vs s3"
version = "0.0.0"
publish = false

authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
opendal = { path = "../..", features = ["tests"] }
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", features = ["v4"] }
criterion = { version = "0.4", features = ["async", "async_tokio"] }
rand = "0.8"
aws-sdk-s3 = "0.38"
aws-config = { version = "0.101.0", features = ["behavior-version-latest"] }
dotenvy = "0.15"
aws-credential-types = { version = "0.101.0", features = ["hardcoded-credentials"] }
46 changes: 46 additions & 0 deletions core/benches/vs_s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# OpenDAL Benchmark VS AWS SDK S3

This benchmark compares the performance of OpenDAL with the performance of the `aws_sdk_s3`.

## Goal

We expect OpenDAL to match `aws_sdk_s3` in speed: the throughput of OpenDAL should be within a `10%` range of `aws_sdk_s3`.

## Usage

For test: `cargo run`

```shell
Testing read/aws_s3_sdk_collect
Success
Testing read/aws_s3_sdk_into_async_read
Success
Testing read/aws_s3_sdk_into_async_read_with_size_known
Success
Testing read/opendal_s3
Success
Testing read/opendal_s3_with_range
Success
```

For bench: `cargo run --release -- --bench`

```shell
read/aws_s3_sdk_collect time: [47.264 ms 47.378 ms 47.504 ms]
thrpt: [336.82 MiB/s 337.71 MiB/s 338.53 MiB/s]

read/aws_s3_sdk_into_async_read
time: [9.8422 ms 11.607 ms 13.703 ms]
thrpt: [1.1403 GiB/s 1.3462 GiB/s 1.5876 GiB/s]

read/aws_s3_sdk_into_async_read_with_size_known
time: [7.9572 ms 8.1055 ms 8.2552 ms]
thrpt: [1.8927 GiB/s 1.9277 GiB/s 1.9636 GiB/s]

read/opendal_s3 time: [8.9068 ms 9.2614 ms 9.6912 ms]
thrpt: [1.6123 GiB/s 1.6871 GiB/s 1.7543 GiB/s]

read/opendal_s3_with_range
time: [8.5459 ms 8.7592 ms 8.9739 ms]
thrpt: [1.7412 GiB/s 1.7838 GiB/s 1.8284 GiB/s]
```
140 changes: 140 additions & 0 deletions core/benches/vs_s3/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use criterion::Criterion;
use opendal::raw::tests::TEST_RUNTIME;
use opendal::services;
use opendal::Operator;
use rand::prelude::*;
use std::env;
use tokio::io::AsyncReadExt;

fn main() {
let _ = dotenvy::dotenv();

let endpoint = env::var("OPENDAL_S3_ENDPOINT").unwrap();
let access_key = env::var("OPENDAL_S3_ACCESS_KEY_ID").unwrap();
let secret_key = env::var("OPENDAL_S3_SECRET_ACCESS_KEY").unwrap();
let bucket = env::var("OPENDAL_S3_BUCKET").unwrap();
let region = env::var("OPENDAL_S3_REGION").unwrap();

// Init OpenDAL Operator.
let mut cfg = services::S3::default();
cfg.endpoint(&endpoint);
cfg.access_key_id(&access_key);
cfg.secret_access_key(&secret_key);
cfg.bucket(&bucket);
cfg.region(&region);
let op = Operator::new(cfg).unwrap().finish();

// Init AWS S3 SDK.
let mut config_loader = aws_config::defaults(BehaviorVersion::latest());
config_loader = config_loader.endpoint_url(&endpoint);
config_loader = config_loader.region(Region::new(region.to_string()));
config_loader =
config_loader.credentials_provider(Credentials::from_keys(&access_key, &secret_key, None));
let config = TEST_RUNTIME.block_on(config_loader.load());
let s3_client = aws_sdk_s3::Client::new(&config);

let mut c = Criterion::default().configure_from_args();
bench_read(&mut c, op, s3_client, &bucket);

c.final_summary();
}

fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bucket: &str) {
let mut group = c.benchmark_group("read");
group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024));

let path = TEST_RUNTIME.block_on(prepare(op.clone()));

group.bench_function("aws_s3_sdk_collect", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.send()
.await
.unwrap()
.body
.collect()
.await
.unwrap()
.to_vec();
});
});
group.bench_function("aws_s3_sdk_into_async_read", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let mut r = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.send()
.await
.unwrap()
.body
.into_async_read();
let mut bs = Vec::new();
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read_with_size_known", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let mut r = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.send()
.await
.unwrap()
.body
.into_async_read();
let mut bs = Vec::with_capacity(16 * 1024 * 1024);
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("opendal_s3", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = op.read(&path).await.unwrap();
});
});
group.bench_function("opendal_s3_with_range", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = op
.read_with(&path)
.range(0..16 * 1024 * 1024)
.await
.unwrap();
});
});

group.finish()
}

async fn prepare(op: Operator) -> String {
let mut rng = thread_rng();
let mut content = vec![0; 16 * 1024 * 1024];
rng.fill_bytes(&mut content);

let name = uuid::Uuid::new_v4().to_string();
op.write(&name, content).await.unwrap();

name
}
1 change: 1 addition & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ mod tests {

let op = new_test_operator(Capability {
read: true,
stat: true,
..Default::default()
});
let res = op.read("path").await;
Expand Down
20 changes: 13 additions & 7 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,19 @@ impl oio::Read for IncomingAsyncBody {
return Poll::Ready(Ok(0));
}

// We must get a valid bytes from underlying stream
let mut bs = loop {
match ready!(self.poll_next(cx)) {
Some(Ok(bs)) if bs.is_empty() => continue,
Some(Ok(bs)) => break bs,
Some(Err(err)) => return Poll::Ready(Err(err)),
None => return Poll::Ready(Ok(0)),
// Avoid extra poll of next if we already have chunks.
let mut bs = if let Some(chunk) = self.chunk.take() {
chunk
} else {
loop {
match ready!(self.poll_next(cx)) {
// It's possible for underlying stream to return empty bytes, we should continue
// to fetch next one.
Some(Ok(bs)) if bs.is_empty() => continue,
Some(Ok(bs)) => break bs,
Some(Err(err)) => return Poll::Ready(Err(err)),
None => return Poll::Ready(Ok(0)),
}
}
};

Expand Down
79 changes: 35 additions & 44 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp;
use std::fmt::Display;
use std::fmt::Formatter;
use std::io;
Expand Down Expand Up @@ -204,14 +203,7 @@ pub trait ReadExt: Read {

/// Build a future for `read_to_end`.
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self> {
let start = buf.len();
ReadToEndFuture {
reader: self,
buf,
start,
length: start,
next: MIN_READ_TO_END_GROW_SIZE,
}
ReadToEndFuture { reader: self, buf }
}
}

Expand Down Expand Up @@ -271,19 +263,11 @@ where
}
}

/// The MIN read to end grow size.
const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024;
/// The MAX read to end grow size.
const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024;

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
start: usize,
length: usize,
next: usize,
}

impl<R> Future for ReadToEndFuture<'_, R>
Expand All @@ -294,43 +278,50 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
let start_len = this.buf.len();
let start_cap = this.buf.capacity();

loop {
if this.buf.capacity() == *this.length {
this.buf.reserve(*this.next);
// # Safety
//
// We make sure that the length of buf is maintained correctly.
#[allow(clippy::uninit_vec)]
unsafe {
this.buf.set_len(this.buf.capacity());
}
if this.buf.len() == this.buf.capacity() {
this.buf.reserve(32); // buf is full, need more space
}

let buf = &mut this.buf[*this.length..];
match ready!(this.reader.poll_read(cx, buf)) {
let spare = this.buf.spare_capacity_mut();
let mut read_buf: ReadBuf = ReadBuf::uninit(spare);

// SAFETY: These bytes were initialized but not filled in the previous loop
unsafe {
read_buf.assume_init(read_buf.capacity());
}

match ready!(this.reader.poll_read(cx, read_buf.initialize_unfilled())) {
Ok(0) => {
unsafe {
this.buf.set_len(*this.length);
}
return Poll::Ready(Ok(*this.length - *this.start));
return Poll::Ready(Ok(this.buf.len() - start_len));
}
Ok(n) => {
*this.next = if n >= *this.next {
cmp::min((*this.next).saturating_mul(2), MAX_READ_TO_END_GROW_SIZE)
} else if n >= *this.next / 2 {
*this.next
} else {
cmp::max((*this.next).saturating_div(2), MIN_READ_TO_END_GROW_SIZE)
};
// We can't allow bogus values from read. If it is too large, the returned vec could have its length
// set past its capacity, or if it overflows the vec could be shortened which could create an invalid
// string if this is called via read_to_string.
assert!(n <= buf.len());
*this.length += n;
// SAFETY: Read API makes sure that returning `n` is correct.
unsafe {
this.buf.set_len(this.buf.len() + n);
}
}
Err(e) => return Poll::Ready(Err(e)),
}

// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
if this.buf.len() == this.buf.capacity() && this.buf.capacity() == start_cap {
let mut probe = [0u8; 32];

match ready!(this.reader.poll_read(cx, &mut probe)) {
Ok(0) => return Poll::Ready(Ok(this.buf.len() - start_len)),
Ok(n) => {
this.buf.extend_from_slice(&probe[..n]);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
}
Expand Down
Loading

0 comments on commit a53a7e5

Please sign in to comment.