diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 4d101456fd16..838bef8ac23b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -268,6 +268,7 @@ pub(crate) struct Request<'a> { builder: RequestBuilder, payload_sha256: Option>, use_session_creds: bool, + idempotent: bool, } impl<'a> Request<'a> { @@ -285,6 +286,11 @@ impl<'a> Request<'a> { Self { builder, ..self } } + pub fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + pub async fn send(self) -> Result { let credential = match self.use_session_creds { true => self.config.get_session_credential().await?, @@ -298,7 +304,7 @@ impl<'a> Request<'a> { let path = self.path.as_ref(); self.builder .with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref()) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(RetrySnafu { path }) } @@ -360,6 +366,7 @@ impl S3Client { payload_sha256, config: &self.config, use_session_creds: true, + idempotent: false, } } @@ -462,7 +469,7 @@ impl S3Client { .header(CONTENT_TYPE, "application/xml") .body(body) .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref()) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, false) .await .context(DeleteObjectsRequestSnafu {})? .bytes() @@ -510,6 +517,7 @@ impl S3Client { config: &self.config, payload_sha256: None, use_session_creds: false, + idempotent: false, } } @@ -522,7 +530,7 @@ impl S3Client { .request(Method::POST, url) .headers(self.config.encryption_headers.clone().into()) .with_aws_sigv4(credential.authorizer(), None) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CreateMultipartRequestSnafu)? .bytes() @@ -547,6 +555,7 @@ impl S3Client { let response = self .put_request(path, data, false) .query(&[("partNumber", &part), ("uploadId", upload_id)]) + .set_idempotent(true) .send() .await?; @@ -582,7 +591,7 @@ impl S3Client { .query(&[("uploadId", upload_id)]) .body(body) .with_aws_sigv4(credential.authorizer(), None) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CompleteMultipartRequestSnafu)?; diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index 478e56dd09c2..a7d1a9772aa1 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -517,7 +517,7 @@ async fn instance_creds( let token_result = client .request(Method::PUT, token_url) .header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL - .send_retry(retry_config) + .send_retry_with_idempotency(retry_config, true) .await; let token = match token_result { @@ -607,7 +607,7 @@ async fn web_identity( ("Version", "2011-06-15"), ("WebIdentityToken", &token), ]) - .send_retry(retry_config) + .send_retry_with_idempotency(retry_config, true) .await? .bytes() .await?; diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 2390187e7f72..2e60bbad2226 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -186,7 +186,11 @@ impl DynamoCommit { to: &Path, ) -> Result<()> { self.conditional_op(client, to, None, || async { - client.copy_request(from, to).send().await?; + client + .copy_request(from, to) + .set_idempotent(false) + .send() + .await?; Ok(()) }) .await diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 76d01d597042..16af4d3b4107 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -159,7 +159,7 @@ impl ObjectStore for AmazonS3 { } match (opts.mode, &self.client.config.conditional_put) { - (PutMode::Overwrite, _) => request.do_put().await, + (PutMode::Overwrite, _) => request.set_idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { @@ -268,7 +268,11 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to).send().await?; + self.client + .copy_request(from, to) + .set_idempotent(true) + .send() + .await?; Ok(()) } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 5be6658beff2..0e6af50fbf94 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -172,6 +172,7 @@ struct PutRequest<'a> { path: &'a Path, config: &'a AzureConfig, builder: RequestBuilder, + idempotent: bool, } impl<'a> PutRequest<'a> { @@ -185,12 +186,17 @@ impl<'a> PutRequest<'a> { Self { builder, ..self } } + fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + async fn send(self) -> Result { let credential = self.config.get_credential().await?; let response = self .builder .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(PutRequestSnafu { path: self.path.as_ref(), @@ -239,6 +245,7 @@ impl AzureClient { path, builder, config: &self.config, + idempotent: false, } } @@ -247,7 +254,7 @@ impl AzureClient { let builder = self.put_request(path, bytes); let builder = match &opts.mode { - PutMode::Overwrite => builder, + PutMode::Overwrite => builder.set_idempotent(true), PutMode::Create => builder.header(&IF_NONE_MATCH, "*"), PutMode::Update(v) => { let etag = v.e_tag.as_ref().context(MissingETagSnafu)?; @@ -271,6 +278,7 @@ impl AzureClient { self.put_request(path, data) .query(&[("comp", "block"), ("blockid", &block_id)]) + .set_idempotent(true) .send() .await?; @@ -287,6 +295,7 @@ impl AzureClient { let response = self .put_request(path, BlockList { blocks }.to_xml().into()) .query(&[("comp", "blocklist")]) + .set_idempotent(true) .send() .await?; @@ -340,7 +349,7 @@ impl AzureClient { builder .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .map_err(|err| err.error(STORE, from.to_string()))?; @@ -373,7 +382,7 @@ impl AzureClient { .body(body) .query(&[("restype", "service"), ("comp", "userdelegationkey")]) .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(DelegationKeyRequestSnafu)? .bytes() diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 6dc3141b08c8..36845bd1d646 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -615,7 +615,7 @@ impl TokenProvider for ClientSecretOAuthProvider { ("scope", AZURE_STORAGE_SCOPE), ("grant_type", "client_credentials"), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json() @@ -797,7 +797,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider { ("scope", AZURE_STORAGE_SCOPE), ("grant_type", "client_credentials"), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json() diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index e4bb5c9e731a..f3fa7153e930 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -166,128 +166,83 @@ impl Default for RetryConfig { } } -pub trait RetryExt { - /// Dispatch a request with the given retry configuration - /// - /// # Panic - /// - /// This will panic if the request body is a stream - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; -} - -impl RetryExt for reqwest::RequestBuilder { - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { - let mut backoff = Backoff::new(&config.backoff); - let max_retries = config.max_retries; - let retry_timeout = config.retry_timeout; - - let (client, req) = self.build_split(); - let req = req.expect("request must be valid"); - - async move { - let mut retries = 0; - let now = Instant::now(); - - loop { - let s = req.try_clone().expect("request body must be cloneable"); - match client.execute(s).await { - Ok(r) => match r.error_for_status_ref() { - Ok(_) if r.status().is_success() => return Ok(r), - Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { - return Err(Error::Client { +fn send_retry_impl( + builder: reqwest::RequestBuilder, + config: &RetryConfig, + is_idempotent: Option, +) -> BoxFuture<'static, Result> { + let mut backoff = Backoff::new(&config.backoff); + let max_retries = config.max_retries; + let retry_timeout = config.retry_timeout; + + let (client, req) = builder.build_split(); + let req = req.expect("request must be valid"); + let is_idempotent = is_idempotent.unwrap_or(req.method().is_safe()); + + async move { + let mut retries = 0; + let now = Instant::now(); + + loop { + let s = req.try_clone().expect("request body must be cloneable"); + match client.execute(s).await { + Ok(r) => match r.error_for_status_ref() { + Ok(_) if r.status().is_success() => return Ok(r), + Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { + return Err(Error::Client { + body: None, + status: StatusCode::NOT_MODIFIED, + }) + } + Ok(r) => { + let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION); + return match is_bare_redirect { + true => Err(Error::BareRedirect), + // Not actually sure if this is reachable, but here for completeness + false => Err(Error::Client { body: None, - status: StatusCode::NOT_MODIFIED, + status: r.status(), }) } - Ok(r) => { - let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION); - return match is_bare_redirect { - true => Err(Error::BareRedirect), - // Not actually sure if this is reachable, but here for completeness - false => Err(Error::Client { - body: None, - status: r.status(), - }) - } - } - Err(e) => { - let status = r.status(); - if retries == max_retries - || now.elapsed() > retry_timeout - || !status.is_server_error() { - - return Err(match status.is_client_error() { - true => match r.text().await { - Ok(body) => { - Error::Client { - body: Some(body).filter(|b| !b.is_empty()), - status, - } - } - Err(e) => { - Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - } + } + Err(e) => { + let status = r.status(); + if retries == max_retries + || now.elapsed() > retry_timeout + || !status.is_server_error() { + + return Err(match status.is_client_error() { + true => match r.text().await { + Ok(body) => { + Error::Client { + body: Some(body).filter(|b| !b.is_empty()), + status, } } - false => Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, + Err(e) => { + Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + } } - }); - } - - let sleep = backoff.next(); - retries += 1; - info!( - "Encountered server error, backing off for {} seconds, retry {} of {}: {}", - sleep.as_secs_f32(), - retries, - max_retries, - e, - ); - tokio::time::sleep(sleep).await; - } - }, - Err(e) => - { - let mut do_retry = false; - if e.is_connect() || ( req.method().is_safe() && e.is_timeout()) { - do_retry = true - } else { - let mut source = e.source(); - while let Some(e) = source { - if let Some(e) = e.downcast_ref::() { - do_retry = e.is_closed() || e.is_incomplete_message(); - break } - source = e.source(); - } + false => Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + } + }); } - if retries == max_retries - || now.elapsed() > retry_timeout - || !do_retry { - - return Err(Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - }) - } let sleep = backoff.next(); retries += 1; info!( - "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + "Encountered server error, backing off for {} seconds, retry {} of {}: {}", sleep.as_secs_f32(), retries, max_retries, @@ -295,10 +250,102 @@ impl RetryExt for reqwest::RequestBuilder { ); tokio::time::sleep(sleep).await; } + }, + Err(e) => + { + let mut do_retry = false; + if e.is_connect() + || e.is_body() + || (e.is_request() && !e.is_timeout()) + || (is_idempotent && e.is_timeout()) { + do_retry = true + } else { + let mut source = e.source(); + while let Some(e) = source { + if let Some(e) = e.downcast_ref::() { + do_retry = e.is_closed() + || e.is_incomplete_message() + || e.is_body_write_aborted() + || (is_idempotent && e.is_timeout()); + break + } + if let Some(e) = e.downcast_ref::() { + if e.kind() == std::io::ErrorKind::TimedOut { + do_retry = is_idempotent; + } else { + do_retry = matches!( + e.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof + ); + } + break; + } + source = e.source(); + } + } + + if retries == max_retries + || now.elapsed() > retry_timeout + || !do_retry { + + return Err(Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + }) + } + let sleep = backoff.next(); + retries += 1; + info!( + "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + sleep.as_secs_f32(), + retries, + max_retries, + e, + ); + tokio::time::sleep(sleep).await; } } } - .boxed() + } + .boxed() +} + +pub trait RetryExt { + /// Dispatch a request with the given retry configuration + /// + /// # Panic + /// + /// This will panic if the request body is a stream + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; + + /// Dispatch a request with the given retry configuration and idempotency + /// + /// # Panic + /// + /// This will panic if the request body is a stream + fn send_retry_with_idempotency( + self, + config: &RetryConfig, + is_idempotent: bool, + ) -> BoxFuture<'static, Result>; +} + +impl RetryExt for reqwest::RequestBuilder { + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { + send_retry_impl(self, config, None) + } + fn send_retry_with_idempotency( + self, + config: &RetryConfig, + is_idempotent: bool, + ) -> BoxFuture<'static, Result> { + send_retry_impl(self, config, Some(is_idempotent)) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 3762915b0b0c..17404f9d5acd 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -173,6 +173,7 @@ pub struct PutRequest<'a> { path: &'a Path, config: &'a GoogleCloudStorageConfig, builder: RequestBuilder, + idempotent: bool, } impl<'a> PutRequest<'a> { @@ -186,12 +187,17 @@ impl<'a> PutRequest<'a> { Self { builder, ..self } } + fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + async fn send(self) -> Result { let credential = self.config.credentials.get_credential().await?; let response = self .builder .bearer_auth(&credential.bearer) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(PutRequestSnafu { path: self.path.as_ref(), @@ -281,7 +287,7 @@ impl GoogleCloudStorageClient { .post(&url) .bearer_auth(&credential.bearer) .json(&body) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(SignBlobRequestSnafu)?; @@ -329,6 +335,7 @@ impl GoogleCloudStorageClient { path, builder, config: &self.config, + idempotent: false, } } @@ -336,7 +343,7 @@ impl GoogleCloudStorageClient { let builder = self.put_request(path, data); let builder = match &opts.mode { - PutMode::Overwrite => builder, + PutMode::Overwrite => builder.set_idempotent(true), PutMode::Create => builder.header(&VERSION_MATCH, "0"), PutMode::Update(v) => { let etag = v.version.as_ref().context(MissingVersionSnafu)?; @@ -366,7 +373,12 @@ impl GoogleCloudStorageClient { ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), ]; - let result = self.put_request(path, data).query(query).send().await?; + let result = self + .put_request(path, data) + .query(query) + .set_idempotent(true) + .send() + .await?; Ok(PartId { content_id: result.e_tag.unwrap(), @@ -391,7 +403,7 @@ impl GoogleCloudStorageClient { .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, "0") .query(&[("uploads", "")]) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(PutRequestSnafu { path: path.as_ref(), @@ -432,7 +444,11 @@ impl GoogleCloudStorageClient { ) -> Result { if completed_parts.is_empty() { // GCS doesn't allow empty multipart uploads - let result = self.put_request(path, Default::default()).send().await?; + let result = self + .put_request(path, Default::default()) + .set_idempotent(true) + .send() + .await?; self.multipart_cleanup(path, multipart_id).await?; return Ok(result); } @@ -456,7 +472,7 @@ impl GoogleCloudStorageClient { .bearer_auth(&credential.bearer) .query(&[("uploadId", upload_id)]) .body(data) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CompleteMultipartRequestSnafu)?; @@ -515,7 +531,7 @@ impl GoogleCloudStorageClient { // Needed if reqwest is compiled with native-tls instead of rustls-tls // See https://github.com/apache/arrow-rs/pull/3921 .header(header::CONTENT_LENGTH, 0) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, !if_not_exists) .await .map_err(|err| match err.status() { Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index fcd516a1bf1a..158716ce4c18 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -623,7 +623,7 @@ impl TokenProvider for AuthorizedUserCredentials { ("client_secret", &self.client_secret), ("refresh_token", &self.refresh_token), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json::() @@ -709,12 +709,12 @@ impl GCSAuthorizer { /// Canonicalizes query parameters into the GCP canonical form /// form like: ///```plaintext - ///HTTP_VERB - ///PATH_TO_RESOURCE - ///CANONICAL_QUERY_STRING - ///CANONICAL_HEADERS + ///HTTP_VERB + ///PATH_TO_RESOURCE + ///CANONICAL_QUERY_STRING + ///CANONICAL_HEADERS /// - ///SIGNED_HEADERS + ///SIGNED_HEADERS ///PAYLOAD ///``` /// @@ -780,9 +780,9 @@ impl GCSAuthorizer { ///construct the string to sign ///form like: ///```plaintext - ///SIGNING_ALGORITHM - ///ACTIVE_DATETIME - ///CREDENTIAL_SCOPE + ///SIGNING_ALGORITHM + ///ACTIVE_DATETIME + ///CREDENTIAL_SCOPE ///HASHED_CANONICAL_REQUEST ///``` ///`ACTIVE_DATETIME` format:`YYYYMMDD'T'HHMMSS'Z'` diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 8700775fb243..fdc8751c1ca1 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -189,7 +189,7 @@ impl Client { .client .request(method, url) .header("Depth", depth) - .send_retry(&self.retry_config) + .send_retry_with_idempotency(&self.retry_config, true) .await; let response = match result {