Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Safe/Read-Only Requests on Timeout #5278

Merged
merged 2 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions object_store/src/client/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use futures::future::BoxFuture;
use futures::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;
pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, Response<Body>> + Send>;

/// A mock server
pub struct MockServer {
Expand All @@ -46,9 +49,10 @@ impl MockServer {
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
let next = r.lock().pop_front();
async move {
Ok::<_, Infallible>(match r.lock().pop_front() {
Some(r) => r(req),
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
None => Response::new(Body::from("Hello World")),
})
}
Expand Down Expand Up @@ -93,7 +97,16 @@ impl MockServer {
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
self.responses.lock().push_back(Box::new(f))
let f = Box::new(|req| async move { f(req) }.boxed());
self.responses.lock().push_back(f)
}

pub fn push_async_fn<F, Fut>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Fut + Send + 'static,
Fut: Future<Output = Response<Body>> + Send + 'static,
{
self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
}

/// Shutdown the mock server
Expand Down
50 changes: 43 additions & 7 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ impl From<Error> for std::io::Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Contains the configuration for how to respond to server errors
/// The configuration for how to respond to request errors
///
/// By default they will be retried up to some limit, using exponential
/// The following categories of error will be retried:
///
/// * 5xx server errors
/// * Connection errors
/// * Dropped connections
/// * Timeouts for [safe] / read-only requests
///
/// Requests will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
Expand Down Expand Up @@ -173,13 +181,16 @@ impl RetryExt for reqwest::RequestBuilder {
let max_retries = config.max_retries;
let retry_timeout = config.retry_timeout;

let (client, req) = self.build_split();
let req = req.expect("request must be valid");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this would have panicked try_clone below, so this doesn't change the behaviour. The clients shouldn't be constructing requests that fail validation and so panicking here is the "correct" thing to do.


async move {
let mut retries = 0;
let now = Instant::now();

loop {
let s = self.try_clone().expect("request body must be cloneable");
match s.send().await {
let s = req.try_clone().expect("request body must be cloneable");
match client.execute(s).await {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
Expand Down Expand Up @@ -242,7 +253,9 @@ impl RetryExt for reqwest::RequestBuilder {
Err(e) =>
{
let mut do_retry = false;
if let Some(source) = e.source() {
if req.method().is_safe() && e.is_timeout() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we could retry PUT requests as they are idempotent, that is even if repeated they yield the same server-side state, however, in the presence of request preconditions this behaviour I think might be surprising to users even if it is technically correct.

do_retry = true
} else if let Some(source) = e.source() {
if let Some(e) = source.downcast_ref::<hyper::Error>() {
if e.is_connect() || e.is_closed() || e.is_incomplete_message() {
do_retry = true;
Expand Down Expand Up @@ -294,7 +307,11 @@ mod tests {
retry_timeout: Duration::from_secs(1000),
};

let client = Client::new();
let client = Client::builder()
.timeout(Duration::from_millis(100))
.build()
.unwrap();

let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry);

// Simple request should work
Expand Down Expand Up @@ -419,7 +436,7 @@ mod tests {

let e = do_request().await.unwrap_err().to_string();
assert!(
e.contains("Error after 2 retries in") &&
e.contains("Error after 2 retries in") &&
e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status server error (502 Bad Gateway) for url"),
"{e}"
);
Expand All @@ -442,6 +459,25 @@ mod tests {
"{e}"
);

// Retries on client timeout
mock.push_async_fn(|_| async move {
tokio::time::sleep(Duration::from_secs(10)).await;
panic!()
});
do_request().await.unwrap();

// Does not retry PUT request
mock.push_async_fn(|_| async move {
tokio::time::sleep(Duration::from_secs(10)).await;
panic!()
});
let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
let e = res.await.unwrap_err().to_string();
assert!(
e.contains("Error after 0 retries in") && e.contains("operation timed out"),
"{e}"
);

// Shutdown
mock.shutdown().await
}
Expand Down
Loading