Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Implement into_stream for Reader #4473

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/docs/internals/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
//! pub trait Accessor {
//! fn create_dir<'async>(
//! &'async self,
//! ) -> Pin<Box<dyn core::future::Future<Output = Result()> + Send + 'async>>
//! ) -> Pin<Box<dyn core::future::Future<Output = Result()> + MaybeSend + 'async>>
//! where Self: Sync + 'async;
//! }
//! ```
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ mod tests {
}

async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
Ok((RpRead::new(), Arc::new(bytes::Bytes::new())))
}

async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::*;
/// use opendal::services;
/// use opendal::Operator;
///
/// fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
/// fn main() -> Result<(), Box<dyn Error + MaybeSend + Sync + 'static>> {
/// let reporter =
/// minitrace_jaeger::JaegerReporter::new("127.0.0.1:6831".parse().unwrap(), "opendal")
/// .unwrap();
Expand Down Expand Up @@ -309,19 +309,19 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + Send {
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Abort.into_static());
self.inner.abort()
}

fn close(&mut self) -> impl Future<Output = Result<()>> + Send {
fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Close.into_static());
self.inner.close()
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
self.inner.write(bs)
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + Send {
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.abort()
}

fn close(&mut self) -> impl Future<Output = Result<()>> + Send {
fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.close()
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::*;
/// use tracing_subscriber::prelude::*;
/// use tracing_subscriber::EnvFilter;
///
/// fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
/// fn main() -> Result<(), Box<dyn Error + MaybeSend + Sync + 'static>> {
/// let tracer = opentelemetry_jaeger::new_pipeline()
/// .with_service_name("opendal_example")
/// .install_simple()?;
Expand Down Expand Up @@ -289,23 +289,23 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
self.inner.write(bs)
}

#[tracing::instrument(
parent = &self.span,
level = "trace",
skip_all)]
fn abort(&mut self) -> impl Future<Output = Result<()>> + Send {
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.abort()
}

