Skip to content

Commit

Permalink
Support native S3 conditional writes
Browse files Browse the repository at this point in the history
Add support for `PutMode::Create` and `copy_if_not_exists` on native AWS
S3, which uses the underlying conditional write primitive that Amazon
launched earlier this year [0].

The conditional write primitive is simpler than what's available in
other S3-like products (e.g., R2), so new modes for
`s3_copy_if_not_exists` and `s3_conditional_put` are added to select the
native S3-specific behavior.

To maintain strict backwards compatibility (e.g. with older versions of
LocalStack), the new behavior is not on by default. It must be
explicitly requested by the end user.

The implementation for `PutMode::Create` is straightforward. The
implementation of `copy_if_not_exists` is a bit more involved, as it
requires managing a multipart upload that uses the UploadPartCopy
operation, which was not previously supported by this crate's S3 client.

To ensure test coverage, the object store workflow now runs the AWS
integration tests with conditional put both disabled and enabled.

Fix apache#6285.

[0]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
  • Loading branch information
benesch committed Nov 5, 2024
1 parent 22bc772 commit 00fb032
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 22 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ jobs:
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http

- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
env:
AWS_CONDITIONAL_PUT: etag-create-only
AWS_COPY_IF_NOT_EXISTS: multipart

- name: GCS Output
if: ${{ !cancelled() }}
run: docker logs $GCS_CONTAINER
Expand Down
83 changes: 70 additions & 13 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
Expand Down Expand Up @@ -99,7 +99,10 @@ pub(crate) enum Error {
CreateMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: crate::client::retry::Error },
CompleteMultipartRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error getting complete multipart response body: {}", source))]
CompleteMultipartResponseBody { source: reqwest::Error },
Expand All @@ -118,13 +121,32 @@ pub(crate) enum Error {

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
Self::Generic {
store: STORE,
source: Box::new(err),
match err {
Error::CompleteMultipartRequest { source, path } => source.error(STORE, path),
_ => Self::Generic {
store: STORE,
source: Box::new(err),
},
}
}
}

pub(crate) enum PutPartPayload<'a> {
Inline(PutPayload),
Copy(&'a Path),
}

impl Default for PutPartPayload<'_> {
fn default() -> Self {
Self::Inline(PutPayload::default())
}
}

pub(crate) enum CompleteMultipartMode {
Overwrite,
Create,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
Expand Down Expand Up @@ -605,15 +627,24 @@ impl S3Client {
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: PutPayload,
data: PutPartPayload<'_>,
) -> Result<PartId> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();

let mut request = self
.request(Method::PUT, path)
.with_payload(data)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);

request = match data {
PutPartPayload::Inline(payload) => request.with_payload(payload),
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, path),
),
};

if self
.config
.encryption_headers
Expand All @@ -625,7 +656,18 @@ impl S3Client {
}
let response = request.send().await?;

let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
let content_id = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
true => {
let response = response
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
response.e_tag
}
};
Ok(PartId { content_id })
}

