diff --git a/bindings/haskell/test/BasicTest.hs b/bindings/haskell/test/BasicTest.hs index 1311f9a05f2f..6c817d046e53 100644 --- a/bindings/haskell/test/BasicTest.hs +++ b/bindings/haskell/test/BasicTest.hs @@ -109,7 +109,7 @@ testLogger = do let logFn = LogAction $ logger state Right _ <- newOperator "memory" {ocLogAction = Just logFn} logStr <- readIORef state - T.take 77 logStr @?= "service=memory operation=metadata -> startedservice=memory operation=metadata" + T.take 78 logStr @?= "service=memory operation=metadata -> startedservice=memory operation=metadata" -- helper function diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 40294204f6bc..1dfdba85ef0d 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -23,9 +23,7 @@ use std::sync::Arc; use bytes::Buf; use futures::FutureExt; use futures::TryFutureExt; -use log::debug; use log::log; -use log::trace; use log::Level; use crate::raw::*; @@ -80,74 +78,73 @@ use crate::*; /// ```shell /// RUST_LOG="info,opendal::services=debug" ./app /// ``` -#[derive(Debug, Copy, Clone)] -pub struct LoggingLayer { - error_level: Option, - failure_level: Option, - backtrace_output: bool, +/// +/// # Logging Interceptor +/// +/// You can implement your own logging interceptor to customize the logging behavior. +/// +/// ```no_run +/// use opendal::layers::LoggingInterceptor; +/// use opendal::layers::LoggingLayer; +/// use opendal::raw::Operation; +/// use opendal::services; +/// use opendal::Error; +/// use opendal::Operator; +/// use opendal::Scheme; +/// +/// #[derive(Debug, Clone)] +/// struct MyLoggingInterceptor; +/// +/// impl LoggingInterceptor for MyLoggingInterceptor { +/// fn log( +/// &self, +/// scheme: Scheme, +/// operation: Operation, +/// context: &str, +/// message: &str, +/// err: Option<&Error>, +/// ) { +/// // log something +/// } +/// } +/// +/// let _ = Operator::new(services::Memory::default()) +/// .expect("must init") +/// .layer(LoggingLayer::new(MyLoggingInterceptor)) +/// .finish(); +/// ``` +#[derive(Debug)] +pub struct LoggingLayer { + notify: Arc, } -impl Default for LoggingLayer { - fn default() -> Self { +impl Clone for LoggingLayer { + fn clone(&self) -> Self { Self { - error_level: Some(Level::Warn), - failure_level: Some(Level::Error), - backtrace_output: false, + notify: self.notify.clone(), } } } -impl LoggingLayer { - /// Setting the log level while expected error happened. - /// - /// For example: accessor returns NotFound. - /// - /// `None` means disable the log for error. - pub fn with_error_level(mut self, level: Option<&str>) -> Result { - if let Some(level_str) = level { - let level = level_str.parse().map_err(|_| { - Error::new(ErrorKind::ConfigInvalid, "invalid log level") - .with_context("level", level_str) - })?; - self.error_level = Some(level); - } else { - self.error_level = None; +impl Default for LoggingLayer { + fn default() -> Self { + Self { + notify: Arc::new(DefaultLoggingInterceptor), } - Ok(self) } +} - /// Setting the log level while unexpected failure happened. - /// - /// For example: accessor returns Unexpected network error. - /// - /// `None` means disable the log for failure. - pub fn with_failure_level(mut self, level: Option<&str>) -> Result { - if let Some(level_str) = level { - let level = level_str.parse().map_err(|_| { - Error::new(ErrorKind::ConfigInvalid, "invalid log level") - .with_context("level", level_str) - })?; - self.failure_level = Some(level); - } else { - self.failure_level = None; +impl LoggingLayer { + /// Create the layer with specific logging interceptor. + pub fn new(notify: I) -> LoggingLayer { + LoggingLayer { + notify: Arc::new(notify), } - Ok(self) - } - - /// Setting whether to output backtrace while unexpected failure happened. - /// - /// # Notes - /// - /// - When the error is an expected error, backtrace will not be output. - /// - backtrace output is disable by default. - pub fn with_backtrace_output(mut self, enable: bool) -> Self { - self.backtrace_output = enable; - self } } -impl Layer for LoggingLayer { - type LayeredAccess = LoggingAccessor; +impl Layer for LoggingLayer { + type LayeredAccess = LoggingAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { let meta = inner.info(); @@ -156,400 +153,342 @@ impl Layer for LoggingLayer { ctx: LoggingContext { scheme: meta.scheme(), - error_level: self.error_level, - failure_level: self.failure_level, - backtrace_output: self.backtrace_output, + notify: self.notify.clone(), }, } } } -#[derive(Clone, Debug)] -pub struct LoggingContext { +#[derive(Debug)] +pub struct LoggingContext { scheme: Scheme, - error_level: Option, - failure_level: Option, - backtrace_output: bool, + notify: Arc, } -impl LoggingContext { - #[inline] - fn error_level(&self, err: &Error) -> Option { - if err.kind() == ErrorKind::Unexpected { - self.failure_level - } else { - self.error_level +impl Clone for LoggingContext { + fn clone(&self) -> Self { + Self { + scheme: self.scheme, + notify: self.notify.clone(), } } +} - /// Print error with backtrace if it's unexpected error. - #[inline] - fn error_print(&self, err: &Error) -> String { - // Don't print backtrace if it's not unexpected error. - if err.kind() != ErrorKind::Unexpected { - return format!("{err}"); +impl LoggingContext { + fn log(&self, operation: Operation, context: &str, message: &str, err: Option<&Error>) { + self.notify + .log(self.scheme, operation, context, message, err) + } + + fn log_with_path(&self, operation: Operation, path: &str, message: &str, err: Option<&Error>) { + self.notify.log( + self.scheme, + operation, + &format!("path={path}"), + message, + err, + ) + } +} + +/// LoggingInterceptor is used to intercept the log. +pub trait LoggingInterceptor: Debug + Send + Sync + 'static { + /// Everytime there is a log, this function will be called. + /// + /// # Inputs + /// + /// - scheme: The service generates the log. + /// - operation: The operation to log. + /// - context: Additional context of the log. + /// - message: The log message. + /// - err: The error to log. + /// + /// # Note + /// + /// Users should avoid calling resource-intensive operations such as I/O or network + /// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal + /// could perform unexpectedly slow. + fn log( + &self, + scheme: Scheme, + operation: Operation, + context: &str, + message: &str, + err: Option<&Error>, + ); +} + +/// The DefaultLoggingInterceptor will log the message by the standard logging macro. +#[derive(Debug)] +pub struct DefaultLoggingInterceptor; + +impl LoggingInterceptor for DefaultLoggingInterceptor { + fn log( + &self, + scheme: Scheme, + operation: Operation, + context: &str, + message: &str, + err: Option<&Error>, + ) { + let Some(err) = err else { + let lvl = self.operation_level(operation); + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} {} -> {}", + scheme, + operation, + context, + message, + ); + return; + }; + + let lvl = self.error_level(err); + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} {} -> {} {}", + scheme, + operation, + context, + message, + err, + ); + } +} + +impl DefaultLoggingInterceptor { + fn operation_level(&self, operation: Operation) -> Level { + match operation { + Operation::ReaderRead + | Operation::BlockingReaderRead + | Operation::WriterWrite + | Operation::BlockingWriterWrite => Level::Trace, + _ => Level::Debug, } + } - if self.backtrace_output { - format!("{err:?}") + #[inline] + fn error_level(&self, err: &Error) -> Level { + if err.kind() == ErrorKind::Unexpected { + Level::Error } else { - format!("{err}") + Level::Warn } } } #[derive(Clone, Debug)] -pub struct LoggingAccessor { +pub struct LoggingAccessor { inner: A, - ctx: LoggingContext, + ctx: LoggingContext, } static LOGGING_TARGET: &str = "opendal::services"; -impl LayeredAccess for LoggingAccessor { +impl LayeredAccess for LoggingAccessor { type Inner = A; - type Reader = LoggingReader; - type BlockingReader = LoggingReader; - type Writer = LoggingWriter; - type BlockingWriter = LoggingWriter; - type Lister = LoggingLister; - type BlockingLister = LoggingLister; + type Reader = LoggingReader; + type BlockingReader = LoggingReader; + type Writer = LoggingWriter; + type BlockingWriter = LoggingWriter; + type Lister = LoggingLister; + type BlockingLister = LoggingLister; fn inner(&self) -> &Self::Inner { &self.inner } fn metadata(&self) -> Arc { - debug!( - target: LOGGING_TARGET, - "service={} operation={} -> started", - self.ctx.scheme, - Operation::Info - ); + self.ctx.log(Operation::Info, "", "started", None); let result = self.inner.info(); - debug!( - target: LOGGING_TARGET, - "service={} operation={} -> finished: {:?}", - self.ctx.scheme, + self.ctx.log( Operation::Info, - result + "", + &format!("finished: {:?}", result), + None, ); result } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::CreateDir, - path - ); + self.ctx + .log_with_path(Operation::CreateDir, path, "started", None); self.inner .create_dir(path, args) .await .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - Operation::CreateDir, - path - ); + self.ctx + .log_with_path(Operation::CreateDir, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::CreateDir, - path, - self.ctx.error_print(&err) - ) - }; + self.ctx + .log_with_path(Operation::CreateDir, path, "", Some(&err)); err }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Read, - path, - ); + self.ctx + .log_with_path(Operation::Read, path, "started", None); self.inner .read(path, args) .await .map(|(rp, r)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> got reader", - self.ctx.scheme, - Operation::Read, - path, - ); + self.ctx + .log_with_path(Operation::Read, path, "got reader", None); ( rp, LoggingReader::new(self.ctx.clone(), Operation::Read, path, r), ) }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Read, - path, - self.ctx.error_print(&err) - ) - } + self.ctx + .log_with_path(Operation::Read, path, "", Some(&err)); err }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Write, - path - ); + self.ctx + .log_with_path(Operation::Write, path, "started", None); self.inner .write(path, args) .await .map(|(rp, w)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> start writing", - self.ctx.scheme, - Operation::Write, - path, - ); + self.ctx + .log_with_path(Operation::Write, path, "start writing", None); let w = LoggingWriter::new(self.ctx.clone(), Operation::Write, path, w); (rp, w) }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Write, - path, - self.ctx.error_print(&err) - ) - }; + self.ctx + .log_with_path(Operation::Write, path, "", Some(&err)); err }) } async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> started", - self.ctx.scheme, + self.ctx.log( Operation::Copy, - from, - to + &format!("from={from} to={to}"), + "started", + None, ); self.inner .copy(from, to, args) .await .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> finished", - self.ctx.scheme, + self.ctx.log( Operation::Copy, - from, - to + &format!("from={from} to={to}"), + "finished", + None, ); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} from={} to={} -> {}", - self.ctx.scheme, - Operation::Copy, - from, - to, - self.ctx.error_print(&err), - ) - }; + self.ctx.log( + Operation::Copy, + &format!("from={from} to={to}"), + "", + Some(&err), + ); err }) } async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> started", - self.ctx.scheme, + self.ctx.log( Operation::Rename, - from, - to + &format!("from={from} to={to}"), + "started", + None, ); self.inner .rename(from, to, args) .await .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> finished", - self.ctx.scheme, + self.ctx.log( Operation::Rename, - from, - to + &format!("from={from} to={to}"), + "finished", + None, ); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} from={} to={} -> {}", - self.ctx.scheme, - Operation::Rename, - from, - to, - self.ctx.error_print(&err) - ) - }; + self.ctx.log( + Operation::Rename, + &format!("from={from} to={to}"), + "", + Some(&err), + ); err }) } async fn stat(&self, path: &str, args: OpStat) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Stat, - path - ); + self.ctx + .log_with_path(Operation::Stat, path, "started", None); self.inner .stat(path, args) .await .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished: {v:?}", - self.ctx.scheme, - Operation::Stat, - path - ); + self.ctx + .log_with_path(Operation::Stat, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Stat, - path, - self.ctx.error_print(&err) - ); - }; + self.ctx + .log_with_path(Operation::Stat, path, "", Some(&err)); err }) } async fn delete(&self, path: &str, args: OpDelete) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Delete, - path - ); + self.ctx + .log_with_path(Operation::Delete, path, "started", None); self.inner .delete(path, args.clone()) .inspect(|v| match v { Ok(_) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - Operation::Delete, - path - ); + self.ctx + .log_with_path(Operation::Delete, path, "finished", None); } Err(err) => { - if let Some(lvl) = self.ctx.error_level(err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Delete, - path, - self.ctx.error_print(err) - ); - } + self.ctx + .log_with_path(Operation::Delete, path, "", Some(err)); } }) .await } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::List, - path - ); - self.inner .list(path, args) .map(|v| match v { Ok((rp, v)) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> start listing dir", - self.ctx.scheme, - Operation::List, - path - ); + self.ctx + .log_with_path(Operation::List, path, "start listing dir", None); let streamer = LoggingLister::new(self.ctx.clone(), path, Operation::List, v); Ok((rp, streamer)) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::List, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::List, path, "", Some(&err)); Err(err) } }) @@ -557,39 +496,20 @@ impl LayeredAccess for LoggingAccessor { } async fn presign(&self, path: &str, args: OpPresign) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Presign, - path - ); + self.ctx + .log_with_path(Operation::Presign, path, "started", None); self.inner .presign(path, args) .await .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished: {v:?}", - self.ctx.scheme, - Operation::Presign, - path - ); + self.ctx + .log_with_path(Operation::Presign, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Presign, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::Presign, path, "", Some(&err)); err }) } @@ -597,352 +517,216 @@ impl LayeredAccess for LoggingAccessor { async fn batch(&self, args: OpBatch) -> Result { let (op, count) = (args.operation()[0].1.operation(), args.operation().len()); - debug!( - target: LOGGING_TARGET, - "service={} operation={}-{op} count={count} -> started", - self.ctx.scheme, + self.ctx.log( Operation::Batch, + &format!("op={op} count={count} -> started"), + "started", + None, ); self.inner .batch(args) .map_ok(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={}-{op} count={count} -> finished: {}, succeed: {}, failed: {}", - self.ctx.scheme, + self.ctx.log( Operation::Batch, - v.results().len(), - v.results().iter().filter(|(_, v)|v.is_ok()).count(), - v.results().iter().filter(|(_, v)|v.is_err()).count(), + &format!("op={op} count={count}"), + &format!( + "finished: {}, succeed: {}, failed: {}", + v.results().len(), + v.results().iter().filter(|(_, v)| v.is_ok()).count(), + v.results().iter().filter(|(_, v)| v.is_err()).count(), + ), + None, ); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={}-{op} count={count} -> {}", - self.ctx.scheme, - Operation::Batch, - self.ctx.error_print(&err) - ); - } + self.ctx.log( + Operation::Batch, + &format!("op={op} count={count}"), + "", + Some(&err), + ); err }) .await } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingCreateDir, - path - ); + self.ctx + .log_with_path(Operation::BlockingCreateDir, path, "started", None); self.inner .blocking_create_dir(path, args) .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - Operation::BlockingCreateDir, - path - ); + self.ctx + .log_with_path(Operation::BlockingCreateDir, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingCreateDir, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingCreateDir, path, "", Some(&err)); err }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingRead, - path, - ); + self.ctx + .log_with_path(Operation::BlockingRead, path, "started", None); self.inner .blocking_read(path, args.clone()) .map(|(rp, r)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> got reader", - self.ctx.scheme, - Operation::BlockingRead, - path, - ); + self.ctx + .log_with_path(Operation::BlockingRead, path, "got reader", None); let r = LoggingReader::new(self.ctx.clone(), Operation::BlockingRead, path, r); (rp, r) }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingRead, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingRead, path, "", Some(&err)); err }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingWrite, - path, - ); + self.ctx + .log_with_path(Operation::BlockingWrite, path, "started", None); self.inner .blocking_write(path, args) .map(|(rp, w)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> start writing", - self.ctx.scheme, - Operation::BlockingWrite, - path, - ); + self.ctx + .log_with_path(Operation::BlockingWrite, path, "start writing", None); let w = LoggingWriter::new(self.ctx.clone(), Operation::BlockingWrite, path, w); (rp, w) }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingWrite, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingWrite, path, "", Some(&err)); err }) } fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> started", - self.ctx.scheme, + self.ctx.log( Operation::BlockingCopy, - from, - to, + &format!("from={from} to={to}"), + "started", + None, ); self.inner .blocking_copy(from, to, args) .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> finished", - self.ctx.scheme, + self.ctx.log( Operation::BlockingCopy, - from, - to, + &format!("from={from} to={to}"), + "finished", + None, ); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} from={} to={} -> {}", - self.ctx.scheme, - Operation::BlockingCopy, - from, - to, - self.ctx.error_print(&err) - ); - } + self.ctx.log( + Operation::BlockingCopy, + &format!("from={from} to={to}"), + "", + Some(&err), + ); err }) } fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> started", - self.ctx.scheme, + self.ctx.log( Operation::BlockingRename, - from, - to, + &format!("from={from} to={to}"), + "started", + None, ); self.inner .blocking_rename(from, to, args) .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} from={} to={} -> finished", - self.ctx.scheme, + self.ctx.log( Operation::BlockingRename, - from, - to, + &format!("from={from} to={to}"), + "finished", + None, ); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} from={} to={} -> {}", - self.ctx.scheme, - Operation::BlockingRename, - from, - to, - self.ctx.error_print(&err) - ); - } + self.ctx.log( + Operation::BlockingRename, + &format!("from={from} to={to}"), + "", + Some(&err), + ); err }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingStat, - path - ); + self.ctx + .log_with_path(Operation::BlockingStat, path, "started", None); self.inner .blocking_stat(path, args) .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished: {v:?}", - self.ctx.scheme, - Operation::BlockingStat, - path - ); + self.ctx + .log_with_path(Operation::BlockingStat, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingStat, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingStat, path, "", Some(&err)); err }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingDelete, - path - ); + self.ctx + .log_with_path(Operation::BlockingDelete, path, "started", None); self.inner .blocking_delete(path, args) .map(|v| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - Operation::BlockingDelete, - path - ); + self.ctx + .log_with_path(Operation::BlockingDelete, path, "finished", None); v }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingDelete, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingDelete, path, "", Some(&err)); err }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::BlockingList, - path - ); + self.ctx + .log_with_path(Operation::BlockingList, path, "started", None); self.inner .blocking_list(path, args) .map(|(rp, v)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> got dir", - self.ctx.scheme, - Operation::BlockingList, - path - ); + self.ctx + .log_with_path(Operation::BlockingList, path, "got dir", None); let li = LoggingLister::new(self.ctx.clone(), path, Operation::BlockingList, v); (rp, li) }) .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::BlockingList, - path, - self.ctx.error_print(&err) - ); - } + self.ctx + .log_with_path(Operation::BlockingList, path, "", Some(&err)); err }) } } /// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality. -pub struct LoggingReader { - ctx: LoggingContext, +pub struct LoggingReader { + ctx: LoggingContext, path: String, op: Operation, @@ -950,8 +734,8 @@ pub struct LoggingReader { inner: R, } -impl LoggingReader { - fn new(ctx: LoggingContext, op: Operation, path: &str, reader: R) -> Self { +impl LoggingReader { + fn new(ctx: LoggingContext, op: Operation, path: &str, reader: R) -> Self { Self { ctx, op, @@ -963,93 +747,93 @@ impl LoggingReader { } } -impl Drop for LoggingReader { +impl Drop for LoggingReader { fn drop(&mut self) { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} read={} -> data read finished", - self.ctx.scheme, + self.ctx.log( self.op, - self.path, - self.read.load(Ordering::Relaxed) + &format!( + "path={} read={}", + self.path, + self.read.load(Ordering::Relaxed) + ), + "data read finished", + None, ); } } -impl oio::Read for LoggingReader { +impl oio::Read for LoggingReader { async fn read(&mut self) -> Result { match self.inner.read().await { Ok(bs) => { self.read .fetch_add(bs.remaining() as u64, Ordering::Relaxed); - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} read={} -> read returns {}B", - self.ctx.scheme, + self.ctx.log( Operation::ReaderRead, - self.path, - self.read.load(Ordering::Relaxed), - bs.remaining() + &format!( + "path={} read={}", + self.path, + self.read.load(Ordering::Relaxed), + ), + &format!("read returns {}B", bs.remaining()), + None, ); Ok(bs) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} read={} -> read failed: {}", - self.ctx.scheme, - Operation::ReaderRead, + self.ctx.log( + Operation::ReaderRead, + &format!( + "path={} read={}", self.path, - self.read.load(Ordering::Relaxed), - self.ctx.error_print(&err), - ) - } + self.read.load(Ordering::Relaxed) + ), + "read failed:", + Some(&err), + ); Err(err) } } } } -impl oio::BlockingRead for LoggingReader { +impl oio::BlockingRead for LoggingReader { fn read(&mut self) -> Result { match self.inner.read() { Ok(bs) => { self.read .fetch_add(bs.remaining() as u64, Ordering::Relaxed); - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} read={} -> read returns {}B", - self.ctx.scheme, + self.ctx.log( Operation::BlockingReaderRead, - self.path, - self.read.load(Ordering::Relaxed), - bs.remaining() + &format!( + "path={} read={}", + self.path, + self.read.load(Ordering::Relaxed), + ), + &format!("read returns {}B", bs.remaining()), + None, ); Ok(bs) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} read={} -> read failed: {}", - self.ctx.scheme, - Operation::BlockingReaderRead, + self.ctx.log( + Operation::BlockingReaderRead, + &format!( + "path={} read={}", self.path, - self.read.load(Ordering::Relaxed), - self.ctx.error_print(&err), - ); - } + self.read.load(Ordering::Relaxed) + ), + "read failed:", + Some(&err), + ); Err(err) } } } } -pub struct LoggingWriter { - ctx: LoggingContext, +pub struct LoggingWriter { + ctx: LoggingContext, op: Operation, path: String, @@ -1057,8 +841,8 @@ pub struct LoggingWriter { inner: W, } -impl LoggingWriter { - fn new(ctx: LoggingContext, op: Operation, path: &str, writer: W) -> Self { +impl LoggingWriter { + fn new(ctx: LoggingContext, op: Operation, path: &str, writer: W) -> Self { Self { ctx, op, @@ -1070,35 +854,26 @@ impl LoggingWriter { } } -impl oio::Write for LoggingWriter { +impl oio::Write for LoggingWriter { async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); match self.inner.write(bs).await { Ok(_) => { - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> data write {}B", - self.ctx.scheme, + self.ctx.log( Operation::WriterWrite, - self.path, - self.written, - size, + &format!("path={} written={}B", self.path, self.written), + &format!("data write {}B", size), + None, ); Ok(()) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={}B -> data write failed: {}", - self.ctx.scheme, - Operation::WriterWrite, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } + self.ctx.log( + Operation::WriterWrite, + &format!("path={} written={}B", self.path, self.written), + "data write failed:", + Some(&err), + ); Err(err) } } @@ -1107,29 +882,21 @@ impl oio::Write for LoggingWriter { async fn abort(&mut self) -> Result<()> { match self.inner.abort().await { Ok(_) => { - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> abort writer", - self.ctx.scheme, + self.ctx.log( Operation::WriterAbort, - self.path, - self.written, + &format!("path={} written={}B", self.path, self.written), + "abort writer", + None, ); Ok(()) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={}B -> abort writer failed: {}", - self.ctx.scheme, - Operation::WriterAbort, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } + self.ctx.log( + Operation::WriterAbort, + &format!("path={} written={}B", self.path, self.written), + "abort writer failed:", + Some(&err), + ); Err(err) } } @@ -1138,63 +905,46 @@ impl oio::Write for LoggingWriter { async fn close(&mut self) -> Result<()> { match self.inner.close().await { Ok(_) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> data written finished", - self.ctx.scheme, + self.ctx.log( self.op, - self.path, - self.written + &format!("path={} written={}B", self.path, self.written), + "data written finished", + None, ); Ok(()) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={}B -> data close failed: {}", - self.ctx.scheme, - Operation::WriterClose, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } + self.ctx.log( + Operation::WriterClose, + &format!("path={} written={}B", self.path, self.written), + "data close failed:", + Some(&err), + ); Err(err) } } } } -impl oio::BlockingWrite for LoggingWriter { +impl oio::BlockingWrite for LoggingWriter { fn write(&mut self, bs: Buffer) -> Result<()> { match self.inner.write(bs.clone()) { Ok(_) => { - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> data write {}B", - self.ctx.scheme, + self.ctx.log( Operation::BlockingWriterWrite, - self.path, - self.written, - bs.len(), + &format!("path={} written={}B", self.path, self.written), + &format!("data write {}B", bs.len()), + None, ); Ok(()) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={}B -> data write failed: {}", - self.ctx.scheme, - Operation::BlockingWriterWrite, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } + self.ctx.log( + Operation::BlockingWriterWrite, + &format!("path={} written={}B", self.path, self.written), + "data write failed:", + Some(&err), + ); Err(err) } } @@ -1203,37 +953,29 @@ impl oio::BlockingWrite for LoggingWriter { fn close(&mut self) -> Result<()> { match self.inner.close() { Ok(_) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> data written finished", - self.ctx.scheme, + self.ctx.log( self.op, - self.path, - self.written + &format!("path={} written={}B", self.path, self.written), + "data written finished", + None, ); Ok(()) } Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={}B -> data close failed: {}", - self.ctx.scheme, - Operation::BlockingWriterClose, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } + self.ctx.log( + Operation::BlockingWriterClose, + &format!("path={} written={}B", self.path, self.written), + "data close failed:", + Some(&err), + ); Err(err) } } } } -pub struct LoggingLister

