Skip to content

Commit

Permalink
feat: make AwaitTreeLayer covers oio::Read and oio::Write (#4787)
Browse files Browse the repository at this point in the history
Signed-off-by: Chojan Shang <[email protected]>
  • Loading branch information
PsiACE authored Jun 23, 2024
1 parent cf7580e commit e6c2e11
Showing 1 changed file with 90 additions and 9 deletions.
99 changes: 90 additions & 9 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

use await_tree::InstrumentAwait;
use futures::{Future, FutureExt};
use oio::{ListOperation, ReadOperation, WriteOperation};

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -67,12 +69,12 @@ pub struct AwaitTreeAccessor<A: Access> {

impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
type Reader = AwaitTreeWrapper<A::Reader>;
type BlockingReader = AwaitTreeWrapper<A::BlockingReader>;
type Writer = AwaitTreeWrapper<A::Writer>;
type BlockingWriter = AwaitTreeWrapper<A::BlockingWriter>;
type Lister = AwaitTreeWrapper<A::Lister>;
type BlockingLister = AwaitTreeWrapper<A::BlockingLister>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand All @@ -82,13 +84,15 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
self.inner
.read(path, args)
.instrument_await(format!("opendal::{}", Operation::Read))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner
.write(path, args)
.instrument_await(format!("opendal::{}", Operation::Write))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
}

Expand Down Expand Up @@ -124,6 +128,7 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
self.inner
.list(path, args)
.instrument_await(format!("opendal::{}", Operation::List))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
}

Expand All @@ -142,14 +147,90 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
self.inner
.blocking_read(path, args)
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.inner.blocking_write(path, args)
self.inner
.blocking_write(path, args)
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
self.inner
.blocking_list(path, args)
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}
}

pub struct AwaitTreeWrapper<R> {
inner: R,
}

impl<R> AwaitTreeWrapper<R> {
fn new(inner: R) -> Self {
Self { inner }
}
}

impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
self.inner
.read()
.instrument_await(format!("opendal::{}", ReadOperation::Read))
.await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
self.inner.read()
}
}

impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}", WriteOperation::Write.into_static()))
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.abort()
.instrument_await(format!("opendal::{}", WriteOperation::Abort.into_static()))
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.close()
.instrument_await(format!("opendal::{}", WriteOperation::Close.into_static()))
}
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
self.inner.close()
}
}

impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner
.next()
.instrument_await(format!("opendal::{}", ListOperation::Next))
.await
}
}

impl<R: oio::BlockingList> oio::BlockingList for AwaitTreeWrapper<R> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next()
}
}

0 comments on commit e6c2e11

Please sign in to comment.