From 627646d7170ddb974c331a9e3e534e4cbaa78cbb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 25 Mar 2024 20:54:36 +0800 Subject: [PATCH] RFC-4382: Range Based Read (#4382) * RFC: Add Range based Read Signed-off-by: Xuanwo * Assign number Signed-off-by: Xuanwo * Add Completion-based IO Signed-off-by: Xuanwo * FIx Signed-off-by: Xuanwo * reorg Signed-off-by: Xuanwo * polish api design Signed-off-by: Xuanwo * Polish API Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- core/src/docs/rfcs/4382_range_based_read.md | 213 ++++++++++++++++++++ core/src/docs/rfcs/mod.rs | 4 + 2 files changed, 217 insertions(+) create mode 100644 core/src/docs/rfcs/4382_range_based_read.md diff --git a/core/src/docs/rfcs/4382_range_based_read.md b/core/src/docs/rfcs/4382_range_based_read.md new file mode 100644 index 000000000000..1342eb7896f0 --- /dev/null +++ b/core/src/docs/rfcs/4382_range_based_read.md @@ -0,0 +1,213 @@ +- Proposal Name: `range_based_read` +- Start Date: 2024-03-20 +- RFC PR: [apache/opendal#4382](https://github.com/apache/opendal/pull/4382) +- Tracking Issue: [apache/opendal#4383](https://github.com/apache/opendal/issues/4383) + +# Summary + +Convert `oio::Read` into a stateless, range-based reading pattern. + +# Motivation + +The current `oio::Read` API is stateful: + +```rust +pub trait Read: Unpin + Send + Sync { + fn read(&mut self, limit: usize) -> impl Future> + Send; + fn seek(&mut self, pos: io::SeekFrom) -> impl Future> + Send; +} +``` + +Users use `read` to retrieve data from storage and can use `seek` to navigate to specific positions. OpenDAL manages the underlying state. This design is good for users from `std::io::Read`, `futures::AsyncRead` and `tokio::io::AsyncRead`. + +OpenDAL also provides `range` option at the `Operator` level for users to read a specific range of data. The most common usage will be like: + +```rust +let r: Reader = op.reader_with(path).range(1024..2048).await?; +``` + +However, after observing our users, we found that: + +- `AsyncSeek` in `Reader` is prone to misuse. +- `Reader` does not support concurrent reading. +- `Reader` can't adopt Completion-based IO + +## Misuse of `AsyncSeek` + +When designing `Reader`, I expected users to check the `read_can_seek` capability to determine if the underlying storage services natively support `seek`. However, many users are unaware of this and directly use `seek`, leading to suboptimal performance. + +For example, `s3` storage does not support `seek` natively. When users call `seek`, opendal will drop current reader and sending a new request. This behavior is hidden from users and can lead to unexpected performance issues like [What's going on in my parquet stream](https://github.com/apache/opendal/issues/3725). + +## Lack of concurrent reading + +`oio::Read` complicates supporting concurrent reading. Users must implement a feature similar to merge IO, as discussed in [support merge io read api by settings](https://github.com/apache/opendal/issues/3675). + +There is no way for opendal to support this feature. + +## Can't adopt Completion-based IO + +Completion-based IO requires take the buffer's owner ship. But API that take `&mut [u8]` can't do that. + +# Guide-level explanation + +So I propose to convert `Reader` into a stateless, range-based reading pattern. + +We will remove the following `impl` from `Reader`: + +- `futures::AsyncRead` +- `futures::AsyncSeek` +- `futures::Stream` +- `tokio::AsyncRead` +- `tokio::AsyncSeek` + +We will add the following new APIs to `Reader`: + +```rust +impl Reader { + /// Read data from the storage at the specified offset. + pub async fn read(&self, buf: &mut impl BufMut, offset: u64, limit: usize) -> Result; + + /// Read data from the storage at the specified range. + pub async fn read_range( + &self, + buf: &mut impl BufMut, + range: impl RangeBounds, + ) -> Result; + + /// Read all data from the storage into given buf. + pub async fn read_to_end(&self, buf: &mut impl BufMut) -> Result; + + /// Copy data from the storage into given writer. + pub async fn copy(&mut self, write_into: &mut impl futures::AsyncWrite) -> Result; + + /// Sink date from the storage into given sink. + pub async fn sink(&mut self, sink_from: &mut S) -> Result + where + S: futures::Sink, + T: Into, +} +``` + +Apart from `Reader`'s own API, we will also provide convert to existing IO APIs like: + +```rust +impl Reader { + /// Convert Reader into `futures::AsyncRead` + pub fn into_futures_io_async_read(self, range: Range) -> FuturesIoAsyncReader; + + /// Convert Reader into `futures::Stream` + pub fn into_futures_bytes_stream(self, range: Range) -> FuturesBytesStream; +} +``` + +After this change, users will be able to use `Reader` to read data from storage in a stateless, range-based pattern. Users can also convert `Reader` into `futures::AsyncRead`, `futures::AsyncSeek` and `futures::Stream` as needed. + +# Reference-level explanation + +The new raw API will be: + +```rust +pub trait Read: Unpin + Send + Sync { + fn read_at( + &self, + offset: u64, + limit: usize, + ) -> impl Future> + Send; +} +``` + +The API is similar to [`ReadAt`](https://doc.rust-lang.org/std/fs/struct.File.html#method.read_at), but with following changes: + +```diff +- async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result ++ async fn read_at(&self, offset: u64, limit: usize) -> Result +``` + +- opendal chooses to use `oio::Buffer` instead of `&mut [u8]` to avoid lifetime issues. +- opendal chooses to return `oio::Buffer` to let services itself manage the buffer. + +For example, http based storage services like `s3` is a stream that generating data on the fly. + +# Drawbacks + +## Breaking changes to `Reader` + +This change will break the existing `Reader` API. Users will need to update their code to use the new `Reader` API. + +Users wishing to migrate to the new range-based API will need to update their code. Those who simply want to use `futures::AsyncRead` can instead utilize `Reader::into_futures_read`. + +# Rationale and alternatives + +None. + +# Prior art + +## `object_store`'s API design + +Current API design inspired from `object_store`'s `ObjectStore` a lot: + +```rust +#[async_trait] +pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { + /// Return the bytes that are stored at the specified location. + async fn get(&self, location: &Path) -> Result { + self.get_opts(location, GetOptions::default()).await + } + + /// Perform a get request with options + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; + + /// Return the bytes that are stored at the specified location + /// in the given byte range. + /// + /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted + async fn get_range(&self, location: &Path, range: Range) -> Result { + let options = GetOptions { + range: Some(range.into()), + ..Default::default() + }; + self.get_opts(location, options).await?.bytes().await + } + + /// Return the bytes that are stored at the specified location + /// in the given byte ranges + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + coalesce_ranges( + ranges, + |range| self.get_range(location, range), + OBJECT_STORE_COALESCE_DEFAULT, + ) + .await + } +} +``` + +We can add support that similar to `get_ranges` in the future. + +OpenDAL opts to return a `Reader` rather than directly implementing `read` to allow for optimization with storage services like `fs` to reduce the extra `open` syscall. + +# Unresolved questions + +## Buffer + +After switching to range-based reading, we can no longer keep a buffer within the reader. As of writing this proposal, users should use `into_async_buf_read` instead. + +# Future possibilities + +## Read Ranges + +We can implement `read_ranges` support in the future. This will allow users to read multiple ranges of data in less requests. + +## Native `read_at` for fs and hdfs + +We can reduce unnecessary `open` and `seek` syscalls by using the `read_at` API across different platforms. + +## Auto Range Read + +We can implement [Auto ranged read support](https://github.com/apache/opendal/issues/1105) like AWS S3 Crt Client. For examples, split the range into multiple ranges and read them concurrently. + +Services can define the preferred io size as default, and users can override it. For example, s3 can use `8 MiB` as preferred io size, while fs can use `4 KiB` instead. + +## Completion-based IO + +`oio::Read` is designed with Completion-based IO in mind. We can add IOCP/io_uring support in the future. diff --git a/core/src/docs/rfcs/mod.rs b/core/src/docs/rfcs/mod.rs index 1d665ef27394..7383cae36314 100644 --- a/core/src/docs/rfcs/mod.rs +++ b/core/src/docs/rfcs/mod.rs @@ -228,3 +228,7 @@ pub mod rfc_3898_concurrent_writer {} /// Deleter API #[doc = include_str!("3911_deleter_api.md")] pub mod rfc_3911_deleter_api {} + +/// Range Based Read API +#[doc = include_str!("4382_range_based_read.md")] +pub mod rfc_4382_range_based_read {}