diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 20e83bf7cf02..f144453a148c 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use futures::{Future, FutureExt}; + use crate::raw::*; use crate::*; @@ -58,12 +60,12 @@ pub struct AsyncBacktraceAccessor { impl LayeredAccess for AsyncBacktraceAccessor { type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; + type Reader = AsyncBacktraceWrapper; + type BlockingReader = AsyncBacktraceWrapper; + type Writer = AsyncBacktraceWrapper; + type BlockingWriter = AsyncBacktraceWrapper; + type Lister = AsyncBacktraceWrapper; + type BlockingLister = AsyncBacktraceWrapper; fn inner(&self) -> &Self::Inner { &self.inner @@ -71,12 +73,17 @@ impl LayeredAccess for AsyncBacktraceAccessor { #[async_backtrace::framed] async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.inner.read(path, args).await + self.inner + .read(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.inner.write(path, args).await + self.inner.write(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] @@ -101,7 +108,9 @@ impl LayeredAccess for AsyncBacktraceAccessor { #[async_backtrace::framed] async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - self.inner.list(path, args).await + self.inner.list(path, args) + .map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))) + .await } #[async_backtrace::framed] @@ -115,14 +124,77 @@ impl LayeredAccess for AsyncBacktraceAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.inner.blocking_read(path, args) + self.inner.blocking_read(path, args).map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) + self.inner.blocking_write(path, args).map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.inner.blocking_list(path, args) + self.inner.blocking_list(path, args).map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))) + } +} + +pub struct AsyncBacktraceWrapper { + inner: R, +} + +impl AsyncBacktraceWrapper { + fn new(inner: R) -> Self { + Self { inner } + } +} + +impl oio::Read for AsyncBacktraceWrapper { + #[async_backtrace::framed] + async fn read(&mut self) -> Result { + self.inner.read().await + } +} + +impl oio::BlockingRead for AsyncBacktraceWrapper { + fn read(&mut self) -> Result { + self.inner.read() + } +} + +impl oio::Write for AsyncBacktraceWrapper { + #[async_backtrace::framed] + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + self.inner.write(bs) + } + + #[async_backtrace::framed] + fn abort(&mut self) -> impl Future> + MaybeSend { + self.inner.abort() + } + + #[async_backtrace::framed] + fn close(&mut self) -> impl Future> + MaybeSend { + self.inner.close() + } +} + +impl oio::BlockingWrite for AsyncBacktraceWrapper { + fn write(&mut self, bs: Buffer) -> Result { + self.inner.write(bs) + } + + fn close(&mut self) -> Result<()> { + self.inner.close() + } +} + +impl oio::List for AsyncBacktraceWrapper { + #[async_backtrace::framed] + async fn next(&mut self) -> Result> { + self.inner.next().await + } +} + +impl oio::BlockingList for AsyncBacktraceWrapper { + fn next(&mut self) -> Result> { + self.inner.next() } }