From 25ee88e6068cc1610503350baa55a373287d1150 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 26 Sep 2022 19:04:54 +0800 Subject: [PATCH] feat(layers/retry): Add warning log while retry happened (#721) * feat(layers/retry): Add warning log while retry happened Signed-off-by: Xuanwo * feat(layers/retry): Add logging while retry happens Signed-off-by: Xuanwo Signed-off-by: Xuanwo --- src/layers/metrics.rs | 3 +- src/layers/retry.rs | 79 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/src/layers/metrics.rs b/src/layers/metrics.rs index a0238f90fe87..2aea1b6aca7a 100644 --- a/src/layers/metrics.rs +++ b/src/layers/metrics.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; +use std::fmt::Formatter; use std::io::ErrorKind; use std::io::Read; use std::io::Result; diff --git a/src/layers/retry.rs b/src/layers/retry.rs index 800a0b853591..77336f315bb1 100644 --- a/src/layers/retry.rs +++ b/src/layers/retry.rs @@ -21,6 +21,7 @@ use std::thread::sleep; use async_trait::async_trait; use backon::Backoff; use backon::Retryable; +use log::warn; use crate::multipart::ObjectPart; use crate::ops::OpAbortMultipart; @@ -34,6 +35,7 @@ use crate::ops::OpRead; use crate::ops::OpStat; use crate::ops::OpWrite; use crate::ops::OpWriteMultipart; +use crate::ops::Operation; use crate::ops::PresignedRequest; use crate::Accessor; use crate::AccessorMetadata; @@ -115,34 +117,68 @@ where { || self.inner.create(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Create, dur.as_secs_f64(), err) + }) .await } + async fn read(&self, path: &str, args: OpRead) -> Result { { || self.inner.read(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Read, dur.as_secs_f64(), err) + }) .await } + async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result { // Write can't retry, until can reset this reader. self.inner.write(path, args.clone(), r).await } + async fn stat(&self, path: &str, args: OpStat) -> Result { { || self.inner.stat(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Stat, dur.as_secs_f64(), err) + }) .await } + async fn delete(&self, path: &str, args: OpDelete) -> Result<()> { { || self.inner.delete(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Delete, dur.as_secs_f64(), err) + }) .await } async fn list(&self, path: &str, args: OpList) -> Result { { || self.inner.list(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::List, dur.as_secs_f64(), err) + }) .await } @@ -154,8 +190,15 @@ where { || self.inner.create_multipart(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::CreateMultipart, dur.as_secs_f64(), err) + }) .await } + async fn write_multipart( &self, path: &str, @@ -165,16 +208,30 @@ where // Write can't retry, until can reset this reader. self.inner.write_multipart(path, args.clone(), r).await } + async fn complete_multipart(&self, path: &str, args: OpCompleteMultipart) -> Result<()> { { || self.inner.complete_multipart(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::CompleteMultipart, dur.as_secs_f64(), err) + }) .await } + async fn abort_multipart(&self, path: &str, args: OpAbortMultipart) -> Result<()> { { || self.inner.abort_multipart(path, args.clone()) } .retry(self.backoff.clone()) .when(|e| e.kind() == ErrorKind::Interrupted) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::AbortMultipart, dur.as_secs_f64(), err) + }) .await } @@ -194,6 +251,10 @@ where if kind == ErrorKind::Interrupted { sleep(dur); + warn!( + target: "opendal::service", + "operation={} path={} -> retry after {}s: error={:?}", + Operation::BlockingCreate, path, dur.as_secs_f64(), e); continue; } else { return Err(e.unwrap()); @@ -221,6 +282,10 @@ where if kind == ErrorKind::Interrupted { sleep(dur); + warn!( + target: "opendal::service", + "operation={} path={} -> retry after {}s: error={:?}", + Operation::BlockingRead, path, dur.as_secs_f64(), e); continue; } else { return Err(e.unwrap()); @@ -252,6 +317,10 @@ where if kind == ErrorKind::Interrupted { sleep(dur); + warn!( + target: "opendal::service", + "operation={} path={} -> retry after {}s: error={:?}", + Operation::BlockingStat, path, dur.as_secs_f64(), e); continue; } else { return Err(e.unwrap()); @@ -279,6 +348,10 @@ where if kind == ErrorKind::Interrupted { sleep(dur); + warn!( + target: "opendal::service", + "operation={} path={} -> retry after {}s: error={:?}", + Operation::BlockingDelete, path, dur.as_secs_f64(), e); continue; } else { return Err(e.unwrap()); @@ -306,6 +379,10 @@ where if kind == ErrorKind::Interrupted { sleep(dur); + warn!( + target: "opendal::service", + "operation={} path={} -> retry after {}s: error={:?}", + Operation::BlockingList, path, dur.as_secs_f64(), e); continue; } else { return Err(e.unwrap()); @@ -359,6 +436,8 @@ mod tests { #[tokio::test] async fn test_retry_retryable_error() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + let srv = Arc::new(MockService::default()); let backoff = ConstantBackoff::default()