From ece83c94d6cf173c69e68a7ed81e69b1f174579c Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 23 Jun 2024 23:51:56 +0800 Subject: [PATCH] feat: make AsyncBacktraceLayer covers oio::Read and oio::Write (#4789) * feat: make AsyncBacktraceLayer covers oio::Read and oio::Write Signed-off-by: Chojan Shang * feat: make cargo fmt happy Signed-off-by: Chojan Shang * refactor: use async fn directly Signed-off-by: Chojan Shang --------- Signed-off-by: Chojan Shang --- core/src/layers/async_backtrace.rs | 104 +++++++++++++++++++++++++---- 1 file changed, 92 insertions(+), 12 deletions(-) diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 20e83bf7cf02..6d77591ca003 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use futures::FutureExt; + use crate::raw::*; use crate::*; @@ -58,12 +60,12 @@ pub struct AsyncBacktraceAccessor { impl LayeredAccess for AsyncBacktraceAccessor { type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; + type Reader = AsyncBacktraceWrapper; + type BlockingReader = AsyncBacktraceWrapper; + type Writer = AsyncBacktraceWrapper; + type BlockingWriter = AsyncBacktraceWrapper; + type Lister = AsyncBacktraceWrapper; + type BlockingLister = AsyncBacktraceWrapper; fn inner(&self) -> &Self::Inner { &self.inner @@ -71,12 +73,18 @@ impl LayeredAccess for AsyncBacktraceAccessor { #[async_backtrace::framed] async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.inner.read(path, args).await + self.inner + .read(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.inner.write(path, args).await + self.inner + .write(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] @@ -101,7 +109,10 @@ impl LayeredAccess for AsyncBacktraceAccessor { #[async_backtrace::framed] async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - self.inner.list(path, args).await + self.inner + .list(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] @@ -115,14 +126,83 @@ impl LayeredAccess for AsyncBacktraceAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.inner.blocking_read(path, args) + self.inner + .blocking_read(path, args) + .map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) + self.inner + .blocking_write(path, args) + .map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.inner.blocking_list(path, args) + self.inner + .blocking_list(path, args) + .map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) + } +} + +pub struct AsyncBacktraceWrapper { + inner: R, +} + +impl AsyncBacktraceWrapper { + fn new(inner: R) -> Self { + Self { inner } + } +} + +impl oio::Read for AsyncBacktraceWrapper { + #[async_backtrace::framed] + async fn read(&mut self) -> Result { + self.inner.read().await + } +} + +impl oio::BlockingRead for AsyncBacktraceWrapper { + fn read(&mut self) -> Result { + self.inner.read() + } +} + +impl oio::Write for AsyncBacktraceWrapper { + #[async_backtrace::framed] + async fn write(&mut self, bs: Buffer) -> Result { + self.inner.write(bs).await + } + + #[async_backtrace::framed] + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } + + #[async_backtrace::framed] + async fn close(&mut self) -> Result<()> { + self.inner.close().await + } +} + +impl oio::BlockingWrite for AsyncBacktraceWrapper { + fn write(&mut self, bs: Buffer) -> Result { + self.inner.write(bs) + } + + fn close(&mut self) -> Result<()> { + self.inner.close() + } +} + +impl oio::List for AsyncBacktraceWrapper { + #[async_backtrace::framed] + async fn next(&mut self) -> Result> { + self.inner.next().await + } +} + +impl oio::BlockingList for AsyncBacktraceWrapper { + fn next(&mut self) -> Result> { + self.inner.next() } }