diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 3e47abd4bcc5..ecbe556c6dfe 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -45,7 +45,7 @@ use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{ header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, RequestBuilder, Response, StatusCode, + Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -466,6 +466,9 @@ impl S3Client { Some(S3CopyIfNotExists::Header(k, v)) => { builder = builder.header(k, v); } + Some(S3CopyIfNotExists::HeaderWithStatus(k, v, _)) => { + builder = builder.header(k, v); + } None => { return Err(crate::Error::NotSupported { source: "S3 does not support copy-if-not-exists".to_string().into(), @@ -474,6 +477,11 @@ impl S3Client { } } + let precondition_failure = match &self.config.copy_if_not_exists { + Some(S3CopyIfNotExists::HeaderWithStatus(_, _, code)) => *code, + _ => reqwest::StatusCode::PRECONDITION_FAILED, + }; + builder .with_aws_sigv4( credential.as_deref(), @@ -485,7 +493,7 @@ impl S3Client { .send_retry(&self.config.retry_config) .await .map_err(|source| match source.status() { - Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { + Some(error) if error == precondition_failure => crate::Error::AlreadyExists { source: Box::new(source), path: to.to_string(), }, diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index a50b57fe23f7..ada5f3b83f07 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -17,11 +17,13 @@ use crate::config::Parse; +use itertools::Itertools; + /// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`]. /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists /// [`AmazonS3`]: super::AmazonS3 -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] #[non_exhaustive] pub enum S3CopyIfNotExists { /// Some S3-compatible stores, such as Cloudflare R2, support copy if not exists @@ -29,7 +31,7 @@ pub enum S3CopyIfNotExists { /// /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy operation /// with the provided header pair, and expect the store to fail with `412 Precondition Failed` - /// if the destination file already exists + /// if the destination file already exists. /// /// Encoded as `header::` ignoring whitespace /// @@ -38,12 +40,20 @@ pub enum S3CopyIfNotExists { /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists Header(String, String), + /// The same as [`S3CopyIfNotExists::Header`] but allows custom status code checking, for object stores that return values + /// other than 412. + /// + /// Encoded as `header-with-status:::` ignoring whitespace + HeaderWithStatus(String, String, reqwest::StatusCode), } impl std::fmt::Display for S3CopyIfNotExists { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + Self::HeaderWithStatus(k, v, code) => { + write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) + } } } } @@ -56,6 +66,17 @@ impl S3CopyIfNotExists { let (k, v) = value.split_once(':')?; Some(Self::Header(k.trim().to_string(), v.trim().to_string())) } + "header-with-status" => { + let (k, v, status) = value.split(':').collect_tuple()?; + + let code = status.trim().parse().ok()?; + + Some(Self::HeaderWithStatus( + k.trim().to_string(), + v.trim().to_string(), + code, + )) + } _ => None, } } @@ -111,3 +132,76 @@ impl Parse for S3ConditionalPut { }) } } + +#[cfg(test)] +mod tests { + use super::S3CopyIfNotExists; + + #[test] + fn parse_s3_copy_if_not_exists_header() { + let input = "header: cf-copy-destination-if-none-match: *"; + let expected = Some(S3CopyIfNotExists::Header( + "cf-copy-destination-if-none-match".to_owned(), + "*".to_owned(), + )); + + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_copy_if_not_exists_header_with_status() { + let input = "header-with-status:key:value:403"; + let expected = Some(S3CopyIfNotExists::HeaderWithStatus( + "key".to_owned(), + "value".to_owned(), + reqwest::StatusCode::FORBIDDEN, + )); + + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { + let expected = Some(S3CopyIfNotExists::Header( + "cf-copy-destination-if-none-match".to_owned(), + "*".to_owned(), + )); + + const INPUTS: &[&str] = &[ + "header:cf-copy-destination-if-none-match:*", + "header: cf-copy-destination-if-none-match:*", + "header: cf-copy-destination-if-none-match: *", + "header : cf-copy-destination-if-none-match: *", + "header : cf-copy-destination-if-none-match : *", + "header : cf-copy-destination-if-none-match : * ", + ]; + + for input in INPUTS { + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + } + + #[test] + fn parse_s3_copy_if_not_exists_header_with_status_whitespace_invariant() { + let expected = Some(S3CopyIfNotExists::HeaderWithStatus( + "key".to_owned(), + "value".to_owned(), + reqwest::StatusCode::FORBIDDEN, + )); + + const INPUTS: &[&str] = &[ + "header-with-status:key:value:403", + "header-with-status: key:value:403", + "header-with-status: key: value:403", + "header-with-status: key: value: 403", + "header-with-status : key: value: 403", + "header-with-status : key : value: 403", + "header-with-status : key : value : 403", + "header-with-status : key : value : 403 ", + ]; + + for input in INPUTS { + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + } +}