Skip to content

Commit

Permalink
Cleanup client error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 10, 2023
1 parent c6387c1 commit dad5388
Showing 1 changed file with 93 additions and 86 deletions.
179 changes: 93 additions & 86 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,50 @@ use futures::FutureExt;
use reqwest::header::LOCATION;
use reqwest::{Response, StatusCode};
use snafu::Error as SnafuError;
use snafu::Snafu;
use std::time::{Duration, Instant};
use tracing::info;

/// Retry request error
#[derive(Debug)]
pub struct Error {
retries: usize,
message: String,
source: Option<reqwest::Error>,
status: Option<StatusCode>,
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"response error \"{}\", after {} retries",
self.message, self.retries
)?;
if let Some(source) = &self.source {
write!(f, ": {source}")?;
}
Ok(())
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e as _)
}
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Received redirect without LOCATION, this normally indicates an incorrectly configured region"))]
BareRedirect,

#[snafu(display("Client error with status {status}: {}", body.as_deref().unwrap_or("No Body")))]
Client {
status: StatusCode,
body: Option<String>,
},

#[snafu(display("Response error after {retries} retries: {source}"))]
Response {
retries: usize,
source: reqwest::Error,
},
}

impl Error {
/// Returns the status code associated with this error if any
pub fn status(&self) -> Option<StatusCode> {
self.status
match self {
Error::BareRedirect => None,
Error::Client { status, .. } => Some(*status),
Error::Response { source, .. } => source.status(),
}
}

/// Returns the error body if any
pub fn body(&self) -> Option<&str> {
match self {
Error::Client { body, .. } => body.as_deref(),
Error::BareRedirect => None,
Error::Response { .. } => None,
}
}

pub fn error(self, store: &'static str, path: String) -> crate::Error {
match self.status {
match self.status() {
Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
path,
source: Box::new(self),
Expand All @@ -86,16 +90,19 @@ impl Error {
impl From<Error> for std::io::Error {
fn from(err: Error) -> Self {
use std::io::ErrorKind;
match (&err.source, err.status()) {
(Some(source), _) if source.is_builder() || source.is_request() => {
Self::new(ErrorKind::InvalidInput, err)
}
(_, Some(StatusCode::NOT_FOUND)) => Self::new(ErrorKind::NotFound, err),
(_, Some(StatusCode::BAD_REQUEST)) => Self::new(ErrorKind::InvalidInput, err),
(Some(source), None) if source.is_timeout() => {
match &err {
Error::Client {
status: StatusCode::NOT_FOUND,
..
} => Self::new(ErrorKind::NotFound, err),
Error::Client {
status: StatusCode::BAD_REQUEST,
..
} => Self::new(ErrorKind::InvalidInput, err),
Error::Response { source, .. } if source.is_timeout() => {
Self::new(ErrorKind::TimedOut, err)
}
(Some(source), None) if source.is_connect() => {
Error::Response { source, .. } if source.is_connect() => {
Self::new(ErrorKind::NotConnected, err)
}
_ => Self::new(ErrorKind::Other, err),
Expand Down Expand Up @@ -169,27 +176,21 @@ impl RetryExt for reqwest::RequestBuilder {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
return Err(Error{
message: "not modified".to_string(),
retries,
status: Some(r.status()),
source: None,
return Err(Error::Client {
body: None,
status: StatusCode::NOT_MODIFIED,
})
}
Ok(r) => {
let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION);
let message = match is_bare_redirect {
true => "Received redirect without LOCATION, this normally indicates an incorrectly configured region".to_string(),
return match is_bare_redirect {
true => Err(Error::BareRedirect),
// Not actually sure if this is reachable, but here for completeness
false => format!("request unsuccessful: {}", r.status()),
};

return Err(Error{
message,
retries,
status: Some(r.status()),
source: None,
})
false => Err(Error::Client {
body: None,
status: r.status(),
})
}
}
Err(e) => {
let status = r.status();
Expand All @@ -198,28 +199,31 @@ impl RetryExt for reqwest::RequestBuilder {
|| now.elapsed() > retry_timeout
|| !status.is_server_error() {

// Get the response message if returned a client error
let message = match status.is_client_error() {
return Err(match status.is_client_error() {
true => match r.text().await {
Ok(message) if !message.is_empty() => message,
Ok(_) => "No Body".to_string(),
Err(e) => format!("error getting response body: {e}")
Ok(body) => {
Error::Client {
body: Some(body).filter(|b| !b.is_empty()),
status,
}
}
Err(e) => {
Error::Response {
retries,
source: e,
}
}
}
false => status.to_string(),
};

return Err(Error{
message,
retries,
status: Some(status),
source: Some(e),
})

false => Error::Response {
retries,
source: e,
}
});
}

let sleep = backoff.next();
retries += 1;
info!("Encountered server error, backing off for {} seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
info!("Encountered response error, backing off for {} seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
tokio::time::sleep(sleep).await;
}
},
Expand All @@ -238,16 +242,14 @@ impl RetryExt for reqwest::RequestBuilder {
|| now.elapsed() > retry_timeout
|| !do_retry {

return Err(Error{
return Err(Error::Response {
retries,
message: "request error".to_string(),
status: e.status(),
source: Some(e),
source: e,
})
}
let sleep = backoff.next();
retries += 1;
info!("Encountered request error ({}) backing off for {} seconds, retry {} of {}", e, sleep.as_secs_f32(), retries, max_retries);
info!("Encountered transport error ({}) backing off for {} seconds, retry {} of {}", e, sleep.as_secs_f32(), retries, max_retries);
tokio::time::sleep(sleep).await;
}
}
Expand All @@ -260,7 +262,7 @@ impl RetryExt for reqwest::RequestBuilder {
#[cfg(test)]
mod tests {
use crate::client::mock_server::MockServer;
use crate::client::retry::RetryExt;
use crate::client::retry::{Error, RetryExt};
use crate::RetryConfig;
use hyper::header::LOCATION;
use hyper::{Body, Response};
Expand Down Expand Up @@ -294,8 +296,11 @@ mod tests {

let e = do_request().await.unwrap_err();
assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(e.retries, 0);
assert_eq!(&e.message, "cupcakes");
assert_eq!(e.body(), Some("cupcakes"));
assert_eq!(
e.to_string(),
"Client error with status 400 Bad Request: cupcakes"
);

// Handles client errors with no payload
mock.push(
Expand All @@ -307,8 +312,11 @@ mod tests {

let e = do_request().await.unwrap_err();
assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(e.retries, 0);
assert_eq!(&e.message, "No Body");
assert_eq!(e.body(), None);
assert_eq!(
e.to_string(),
"Client error with status 400 Bad Request: No Body"
);

// Should retry server error request
mock.push(
Expand Down Expand Up @@ -381,7 +389,8 @@ mod tests {
);

let e = do_request().await.unwrap_err();
assert_eq!(e.message, "Received redirect without LOCATION, this normally indicates an incorrectly configured region");
assert!(matches!(e, Error::BareRedirect));
assert_eq!(e.to_string(), "Received redirect without LOCATION, this normally indicates an incorrectly configured region");

// Gives up after the retrying the specified number of times
for _ in 0..=retry.max_retries {
Expand All @@ -393,22 +402,20 @@ mod tests {
);
}

let e = do_request().await.unwrap_err();
assert_eq!(e.retries, retry.max_retries);
assert_eq!(e.message, "502 Bad Gateway");
let e = do_request().await.unwrap_err().to_string();
assert!(e.starts_with("Response error after 2 retries: HTTP status server error (502 Bad Gateway) for url"), "{e}");

// Panic results in an incomplete message error in the client
mock.push_fn(|_| panic!());
let r = do_request().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);

// Gives up after retrying mulitiple panics
// Gives up after retrying multiple panics
for _ in 0..=retry.max_retries {
mock.push_fn(|_| panic!());
}
let e = do_request().await.unwrap_err();
assert_eq!(e.retries, retry.max_retries);
assert_eq!(e.message, "request error");
let e = do_request().await.unwrap_err().to_string();
assert!(e.starts_with("Response error after 2 retries: error sending request for url"), "{e}");

// Shutdown
mock.shutdown().await
Expand Down

0 comments on commit dad5388

Please sign in to comment.