diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index aa9f6bf3320c..a22f9464a425 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -267,6 +267,7 @@ pub(crate) struct Request<'a> { builder: RequestBuilder, payload_sha256: Option>, use_session_creds: bool, + idempotent: bool, } impl<'a> Request<'a> { @@ -284,6 +285,11 @@ impl<'a> Request<'a> { Self { builder, ..self } } + pub fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + pub async fn send(self) -> Result { let credential = match self.use_session_creds { true => self.config.get_session_credential().await?, @@ -297,7 +303,7 @@ impl<'a> Request<'a> { let path = self.path.as_ref(); self.builder .with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref()) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(RetrySnafu { path }) } @@ -359,6 +365,7 @@ impl S3Client { payload_sha256, config: &self.config, use_session_creds: true, + idempotent: false, } } @@ -461,7 +468,7 @@ impl S3Client { .header(CONTENT_TYPE, "application/xml") .body(body) .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref()) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, false) .await .context(DeleteObjectsRequestSnafu {})? .bytes() @@ -509,6 +516,7 @@ impl S3Client { config: &self.config, payload_sha256: None, use_session_creds: false, + idempotent: false, } } @@ -521,7 +529,7 @@ impl S3Client { .request(Method::POST, url) .headers(self.config.encryption_headers.clone().into()) .with_aws_sigv4(credential.authorizer(), None) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CreateMultipartRequestSnafu)? .bytes() @@ -546,6 +554,7 @@ impl S3Client { let response = self .put_request(path, data, false) .query(&[("partNumber", &part), ("uploadId", upload_id)]) + .set_idempotent(true) .send() .await?; @@ -581,7 +590,7 @@ impl S3Client { .query(&[("uploadId", upload_id)]) .body(body) .with_aws_sigv4(credential.authorizer(), None) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CompleteMultipartRequestSnafu)?; diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index f8614f4f563c..baa40cd64b1c 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -534,7 +534,7 @@ async fn instance_creds( let token_result = client .request(Method::PUT, token_url) .header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL - .send_retry(retry_config) + .send_retry_with_idempotency(retry_config, true) .await; let token = match token_result { @@ -624,7 +624,7 @@ async fn web_identity( ("Version", "2011-06-15"), ("WebIdentityToken", &token), ]) - .send_retry(retry_config) + .send_retry_with_idempotency(retry_config, true) .await? .bytes() .await?; diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 2390187e7f72..2e60bbad2226 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -186,7 +186,11 @@ impl DynamoCommit { to: &Path, ) -> Result<()> { self.conditional_op(client, to, None, || async { - client.copy_request(from, to).send().await?; + client + .copy_request(from, to) + .set_idempotent(false) + .send() + .await?; Ok(()) }) .await diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b11f4513b6df..643c4eeb397c 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -171,7 +171,7 @@ impl ObjectStore for AmazonS3 { } match (opts.mode, &self.client.config.conditional_put) { - (PutMode::Overwrite, _) => request.do_put().await, + (PutMode::Overwrite, _) => request.set_idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { @@ -287,7 +287,11 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to).send().await?; + self.client + .copy_request(from, to) + .set_idempotent(true) + .send() + .await?; Ok(()) } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 5be6658beff2..0e6af50fbf94 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -172,6 +172,7 @@ struct PutRequest<'a> { path: &'a Path, config: &'a AzureConfig, builder: RequestBuilder, + idempotent: bool, } impl<'a> PutRequest<'a> { @@ -185,12 +186,17 @@ impl<'a> PutRequest<'a> { Self { builder, ..self } } + fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + async fn send(self) -> Result { let credential = self.config.get_credential().await?; let response = self .builder .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(PutRequestSnafu { path: self.path.as_ref(), @@ -239,6 +245,7 @@ impl AzureClient { path, builder, config: &self.config, + idempotent: false, } } @@ -247,7 +254,7 @@ impl AzureClient { let builder = self.put_request(path, bytes); let builder = match &opts.mode { - PutMode::Overwrite => builder, + PutMode::Overwrite => builder.set_idempotent(true), PutMode::Create => builder.header(&IF_NONE_MATCH, "*"), PutMode::Update(v) => { let etag = v.e_tag.as_ref().context(MissingETagSnafu)?; @@ -271,6 +278,7 @@ impl AzureClient { self.put_request(path, data) .query(&[("comp", "block"), ("blockid", &block_id)]) + .set_idempotent(true) .send() .await?; @@ -287,6 +295,7 @@ impl AzureClient { let response = self .put_request(path, BlockList { blocks }.to_xml().into()) .query(&[("comp", "blocklist")]) + .set_idempotent(true) .send() .await?; @@ -340,7 +349,7 @@ impl AzureClient { builder .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .map_err(|err| err.error(STORE, from.to_string()))?; @@ -373,7 +382,7 @@ impl AzureClient { .body(body) .query(&[("restype", "service"), ("comp", "userdelegationkey")]) .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(DelegationKeyRequestSnafu)? .bytes() diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 9360831974ca..beefe75a5be5 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -615,7 +615,7 @@ impl TokenProvider for ClientSecretOAuthProvider { ("scope", AZURE_STORAGE_SCOPE), ("grant_type", "client_credentials"), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json() @@ -797,7 +797,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider { ("scope", AZURE_STORAGE_SCOPE), ("grant_type", "client_credentials"), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json() diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index fbd3645d2780..e305c5e36313 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -166,125 +166,83 @@ impl Default for RetryConfig { } } -pub trait RetryExt { - /// Dispatch a request with the given retry configuration - /// - /// # Panic - /// - /// This will panic if the request body is a stream - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; -} - -impl RetryExt for reqwest::RequestBuilder { - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { - let mut backoff = Backoff::new(&config.backoff); - let max_retries = config.max_retries; - let retry_timeout = config.retry_timeout; - - let (client, req) = self.build_split(); - let req = req.expect("request must be valid"); - - async move { - let mut retries = 0; - let now = Instant::now(); - - loop { - let s = req.try_clone().expect("request body must be cloneable"); - match client.execute(s).await { - Ok(r) => match r.error_for_status_ref() { - Ok(_) if r.status().is_success() => return Ok(r), - Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { - return Err(Error::Client { +fn send_retry_impl( + builder: reqwest::RequestBuilder, + config: &RetryConfig, + is_idempotent: Option, +) -> BoxFuture<'static, Result> { + let mut backoff = Backoff::new(&config.backoff); + let max_retries = config.max_retries; + let retry_timeout = config.retry_timeout; + + let (client, req) = builder.build_split(); + let req = req.expect("request must be valid"); + let is_idempotent = is_idempotent.unwrap_or(req.method().is_safe()); + + async move { + let mut retries = 0; + let now = Instant::now(); + + loop { + let s = req.try_clone().expect("request body must be cloneable"); + match client.execute(s).await { + Ok(r) => match r.error_for_status_ref() { + Ok(_) if r.status().is_success() => return Ok(r), + Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { + return Err(Error::Client { + body: None, + status: StatusCode::NOT_MODIFIED, + }) + } + Ok(r) => { + let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION); + return match is_bare_redirect { + true => Err(Error::BareRedirect), + // Not actually sure if this is reachable, but here for completeness + false => Err(Error::Client { body: None, - status: StatusCode::NOT_MODIFIED, + status: r.status(), }) } - Ok(r) => { - let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION); - return match is_bare_redirect { - true => Err(Error::BareRedirect), - // Not actually sure if this is reachable, but here for completeness - false => Err(Error::Client { - body: None, - status: r.status(), - }) - } - } - Err(e) => { - let status = r.status(); - if retries == max_retries - || now.elapsed() > retry_timeout - || !status.is_server_error() { - - return Err(match status.is_client_error() { - true => match r.text().await { - Ok(body) => { - Error::Client { - body: Some(body).filter(|b| !b.is_empty()), - status, - } - } - Err(e) => { - Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - } + } + Err(e) => { + let status = r.status(); + if retries == max_retries + || now.elapsed() > retry_timeout + || !status.is_server_error() { + + return Err(match status.is_client_error() { + true => match r.text().await { + Ok(body) => { + Error::Client { + body: Some(body).filter(|b| !b.is_empty()), + status, } } - false => Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, + Err(e) => { + Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + } } - }); - } - - let sleep = backoff.next(); - retries += 1; - info!( - "Encountered server error, backing off for {} seconds, retry {} of {}: {}", - sleep.as_secs_f32(), - retries, - max_retries, - e, - ); - tokio::time::sleep(sleep).await; - } - }, - Err(e) => - { - let mut do_retry = false; - if req.method().is_safe() && e.is_timeout() { - do_retry = true - } else if let Some(source) = e.source() { - if let Some(e) = source.downcast_ref::() { - if e.is_connect() || e.is_closed() || e.is_incomplete_message() { - do_retry = true; } - } + false => Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + } + }); } - if retries == max_retries - || now.elapsed() > retry_timeout - || !do_retry { - - return Err(Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - }) - } let sleep = backoff.next(); retries += 1; info!( - "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + "Encountered server error, backing off for {} seconds, retry {} of {}: {}", sleep.as_secs_f32(), retries, max_retries, @@ -292,10 +250,102 @@ impl RetryExt for reqwest::RequestBuilder { ); tokio::time::sleep(sleep).await; } + }, + Err(e) => + { + let mut do_retry = false; + if e.is_connect() + || e.is_body() + || (e.is_request() && !e.is_timeout()) + || (is_idempotent && e.is_timeout()) { + do_retry = true + } else { + let mut source = e.source(); + while let Some(e) = source { + if let Some(e) = e.downcast_ref::() { + do_retry = e.is_closed() + || e.is_incomplete_message() + || e.is_body_write_aborted() + || (is_idempotent && e.is_timeout()); + break + } + if let Some(e) = e.downcast_ref::() { + if e.kind() == std::io::ErrorKind::TimedOut { + do_retry = is_idempotent; + } else { + do_retry = matches!( + e.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof + ); + } + break; + } + source = e.source(); + } + } + + if retries == max_retries + || now.elapsed() > retry_timeout + || !do_retry { + + return Err(Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + }) + } + let sleep = backoff.next(); + retries += 1; + info!( + "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + sleep.as_secs_f32(), + retries, + max_retries, + e, + ); + tokio::time::sleep(sleep).await; } } } - .boxed() + } + .boxed() +} + +pub trait RetryExt { + /// Dispatch a request with the given retry configuration + /// + /// # Panic + /// + /// This will panic if the request body is a stream + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; + + /// Dispatch a request with the given retry configuration and idempotency + /// + /// # Panic + /// + /// This will panic if the request body is a stream + fn send_retry_with_idempotency( + self, + config: &RetryConfig, + is_idempotent: bool, + ) -> BoxFuture<'static, Result>; +} + +impl RetryExt for reqwest::RequestBuilder { + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { + send_retry_impl(self, config, None) + } + fn send_retry_with_idempotency( + self, + config: &RetryConfig, + is_idempotent: bool, + ) -> BoxFuture<'static, Result> { + send_retry_impl(self, config, Some(is_idempotent)) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e4b0f9af7d15..2bd061cc5184 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -135,6 +135,7 @@ pub struct PutRequest<'a> { path: &'a Path, config: &'a GoogleCloudStorageConfig, builder: RequestBuilder, + idempotent: bool, } impl<'a> PutRequest<'a> { @@ -148,12 +149,17 @@ impl<'a> PutRequest<'a> { Self { builder, ..self } } + fn set_idempotent(mut self, idempotent: bool) -> Self { + self.idempotent = idempotent; + self + } + async fn send(self) -> Result { let credential = self.config.credentials.get_credential().await?; let response = self .builder .bearer_auth(&credential.bearer) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, self.idempotent) .await .context(PutRequestSnafu { path: self.path.as_ref(), @@ -228,6 +234,7 @@ impl GoogleCloudStorageClient { path, builder, config: &self.config, + idempotent: false, } } @@ -235,7 +242,7 @@ impl GoogleCloudStorageClient { let builder = self.put_request(path, data); let builder = match &opts.mode { - PutMode::Overwrite => builder, + PutMode::Overwrite => builder.set_idempotent(true), PutMode::Create => builder.header(&VERSION_MATCH, "0"), PutMode::Update(v) => { let etag = v.version.as_ref().context(MissingVersionSnafu)?; @@ -265,7 +272,12 @@ impl GoogleCloudStorageClient { ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), ]; - let result = self.put_request(path, data).query(query).send().await?; + let result = self + .put_request(path, data) + .query(query) + .set_idempotent(true) + .send() + .await?; Ok(PartId { content_id: result.e_tag.unwrap(), @@ -290,7 +302,7 @@ impl GoogleCloudStorageClient { .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, "0") .query(&[("uploads", "")]) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(PutRequestSnafu { path: path.as_ref(), @@ -329,6 +341,17 @@ impl GoogleCloudStorageClient { multipart_id: &MultipartId, completed_parts: Vec, ) -> Result { + if completed_parts.is_empty() { + // GCS doesn't allow empty multipart uploads + let result = self + .put_request(path, Default::default()) + .set_idempotent(true) + .send() + .await?; + self.multipart_cleanup(path, multipart_id).await?; + return Ok(result); + } + let upload_id = multipart_id.clone(); let url = self.object_url(path); @@ -348,7 +371,7 @@ impl GoogleCloudStorageClient { .bearer_auth(&credential.bearer) .query(&[("uploadId", upload_id)]) .body(data) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, true) .await .context(CompleteMultipartRequestSnafu)?; @@ -407,7 +430,7 @@ impl GoogleCloudStorageClient { // Needed if reqwest is compiled with native-tls instead of rustls-tls // See https://github.com/apache/arrow-rs/pull/3921 .header(header::CONTENT_LENGTH, 0) - .send_retry(&self.config.retry_config) + .send_retry_with_idempotency(&self.config.retry_config, !if_not_exists) .await .map_err(|err| match err.status() { Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index dc504da05723..486e9108d065 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -441,7 +441,7 @@ impl TokenProvider for AuthorizedUserCredentials { ("client_secret", &self.client_secret), ("refresh_token", &self.refresh_token), ]) - .send_retry(retry) + .send_retry_with_idempotency(retry, true) .await .context(TokenRequestSnafu)? .json::() diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 8700775fb243..fdc8751c1ca1 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -189,7 +189,7 @@ impl Client { .client .request(method, url) .header("Depth", depth) - .send_retry(&self.retry_config) + .send_retry_with_idempotency(&self.retry_config, true) .await; let response = match result {