#[tracing::instrument(
parent = &self.span,
level = "trace",
skip_all)]
fn close(&mut self) -> impl Future<Output = Result<()>> + Send {
fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.close()
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/layers/type_eraser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use async_trait::async_trait;

Expand Down Expand Up @@ -71,7 +72,7 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
self.inner
.read(path, args)
.await
.map(|(rp, r)| (rp, Box::new(r) as oio::Reader))
.map(|(rp, r)| (rp, Arc::new(r) as oio::Reader))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
17 changes: 17 additions & 0 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ pub type BoxedStaticFuture<T> = futures::future::BoxFuture<'static, T>;
#[cfg(target_arch = "wasm32")]
pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;

/// MaybeSend is a marker to determine whether a type is `Send` or not.
/// We use this trait to wrap the `Send` requirement for wasm32 target.
///
/// # Safety
///
/// MaybeSend equivalent to `Send` on non-wasm32 target. And it's empty
/// on wasm32 target.
#[cfg(not(target_arch = "wasm32"))]
pub unsafe trait MaybeSend: Send {}
#[cfg(target_arch = "wasm32")]
pub unsafe trait MaybeSend {}

#[cfg(not(target_arch = "wasm32"))]
unsafe impl<T: Send> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
unsafe impl<T> MaybeSend for T {}

/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use
/// [`FuturesOrdered`] or not.
///
Expand Down
11 changes: 0 additions & 11 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl HttpClient {
}

/// Build a new http client in async context.
#[cfg(not(target_arch = "wasm32"))]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
Ok(Self {
client: builder.build().map_err(|err| {
Expand All @@ -66,16 +65,6 @@ impl HttpClient {
})
}

/// Build a new http client in async context.
#[cfg(target_arch = "wasm32")]
pub fn build(mut builder: reqwest::ClientBuilder) -> Result<Self> {
Ok(Self {
client: builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?,
})
}

/// Get the async client from http client.
pub fn client(&self) -> reqwest::Client {
self.client.clone()
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod futures_util;
pub use futures_util::BoxedFuture;
pub use futures_util::BoxedStaticFuture;
pub use futures_util::ConcurrentFutures;
pub use futures_util::MaybeSend;

mod enum_utils;
pub use enum_utils::*;
Expand Down
7 changes: 2 additions & 5 deletions core/src/raw/oio/list/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::future::Future;
use std::ops::DerefMut;

use crate::raw::oio::Entry;
use crate::raw::BoxedFuture;
use crate::raw::*;
use crate::*;

/// PageOperation is the name for APIs of lister.
Expand Down Expand Up @@ -67,10 +67,7 @@ pub trait List: Unpin + Send + Sync {
///
/// `Ok(None)` means all pages have been returned. Any following call
/// to `next` will always get the same result.
#[cfg(not(target_arch = "wasm32"))]
fn next(&mut self) -> impl Future<Output = Result<Option<Entry>>> + Send;
#[cfg(target_arch = "wasm32")]
fn next(&mut self) -> impl Future<Output = Result<Option<Entry>>>;
fn next(&mut self) -> impl Future<Output = Result<Option<Entry>>> + MaybeSend;
}

impl List for () {
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::*;
pub trait PageList: Send + Sync + Unpin + 'static {
/// next_page is used to fetch next page of entries from underlying storage.
#[cfg(not(target_arch = "wasm32"))]
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>> + Send;
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>>;
}
Expand Down
39 changes: 37 additions & 2 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::Deref;
use std::sync::Arc;

use bytes::Bytes;
use futures::Future;
Expand Down Expand Up @@ -60,7 +61,7 @@ impl From<ReadOperation> for &'static str {
}

/// Reader is a type erased [`Read`].
pub type Reader = Box<dyn ReadDyn>;
pub type Reader = Arc<dyn ReadDyn>;

/// Read is the internal trait used by OpenDAL to read data from storage.
///
Expand All @@ -84,7 +85,11 @@ pub trait Read: Unpin + Send + Sync {
/// Storage services should try to read as much as possible, only return bytes less than the
/// limit while reaching the end of the file.
#[cfg(not(target_arch = "wasm32"))]
fn read_at(&self, offset: u64, limit: usize) -> impl Future<Output = Result<Buffer>> + Send;
fn read_at(
&self,
offset: u64,
limit: usize,
) -> impl Future<Output = Result<Buffer>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn read_at(&self, offset: u64, limit: usize) -> impl Future<Output = Result<Buffer>>;
}
Expand Down Expand Up @@ -112,14 +117,38 @@ impl Read for Bytes {
}
}

/// ReadDyn is the dyn version of [`Read`] make it possible to use as
/// `Box<dyn ReadDyn>`.
pub trait ReadDyn: Unpin + Send + Sync {
/// The dyn version of [`Read::read_at`].
///
/// This function returns a boxed future to make it object safe.
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>>;

/// The static version of [`Read::read_at`].
///
/// This function returns a `'static` future by moving `self` into the
/// future. Caller can call `Box::pin` to build a static boxed future.
fn read_at_static(
self,
offset: u64,
limit: usize,
) -> impl Future<Output = Result<Buffer>> + MaybeSend + 'static
where
Self: Sized + 'static;
}

impl<T: Read + ?Sized> ReadDyn for T {
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>> {
Box::pin(self.read_at(offset, limit))
}

async fn read_at_static(self, offset: u64, limit: usize) -> Result<Buffer>
where
Self: Sized + 'static,
{
self.read_at(offset, limit).await
}
}

/// # NOTE
Expand All @@ -132,6 +161,12 @@ impl<T: ReadDyn + ?Sized> Read for Box<T> {
}
}

impl<T: ReadDyn + ?Sized> Read for Arc<T> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.deref().read_at_dyn(offset, limit).await
}
}

