From a663eafe9c1c1ffeef4077f6f7d1002654e7666e Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sat, 22 Jun 2024 14:11:34 +0800 Subject: [PATCH] feat: make AwaitTreeLayer covers oio::Read and oio::Write Signed-off-by: Chojan Shang --- core/src/layers/await_tree.rs | 99 +++++++++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 9 deletions(-) diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 49834d6dc30b..7bb20d42fb49 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -16,6 +16,8 @@ // under the License. use await_tree::InstrumentAwait; +use futures::{Future, FutureExt}; +use oio::{ListOperation, ReadOperation, WriteOperation}; use crate::raw::*; use crate::*; @@ -67,12 +69,12 @@ pub struct AwaitTreeAccessor { impl LayeredAccess for AwaitTreeAccessor { type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; + type Reader = AwaitTreeWrapper; + type BlockingReader = AwaitTreeWrapper; + type Writer = AwaitTreeWrapper; + type BlockingWriter = AwaitTreeWrapper; + type Lister = AwaitTreeWrapper; + type BlockingLister = AwaitTreeWrapper; fn inner(&self) -> &Self::Inner { &self.inner @@ -82,6 +84,7 @@ impl LayeredAccess for AwaitTreeAccessor { self.inner .read(path, args) .instrument_await(format!("opendal::{}", Operation::Read)) + .map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))) .await } @@ -89,6 +92,7 @@ impl LayeredAccess for AwaitTreeAccessor { self.inner .write(path, args) .instrument_await(format!("opendal::{}", Operation::Write)) + .map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))) .await } @@ -124,6 +128,7 @@ impl LayeredAccess for AwaitTreeAccessor { self.inner .list(path, args) .instrument_await(format!("opendal::{}", Operation::List)) + .map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))) .await } @@ -142,14 +147,90 @@ impl LayeredAccess for AwaitTreeAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.inner.blocking_read(path, args) + self.inner + .blocking_read(path, args) + .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) + self.inner + .blocking_write(path, args) + .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.inner.blocking_list(path, args) + self.inner + .blocking_list(path, args) + .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))) + } +} + +pub struct AwaitTreeWrapper { + inner: R, +} + +impl AwaitTreeWrapper { + fn new(inner: R) -> Self { + Self { inner } + } +} + +impl oio::Read for AwaitTreeWrapper { + async fn read(&mut self) -> Result { + self.inner + .read() + .instrument_await(format!("opendal::{}", ReadOperation::Read)) + .await + } +} + +impl oio::BlockingRead for AwaitTreeWrapper { + fn read(&mut self) -> Result { + self.inner.read() + } +} + +impl oio::Write for AwaitTreeWrapper { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + self.inner + .write(bs) + .instrument_await(format!("opendal::{}", WriteOperation::Write.into_static())) + } + + fn abort(&mut self) -> impl Future> + MaybeSend { + self.inner + .abort() + .instrument_await(format!("opendal::{}", WriteOperation::Abort.into_static())) + } + + fn close(&mut self) -> impl Future> + MaybeSend { + self.inner + .close() + .instrument_await(format!("opendal::{}", WriteOperation::Close.into_static())) + } +} + +impl oio::BlockingWrite for AwaitTreeWrapper { + fn write(&mut self, bs: Buffer) -> Result { + self.inner.write(bs) + } + + fn close(&mut self) -> Result<()> { + self.inner.close() + } +} + +impl oio::List for AwaitTreeWrapper { + async fn next(&mut self) -> Result> { + self.inner + .next() + .instrument_await(format!("opendal::{}", ListOperation::Next)) + .await + } +} + +impl oio::BlockingList for AwaitTreeWrapper { + fn next(&mut self) -> Result> { + self.inner.next() } }