diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 1857b330326..2c86e6c8eb2 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -138,7 +138,7 @@ jobs: - name: Setup LocalStack (AWS emulation) run: | - echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV + echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 @@ -164,7 +164,7 @@ jobs: - name: Run object_store tests (AWS native conditional put) run: cargo test --features=aws env: - AWS_CONDITIONAL_PUT: etag-put-if-not-exists + AWS_CONDITIONAL_PUT: etag AWS_COPY_IF_NOT_EXISTS: multipart - name: GCS Output diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 51c917723ee..47249685b7b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -290,6 +290,7 @@ pub(crate) struct Request<'a> { payload: Option, use_session_creds: bool, idempotent: bool, + retry_on_conflict: bool, retry_error_body: bool, } @@ -317,6 +318,13 @@ impl<'a> Request<'a> { Self { idempotent, ..self } } + pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self { + Self { + retry_on_conflict, + ..self + } + } + pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self { Self { retry_error_body, @@ -412,6 +420,7 @@ impl<'a> Request<'a> { self.builder .with_aws_sigv4(credential.authorizer(), sha) .retryable(&self.config.retry_config) + .retry_on_conflict(self.retry_on_conflict) .idempotent(self.idempotent) .retry_error_body(self.retry_error_body) .payload(self.payload) @@ -448,6 +457,7 @@ impl S3Client { config: &self.config, use_session_creds: true, idempotent: false, + retry_on_conflict: false, retry_error_body: false, } } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 81511bad7b0..d7c8c9b546e 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -170,10 +170,7 @@ impl ObjectStore for AmazonS3 { match (opts.mode, &self.client.config.conditional_put) { (PutMode::Overwrite, _) => request.idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), - ( - PutMode::Create, - Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists), - ) => { + (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed @@ -197,9 +194,26 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { - S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { - request.header(&IF_MATCH, etag.as_str()).do_put().await + match request + .header(&IF_MATCH, etag.as_str()) + // Real S3 will occasionally report 409 Conflict + // if there are concurrent `If-Match` requests + // in flight, so we need to be prepared to retry + // 409 responses. + .retry_on_conflict(true) + .do_put() + .await + { + // Real S3 reports NotFound rather than PreconditionFailed when the + // object doesn't exist. Convert to PreconditionFailed for + // consistency with R2. This also matches what the HTTP spec + // says the behavior should be. + Err(Error::NotFound { path, source }) => { + Err(Error::Precondition { path, source }) + } + r => r, + } } S3ConditionalPut::Dynamo(d) => { d.conditional_op(&self.client, location, Some(&etag), move || { @@ -487,6 +501,7 @@ mod tests { let integration = config.build().unwrap(); let config = &integration.client.config; let test_not_exists = config.copy_if_not_exists.is_some(); + let test_conditional_put = config.conditional_put.is_some(); put_get_delete_list(&integration).await; get_opts(&integration).await; @@ -517,9 +532,8 @@ mod tests { if test_not_exists { copy_if_not_exists(&integration).await; } - if let Some(conditional_put) = &config.conditional_put { - let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists); - put_opts(&integration, supports_update).await; + if test_conditional_put { + put_opts(&integration, true).await; } // run integration test with unsigned payload enabled diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index e5058052790..b261ad0dbfb 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -138,17 +138,6 @@ pub enum S3ConditionalPut { /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, - /// Like `ETagMatch`, but with support for `PutMode::Create` and not - /// `PutMode::Option`. - /// - /// This is the limited form of conditional put supported by Amazon S3 - /// as of August 2024 ([announcement]). - /// - /// Encoded as `etag-put-if-not-exists` ignoring whitespace. - /// - /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ - ETagPutIfNotExists, - /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -164,7 +153,6 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), - Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -174,7 +162,6 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - "etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 601bffdec15..a8a8e58de4d 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -200,6 +200,7 @@ pub(crate) struct RetryableRequest { sensitive: bool, idempotent: Option, + retry_on_conflict: bool, payload: Option, retry_error_body: bool, @@ -217,6 +218,15 @@ impl RetryableRequest { } } + /// Set whether this request should be retried on a 409 Conflict response. + #[cfg(feature = "aws")] + pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self { + Self { + retry_on_conflict, + ..self + } + } + /// Set whether this request contains sensitive data /// /// This will avoid printing out the URL in error messages @@ -340,7 +350,8 @@ impl RetryableRequest { let status = r.status(); if retries == max_retries || now.elapsed() > retry_timeout - || !status.is_server_error() + || !(status.is_server_error() + || (self.retry_on_conflict && status == StatusCode::CONFLICT)) { return Err(match status.is_client_error() { true => match r.text().await { @@ -467,6 +478,7 @@ impl RetryExt for reqwest::RequestBuilder { idempotent: None, payload: None, sensitive: false, + retry_on_conflict: false, retry_error_body: false, } }