/// BlockingReader is a boxed dyn `BlockingRead`.
pub type BlockingReader = Box<dyn BlockingRead>;

Expand Down
1 change: 1 addition & 0 deletions core/src/raw/oio/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ mod api;
pub use api::BlockingRead;
pub use api::BlockingReader;
pub use api::Read;
pub use api::ReadDyn;
pub use api::ReadOperation;
pub use api::Reader;
6 changes: 3 additions & 3 deletions core/src/raw/oio/write/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ pub trait Write: Unpin + Send + Sync {
/// It's possible that `n < bs.len()`, caller should pass the remaining bytes
/// repeatedly until all bytes has been written.
#[cfg(not(target_arch = "wasm32"))]
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send;
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>>;

/// Close the writer and make sure all data has been flushed.
#[cfg(not(target_arch = "wasm32"))]
fn close(&mut self) -> impl Future<Output = Result<()>> + Send;
fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn close(&mut self) -> impl Future<Output = Result<()>>;

/// Abort the pending writer.
#[cfg(not(target_arch = "wasm32"))]
fn abort(&mut self) -> impl Future<Output = Result<()>> + Send;
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn abort(&mut self) -> impl Future<Output = Result<()>>;
}
Expand Down
10 changes: 2 additions & 8 deletions core/src/raw/oio/write/append_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,15 @@ pub trait AppendWrite: Send + Sync + Unpin + 'static {
/// Get the current offset of the append object.
///
/// Returns `0` if the object is not exist.
#[cfg(not(target_arch = "wasm32"))]
fn offset(&self) -> impl Future<Output = Result<u64>> + Send;
#[cfg(target_arch = "wasm32")]
fn offset(&self) -> impl Future<Output = Result<u64>>;
fn offset(&self) -> impl Future<Output = Result<u64>> + MaybeSend;

/// Append the data to the end of this object.
#[cfg(not(target_arch = "wasm32"))]
fn append(
&self,
offset: u64,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + Send;
#[cfg(target_arch = "wasm32")]
fn append(&self, offset: u64, size: u64, body: Buffer) -> impl Future<Output = Result<()>>;
) -> impl Future<Output = Result<()>> + MaybeSend;
}

/// AppendWriter will implements [`Write`] based on append object.
Expand Down
25 changes: 4 additions & 21 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static {
/// BlockWriter will call this API when:
///
/// - All the data has been written to the buffer and we can perform the upload at once.
#[cfg(not(target_arch = "wasm32"))]
fn write_once(&self, size: u64, body: Buffer) -> impl Future<Output = Result<()>> + Send;
#[cfg(target_arch = "wasm32")]
fn write_once(&self, size: u64, body: Buffer) -> impl Future<Output = Result<()>>;
fn write_once(&self, size: u64, body: Buffer) -> impl Future<Output = Result<()>> + MaybeSend;

/// write_block will write a block of the data and returns the result
/// [`Block`].
Expand All @@ -77,33 +74,19 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static {
/// order.
///
/// - block_id is the id of the block.
#[cfg(not(target_arch = "wasm32"))]
fn write_block(
&self,
block_id: Uuid,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + Send;
#[cfg(target_arch = "wasm32")]
fn write_block(
&self,
block_id: Uuid,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>>;
) -> impl Future<Output = Result<()>> + MaybeSend;

/// complete_block will complete the block upload to build the final
/// file.
#[cfg(not(target_arch = "wasm32"))]
fn complete_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + Send;
#[cfg(target_arch = "wasm32")]
fn complete_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>>;
fn complete_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;

/// abort_block will cancel the block upload and purge all data.
#[cfg(not(target_arch = "wasm32"))]
fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + Send;
#[cfg(target_arch = "wasm32")]
fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>>;
fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
}

/// WriteBlockResult is the result returned by [`WriteBlockFuture`].
Expand Down
Loading
Loading