Skip to content

Commit

Permalink
object-store: remove S3ConditionalPut::ETagPutIfNotExists (#6802)
Browse files Browse the repository at this point in the history
* Support real S3's If-Match semantics

As of today [0] S3 now supports the If-Match for in-place conditional
writes. This commit adjusts the existing support for
S3ConditionalPut::Etag mode for compatibility with real S3's particular
semantics, which vary slightly from MinIO and R2. Specifically:

  * Real S3 can occasionally return 409 Conflict when concurrent
    If-Match requests are in progress. These requests need to be
    retried.

  * Real S3 returns 404 Not Found instead of 412 Precondition Failed
    when issuing an If-Match request against an object that does not
    exist.

Fix #6799.

[0]: https://aws.amazon.com/about-aws/whats-new/2024/11/amazon-s3-functionality-conditional-writes/

* object-store: remove S3ConditionalPut::ETagPutIfNotExists

Now that real S3 supports `If-Match`, we no longer need this special
conditional put mode for real S3.

* [XXX put in real release version] Upgrade localstack

* Update .github/workflows/object_store.yml

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
benesch and tustvold authored Nov 30, 2024
1 parent e2a2beb commit c60ce14
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 25 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ jobs:

- name: Setup LocalStack (AWS emulation)
run: |
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Expand All @@ -164,7 +164,7 @@ jobs:
- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
env:
AWS_CONDITIONAL_PUT: etag-put-if-not-exists
AWS_CONDITIONAL_PUT: etag
AWS_COPY_IF_NOT_EXISTS: multipart

- name: GCS Output
Expand Down
10 changes: 10 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub(crate) struct Request<'a> {
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
retry_on_conflict: bool,
retry_error_body: bool,
}

Expand Down Expand Up @@ -317,6 +318,13 @@ impl<'a> Request<'a> {
Self { idempotent, ..self }
}

pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
Self {
retry_error_body,
Expand Down Expand Up @@ -412,6 +420,7 @@ impl<'a> Request<'a> {
self.builder
.with_aws_sigv4(credential.authorizer(), sha)
.retryable(&self.config.retry_config)
.retry_on_conflict(self.retry_on_conflict)
.idempotent(self.idempotent)
.retry_error_body(self.retry_error_body)
.payload(self.payload)
Expand Down Expand Up @@ -448,6 +457,7 @@ impl S3Client {
config: &self.config,
use_session_creds: true,
idempotent: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down
32 changes: 23 additions & 9 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,7 @@ impl ObjectStore for AmazonS3 {
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(
PutMode::Create,
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists),
) => {
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
// such as R2, instead return PreconditionFailed
Expand All @@ -197,9 +194,26 @@ impl ObjectStore for AmazonS3 {
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
match request
.header(&IF_MATCH, etag.as_str())
// Real S3 will occasionally report 409 Conflict
// if there are concurrent `If-Match` requests
// in flight, so we need to be prepared to retry
// 409 responses.
.retry_on_conflict(true)
.do_put()
.await
{
// Real S3 reports NotFound rather than PreconditionFailed when the
// object doesn't exist. Convert to PreconditionFailed for
// consistency with R2. This also matches what the HTTP spec
// says the behavior should be.
Err(Error::NotFound { path, source }) => {
Err(Error::Precondition { path, source })
}
r => r,
}
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
Expand Down Expand Up @@ -487,6 +501,7 @@ mod tests {
let integration = config.build().unwrap();
let config = &integration.client.config;
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();

put_get_delete_list(&integration).await;
get_opts(&integration).await;
Expand Down Expand Up @@ -517,9 +532,8 @@ mod tests {
if test_not_exists {
copy_if_not_exists(&integration).await;
}
if let Some(conditional_put) = &config.conditional_put {
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists);
put_opts(&integration, supports_update).await;
if test_conditional_put {
put_opts(&integration, true).await;
}

// run integration test with unsigned payload enabled
Expand Down
13 changes: 0 additions & 13 deletions object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ pub enum S3ConditionalPut {
/// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,

/// Like `ETagMatch`, but with support for `PutMode::Create` and not
/// `PutMode::Option`.
///
/// This is the limited form of conditional put supported by Amazon S3
/// as of August 2024 ([announcement]).
///
/// Encoded as `etag-put-if-not-exists` ignoring whitespace.
///
/// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
ETagPutIfNotExists,

/// The name of a DynamoDB table to use for coordination
///
/// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
Expand All @@ -164,7 +153,6 @@ impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"),
Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
Expand All @@ -174,7 +162,6 @@ impl S3ConditionalPut {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"etag" => Some(Self::ETagMatch),
"etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists),
trimmed => match trimmed.split_once(':')? {
("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
_ => None,
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub(crate) struct RetryableRequest {

sensitive: bool,
idempotent: Option<bool>,
retry_on_conflict: bool,
payload: Option<PutPayload>,

retry_error_body: bool,
Expand All @@ -217,6 +218,15 @@ impl RetryableRequest {
}
}

/// Set whether this request should be retried on a 409 Conflict response.
#[cfg(feature = "aws")]
pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

/// Set whether this request contains sensitive data
///
/// This will avoid printing out the URL in error messages
Expand Down Expand Up @@ -340,7 +350,8 @@ impl RetryableRequest {
let status = r.status();
if retries == max_retries
|| now.elapsed() > retry_timeout
|| !status.is_server_error()
|| !(status.is_server_error()
|| (self.retry_on_conflict && status == StatusCode::CONFLICT))
{
return Err(match status.is_client_error() {
true => match r.text().await {
Expand Down Expand Up @@ -467,6 +478,7 @@ impl RetryExt for reqwest::RequestBuilder {
idempotent: None,
payload: None,
sensitive: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down

0 comments on commit c60ce14

Please sign in to comment.