{ - ctx: LoggingContext, +pub struct LoggingLister { + ctx: LoggingContext, path: String, op: Operation, @@ -1241,8 +983,8 @@ pub struct LoggingLister

{ inner: P, } -impl

LoggingLister

{ - fn new(ctx: LoggingContext, path: &str, op: Operation, inner: P) -> Self { +impl LoggingLister { + fn new(ctx: LoggingContext, path: &str, op: Operation, inner: P) -> Self { Self { ctx, path: path.to_string(), @@ -1253,65 +995,38 @@ impl

LoggingLister

{ } } -impl

Drop for LoggingLister

{ +impl Drop for LoggingLister { fn drop(&mut self) { if self.finished { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> all entries read finished", - self.ctx.scheme, - self.op, - self.path - ); + self.ctx + .log_with_path(self.op, &self.path, "all entries read finished", None); } else { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> partial entries read finished", - self.ctx.scheme, - self.op, - self.path - ); + self.ctx + .log_with_path(self.op, &self.path, "partial entries read finished", None); } } } -impl oio::List for LoggingLister

{ +impl oio::List for LoggingLister { async fn next(&mut self) -> Result> { let res = self.inner.next().await; match &res { Ok(Some(de)) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> listed entry: {}", - self.ctx.scheme, + self.ctx.log_with_path( self.op, - self.path, - de.path(), + &self.path, + &format!("listed entry: {}", de.path()), + None, ); } Ok(None) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - self.op, - self.path - ); + self.ctx + .log_with_path(self.op, &self.path, "finished", None); self.finished = true; } Err(err) => { - if let Some(lvl) = self.ctx.error_level(err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - self.op, - self.path, - self.ctx.error_print(err) - ) - } + self.ctx.log_with_path(self.op, &self.path, "", Some(err)); } }; @@ -1319,43 +1034,26 @@ impl oio::List for LoggingLister

{ } } -impl oio::BlockingList for LoggingLister

{ +impl oio::BlockingList for LoggingLister { fn next(&mut self) -> Result> { let res = self.inner.next(); match &res { Ok(Some(des)) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> listed entry: {}", - self.ctx.scheme, + self.ctx.log_with_path( self.op, - self.path, - des.path(), + &self.path, + &format!("listed entry: {}", des.path()), + None, ); } Ok(None) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> finished", - self.ctx.scheme, - self.op, - self.path - ); + self.ctx + .log_with_path(self.op, &self.path, "finished", None); self.finished = true; } Err(err) => { - if let Some(lvl) = self.ctx.error_level(err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - self.op, - self.path, - self.ctx.error_print(err) - ) - } + self.ctx.log_with_path(self.op, &self.path, "", Some(err)); } }; diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index eff0bfb8b746..98372b220389 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -33,6 +33,7 @@ mod immutable_index; pub use immutable_index::ImmutableIndexLayer; mod logging; +pub use logging::LoggingInterceptor; pub use logging::LoggingLayer; mod timeout; diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs index c9eb875d2070..18545764281e 100644 --- a/core/src/raw/tests/utils.rs +++ b/core/src/raw/tests/utils.rs @@ -19,8 +19,11 @@ use std::collections::HashMap; use std::env; use std::str::FromStr; +use log::{log, Level}; use once_cell::sync::Lazy; +use crate::layers::LoggingInterceptor; +use crate::raw::Operation; use crate::*; /// TEST_RUNTIME is the runtime used for running tests. @@ -73,7 +76,7 @@ pub fn init_test_service() -> Result> { let op = { op.layer(layers::ChaosLayer::new(0.1)) }; let mut op = op - .layer(layers::LoggingLayer::default().with_backtrace_output(true)) + .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor)) .layer(layers::TimeoutLayer::new()) .layer(layers::RetryLayer::new().with_max_times(4)); @@ -88,3 +91,66 @@ pub fn init_test_service() -> Result> { Ok(Some(op)) } + +/// A logging interceptor that logs the backtrace. +#[derive(Debug, Clone)] +struct BacktraceLoggingInterceptor; + +impl LoggingInterceptor for BacktraceLoggingInterceptor { + fn log( + &self, + scheme: Scheme, + operation: raw::Operation, + context: &str, + message: &str, + err: Option<&Error>, + ) { + let Some(err) = err else { + let lvl = self.operation_level(operation); + log!( + target: "opendal::services", + lvl, + "service={} operation={} {} -> {}", + scheme, + operation, + context, + message, + ); + return; + }; + + let err_msg = if err.kind() != ErrorKind::Unexpected { + format!("{err}") + } else { + format!("{err:?}") + }; + let lvl = if err.kind() == ErrorKind::Unexpected { + Level::Error + } else { + Level::Warn + }; + + log!( + target: "opendal::services", + lvl, + "service={} operation={} {} -> {} {}", + scheme, + operation, + context, + message, + err_msg, + ); + } +} + +impl BacktraceLoggingInterceptor { + fn operation_level(&self, operation: Operation) -> Level { + match operation { + Operation::ReaderRead + | Operation::BlockingReaderRead + | Operation::WriterWrite + | Operation::BlockingWriterWrite => Level::Trace, + _ => Level::Debug, + } + } +}