Skip to content

Commit

Permalink
[backport] Adds send_retry_with_idempotency and retry more kinds of t…
Browse files Browse the repository at this point in the history
…ransport errors (apache#5609)
  • Loading branch information
andrebsguedes committed Apr 9, 2024
1 parent 386bf49 commit 95eacf4
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 131 deletions.
17 changes: 13 additions & 4 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ pub(crate) struct Request<'a> {
builder: RequestBuilder,
payload_sha256: Option<Vec<u8>>,
use_session_creds: bool,
idempotent: bool,
}

impl<'a> Request<'a> {
Expand All @@ -284,6 +285,11 @@ impl<'a> Request<'a> {
Self { builder, ..self }
}

pub fn set_idempotent(mut self, idempotent: bool) -> Self {
self.idempotent = idempotent;
self
}

pub async fn send(self) -> Result<Response, RequestError> {
let credential = match self.use_session_creds {
true => self.config.get_session_credential().await?,
Expand All @@ -297,7 +303,7 @@ impl<'a> Request<'a> {
let path = self.path.as_ref();
self.builder
.with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref())
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, self.idempotent)
.await
.context(RetrySnafu { path })
}
Expand Down Expand Up @@ -359,6 +365,7 @@ impl S3Client {
payload_sha256,
config: &self.config,
use_session_creds: true,
idempotent: false,
}
}

Expand Down Expand Up @@ -461,7 +468,7 @@ impl S3Client {
.header(CONTENT_TYPE, "application/xml")
.body(body)
.with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, false)
.await
.context(DeleteObjectsRequestSnafu {})?
.bytes()
Expand Down Expand Up @@ -509,6 +516,7 @@ impl S3Client {
config: &self.config,
payload_sha256: None,
use_session_creds: false,
idempotent: false,
}
}

Expand All @@ -521,7 +529,7 @@ impl S3Client {
.request(Method::POST, url)
.headers(self.config.encryption_headers.clone().into())
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, true)
.await
.context(CreateMultipartRequestSnafu)?
.bytes()
Expand All @@ -546,6 +554,7 @@ impl S3Client {
let response = self
.put_request(path, data, false)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.set_idempotent(true)
.send()
.await?;

Expand Down Expand Up @@ -581,7 +590,7 @@ impl S3Client {
.query(&[("uploadId", upload_id)])
.body(body)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, true)
.await
.context(CompleteMultipartRequestSnafu)?;

Expand Down
4 changes: 2 additions & 2 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ async fn instance_creds(
let token_result = client
.request(Method::PUT, token_url)
.header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
.send_retry(retry_config)
.send_retry_with_idempotency(retry_config, true)
.await;

let token = match token_result {
Expand Down Expand Up @@ -624,7 +624,7 @@ async fn web_identity(
("Version", "2011-06-15"),
("WebIdentityToken", &token),
])
.send_retry(retry_config)
.send_retry_with_idempotency(retry_config, true)
.await?
.bytes()
.await?;
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/aws/dynamo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ impl DynamoCommit {
to: &Path,
) -> Result<()> {
self.conditional_op(client, to, None, || async {
client.copy_request(from, to).send().await?;
client
.copy_request(from, to)
.set_idempotent(false)
.send()
.await?;
Ok(())
})
.await
Expand Down
8 changes: 6 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ObjectStore for AmazonS3 {
}

match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.do_put().await,
(PutMode::Overwrite, _) => request.set_idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
Expand Down Expand Up @@ -287,7 +287,11 @@ impl ObjectStore for AmazonS3 {
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.client.copy_request(from, to).send().await?;
self.client
.copy_request(from, to)
.set_idempotent(true)
.send()
.await?;
Ok(())
}

Expand Down
17 changes: 13 additions & 4 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ struct PutRequest<'a> {
path: &'a Path,
config: &'a AzureConfig,
builder: RequestBuilder,
idempotent: bool,
}

impl<'a> PutRequest<'a> {
Expand All @@ -185,12 +186,17 @@ impl<'a> PutRequest<'a> {
Self { builder, ..self }
}

fn set_idempotent(mut self, idempotent: bool) -> Self {
self.idempotent = idempotent;
self
}

async fn send(self) -> Result<Response> {
let credential = self.config.get_credential().await?;
let response = self
.builder
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, self.idempotent)
.await
.context(PutRequestSnafu {
path: self.path.as_ref(),
Expand Down Expand Up @@ -239,6 +245,7 @@ impl AzureClient {
path,
builder,
config: &self.config,
idempotent: false,
}
}

Expand All @@ -247,7 +254,7 @@ impl AzureClient {
let builder = self.put_request(path, bytes);

let builder = match &opts.mode {
PutMode::Overwrite => builder,
PutMode::Overwrite => builder.set_idempotent(true),
PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
PutMode::Update(v) => {
let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
Expand All @@ -271,6 +278,7 @@ impl AzureClient {

self.put_request(path, data)
.query(&[("comp", "block"), ("blockid", &block_id)])
.set_idempotent(true)
.send()
.await?;

Expand All @@ -287,6 +295,7 @@ impl AzureClient {
let response = self
.put_request(path, BlockList { blocks }.to_xml().into())
.query(&[("comp", "blocklist")])
.set_idempotent(true)
.send()
.await?;

Expand Down Expand Up @@ -340,7 +349,7 @@ impl AzureClient {

builder
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, true)
.await
.map_err(|err| err.error(STORE, from.to_string()))?;

Expand Down Expand Up @@ -373,7 +382,7 @@ impl AzureClient {
.body(body)
.query(&[("restype", "service"), ("comp", "userdelegationkey")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.send_retry_with_idempotency(&self.config.retry_config, true)
.await
.context(DelegationKeyRequestSnafu)?
.bytes()
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ impl TokenProvider for ClientSecretOAuthProvider {
("scope", AZURE_STORAGE_SCOPE),
("grant_type", "client_credentials"),
])
.send_retry(retry)
.send_retry_with_idempotency(retry, true)
.await
.context(TokenRequestSnafu)?
.json()
Expand Down Expand Up @@ -797,7 +797,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider {
("scope", AZURE_STORAGE_SCOPE),
("grant_type", "client_credentials"),
])
.send_retry(retry)
.send_retry_with_idempotency(retry, true)
.await
.context(TokenRequestSnafu)?
.json()
Expand Down
Loading

0 comments on commit 95eacf4

Please sign in to comment.