Skip to content

Commit

Permalink
feat(layers/retry): Add warning log while retry happened (#721)
Browse files Browse the repository at this point in the history
* feat(layers/retry): Add warning log while retry happened

Signed-off-by: Xuanwo <[email protected]>

* feat(layers/retry): Add logging while retry happens

Signed-off-by: Xuanwo <[email protected]>

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Sep 26, 2022
1 parent 9bb7e8a commit 25ee88e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Result;
Expand Down
79 changes: 79 additions & 0 deletions src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::thread::sleep;
use async_trait::async_trait;
use backon::Backoff;
use backon::Retryable;
use log::warn;

use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
Expand All @@ -34,6 +35,7 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::ops::OpWriteMultipart;
use crate::ops::Operation;
use crate::ops::PresignedRequest;
use crate::Accessor;
use crate::AccessorMetadata;
Expand Down Expand Up @@ -115,34 +117,68 @@ where
{ || self.inner.create(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Create, dur.as_secs_f64(), err)
})
.await
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
{ || self.inner.read(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Read, dur.as_secs_f64(), err)
})
.await
}

async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
// Write can't retry, until can reset this reader.
self.inner.write(path, args.clone(), r).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
{ || self.inner.stat(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Stat, dur.as_secs_f64(), err)
})
.await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<()> {
{ || self.inner.delete(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Delete, dur.as_secs_f64(), err)
})
.await
}
async fn list(&self, path: &str, args: OpList) -> Result<DirStreamer> {
{ || self.inner.list(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::List, dur.as_secs_f64(), err)
})
.await
}

Expand All @@ -154,8 +190,15 @@ where
{ || self.inner.create_multipart(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::CreateMultipart, dur.as_secs_f64(), err)
})
.await
}

async fn write_multipart(
&self,
path: &str,
Expand All @@ -165,16 +208,30 @@ where
// Write can't retry, until can reset this reader.
self.inner.write_multipart(path, args.clone(), r).await
}

async fn complete_multipart(&self, path: &str, args: OpCompleteMultipart) -> Result<()> {
{ || self.inner.complete_multipart(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::CompleteMultipart, dur.as_secs_f64(), err)
})
.await
}

async fn abort_multipart(&self, path: &str, args: OpAbortMultipart) -> Result<()> {
{ || self.inner.abort_multipart(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::AbortMultipart, dur.as_secs_f64(), err)
})
.await
}

Expand All @@ -194,6 +251,10 @@ where

if kind == ErrorKind::Interrupted {
sleep(dur);
warn!(
target: "opendal::service",
"operation={} path={} -> retry after {}s: error={:?}",
Operation::BlockingCreate, path, dur.as_secs_f64(), e);
continue;
} else {
return Err(e.unwrap());
Expand Down Expand Up @@ -221,6 +282,10 @@ where

if kind == ErrorKind::Interrupted {
sleep(dur);
warn!(
target: "opendal::service",
"operation={} path={} -> retry after {}s: error={:?}",
Operation::BlockingRead, path, dur.as_secs_f64(), e);
continue;
} else {
return Err(e.unwrap());
Expand Down Expand Up @@ -252,6 +317,10 @@ where

if kind == ErrorKind::Interrupted {
sleep(dur);
warn!(
target: "opendal::service",
"operation={} path={} -> retry after {}s: error={:?}",
Operation::BlockingStat, path, dur.as_secs_f64(), e);
continue;
} else {
return Err(e.unwrap());
Expand Down Expand Up @@ -279,6 +348,10 @@ where

if kind == ErrorKind::Interrupted {
sleep(dur);
warn!(
target: "opendal::service",
"operation={} path={} -> retry after {}s: error={:?}",
Operation::BlockingDelete, path, dur.as_secs_f64(), e);
continue;
} else {
return Err(e.unwrap());
Expand Down Expand Up @@ -306,6 +379,10 @@ where

if kind == ErrorKind::Interrupted {
sleep(dur);
warn!(
target: "opendal::service",
"operation={} path={} -> retry after {}s: error={:?}",
Operation::BlockingList, path, dur.as_secs_f64(), e);
continue;
} else {
return Err(e.unwrap());
Expand Down Expand Up @@ -359,6 +436,8 @@ mod tests {

#[tokio::test]
async fn test_retry_retryable_error() -> anyhow::Result<()> {
let _ = env_logger::try_init();

let srv = Arc::new(MockService::default());

let backoff = ConstantBackoff::default()
Expand Down

0 comments on commit 25ee88e

Please sign in to comment.