From ae2222d9ee0e75208c08dcb268198b7c8e2a1f0d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 10:11:05 +0100 Subject: [PATCH 01/12] Add version to PutResult --- object_store/src/aws/client.rs | 19 +++++++++++++------ object_store/src/aws/mod.rs | 3 +-- object_store/src/azure/client.rs | 16 +++++++++++----- object_store/src/azure/mod.rs | 11 +---------- object_store/src/client/header.rs | 17 ++++++++++++++++- object_store/src/gcp/client.rs | 18 ++++++++++-------- object_store/src/gcp/mod.rs | 3 +-- object_store/src/http/mod.rs | 6 +++++- object_store/src/lib.rs | 4 +++- object_store/src/local.rs | 1 + object_store/src/memory.rs | 1 + 11 files changed, 63 insertions(+), 36 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 4e98f259f8dd..9fdcc6c64934 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -19,8 +19,8 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; use crate::aws::{AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET}; use crate::client::get::GetClient; -use crate::client::header::get_etag; use crate::client::header::HeaderConfig; +use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -45,6 +45,8 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::sync::Arc; +const VERSION_HEADER: &str = "x-amz-version-id"; + /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -265,7 +267,7 @@ impl S3Client { path: &Path, bytes: Bytes, query: &T, - ) -> Result { + ) -> Result { let credential = self.get_credential().await?; let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); @@ -303,7 +305,7 @@ impl S3Client { path: path.as_ref(), })?; - Ok(get_etag(response.headers()).context(MetadataSnafu)?) + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// Make an S3 Delete request @@ -527,7 +529,7 @@ impl S3Client { ) -> Result { let part = (part_idx + 1).to_string(); - let content_id = self + let result = self .put_request( path, data, @@ -535,7 +537,9 @@ impl S3Client { ) .await?; - Ok(PartId { content_id }) + Ok(PartId { + content_id: result.e_tag.unwrap(), + }) } pub async fn complete_multipart( @@ -575,6 +579,8 @@ impl S3Client { .await .context(CompleteMultipartRequestSnafu)?; + let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?; + let data = response .bytes() .await @@ -585,6 +591,7 @@ impl S3Client { Ok(PutResult { e_tag: Some(response.e_tag), + version, }) } } @@ -596,7 +603,7 @@ impl GetClient for S3Client { const HEADER_CONFIG: HeaderConfig = HeaderConfig { etag_required: false, last_modified_required: false, - version_header: Some("x-amz-version-id"), + version_header: Some(VERSION_HEADER), }; /// Make an S3 GET request diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 57254c7cf4e8..2254fb54cedb 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -159,8 +159,7 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let e_tag = self.client.put_request(location, bytes, &()).await?; - Ok(PutResult { e_tag: Some(e_tag) }) + self.client.put_request(location, bytes, &()).await } async fn put_multipart( diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 9f47b9a8152b..e99cfb0ce6bd 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -19,7 +19,7 @@ use super::credential::AzureCredential; use crate::azure::credential::*; use crate::azure::{AzureCredentialProvider, STORE}; use crate::client::get::GetClient; -use crate::client::header::{get_etag, HeaderConfig}; +use crate::client::header::{get_put_result, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; @@ -46,6 +46,8 @@ use std::collections::HashMap; use std::sync::Arc; use url::Url; +const VERSION_HEADER: &str = "x-ms-version-id"; + /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -156,7 +158,6 @@ impl AzureClient { self.config.credentials.get_credential().await } - /// Make an Azure PUT request pub async fn put_request( &self, path: &Path, @@ -198,6 +199,12 @@ impl AzureClient { Ok(response) } + /// Make an Azure PUT request + pub async fn put_blob(&self, path: &Path, bytes: Bytes) -> Result { + let response = self.put_request(path, Some(bytes), false, &()).await?; + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + } + /// PUT a block pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result { let content_id = format!("{part_idx:20}"); @@ -231,8 +238,7 @@ impl AzureClient { .put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")]) .await?; - let e_tag = get_etag(response.headers()).context(MetadataSnafu)?; - Ok(PutResult { e_tag: Some(e_tag) }) + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// Make an Azure Delete request @@ -303,7 +309,7 @@ impl GetClient for AzureClient { const HEADER_CONFIG: HeaderConfig = HeaderConfig { etag_required: true, last_modified_required: true, - version_header: Some("x-ms-version-id"), + version_header: Some(VERSION_HEADER), }; /// Make an Azure GET request diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 779ac2f71ff8..4b6e4fa44253 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -49,7 +49,6 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; -use crate::client::header::get_etag; use crate::multipart::MultiPartStore; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -83,15 +82,7 @@ impl std::fmt::Display for MicrosoftAzure { #[async_trait] impl ObjectStore for MicrosoftAzure { async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let response = self - .client - .put_request(location, Some(bytes), false, &()) - .await?; - let e_tag = get_etag(response.headers()).map_err(|e| crate::Error::Generic { - store: STORE, - source: Box::new(e), - })?; - Ok(PutResult { e_tag: Some(e_tag) }) + Ok(self.client.put_blob(location, bytes).await?) } async fn put_multipart( diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index e67496833b99..38aca8e2f013 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -18,7 +18,7 @@ //! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure use crate::path::Path; -use crate::ObjectMeta; +use crate::{ObjectMeta, PutResult}; use chrono::{DateTime, TimeZone, Utc}; use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; use hyper::HeaderMap; @@ -67,6 +67,21 @@ pub enum Error { }, } +/// Extracts a [`PutResult`] from the provided [`HeaderMap`] +pub fn get_put_result(headers: &HeaderMap, version: &str) -> Result { + let e_tag = Some(get_etag(headers)?); + let version = get_version(headers, version)?; + Ok(PutResult { e_tag, version }) +} + +/// Extracts a optional version from the provided [`HeaderMap`] +pub fn get_version(headers: &HeaderMap, version: &str) -> Result, Error> { + Ok(match headers.get(version) { + Some(x) => Some(x.to_str().context(BadHeaderSnafu)?.to_string()), + None => None, + }) +} + /// Extracts an etag from the provided [`HeaderMap`] pub fn get_etag(headers: &HeaderMap) -> Result { let e_tag = headers.get(ETAG).ok_or(Error::MissingEtag)?; diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 8c44f9016480..5c3936c73fe4 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -16,7 +16,7 @@ // under the License. use crate::client::get::GetClient; -use crate::client::header::{get_etag, HeaderConfig}; +use crate::client::header::{get_put_result, HeaderConfig}; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -33,6 +33,7 @@ use serde::Serialize; use snafu::{ResultExt, Snafu}; use std::sync::Arc; +const VERSION_HEADER: &str = "x-goog-generation"; #[derive(Debug, Snafu)] enum Error { #[snafu(display("Error performing list request: {}", source))] @@ -157,7 +158,7 @@ impl GoogleCloudStorageClient { path: &Path, payload: Bytes, query: &T, - ) -> Result { + ) -> Result { let credential = self.get_credential().await?; let url = self.object_url(path); @@ -181,7 +182,7 @@ impl GoogleCloudStorageClient { path: path.as_ref(), })?; - Ok(get_etag(response.headers()).context(MetadataSnafu)?) + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// Perform a put part request @@ -194,7 +195,7 @@ impl GoogleCloudStorageClient { part_idx: usize, data: Bytes, ) -> Result { - let content_id = self + let result = self .put_request( path, data, @@ -205,7 +206,9 @@ impl GoogleCloudStorageClient { ) .await?; - Ok(PartId { content_id }) + Ok(PartId { + content_id: result.e_tag.unwrap(), + }) } /// Initiate a multi-part upload @@ -299,8 +302,7 @@ impl GoogleCloudStorageClient { path: path.as_ref(), })?; - let etag = get_etag(result.headers()).context(MetadataSnafu)?; - Ok(PutResult { e_tag: Some(etag) }) + Ok(get_put_result(result.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// Perform a delete request @@ -362,7 +364,7 @@ impl GetClient for GoogleCloudStorageClient { const HEADER_CONFIG: HeaderConfig = HeaderConfig { etag_required: true, last_modified_required: true, - version_header: Some("x-goog-generation"), + version_header: Some(VERSION_HEADER), }; /// Perform a get request diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 0eb3e9c23c43..d3a84f838c03 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -108,8 +108,7 @@ impl PutPart for GCSMultipartUpload { #[async_trait] impl ObjectStore for GoogleCloudStorage { async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let e_tag = self.client.put_request(location, bytes, &()).await?; - Ok(PutResult { e_tag: Some(e_tag) }) + self.client.put_request(location, bytes, &()).await } async fn put_multipart( diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 8f61011ccae1..3929af9fd1f9 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -103,7 +103,11 @@ impl ObjectStore for HttpStore { Err(crate::client::header::Error::MissingEtag) => None, Err(source) => return Err(Error::Metadata { source }.into()), }; - Ok(PutResult { e_tag }) + + Ok(PutResult { + e_tag, + version: None, + }) } async fn put_multipart( diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 9a0667229803..6d53c3e1afd0 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -840,10 +840,12 @@ impl GetResult { /// Result for a put request #[derive(Debug, Clone, PartialEq, Eq)] pub struct PutResult { - /// The unique identifier for the object + /// The unique identifier for the newly created object /// /// pub e_tag: Option, + /// A version indicator for the newly created object + pub version: Option, } /// A specialized `Result` for object store-related errors diff --git a/object_store/src/local.rs b/object_store/src/local.rs index ce9aa4683499..3df486d506ae 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -293,6 +293,7 @@ impl ObjectStore for LocalFileSystem { Ok(PutResult { e_tag: Some(get_etag(&metadata)), + version: None, }) }) .await diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 8b9522e48de8..3401ceb7c463 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -127,6 +127,7 @@ impl ObjectStore for InMemory { let etag = self.storage.write().insert(location, bytes); Ok(PutResult { e_tag: Some(etag.to_string()), + version: None, }) } From c62a6780f0693408fe2aaf5c7f996796ed628178 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 11:52:43 +0100 Subject: [PATCH 02/12] Conditional Put (#4879) --- object_store/src/aws/mod.rs | 11 ++- object_store/src/azure/client.rs | 38 +++++++-- object_store/src/azure/mod.rs | 8 +- object_store/src/chunked.rs | 7 +- object_store/src/gcp/client.rs | 67 +++++++++++---- object_store/src/gcp/mod.rs | 8 +- object_store/src/http/client.rs | 27 +++++- object_store/src/http/mod.rs | 7 +- object_store/src/lib.rs | 119 ++++++++++++++++++++++++++- object_store/src/limit.rs | 6 +- object_store/src/local.rs | 47 ++++++++--- object_store/src/memory.rs | 66 +++++++++++++-- object_store/src/prefix.rs | 8 +- object_store/src/throttle.rs | 9 +- object_store/tests/get_range_file.rs | 32 +++---- 15 files changed, 375 insertions(+), 85 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 2254fb54cedb..1a64e6aa0876 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -47,8 +47,8 @@ use crate::client::CredentialProvider; use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; use crate::signer::Signer; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult, - Result, + Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, + PutOptions, PutResult, Result, }; mod builder; @@ -158,8 +158,11 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.client.put_request(location, bytes, &()).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + match opts.mode { + PutMode::Overwrite => self.client.put_request(location, bytes, &()).await, + PutMode::Create | PutMode::Update(_) => Err(Error::NotImplemented), + } } async fn put_multipart( diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index e99cfb0ce6bd..94d81cca2f93 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -27,7 +27,8 @@ use crate::multipart::PartId; use crate::path::DELIMITER; use crate::util::deserialize_rfc1123; use crate::{ - ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig, + ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult, + Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; @@ -37,11 +38,11 @@ use chrono::{DateTime, Utc}; use itertools::Itertools; use reqwest::header::CONTENT_TYPE; use reqwest::{ - header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH}, + header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, Response, StatusCode, }; use serde::{Deserialize, Serialize}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::HashMap; use std::sync::Arc; use url::Url; @@ -94,6 +95,9 @@ pub(crate) enum Error { Metadata { source: crate::client::header::Error, }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for crate::Error { @@ -163,6 +167,7 @@ impl AzureClient { path: &Path, bytes: Option, is_block_op: bool, + opts: PutOptions, query: &T, ) -> Result { let credential = self.get_credential().await?; @@ -171,9 +176,7 @@ impl AzureClient { let mut builder = self.client.request(Method::PUT, url); if !is_block_op { - builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query); - } else { - builder = builder.query(query); + builder = builder.header(&BLOB_TYPE, "BlockBlob"); } if let Some(value) = self.config().client_options.get_content_type(path) { @@ -188,7 +191,14 @@ impl AzureClient { builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0")); } + builder = match opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(IF_NONE_MATCH, "*"), + PutMode::Update(v) => builder.header(IF_MATCH, v.e_tag.context(MissingETagSnafu)?), + }; + let response = builder + .query(query) .with_azure_authorization(&credential, &self.config.account) .send_retry(&self.config.retry_config) .await @@ -200,8 +210,11 @@ impl AzureClient { } /// Make an Azure PUT request - pub async fn put_blob(&self, path: &Path, bytes: Bytes) -> Result { - let response = self.put_request(path, Some(bytes), false, &()).await?; + pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let response = self + .put_request(path, Some(bytes), false, opts, &()) + .await?; + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } @@ -214,6 +227,7 @@ impl AzureClient { path, Some(data), true, + PutOptions::default(), &[ ("comp", "block"), ("blockid", &BASE64_STANDARD.encode(block_id)), @@ -235,7 +249,13 @@ impl AzureClient { let block_xml = block_list.to_xml(); let response = self - .put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")]) + .put_request( + path, + Some(block_xml.into()), + true, + PutOptions::default(), + &[("comp", "blocklist")], + ) .await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 4b6e4fa44253..762a51dd9d60 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -29,7 +29,8 @@ use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -81,8 +82,8 @@ impl std::fmt::Display for MicrosoftAzure { #[async_trait] impl ObjectStore for MicrosoftAzure { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - Ok(self.client.put_blob(location, bytes).await?) + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.client.put_blob(location, bytes, opts).await } async fn put_multipart( @@ -199,6 +200,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, true).await; multipart(&integration, &integration).await; } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 021f9f50156b..d33556f4b12e 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -29,7 +29,8 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, + GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, + PutResult, }; use crate::{MultipartId, Result}; @@ -62,8 +63,8 @@ impl Display for ChunkedStore { #[async_trait] impl ObjectStore for ChunkedStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.inner.put(location, bytes).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.inner.put_opts(location, bytes, opts).await } async fn put_multipart( diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 5c3936c73fe4..98cb1704cb45 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -24,16 +24,22 @@ use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; -use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, Result, RetryConfig}; +use crate::{ + ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result, + RetryConfig, +}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::{header, Client, Method, Response, StatusCode}; use serde::Serialize; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; + +const VERSION_MATCH: &str = "x-goog-if-generation-match"; + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Error performing list request: {}", source))] @@ -79,6 +85,9 @@ enum Error { Metadata { source: crate::client::header::Error, }, + + #[snafu(display("Version required for conditional update"))] + MissingVersion, } impl From for crate::Error { @@ -157,6 +166,7 @@ impl GoogleCloudStorageClient { &self, path: &Path, payload: Bytes, + opts: PutOptions, query: &T, ) -> Result { let credential = self.get_credential().await?; @@ -168,21 +178,44 @@ impl GoogleCloudStorageClient { .get_content_type(path) .unwrap_or("application/octet-stream"); - let response = self - .client - .request(Method::PUT, url) - .query(query) + let mut builder = self.client.request(Method::PUT, url).query(query); + + builder = match &opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(VERSION_MATCH, 0), + PutMode::Update(v) => builder.header( + VERSION_MATCH, + v.version.as_ref().context(MissingVersionSnafu)?, + ), + }; + + let result = builder .bearer_auth(&credential.bearer) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, payload.len()) .body(payload) .send_retry(&self.config.retry_config) - .await - .context(PutRequestSnafu { - path: path.as_ref(), - })?; - - Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + .await; + + match result { + Ok(response) => { + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + } + Err(source) => { + return Err(match (opts.mode, source.status()) { + (PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => { + crate::Error::AlreadyExists { + source: Box::new(source), + path: path.to_string(), + } + } + _ => crate::Error::from(Error::PutRequest { + source, + path: path.to_string(), + }), + }); + } + } } /// Perform a put part request @@ -199,6 +232,7 @@ impl GoogleCloudStorageClient { .put_request( path, data, + PutOptions::default(), &[ ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), @@ -336,7 +370,7 @@ impl GoogleCloudStorageClient { .header("x-goog-copy-source", source); if if_not_exists { - builder = builder.header("x-goog-if-generation-match", 0); + builder = builder.header(VERSION_MATCH, 0); } builder @@ -377,13 +411,18 @@ impl GetClient for GoogleCloudStorageClient { false => Method::GET, }; - let mut request = self.client.request(method, url).with_get_options(options); + let mut request = self.client.request(method, url); + + if let Some(version) = &options.version { + request = request.query(&[("generation", version)]); + } if !credential.bearer.is_empty() { request = request.bearer_auth(&credential.bearer); } let response = request + .with_get_options(options) .send_retry(&self.config.retry_config) .await .context(GetRequestSnafu { diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index d3a84f838c03..43d1162adb43 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -35,7 +35,8 @@ use crate::client::CredentialProvider; use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -107,8 +108,8 @@ impl PutPart for GCSMultipartUpload { #[async_trait] impl ObjectStore for GoogleCloudStorage { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.client.put_request(location, bytes, &()).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.client.put_request(location, bytes, opts, &()).await } async fn put_multipart( @@ -220,6 +221,7 @@ mod test { multipart(&integration, &integration).await; // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; + put_opts(&integration, true).await; } } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index a7dbdfcbe844..dd90b0da5552 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -21,12 +21,12 @@ use crate::client::retry::{self, RetryConfig, RetryExt}; use crate::client::GetOptionsExt; use crate::path::{Path, DELIMITER}; use crate::util::deserialize_rfc1123; -use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; +use crate::{ClientOptions, GetOptions, ObjectMeta, PutMode, PutOptions, Result}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use percent_encoding::percent_decode_str; -use reqwest::header::CONTENT_TYPE; +use reqwest::header::{CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, Response, StatusCode}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; @@ -69,6 +69,9 @@ enum Error { path: String, source: crate::path::Error, }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for crate::Error { @@ -156,7 +159,7 @@ impl Client { Ok(()) } - pub async fn put(&self, location: &Path, bytes: Bytes) -> Result { + pub async fn put(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { let mut retry = false; loop { let url = self.path_url(location); @@ -165,6 +168,14 @@ impl Client { builder = builder.header(CONTENT_TYPE, value); } + let builder = match &opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(IF_NONE_MATCH, "*"), + PutMode::Update(v) => { + builder.header(IF_MATCH, v.e_tag.as_ref().context(MissingETagSnafu)?) + } + }; + match builder.send_retry(&self.retry_config).await { Ok(response) => return Ok(response), Err(source) => match source.status() { @@ -173,6 +184,12 @@ impl Client { retry = true; self.create_parent_directories(location).await? } + Some(StatusCode::NOT_MODIFIED) if matches!(opts.mode, PutMode::Create) => { + return Err(crate::Error::AlreadyExists { + path: location.to_string(), + source: Box::new(source), + }) + } _ => return Err(Error::Request { source }.into()), }, } @@ -243,6 +260,10 @@ impl Client { .header("Destination", self.path_url(to).as_str()); if !overwrite { + // While the Overwrite header appears to duplicate + // the functionality of the If-Match: * header of HTTP/1.1, If-Match + // applies only to the Request-URI, and not to the Destination of a COPY + // or MOVE. builder = builder.header("Overwrite", "F"); } diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 3929af9fd1f9..abe03e061746 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -46,7 +46,7 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutResult, Result, RetryConfig, + ObjectStore, PutOptions, PutResult, Result, RetryConfig, }; mod client; @@ -96,8 +96,8 @@ impl std::fmt::Display for HttpStore { #[async_trait] impl ObjectStore for HttpStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let response = self.client.put(location, bytes).await?; + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let response = self.client.put(location, bytes, opts).await?; let e_tag = match get_etag(response.headers()) { Ok(e_tag) => Some(e_tag), Err(crate::client::header::Error::MissingEtag) => None, @@ -264,5 +264,6 @@ mod tests { list_with_delimiter(&integration).await; rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; + put_opts(&integration, true).await; } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 6d53c3e1afd0..a7d402de8f51 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -299,7 +299,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// The operation is guaranteed to be atomic, it will either successfully /// write the entirety of `bytes` to `location`, or fail. No clients /// should be able to observe a partially written object - async fn put(&self, location: &Path, bytes: Bytes) -> Result; + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + self.put_opts(location, bytes, PutOptions::default()).await + } + + /// Save the provided bytes to the specified location with the given options + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result; /// Get a multi-part upload that allows writing data in chunks. /// @@ -531,6 +536,15 @@ macro_rules! as_ref_impl { self.as_ref().put(location, bytes).await } + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> Result { + self.as_ref().put_opts(location, bytes, opts).await + } + async fn put_multipart( &self, location: &Path, @@ -837,6 +851,56 @@ impl GetResult { } } +/// Configure preconditions for the put operation +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum PutMode { + /// Perform an atomic write operation, overwriting any object present at the provided path + #[default] + Overwrite, + /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an + /// object already exists at the provided path + Create, + /// Perform an atomic write operation if the current version of the object matches the + /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise + Update(UpdateVersion), +} + +/// Uniquely identifies a version of an object to update +/// +/// Stores will use differing combinations of `e_tag` and `version` to provide conditional +/// updates, and it is therefore recommended applications preserve both +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UpdateVersion { + /// The unique identifier for the newly created object + /// + /// + pub e_tag: Option, + /// A version indicator for the newly created object + pub version: Option, +} + +impl From for UpdateVersion { + fn from(value: PutResult) -> Self { + Self { + e_tag: value.e_tag, + version: value.version, + } + } +} + +/// Options for a put request +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct PutOptions { + /// Configure the [`PutMode`] for this operation + pub mode: PutMode, +} + +impl From for PutOptions { + fn from(mode: PutMode) -> Self { + Self { mode } + } +} + /// Result for a put request #[derive(Debug, Clone, PartialEq, Eq)] pub struct PutResult { @@ -1408,7 +1472,7 @@ mod tests { // Can retrieve previous version let get_opts = storage.get_opts(&path, options).await.unwrap(); let old = get_opts.bytes().await.unwrap(); - assert_eq!(old, b"foo".as_slice()); + assert_eq!(old, b"test".as_slice()); // Current version contains the updated data let current = storage.get(&path).await.unwrap().bytes().await.unwrap(); @@ -1416,6 +1480,57 @@ mod tests { } } + pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + let path = Path::from("put_opts"); + let v1 = storage + .put_opts(&path, "a".into(), PutMode::Create.into()) + .await + .unwrap(); + + let err = storage + .put_opts(&path, "b".into(), PutMode::Create.into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::AlreadyExists { .. }), "{err}"); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"a"); + + if !supports_update { + return; + } + + let v2 = storage + .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into()) + .await + .unwrap(); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"c"); + + let err = storage + .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + storage + .put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into()) + .await + .unwrap(); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"e"); + + // Update not exists + let path = Path::from("I don't exist"); + let err = storage + .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + } + /// Returns a chunk of length `chunk_length` fn get_chunk(chunk_length: usize) -> Bytes { let mut data = vec![0_u8; chunk_length]; diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index cd01a964dc3e..39cc605c4768 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -19,7 +19,7 @@ use crate::{ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, PutResult, Result, StreamExt, + ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, }; use async_trait::async_trait; use bytes::Bytes; @@ -77,6 +77,10 @@ impl ObjectStore for LimitStore { self.inner.put(location, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.put_opts(location, bytes, opts).await + } async fn put_multipart( &self, location: &Path, diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 3df486d506ae..919baf71b0a8 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -20,7 +20,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutResult, Result, + PutMode, PutOptions, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -271,20 +271,44 @@ impl Config { #[async_trait] impl ObjectStore for LocalFileSystem { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + if matches!(opts.mode, PutMode::Update(_)) { + return Err(crate::Error::NotImplemented); + } + let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, suffix) = new_staged_upload(&path)?; let staging_path = staged_upload_path(&path, &suffix); - file.write_all(&bytes) - .context(UnableToCopyDataToFileSnafu) - .and_then(|_| { - std::fs::rename(&staging_path, &path).context(UnableToRenameFileSnafu) - }) - .map_err(|e| { - let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup - e - })?; + + let err = match file.write_all(&bytes) { + Ok(_) => match opts.mode { + PutMode::Overwrite => match std::fs::rename(&staging_path, &path) { + Ok(_) => None, + Err(source) => Some(Error::UnableToRenameFile { source }), + }, + PutMode::Create => match std::fs::hard_link(&staging_path, &path) { + Ok(_) => { + let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup + None + } + Err(source) => match source.kind() { + ErrorKind::AlreadyExists => Some(Error::AlreadyExists { + path: path.to_str().unwrap().to_string(), + source, + }), + _ => Some(Error::UnableToRenameFile { source }), + }, + }, + PutMode::Update(_) => unreachable!(), + }, + Err(source) => Some(Error::UnableToCopyDataToFile { source }), + }; + + if let Some(err) = err { + let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup + return Err(err.into()); + } let metadata = file.metadata().map_err(|e| Error::Metadata { source: e.into(), @@ -1055,6 +1079,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, false).await; } #[test] diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 3401ceb7c463..9d79a798ad1f 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -17,7 +17,8 @@ //! An in-memory object store implementation use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, + PutOptions, PutResult, Result, UpdateVersion, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -52,6 +53,9 @@ enum Error { #[snafu(display("Object already exists at that location: {path}"))] AlreadyExists { path: String }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for super::Error { @@ -110,9 +114,50 @@ impl Storage { let etag = self.next_etag; self.next_etag += 1; let entry = Entry::new(bytes, Utc::now(), etag); - self.map.insert(location.clone(), entry); + self.overwrite(location, entry); etag } + + fn overwrite(&mut self, location: &Path, entry: Entry) { + self.map.insert(location.clone(), entry); + } + + fn create(&mut self, location: &Path, entry: Entry) -> Result<()> { + use std::collections::btree_map; + match self.map.entry(location.clone()) { + btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists { + path: location.to_string(), + } + .into()), + btree_map::Entry::Vacant(v) => { + v.insert(entry); + Ok(()) + } + } + } + + fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> { + match self.map.get_mut(location) { + // Return Precondition instead of NotFound for consistency with stores + None => Err(crate::Error::Precondition { + path: location.to_string(), + source: format!("Object at location {location} not found").into(), + }), + Some(e) => { + let existing = e.e_tag.to_string(); + let expected = v.e_tag.context(MissingETagSnafu)?; + if existing == expected { + *e = entry; + Ok(()) + } else { + Err(crate::Error::Precondition { + path: location.to_string(), + source: format!("{existing} does not match {expected}").into(), + }) + } + } + } + } } impl std::fmt::Display for InMemory { @@ -123,8 +168,18 @@ impl std::fmt::Display for InMemory { #[async_trait] impl ObjectStore for InMemory { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let etag = self.storage.write().insert(location, bytes); + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let mut storage = self.storage.write(); + let etag = storage.next_etag; + let entry = Entry::new(bytes, Utc::now(), etag); + + match opts.mode { + PutMode::Overwrite => storage.overwrite(location, entry), + PutMode::Create => storage.create(location, entry)?, + PutMode::Update(v) => storage.update(location, v, entry)?, + } + storage.next_etag += 1; + Ok(PutResult { e_tag: Some(etag.to_string()), version: None, @@ -426,7 +481,7 @@ impl AsyncWrite for InMemoryAppend { fn poll_shutdown( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { self.poll_flush(cx) } } @@ -450,6 +505,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, true).await; } #[tokio::test] diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index b5bff8b12dd7..68101307fbdf 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -23,7 +23,8 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; #[doc(hidden)] @@ -85,6 +86,11 @@ impl ObjectStore for PrefixStore { self.inner.put(&full_path, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let full_path = self.full_path(location); + self.inner.put_opts(&full_path, bytes, opts).await + } + async fn put_multipart( &self, location: &Path, diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index c5521256b8a6..dcd2c04bcf05 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -21,7 +21,8 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, + PutResult, Result, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -149,10 +150,14 @@ impl std::fmt::Display for ThrottledStore { impl ObjectStore for ThrottledStore { async fn put(&self, location: &Path, bytes: Bytes) -> Result { sleep(self.config().wait_put_per_call).await; - self.inner.put(location, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + sleep(self.config().wait_put_per_call).await; + self.inner.put_opts(location, bytes, opts).await + } + async fn put_multipart( &self, _location: &Path, diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 3fa1cc7104b3..85231a5a5b9b 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -22,9 +22,7 @@ use bytes::Bytes; use futures::stream::BoxStream; use object_store::local::LocalFileSystem; use object_store::path::Path; -use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, -}; +use object_store::*; use std::fmt::Formatter; use tempfile::tempdir; use tokio::io::AsyncWrite; @@ -40,50 +38,42 @@ impl std::fmt::Display for MyStore { #[async_trait] impl ObjectStore for MyStore { - async fn put(&self, path: &Path, data: Bytes) -> object_store::Result { - self.0.put(path, data).await + async fn put_opts(&self, path: &Path, data: Bytes, opts: PutOptions) -> Result { + self.0.put_opts(path, data, opts).await } async fn put_multipart( &self, _: &Path, - ) -> object_store::Result<(MultipartId, Box)> { + ) -> Result<(MultipartId, Box)> { todo!() } - async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> object_store::Result<()> { + async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> { todo!() } - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.0.get_opts(location, options).await } - async fn head(&self, _: &Path) -> object_store::Result { - todo!() - } - - async fn delete(&self, _: &Path) -> object_store::Result<()> { + async fn delete(&self, _: &Path) -> Result<()> { todo!() } - fn list(&self, _: Option<&Path>) -> BoxStream<'_, object_store::Result> { + fn list(&self, _: Option<&Path>) -> BoxStream<'_, Result> { todo!() } - async fn list_with_delimiter(&self, _: Option<&Path>) -> object_store::Result { + async fn list_with_delimiter(&self, _: Option<&Path>) -> Result { todo!() } - async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> { + async fn copy(&self, _: &Path, _: &Path) -> Result<()> { todo!() } - async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> { + async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> Result<()> { todo!() } } From f8613395b50d849b8bf332e83c63f346bd5acbd3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 18:02:41 +0100 Subject: [PATCH 03/12] Don't support HttpStore --- object_store/src/http/client.rs | 10 +--------- object_store/src/http/mod.rs | 10 +++++++--- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index dd90b0da5552..db07692e112f 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -159,7 +159,7 @@ impl Client { Ok(()) } - pub async fn put(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + pub async fn put(&self, location: &Path, bytes: Bytes) -> Result { let mut retry = false; loop { let url = self.path_url(location); @@ -168,14 +168,6 @@ impl Client { builder = builder.header(CONTENT_TYPE, value); } - let builder = match &opts.mode { - PutMode::Overwrite => builder, - PutMode::Create => builder.header(IF_NONE_MATCH, "*"), - PutMode::Update(v) => { - builder.header(IF_MATCH, v.e_tag.as_ref().context(MissingETagSnafu)?) - } - }; - match builder.send_retry(&self.retry_config).await { Ok(response) => return Ok(response), Err(source) => match source.status() { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index abe03e061746..cfcde27fd781 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -46,7 +46,7 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutOptions, PutResult, Result, RetryConfig, + ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, }; mod client; @@ -97,7 +97,12 @@ impl std::fmt::Display for HttpStore { #[async_trait] impl ObjectStore for HttpStore { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { - let response = self.client.put(location, bytes, opts).await?; + if opts.mode != PutMode::Overwrite { + // TODO: Add support for If header - https://datatracker.ietf.org/doc/html/rfc2518#section-9.4 + return Err(crate::Error::NotImplemented); + } + + let response = self.client.put(location, bytes).await?; let e_tag = match get_etag(response.headers()) { Ok(e_tag) => Some(e_tag), Err(crate::client::header::Error::MissingEtag) => None, @@ -264,6 +269,5 @@ mod tests { list_with_delimiter(&integration).await; rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; - put_opts(&integration, true).await; } } From 642a77eeed1d56a2f1714ff632553473939d8633 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 18:45:02 +0100 Subject: [PATCH 04/12] Add R2 Support --- object_store/src/aws/builder.rs | 30 +++++++++++-- object_store/src/aws/client.rs | 11 ++++- object_store/src/aws/mod.rs | 37 ++++++++++++--- .../src/aws/{copy.rs => precondition.rs} | 45 ++++++++++++++++++- object_store/src/lib.rs | 1 + 5 files changed, 113 insertions(+), 11 deletions(-) rename object_store/src/aws/{copy.rs => precondition.rs} (68%) diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs index 75a5299a0859..79ea75b5aba2 100644 --- a/object_store/src/aws/builder.rs +++ b/object_store/src/aws/builder.rs @@ -20,7 +20,8 @@ use crate::aws::credential::{ InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider, }; use crate::aws::{ - AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3CopyIfNotExists, STORE, + AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists, + STORE, }; use crate::client::TokenCredentialProvider; use crate::config::ConfigValue; @@ -152,6 +153,8 @@ pub struct AmazonS3Builder { skip_signature: ConfigValue, /// Copy if not exists copy_if_not_exists: Option>, + /// Put precondition + conditional_put: Option>, } /// Configuration keys for [`AmazonS3Builder`] @@ -288,6 +291,11 @@ pub enum AmazonS3ConfigKey { /// See [`S3CopyIfNotExists`] CopyIfNotExists, + /// Configure how to provide conditional put operations + /// + /// See [`S3ConditionalPut`] + ConditionalPut, + /// Skip signing request SkipSignature, @@ -312,7 +320,8 @@ impl AsRef for AmazonS3ConfigKey { Self::Checksum => "aws_checksum_algorithm", Self::ContainerCredentialsRelativeUri => "aws_container_credentials_relative_uri", Self::SkipSignature => "aws_skip_signature", - Self::CopyIfNotExists => "copy_if_not_exists", + Self::CopyIfNotExists => "aws_copy_if_not_exists", + Self::ConditionalPut => "aws_conditional_put", Self::Client(opt) => opt.as_ref(), } } @@ -339,7 +348,8 @@ impl FromStr for AmazonS3ConfigKey { "aws_checksum_algorithm" | "checksum_algorithm" => Ok(Self::Checksum), "aws_container_credentials_relative_uri" => Ok(Self::ContainerCredentialsRelativeUri), "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature), - "copy_if_not_exists" => Ok(Self::CopyIfNotExists), + "aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists), + "aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut), // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), _ => match s.parse() { @@ -446,6 +456,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::CopyIfNotExists => { self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into())) } + AmazonS3ConfigKey::ConditionalPut => { + self.conditional_put = Some(ConfigValue::Deferred(value.into())) + } }; self } @@ -509,6 +522,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::CopyIfNotExists => { self.copy_if_not_exists.as_ref().map(ToString::to_string) } + AmazonS3ConfigKey::ConditionalPut => { + self.conditional_put.as_ref().map(ToString::to_string) + } } } @@ -713,6 +729,12 @@ impl AmazonS3Builder { self } + /// Configure how to provide conditional put operations + pub fn with_conditional_put(mut self, config: S3ConditionalPut) -> Self { + self.conditional_put = Some(config.into()); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result { @@ -724,6 +746,7 @@ impl AmazonS3Builder { let region = self.region.context(MissingRegionSnafu)?; let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?; let copy_if_not_exists = self.copy_if_not_exists.map(|x| x.get()).transpose()?; + let put_precondition = self.conditional_put.map(|x| x.get()).transpose()?; let credentials = if let Some(credentials) = self.credentials { credentials @@ -830,6 +853,7 @@ impl AmazonS3Builder { skip_signature: self.skip_signature.get()?, checksum, copy_if_not_exists, + conditional_put: put_precondition, }; let client = Arc::new(S3Client::new(config)?); diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 9fdcc6c64934..46be1e189d62 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -17,7 +17,9 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; -use crate::aws::{AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET}; +use crate::aws::{ + AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, +}; use crate::client::get::GetClient; use crate::client::header::HeaderConfig; use crate::client::header::{get_put_result, get_version}; @@ -227,6 +229,7 @@ pub struct S3Config { pub skip_signature: bool, pub checksum: Option, pub copy_if_not_exists: Option, + pub conditional_put: Option, } impl S3Config { @@ -267,6 +270,7 @@ impl S3Client { path: &Path, bytes: Bytes, query: &T, + header: Option<(&str, &str)>, ) -> Result { let credential = self.get_credential().await?; let url = self.config.path_url(path); @@ -290,6 +294,10 @@ impl S3Client { builder = builder.header(CONTENT_TYPE, value); } + if let Some((k, v)) = header { + builder = builder.header(k, v); + } + let response = builder .query(query) .with_aws_sigv4( @@ -534,6 +542,7 @@ impl S3Client { path, data, &[("partNumber", &part), ("uploadId", upload_id)], + None, ) .await?; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 1a64e6aa0876..b04c26968253 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -54,13 +54,13 @@ use crate::{ mod builder; mod checksum; mod client; -mod copy; mod credential; +mod precondition; mod resolve; pub use builder::{AmazonS3Builder, AmazonS3ConfigKey}; pub use checksum::Checksum; -pub use copy::S3CopyIfNotExists; +pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; pub use resolve::resolve_bucket_region; // http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html @@ -159,9 +159,32 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { - match opts.mode { - PutMode::Overwrite => self.client.put_request(location, bytes, &()).await, - PutMode::Create | PutMode::Update(_) => Err(Error::NotImplemented), + match (opts.mode, &self.client.config().conditional_put) { + (PutMode::Overwrite, _) => self.client.put_request(location, bytes, &(), None).await, + (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), + (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { + let header = Some(("If-None-Match", "*")); + match self.client.put_request(location, bytes, &(), header).await { + // Technically If-None-Match should return NotModified but some stores, + // such as R2, instead return PreconditionFailed + // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject + Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => { + Err(Error::AlreadyExists { + path: location.to_string(), + source: Box::new(e), + }) + } + r => r, + } + } + (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => { + let etag = v.e_tag.ok_or_else(|| Error::Generic { + store: STORE, + source: "ETag required for conditional put".to_string().into(), + })?; + let header = Some(("If-Match", etag.as_str())); + self.client.put_request(location, bytes, &(), header).await + } } } @@ -308,6 +331,7 @@ mod tests { let config = integration.client.config(); let is_local = config.endpoint.starts_with("http://"); let test_not_exists = config.copy_if_not_exists.is_some(); + let test_conditional_put = config.conditional_put.is_some(); // Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328 put_get_delete_list_opts(&integration, is_local).await; @@ -321,6 +345,9 @@ mod tests { if test_not_exists { copy_if_not_exists(&integration).await; } + if test_conditional_put { + put_opts(&integration, true).await; + } // run integration test with unsigned payload enabled let builder = AmazonS3Builder::from_env().with_unsigned_payload(true); diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/precondition.rs similarity index 68% rename from object_store/src/aws/copy.rs rename to object_store/src/aws/precondition.rs index da4e2809be1a..a50b57fe23f7 100644 --- a/object_store/src/aws/copy.rs +++ b/object_store/src/aws/precondition.rs @@ -17,8 +17,7 @@ use crate::config::Parse; -/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for -/// [`AmazonS3`]. +/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`]. /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists /// [`AmazonS3`]: super::AmazonS3 @@ -70,3 +69,45 @@ impl Parse for S3CopyIfNotExists { }) } } + +/// Configure how to provide conditional put support for [`AmazonS3`]. +/// +/// [`AmazonS3`]: super::AmazonS3 +#[derive(Debug, Clone)] +#[allow(missing_copy_implementations)] +#[non_exhaustive] +pub enum S3ConditionalPut { + /// Some S3-compatible stores, such as Cloudflare R2 and minio support conditional + /// put using the standard [HTTP precondition] headers If-Match and If-None-Match + /// + /// Encoded as `etag` ignoring whitespace + /// + /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions + ETagMatch, +} + +impl std::fmt::Display for S3ConditionalPut { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ETagMatch => write!(f, "etag"), + } + } +} + +impl S3ConditionalPut { + fn from_str(s: &str) -> Option { + match s.trim() { + "etag" => Some(Self::ETagMatch), + _ => None, + } + } +} + +impl Parse for S3ConditionalPut { + fn parse(v: &str) -> crate::Result { + Self::from_str(v).ok_or_else(|| crate::Error::Generic { + store: "Config", + source: format!("Failed to parse \"{v}\" as S3PutConditional").into(), + }) + } +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index a7d402de8f51..13526c336880 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1481,6 +1481,7 @@ mod tests { } pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + delete_fixtures(storage).await; let path = Path::from("put_opts"); let v1 = storage .put_opts(&path, "a".into(), PutMode::Create.into()) From 57707a537c93b1518a83b4bd7024db9ddace7982 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 18:59:36 +0100 Subject: [PATCH 05/12] Update Azure StatusCode --- object_store/src/azure/client.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 94d81cca2f93..b6ababa2d016 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -191,22 +191,35 @@ impl AzureClient { builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0")); } - builder = match opts.mode { + builder = match &opts.mode { PutMode::Overwrite => builder, PutMode::Create => builder.header(IF_NONE_MATCH, "*"), - PutMode::Update(v) => builder.header(IF_MATCH, v.e_tag.context(MissingETagSnafu)?), + PutMode::Update(v) => { + builder.header(IF_MATCH, v.e_tag.as_ref().context(MissingETagSnafu)?) + } }; - let response = builder + let result = builder .query(query) .with_azure_authorization(&credential, &self.config.account) .send_retry(&self.config.retry_config) - .await - .context(PutRequestSnafu { - path: path.as_ref(), - })?; - - Ok(response) + .await; + + match result { + Ok(response) => Ok(response), + Err(source) => { + return Err(match (opts.mode, source.status()) { + (PutMode::Create, Some(StatusCode::CONFLICT)) => crate::Error::AlreadyExists { + source: Box::new(source), + path: path.to_string(), + }, + _ => crate::Error::from(Error::PutRequest { + source, + path: path.to_string(), + }), + }); + } + } } /// Make an Azure PUT request From fba437d5f996e19a7fa60b276c821bc00a82308a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 19:03:56 +0100 Subject: [PATCH 06/12] Fixes --- object_store/src/http/client.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index db07692e112f..8700775fb243 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -21,12 +21,12 @@ use crate::client::retry::{self, RetryConfig, RetryExt}; use crate::client::GetOptionsExt; use crate::path::{Path, DELIMITER}; use crate::util::deserialize_rfc1123; -use crate::{ClientOptions, GetOptions, ObjectMeta, PutMode, PutOptions, Result}; +use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use percent_encoding::percent_decode_str; -use reqwest::header::{CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}; +use reqwest::header::CONTENT_TYPE; use reqwest::{Method, Response, StatusCode}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; @@ -69,9 +69,6 @@ enum Error { path: String, source: crate::path::Error, }, - - #[snafu(display("ETag required for conditional update"))] - MissingETag, } impl From for crate::Error { @@ -176,12 +173,6 @@ impl Client { retry = true; self.create_parent_directories(location).await? } - Some(StatusCode::NOT_MODIFIED) if matches!(opts.mode, PutMode::Create) => { - return Err(crate::Error::AlreadyExists { - path: location.to_string(), - source: Box::new(source), - }) - } _ => return Err(Error::Request { source }.into()), }, } From f961f4ac572bc08e3846d4e07d9f54cdb19112ed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 19:05:01 +0100 Subject: [PATCH 07/12] Clippy --- object_store/src/azure/client.rs | 22 ++++++++++------------ object_store/src/gcp/client.rs | 24 +++++++++++------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index b6ababa2d016..4bfb3ae33835 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -207,18 +207,16 @@ impl AzureClient { match result { Ok(response) => Ok(response), - Err(source) => { - return Err(match (opts.mode, source.status()) { - (PutMode::Create, Some(StatusCode::CONFLICT)) => crate::Error::AlreadyExists { - source: Box::new(source), - path: path.to_string(), - }, - _ => crate::Error::from(Error::PutRequest { - source, - path: path.to_string(), - }), - }); - } + Err(source) => Err(match (opts.mode, source.status()) { + (PutMode::Create, Some(StatusCode::CONFLICT)) => crate::Error::AlreadyExists { + source: Box::new(source), + path: path.to_string(), + }, + _ => crate::Error::from(Error::PutRequest { + source, + path: path.to_string(), + }), + }), } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 98cb1704cb45..0ce2b70c37a3 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -201,20 +201,18 @@ impl GoogleCloudStorageClient { Ok(response) => { Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } - Err(source) => { - return Err(match (opts.mode, source.status()) { - (PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => { - crate::Error::AlreadyExists { - source: Box::new(source), - path: path.to_string(), - } - } - _ => crate::Error::from(Error::PutRequest { - source, + Err(source) => Err(match (opts.mode, source.status()) { + (PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => { + crate::Error::AlreadyExists { + source: Box::new(source), path: path.to_string(), - }), - }); - } + } + } + _ => crate::Error::from(Error::PutRequest { + source, + path: path.to_string(), + }), + }), } } From be3499397ca7a218e4f3af7ae4e78920336278ea Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 19:11:34 +0100 Subject: [PATCH 08/12] Clippy --- object_store/src/client/header.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index 38aca8e2f013..e85bf6ba52d0 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -18,7 +18,7 @@ //! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure use crate::path::Path; -use crate::{ObjectMeta, PutResult}; +use crate::ObjectMeta; use chrono::{DateTime, TimeZone, Utc}; use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; use hyper::HeaderMap; @@ -67,14 +67,16 @@ pub enum Error { }, } -/// Extracts a [`PutResult`] from the provided [`HeaderMap`] -pub fn get_put_result(headers: &HeaderMap, version: &str) -> Result { +/// Extracts a PutResult from the provided [`HeaderMap`] +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub fn get_put_result(headers: &HeaderMap, version: &str) -> Result { let e_tag = Some(get_etag(headers)?); let version = get_version(headers, version)?; - Ok(PutResult { e_tag, version }) + Ok(crate::PutResult { e_tag, version }) } /// Extracts a optional version from the provided [`HeaderMap`] +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub fn get_version(headers: &HeaderMap, version: &str) -> Result, Error> { Ok(match headers.get(version) { Some(x) => Some(x.to_str().context(BadHeaderSnafu)?.to_string()), From 56d95d7f6dfe309a6f21a58341bddd7fda52e5c8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 25 Oct 2023 21:53:52 +0100 Subject: [PATCH 09/12] PutRequestBuilder --- object_store/src/aws/client.rs | 161 +++++++-------- object_store/src/aws/mod.rs | 10 +- object_store/src/azure/client.rs | 142 ++++++------- object_store/src/client/mod.rs | 2 +- object_store/src/client/retry.rs | 4 + .../src/client/{list_response.rs => s3.rs} | 46 ++++- object_store/src/gcp/client.rs | 188 +++++++++--------- object_store/src/gcp/mod.rs | 2 +- 8 files changed, 290 insertions(+), 265 deletions(-) rename object_store/src/client/{list_response.rs => s3.rs} (68%) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 46be1e189d62..20c2a96b57cd 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -24,8 +24,11 @@ use crate::client::get::GetClient; use crate::client::header::HeaderConfig; use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; -use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; +use crate::client::s3::{ + CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, + ListResponse, +}; use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; @@ -36,12 +39,13 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; +use hyper::http::HeaderName; use itertools::Itertools; use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{ header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, Response, StatusCode, + Client as ReqwestClient, Method, RequestBuilder, Response, StatusCode, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -151,33 +155,6 @@ impl From for crate::Error { } } -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct InitiateMultipart { - upload_id: String, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUpload")] -struct CompleteMultipart { - part: Vec, -} - -#[derive(Debug, Serialize)] -struct MultipartPart { - #[serde(rename = "ETag")] - e_tag: String, - #[serde(rename = "PartNumber")] - part_number: usize, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUploadResult")] -struct CompleteMultipartResult { - #[serde(rename = "ETag")] - e_tag: String, -} - #[derive(Deserialize)] #[serde(rename_all = "PascalCase", rename = "DeleteResult")] struct BatchDeleteResponse { @@ -236,6 +213,54 @@ impl S3Config { pub(crate) fn path_url(&self, path: &Path) -> String { format!("{}/{}", self.bucket_endpoint, encode_path(path)) } + + async fn get_credential(&self) -> Result>> { + Ok(match self.skip_signature { + false => Some(self.credentials.get_credential().await?), + true => None, + }) + } +} + +/// A builder for a put request allowing customisation of the headers and query string +pub(crate) struct PutRequest<'a> { + path: &'a Path, + config: &'a S3Config, + builder: RequestBuilder, + payload_sha256: Option>, +} + +impl<'a> PutRequest<'a> { + pub fn query(self, query: &T) -> Self { + let builder = self.builder.query(query); + Self { builder, ..self } + } + + pub fn header(self, k: &HeaderName, v: &str) -> Self { + let builder = self.builder.header(k, v); + Self { builder, ..self } + } + + pub async fn send(self) -> Result { + let credential = self.config.get_credential().await?; + + let response = self + .builder + .with_aws_sigv4( + credential.as_deref(), + &self.config.region, + "s3", + self.config.sign_payload, + self.payload_sha256.as_deref(), + ) + .send_retry(&self.config.retry_config) + .await + .context(PutRequestSnafu { + path: self.path.as_ref(), + })?; + + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + } } #[derive(Debug)] @@ -255,24 +280,10 @@ impl S3Client { &self.config } - async fn get_credential(&self) -> Result>> { - Ok(match self.config.skip_signature { - false => Some(self.config.credentials.get_credential().await?), - true => None, - }) - } - /// Make an S3 PUT request /// /// Returns the ETag - pub async fn put_request( - &self, - path: &Path, - bytes: Bytes, - query: &T, - header: Option<(&str, &str)>, - ) -> Result { - let credential = self.get_credential().await?; + pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> PutRequest<'a> { let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); let mut payload_sha256 = None; @@ -294,26 +305,12 @@ impl S3Client { builder = builder.header(CONTENT_TYPE, value); } - if let Some((k, v)) = header { - builder = builder.header(k, v); + PutRequest { + path, + builder, + payload_sha256, + config: &self.config, } - - let response = builder - .query(query) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - payload_sha256.as_deref(), - ) - .send_retry(&self.config.retry_config) - .await - .context(PutRequestSnafu { - path: path.as_ref(), - })?; - - Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// Make an S3 Delete request @@ -322,7 +319,7 @@ impl S3Client { path: &Path, query: &T, ) -> Result<()> { - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = self.config.path_url(path); self.client @@ -356,7 +353,7 @@ impl S3Client { return Ok(Vec::new()); } - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = format!("{}?delete", self.config.bucket_endpoint); let mut buffer = Vec::new(); @@ -454,7 +451,7 @@ impl S3Client { /// Make an S3 Copy request pub async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = self.config.path_url(to); let source = format!("{}/{}", self.config.bucket, encode_path(from)); @@ -502,7 +499,7 @@ impl S3Client { } pub async fn create_multipart(&self, location: &Path) -> Result { - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = format!("{}?uploads=", self.config.path_url(location),); let response = self @@ -522,7 +519,7 @@ impl S3Client { .await .context(CreateMultipartResponseBodySnafu)?; - let response: InitiateMultipart = + let response: InitiateMultipartUploadResult = quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?; Ok(response.upload_id) @@ -538,12 +535,9 @@ impl S3Client { let part = (part_idx + 1).to_string(); let result = self - .put_request( - path, - data, - &[("partNumber", &part), ("uploadId", upload_id)], - None, - ) + .put_request(path, data) + .query(&[("partNumber", &part), ("uploadId", upload_id)]) + .send() .await?; Ok(PartId { @@ -557,19 +551,10 @@ impl S3Client { upload_id: &str, parts: Vec, ) -> Result { - let parts = parts - .into_iter() - .enumerate() - .map(|(part_idx, part)| MultipartPart { - e_tag: part.content_id, - part_number: part_idx + 1, - }) - .collect(); - - let request = CompleteMultipart { part: parts }; + let request = CompleteMultipartUpload::from(parts); let body = quick_xml::se::to_string(&request).unwrap(); - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = self.config.path_url(location); let response = self @@ -595,7 +580,7 @@ impl S3Client { .await .context(CompleteMultipartResponseBodySnafu)?; - let response: CompleteMultipartResult = + let response: CompleteMultipartUploadResult = quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?; Ok(PutResult { @@ -617,7 +602,7 @@ impl GetClient for S3Client { /// Make an S3 GET request async fn get_request(&self, path: &Path, options: GetOptions) -> Result { - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = self.config.path_url(path); let method = match options.head { true => Method::HEAD, @@ -659,7 +644,7 @@ impl ListClient for S3Client { token: Option<&str>, offset: Option<&str>, ) -> Result<(ListResult, Option)> { - let credential = self.get_credential().await?; + let credential = self.config.get_credential().await?; let url = self.config.bucket_endpoint.clone(); let mut query = Vec::with_capacity(4); diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b04c26968253..99e637695059 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -35,6 +35,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; +use reqwest::header::{IF_MATCH, IF_NONE_MATCH}; use reqwest::Method; use std::{sync::Arc, time::Duration}; use tokio::io::AsyncWrite; @@ -159,12 +160,12 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let request = self.client.put_request(location, bytes); match (opts.mode, &self.client.config().conditional_put) { - (PutMode::Overwrite, _) => self.client.put_request(location, bytes, &(), None).await, + (PutMode::Overwrite, _) => request.send().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { - let header = Some(("If-None-Match", "*")); - match self.client.put_request(location, bytes, &(), header).await { + match request.header(&IF_NONE_MATCH, "*").send().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject @@ -182,8 +183,7 @@ impl ObjectStore for AmazonS3 { store: STORE, source: "ETag required for conditional put".to_string().into(), })?; - let header = Some(("If-Match", etag.as_str())); - self.client.put_request(location, bytes, &(), header).await + request.header(&IF_MATCH, etag.as_str()).send().await } } } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 4bfb3ae33835..c7bd79149872 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -35,11 +35,12 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; +use hyper::http::HeaderName; use itertools::Itertools; use reqwest::header::CONTENT_TYPE; use reqwest::{ header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH}, - Client as ReqwestClient, Method, Response, StatusCode, + Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -140,6 +141,39 @@ impl AzureConfig { } } +/// A builder for a put request allowing customisation of the headers and query string +struct PutRequest<'a> { + path: &'a Path, + config: &'a AzureConfig, + builder: RequestBuilder, +} + +impl<'a> PutRequest<'a> { + fn header(self, k: &HeaderName, v: &str) -> Self { + let builder = self.builder.header(k, v); + Self { builder, ..self } + } + + fn query(self, query: &T) -> Self { + let builder = self.builder.query(query); + Self { builder, ..self } + } + + async fn send(self) -> Result { + let credential = self.config.credentials.get_credential().await?; + let response = self + .builder + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(PutRequestSnafu { + path: self.path.as_ref(), + })?; + + Ok(response) + } +} + #[derive(Debug)] pub(crate) struct AzureClient { config: AzureConfig, @@ -162,89 +196,52 @@ impl AzureClient { self.config.credentials.get_credential().await } - pub async fn put_request( - &self, - path: &Path, - bytes: Option, - is_block_op: bool, - opts: PutOptions, - query: &T, - ) -> Result { - let credential = self.get_credential().await?; + fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> PutRequest<'a> { let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); - if !is_block_op { - builder = builder.header(&BLOB_TYPE, "BlockBlob"); - } - if let Some(value) = self.config().client_options.get_content_type(path) { builder = builder.header(CONTENT_TYPE, value); } - if let Some(bytes) = bytes { - builder = builder - .header(CONTENT_LENGTH, HeaderValue::from(bytes.len())) - .body(bytes) - } else { - builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0")); - } + builder = builder + .header(CONTENT_LENGTH, HeaderValue::from(bytes.len())) + .body(bytes); - builder = match &opts.mode { - PutMode::Overwrite => builder, - PutMode::Create => builder.header(IF_NONE_MATCH, "*"), - PutMode::Update(v) => { - builder.header(IF_MATCH, v.e_tag.as_ref().context(MissingETagSnafu)?) - } - }; - - let result = builder - .query(query) - .with_azure_authorization(&credential, &self.config.account) - .send_retry(&self.config.retry_config) - .await; - - match result { - Ok(response) => Ok(response), - Err(source) => Err(match (opts.mode, source.status()) { - (PutMode::Create, Some(StatusCode::CONFLICT)) => crate::Error::AlreadyExists { - source: Box::new(source), - path: path.to_string(), - }, - _ => crate::Error::from(Error::PutRequest { - source, - path: path.to_string(), - }), - }), + PutRequest { + path, + builder, + config: &self.config, } } /// Make an Azure PUT request pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) -> Result { - let response = self - .put_request(path, Some(bytes), false, opts, &()) - .await?; + let builder = self.put_request(path, bytes); + + let builder = match &opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(&IF_NONE_MATCH, "*"), + PutMode::Update(v) => { + let etag = v.e_tag.as_ref().context(MissingETagSnafu)?; + builder.header(&IF_MATCH, etag) + } + }; + let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } /// PUT a block pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result { let content_id = format!("{part_idx:20}"); - let block_id: BlockId = content_id.clone().into(); + let block_id = BASE64_STANDARD.encode(&content_id); - self.put_request( - path, - Some(data), - true, - PutOptions::default(), - &[ - ("comp", "block"), - ("blockid", &BASE64_STANDARD.encode(block_id)), - ], - ) - .await?; + self.put_request(path, data) + .query(&[("comp", "block"), ("blockid", &block_id)]) + .send() + .await?; Ok(PartId { content_id }) } @@ -256,17 +253,10 @@ impl AzureClient { .map(|part| BlockId::from(part.content_id)) .collect(); - let block_list = BlockList { blocks }; - let block_xml = block_list.to_xml(); - let response = self - .put_request( - path, - Some(block_xml.into()), - true, - PutOptions::default(), - &[("comp", "blocklist")], - ) + .put_request(path, BlockList { blocks }.to_xml().into()) + .query(&[("comp", "blocklist")]) + .send() .await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) @@ -321,13 +311,7 @@ impl AzureClient { .with_azure_authorization(&credential, &self.config.account) .send_retry(&self.config.retry_config) .await - .map_err(|err| match err.status() { - Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists { - source: Box::new(err), - path: to.to_string(), - }, - _ => err.error(STORE, from.to_string()), - })?; + .map_err(|err| err.error(STORE, from.to_string()))?; Ok(()) } diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 77eee7fc92f3..ae092edac095 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -38,7 +38,7 @@ pub mod token; pub mod header; #[cfg(any(feature = "aws", feature = "gcp"))] -pub mod list_response; +pub mod s3; use async_trait::async_trait; use std::collections::HashMap; diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index d70d6d88de32..789103c0f74f 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -79,6 +79,10 @@ impl Error { path, source: Box::new(self), }, + Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists { + path, + source: Box::new(self), + }, _ => crate::Error::Generic { store, source: Box::new(self), diff --git a/object_store/src/client/list_response.rs b/object_store/src/client/s3.rs similarity index 68% rename from object_store/src/client/list_response.rs rename to object_store/src/client/s3.rs index 7a170c584156..61237dc4beab 100644 --- a/object_store/src/client/list_response.rs +++ b/object_store/src/client/s3.rs @@ -14,12 +14,13 @@ // specific language governing permissions and limitations // under the License. -//! The list response format used by GCP and AWS +//! The list and multipart API used by both GCS and S3 +use crate::multipart::PartId; use crate::path::Path; use crate::{ListResult, ObjectMeta, Result}; use chrono::{DateTime, Utc}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -84,3 +85,44 @@ impl TryFrom for ObjectMeta { }) } } + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + pub upload_id: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct CompleteMultipartUpload { + pub part: Vec, +} + +impl From> for CompleteMultipartUpload { + fn from(value: Vec) -> Self { + let part = value + .into_iter() + .enumerate() + .map(|(part_number, part)| MultipartPart { + e_tag: part.content_id, + part_number: part_number + 1, + }) + .collect(); + Self { part } + } +} + +#[derive(Debug, Serialize)] +pub struct MultipartPart { + #[serde(rename = "ETag")] + pub e_tag: String, + #[serde(rename = "PartNumber")] + pub part_number: usize, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct CompleteMultipartUploadResult { + #[serde(rename = "ETag")] + pub e_tag: String, +} diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 0ce2b70c37a3..85d194c5febd 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -16,10 +16,13 @@ // under the License. use crate::client::get::GetClient; -use crate::client::header::{get_put_result, HeaderConfig}; +use crate::client::header::{get_put_result, get_version, HeaderConfig}; use crate::client::list::ListClient; -use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; +use crate::client::s3::{ + CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, + ListResponse, +}; use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; @@ -31,14 +34,15 @@ use crate::{ use async_trait::async_trait; use bytes::{Buf, Bytes}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; -use reqwest::{header, Client, Method, Response, StatusCode}; +use reqwest::header::HeaderName; +use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode}; use serde::Serialize; use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; -const VERSION_MATCH: &str = "x-goog-if-generation-match"; +const VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match"); #[derive(Debug, Snafu)] enum Error { @@ -88,6 +92,15 @@ enum Error { #[snafu(display("Version required for conditional update"))] MissingVersion, + + #[snafu(display("Error performing complete multipart request: {}", source))] + CompleteMultipartRequest { source: crate::client::retry::Error }, + + #[snafu(display("Error getting complete multipart response body: {}", source))] + CompleteMultipartResponseBody { source: reqwest::Error }, + + #[snafu(display("Got invalid multipart response: {}", source))] + InvalidMultipartResponse { source: quick_xml::de::DeError }, } impl From for crate::Error { @@ -117,6 +130,39 @@ pub struct GoogleCloudStorageConfig { pub client_options: ClientOptions, } +/// A builder for a put request allowing customisation of the headers and query string +pub struct PutRequest<'a> { + path: &'a Path, + config: &'a GoogleCloudStorageConfig, + builder: RequestBuilder, +} + +impl<'a> PutRequest<'a> { + fn header(self, k: &HeaderName, v: &str) -> Self { + let builder = self.builder.header(k, v); + Self { builder, ..self } + } + + fn query(self, query: &T) -> Self { + let builder = self.builder.query(query); + Self { builder, ..self } + } + + async fn send(self) -> Result { + let credential = self.config.credentials.get_credential().await?; + let response = self + .builder + .bearer_auth(&credential.bearer) + .send_retry(&self.config.retry_config) + .await + .context(PutRequestSnafu { + path: self.path.as_ref(), + })?; + + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + } +} + #[derive(Debug)] pub struct GoogleCloudStorageClient { config: GoogleCloudStorageConfig, @@ -162,14 +208,7 @@ impl GoogleCloudStorageClient { /// Perform a put request /// /// Returns the new ETag - pub async fn put_request( - &self, - path: &Path, - payload: Bytes, - opts: PutOptions, - query: &T, - ) -> Result { - let credential = self.get_credential().await?; + pub fn put_request<'a>(&'a self, path: &'a Path, payload: Bytes) -> PutRequest<'a> { let url = self.object_url(path); let content_type = self @@ -178,41 +217,37 @@ impl GoogleCloudStorageClient { .get_content_type(path) .unwrap_or("application/octet-stream"); - let mut builder = self.client.request(Method::PUT, url).query(query); + let builder = self + .client + .request(Method::PUT, url) + .header(header::CONTENT_TYPE, content_type) + .header(header::CONTENT_LENGTH, payload.len()) + .body(payload); + + PutRequest { + path, + builder, + config: &self.config, + } + } - builder = match &opts.mode { + pub async fn put(&self, path: &Path, data: Bytes, opts: PutOptions) -> Result { + let builder = self.put_request(path, data); + + let builder = match &opts.mode { PutMode::Overwrite => builder, - PutMode::Create => builder.header(VERSION_MATCH, 0), - PutMode::Update(v) => builder.header( - VERSION_MATCH, - v.version.as_ref().context(MissingVersionSnafu)?, - ), + PutMode::Create => builder.header(&VERSION_MATCH, "0"), + PutMode::Update(v) => { + let etag = v.version.as_ref().context(MissingVersionSnafu)?; + builder.header(&VERSION_MATCH, etag) + } }; - let result = builder - .bearer_auth(&credential.bearer) - .header(header::CONTENT_TYPE, content_type) - .header(header::CONTENT_LENGTH, payload.len()) - .body(payload) - .send_retry(&self.config.retry_config) - .await; - - match result { - Ok(response) => { - Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + match (opts.mode, builder.send().await) { + (PutMode::Create, Err(crate::Error::Precondition { path, source })) => { + Err(crate::Error::AlreadyExists { path, source }) } - Err(source) => Err(match (opts.mode, source.status()) { - (PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => { - crate::Error::AlreadyExists { - source: Box::new(source), - path: path.to_string(), - } - } - _ => crate::Error::from(Error::PutRequest { - source, - path: path.to_string(), - }), - }), + (_, r) => r, } } @@ -226,17 +261,11 @@ impl GoogleCloudStorageClient { part_idx: usize, data: Bytes, ) -> Result { - let result = self - .put_request( - path, - data, - PutOptions::default(), - &[ - ("partNumber", &format!("{}", part_idx + 1)), - ("uploadId", upload_id), - ], - ) - .await?; + let query = &[ + ("partNumber", &format!("{}", part_idx + 1)), + ("uploadId", upload_id), + ]; + let result = self.put_request(path, data).query(query).send().await?; Ok(PartId { content_id: result.e_tag.unwrap(), @@ -303,17 +332,8 @@ impl GoogleCloudStorageClient { let upload_id = multipart_id.clone(); let url = self.object_url(path); - let parts = completed_parts - .into_iter() - .enumerate() - .map(|(part_number, part)| MultipartPart { - e_tag: part.content_id, - part_number: part_number + 1, - }) - .collect(); - + let upload_info = CompleteMultipartUpload::from(completed_parts); let credential = self.get_credential().await?; - let upload_info = CompleteMultipartUpload { parts }; let data = quick_xml::se::to_string(&upload_info) .context(InvalidPutResponseSnafu)? @@ -322,7 +342,7 @@ impl GoogleCloudStorageClient { // https://github.com/tafia/quick-xml/issues/350 .replace(""", "\""); - let result = self + let response = self .client .request(Method::POST, &url) .bearer_auth(&credential.bearer) @@ -330,11 +350,22 @@ impl GoogleCloudStorageClient { .body(data) .send_retry(&self.config.retry_config) .await - .context(PostRequestSnafu { - path: path.as_ref(), - })?; + .context(CompleteMultipartRequestSnafu)?; + + let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?; - Ok(get_put_result(result.headers(), VERSION_HEADER).context(MetadataSnafu)?) + let data = response + .bytes() + .await + .context(CompleteMultipartResponseBodySnafu)?; + + let response: CompleteMultipartUploadResult = + quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?; + + Ok(PutResult { + e_tag: Some(response.e_tag), + version, + }) } /// Perform a delete request @@ -483,24 +514,3 @@ impl ListClient for GoogleCloudStorageClient { Ok((response.try_into()?, token)) } } - -#[derive(serde::Deserialize, Debug)] -#[serde(rename_all = "PascalCase")] -struct InitiateMultipartUploadResult { - upload_id: String, -} - -#[derive(serde::Serialize, Debug)] -#[serde(rename_all = "PascalCase", rename(serialize = "Part"))] -struct MultipartPart { - #[serde(rename = "PartNumber")] - part_number: usize, - e_tag: String, -} - -#[derive(serde::Serialize, Debug)] -#[serde(rename_all = "PascalCase")] -struct CompleteMultipartUpload { - #[serde(rename = "Part", default)] - parts: Vec, -} diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 43d1162adb43..7721b1278a80 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -109,7 +109,7 @@ impl PutPart for GCSMultipartUpload { #[async_trait] impl ObjectStore for GoogleCloudStorage { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { - self.client.put_request(location, bytes, opts, &()).await + self.client.put(location, bytes, opts).await } async fn put_multipart( From d7570119ea9fa965d1ff5f17e918ede91076c0c6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 25 Oct 2023 23:27:04 +0100 Subject: [PATCH 10/12] Clippy --- object_store/src/gcp/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 85d194c5febd..78964077e2fe 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -42,7 +42,7 @@ use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; -const VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match"); +static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match"); #[derive(Debug, Snafu)] enum Error { @@ -399,7 +399,7 @@ impl GoogleCloudStorageClient { .header("x-goog-copy-source", source); if if_not_exists { - builder = builder.header(VERSION_MATCH, 0); + builder = builder.header(&VERSION_MATCH, 0); } builder From ad0336127f50d2e1cb283950362760b95d3eb879 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 26 Oct 2023 16:58:32 +0100 Subject: [PATCH 11/12] Add stress test --- object_store/src/lib.rs | 47 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 13526c336880..17a833466b1e 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1013,6 +1013,7 @@ mod tests { use crate::multipart::MultiPartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; + use futures::stream::FuturesUnordered; use rand::{thread_rng, Rng}; use tokio::io::AsyncWriteExt; @@ -1530,6 +1531,52 @@ mod tests { .await .unwrap_err(); assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + const NUM_WORKERS: usize = 5; + const NUM_INCREMENTS: usize = 10; + + let path = Path::from("RACE"); + let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) + .map(|_| async { + for _ in 0..NUM_INCREMENTS { + loop { + match storage.get(&path).await { + Ok(r) => { + let mode = PutMode::Update(UpdateVersion { + e_tag: r.meta.e_tag.clone(), + version: r.meta.version.clone(), + }); + + let b = r.bytes().await.unwrap(); + let v: usize = std::str::from_utf8(&b).unwrap().parse().unwrap(); + let new = (v + 1).to_string(); + + match storage.put_opts(&path, new.into(), mode.into()).await { + Ok(_) => break, + Err(Error::Precondition { .. }) => continue, + Err(e) => return Err(e), + } + } + Err(Error::NotFound { .. }) => { + let mode = PutMode::Create; + match storage.put_opts(&path, "1".into(), mode.into()).await { + Ok(_) => break, + Err(Error::AlreadyExists { .. }) => continue, + Err(e) => return Err(e), + } + } + Err(e) => return Err(e), + } + } + } + Ok(()) + }) + .collect(); + + while let Some(_) = futures.next().await.transpose().unwrap() {} + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + let v = std::str::from_utf8(&b).unwrap().parse::().unwrap(); + assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS); } /// Returns a chunk of length `chunk_length` From ca8e0a4a8b5973b68952a50f7da56c7b81ce8e0f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 26 Oct 2023 17:12:11 +0100 Subject: [PATCH 12/12] Clippy --- object_store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 17a833466b1e..66964304e853 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1573,7 +1573,7 @@ mod tests { }) .collect(); - while let Some(_) = futures.next().await.transpose().unwrap() {} + while futures.next().await.transpose().unwrap().is_some() {} let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); let v = std::str::from_utf8(&b).unwrap().parse::().unwrap(); assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);