Expand All @@ -634,12 +676,18 @@ impl S3Client {
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
mode: CompleteMultipartMode,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
// If no parts were uploaded, upload an empty part
// otherwise the completion request will fail
let part = self
.put_part(location, &upload_id.to_string(), 0, PutPayload::default())
.put_part(
location,
&upload_id.to_string(),
0,
PutPartPayload::default(),
)
.await?;
vec![part]
} else {
Expand All @@ -651,18 +699,27 @@ impl S3Client {
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(location);

let response = self
let request = self
.client
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.body(body)
.with_aws_sigv4(credential.authorizer(), None)
.with_aws_sigv4(credential.authorizer(), None);

let request = match mode {
CompleteMultipartMode::Overwrite => request,
CompleteMultipartMode::Create => request.header("If-None-Match", "*"),
};

let response = request
.retryable(&self.config.retry_config)
.idempotent(true)
.retry_error_body(true)
.send()
.await
.context(CompleteMultipartRequestSnafu)?;
.context(CompleteMultipartRequestSnafu {
path: location.as_ref(),
})?;

let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;

Expand Down
64 changes: 55 additions & 9 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;

use crate::aws::client::{RequestError, S3Client};
use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
Expand Down Expand Up @@ -169,7 +169,10 @@ impl ObjectStore for AmazonS3 {
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
(
PutMode::Create,
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly),
) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
// such as R2, instead return PreconditionFailed
Expand All @@ -193,6 +196,7 @@ impl ObjectStore for AmazonS3 {
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
Expand Down Expand Up @@ -293,6 +297,34 @@ impl ObjectStore for AmazonS3 {
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
Some(S3CopyIfNotExists::Multipart) => {
let upload_id = self
.client
.create_multipart(to, PutMultipartOpts::default())
.await?;
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
let res = match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
};
return res;
}
Some(S3CopyIfNotExists::Dynamo(lock)) => {
return lock.copy_if_not_exists(&self.client, from, to).await
}
Expand Down Expand Up @@ -340,7 +372,12 @@ impl MultipartUpload for S3MultiPartUpload {
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.put_part(
&state.location,
&state.upload_id,
idx,
PutPartPayload::Inline(data),
)
.await?;
state.parts.put(idx, part);
Ok(())
Expand All @@ -352,7 +389,12 @@ impl MultipartUpload for S3MultiPartUpload {

self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.complete_multipart(
&self.state.location,
&self.state.upload_id,
parts,
CompleteMultipartMode::Overwrite,
)
.await
}

Expand Down Expand Up @@ -384,7 +426,9 @@ impl MultipartStore for AmazonS3 {
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
self.client
.put_part(path, id, part_idx, PutPartPayload::Inline(data))
.await
}

async fn complete_multipart(
Expand All @@ -393,7 +437,9 @@ impl MultipartStore for AmazonS3 {
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
self.client
.complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
.await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
Expand Down Expand Up @@ -427,7 +473,6 @@ mod tests {
let integration = config.build().unwrap();
let config = &integration.client.config;
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();

put_get_delete_list(&integration).await;
get_opts(&integration).await;
Expand Down Expand Up @@ -458,8 +503,9 @@ mod tests {
if test_not_exists {
copy_if_not_exists(&integration).await;
}
if test_conditional_put {
put_opts(&integration, true).await;
if let Some(conditional_put) = &config.conditional_put {
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly);
put_opts(&integration, supports_update).await;
}

// run integration test with unsigned payload enabled
Expand Down
28 changes: 28 additions & 0 deletions object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists {
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
/// Native Amazon S3 supports copy if not exists through a multipart upload
/// where the upload copies an existing object and is completed only if
/// the new object does not already exist.
///
/// WARNING: When using this mode, `copy_if_not_exists` does not copy
/// tags or attributes from the source object.
///
/// Encoded as `multipart` ignoring whitespace.
Multipart,
/// The name of a DynamoDB table to use for coordination
///
/// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
Expand All @@ -64,13 +73,19 @@ impl std::fmt::Display for S3CopyIfNotExists {
Self::HeaderWithStatus(k, v, code) => {
write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
}
Self::Multipart => f.write_str("multipart"),
Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
}

impl S3CopyIfNotExists {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"multipart" => return Some(Self::Multipart),
_ => (),
};

let (variant, value) = s.split_once(':')?;
match variant.trim() {
"header" => {
Expand Down Expand Up @@ -118,6 +133,17 @@ pub enum S3ConditionalPut {
/// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,

/// Like `ETagMatch`, but with support for `PutMode::Create` and not
/// `PutMode::Option`.
///
/// This is the limited form of conditional put supported by Amazon S3
/// as of August 2024 ([announcement]).
///
/// Encoded as `etag-create-only` ignoring whitespace.
///
/// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
ETagCreateOnly,

/// The name of a DynamoDB table to use for coordination
///
/// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
Expand All @@ -133,6 +159,7 @@ impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
Self::ETagCreateOnly => write!(f, "etag-create-only"),
Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
Expand All @@ -142,6 +169,7 @@ impl S3ConditionalPut {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"etag" => Some(Self::ETagMatch),
"etag-create-only" => Some(Self::ETagCreateOnly),
trimmed => match trimmed.split_once(':')? {
("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
_ => None,
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ pub(crate) struct InitiateMultipartUploadResult {
pub upload_id: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct CopyPartResult {
#[serde(rename = "ETag")]
pub e_tag: String,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct CompleteMultipartUpload {
Expand Down
Loading

0 comments on commit 00fb032

Please sign in to comment.