diff --git a/object_store/src/attributes.rs b/object_store/src/attributes.rs new file mode 100644 index 000000000000..9b90b5325850 --- /dev/null +++ b/object_store/src/attributes.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; + +/// Additional object attribute types +#[non_exhaustive] +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub enum Attribute { + /// Specifies the MIME type of the object + /// + /// This takes precedence over any [ClientOptions](crate::ClientOptions) configuration + /// + /// See [Content-Type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type) + ContentType, + /// Overrides cache control policy of the object + /// + /// See [Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control) + CacheControl, +} + +/// The value of an [`Attribute`] +/// +/// Provides efficient conversion from both static and owned strings +/// +/// ``` +/// # use object_store::AttributeValue; +/// // Can use static strings without needing an allocation +/// let value = AttributeValue::from("bar"); +/// // Can also store owned strings +/// let value = AttributeValue::from("foo".to_string()); +/// ``` +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct AttributeValue(Cow<'static, str>); + +impl AsRef for AttributeValue { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl From<&'static str> for AttributeValue { + fn from(value: &'static str) -> Self { + Self(Cow::Borrowed(value)) + } +} + +impl From for AttributeValue { + fn from(value: String) -> Self { + Self(Cow::Owned(value)) + } +} + +impl Deref for AttributeValue { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } +} + +/// Additional attributes of an object +/// +/// Attributes can be specified in [PutOptions](crate::PutOptions) and retrieved +/// from APIs returning [GetResult](crate::GetResult). +/// +/// Unlike [`ObjectMeta`](crate::ObjectMeta), [`Attributes`] are not returned by +/// listing APIs +#[derive(Debug, Default, Eq, PartialEq, Clone)] +pub struct Attributes(HashMap); + +impl Attributes { + /// Create a new empty [`Attributes`] + pub fn new() -> Self { + Self::default() + } + + /// Create a new [`Attributes`] with space for `capacity` [`Attribute`] + pub fn with_capacity(capacity: usize) -> Self { + Self(HashMap::with_capacity(capacity)) + } + + /// Insert a new [`Attribute`], [`AttributeValue`] pair + /// + /// Returns the previous value for `key` if any + pub fn insert(&mut self, key: Attribute, value: AttributeValue) -> Option { + self.0.insert(key, value) + } + + /// Returns the [`AttributeValue`] for `key` if any + pub fn get(&self, key: &Attribute) -> Option<&AttributeValue> { + self.0.get(key) + } + + /// Removes the [`AttributeValue`] for `key` if any + pub fn remove(&mut self, key: &Attribute) -> Option { + self.0.remove(key) + } + + /// Returns an [`AttributesIter`] over this + pub fn iter(&self) -> AttributesIter<'_> { + self.into_iter() + } + + /// Returns the number of [`Attribute`] in this collection + #[inline] + pub fn len(&self) -> usize { + self.0.len() + } + + /// Returns true if this contains no [`Attribute`] + #[inline] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl FromIterator<(K, V)> for Attributes +where + K: Into, + V: Into, +{ + fn from_iter>(iter: T) -> Self { + Self( + iter.into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), + ) + } +} + +impl<'a> IntoIterator for &'a Attributes { + type Item = (&'a Attribute, &'a AttributeValue); + type IntoIter = AttributesIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + AttributesIter(self.0.iter()) + } +} + +/// Iterator over [`Attributes`] +#[derive(Debug)] +pub struct AttributesIter<'a>(std::collections::hash_map::Iter<'a, Attribute, AttributeValue>); + +impl<'a> Iterator for AttributesIter<'a> { + type Item = (&'a Attribute, &'a AttributeValue); + + fn next(&mut self) -> Option { + self.0.next() + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_attributes_basic() { + let mut attributes = Attributes::from_iter([ + (Attribute::ContentType, "test"), + (Attribute::CacheControl, "control"), + ]); + + assert!(!attributes.is_empty()); + assert_eq!(attributes.len(), 2); + + assert_eq!( + attributes.get(&Attribute::ContentType), + Some(&"test".into()) + ); + + let metav = "control".into(); + assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav)); + assert_eq!( + attributes.insert(Attribute::CacheControl, "v1".into()), + Some(metav) + ); + assert_eq!(attributes.len(), 2); + + assert_eq!( + attributes.remove(&Attribute::CacheControl).unwrap(), + "v1".into() + ); + assert_eq!(attributes.len(), 1); + + let metav: AttributeValue = "v2".into(); + attributes.insert(Attribute::CacheControl, metav.clone()); + assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav)); + assert_eq!(attributes.len(), 2); + } +} diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index c1789ed143e4..e81ef6aa220c 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -35,23 +35,21 @@ use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; use crate::{ - ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result, - RetryConfig, + Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, + PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; +use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH}; use hyper::http; use hyper::http::HeaderName; use itertools::Itertools; use md5::{Digest, Md5}; use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; -use reqwest::{ - header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, RequestBuilder, Response, -}; +use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, RequestBuilder, Response}; use ring::digest; use ring::digest::Context; use serde::{Deserialize, Serialize}; @@ -344,6 +342,7 @@ impl S3Client { &'a self, path: &'a Path, payload: PutPayload, + attributes: Attributes, with_encryption_headers: bool, ) -> Request<'a> { let url = self.config.path_url(path); @@ -363,8 +362,21 @@ impl S3Client { ) } - if let Some(value) = self.config.client_options.get_content_type(path) { - builder = builder.header(CONTENT_TYPE, value); + let mut has_content_type = false; + for (k, v) in &attributes { + builder = match k { + Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()), + Attribute::ContentType => { + has_content_type = true; + builder.header(CONTENT_TYPE, v.as_ref()) + } + }; + } + + if !has_content_type { + if let Some(value) = self.config.client_options.get_content_type(path) { + builder = builder.header(CONTENT_TYPE, value); + } } Request { @@ -556,7 +568,7 @@ impl S3Client { let part = (part_idx + 1).to_string(); let response = self - .put_request(path, data, false) + .put_request(path, data, Attributes::default(), false) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true) .send() diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 9e741c9142dd..43bd38a6de2e 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -156,7 +156,8 @@ impl ObjectStore for AmazonS3 { payload: PutPayload, opts: PutOptions, ) -> Result { - let mut request = self.client.put_request(location, payload, true); + let attrs = opts.attributes; + let mut request = self.client.put_request(location, payload, attrs, true); let tags = opts.tags.encoded(); if !tags.is_empty() && !self.client.config.disable_tagging { request = request.header(&TAGS_HEADER, tags); @@ -403,7 +404,7 @@ mod tests { let test_not_exists = config.copy_if_not_exists.is_some(); let test_conditional_put = config.conditional_put.is_some(); - put_get_delete_list_opts(&integration).await; + put_get_delete_list(&integration).await; get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; @@ -412,6 +413,7 @@ mod tests { multipart(&integration, &integration).await; signing(&integration).await; s3_encryption(&integration).await; + put_get_attributes(&integration).await; // Object tagging is not supported by S3 Express One Zone if config.session_provider.is_none() { @@ -432,12 +434,12 @@ mod tests { // run integration test with unsigned payload enabled let builder = AmazonS3Builder::from_env().with_unsigned_payload(true); let integration = builder.build().unwrap(); - put_get_delete_list_opts(&integration).await; + put_get_delete_list(&integration).await; // run integration test with checksum set to sha256 let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256); let integration = builder.build().unwrap(); - put_get_delete_list_opts(&integration).await; + put_get_delete_list(&integration).await; match &integration.client.config.copy_if_not_exists { Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await, diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index d5972d0a8c16..134609eb262e 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -27,14 +27,15 @@ use crate::multipart::PartId; use crate::path::DELIMITER; use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ - ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutPayload, - PutResult, Result, RetryConfig, + Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, + PutOptions, PutPayload, PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; +use hyper::header::CACHE_CONTROL; use hyper::http::HeaderName; use reqwest::header::CONTENT_TYPE; use reqwest::{ @@ -187,9 +188,8 @@ impl<'a> PutRequest<'a> { Self { builder, ..self } } - fn set_idempotent(mut self, idempotent: bool) -> Self { - self.idempotent = idempotent; - self + fn set_idempotent(self, idempotent: bool) -> Self { + Self { idempotent, ..self } } async fn send(self) -> Result { @@ -199,7 +199,7 @@ impl<'a> PutRequest<'a> { .header(CONTENT_LENGTH, self.payload.content_length()) .with_azure_authorization(&credential, &self.config.account) .retryable(&self.config.retry_config) - .idempotent(true) + .idempotent(self.idempotent) .payload(Some(self.payload)) .send() .await @@ -233,13 +233,31 @@ impl AzureClient { self.config.get_credential().await } - fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> { + fn put_request<'a>( + &'a self, + path: &'a Path, + payload: PutPayload, + attributes: Attributes, + ) -> PutRequest<'a> { let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); - if let Some(value) = self.config().client_options.get_content_type(path) { - builder = builder.header(CONTENT_TYPE, value); + let mut has_content_type = false; + for (k, v) in &attributes { + builder = match k { + Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()), + Attribute::ContentType => { + has_content_type = true; + builder.header(CONTENT_TYPE, v.as_ref()) + } + }; + } + + if !has_content_type { + if let Some(value) = self.config.client_options.get_content_type(path) { + builder = builder.header(CONTENT_TYPE, value); + } } PutRequest { @@ -258,7 +276,7 @@ impl AzureClient { payload: PutPayload, opts: PutOptions, ) -> Result { - let builder = self.put_request(path, payload); + let builder = self.put_request(path, payload, opts.attributes); let builder = match &opts.mode { PutMode::Overwrite => builder.set_idempotent(true), @@ -288,7 +306,7 @@ impl AzureClient { let content_id = format!("{part_idx:20}"); let block_id = BASE64_STANDARD.encode(&content_id); - self.put_request(path, payload) + self.put_request(path, payload, Attributes::default()) .query(&[("comp", "block"), ("blockid", &block_id)]) .set_idempotent(true) .send() @@ -304,8 +322,9 @@ impl AzureClient { .map(|part| BlockId::from(part.content_id)) .collect(); + let payload = BlockList { blocks }.to_xml().into(); let response = self - .put_request(path, BlockList { blocks }.to_xml().into()) + .put_request(path, payload, Attributes::default()) .query(&[("comp", "blocklist")]) .set_idempotent(true) .send() diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 8dc52422b7de..3bb57c45aa6b 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -276,7 +276,7 @@ mod tests { crate::test_util::maybe_skip_integration!(); let integration = MicrosoftAzureBuilder::from_env().build().unwrap(); - put_get_delete_list_opts(&integration).await; + put_get_delete_list(&integration).await; get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; @@ -292,7 +292,12 @@ mod tests { let client = Arc::clone(&integration.client); async move { client.get_blob_tagging(&p).await } }) - .await + .await; + + // Azurite doesn't support attributes properly + if !integration.client.config().is_emulator { + put_get_attributes(&integration).await; + } } #[ignore = "Used for manual testing against a real storage account."] diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 2e399e523ed4..f700457611fa 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -19,10 +19,10 @@ use std::ops::Range; use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; -use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result}; +use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; -use hyper::header::CONTENT_RANGE; +use hyper::header::{CACHE_CONTROL, CONTENT_RANGE, CONTENT_TYPE}; use hyper::StatusCode; use reqwest::header::ToStrError; use reqwest::Response; @@ -117,6 +117,12 @@ enum GetResultError { #[snafu(display("Content-Range header contained non UTF-8 characters"))] InvalidContentRange { source: ToStrError }, + #[snafu(display("Cache-Control header contained non UTF-8 characters"))] + InvalidCacheControl { source: ToStrError }, + + #[snafu(display("Content-Type header contained non UTF-8 characters"))] + InvalidContentType { source: ToStrError }, + #[snafu(display("Requested {expected:?}, got {actual:?}"))] UnexpectedRange { expected: Range, @@ -161,6 +167,16 @@ fn get_result( 0..meta.size }; + let mut attributes = Attributes::new(); + if let Some(x) = response.headers().get(CACHE_CONTROL) { + let x = x.to_str().context(InvalidCacheControlSnafu)?; + attributes.insert(Attribute::CacheControl, x.to_string().into()); + } + if let Some(x) = response.headers().get(CONTENT_TYPE) { + let x = x.to_str().context(InvalidContentTypeSnafu)?; + attributes.insert(Attribute::ContentType, x.to_string().into()); + } + let stream = response .bytes_stream() .map_err(|source| crate::Error::Generic { @@ -172,6 +188,7 @@ fn get_result( Ok(GetResult { range, meta, + attributes, payload: GetResultPayload::Stream(stream), }) } diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 7728f38954f9..3fefbb568343 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -485,7 +485,7 @@ impl ClientOptions { /// mime type if it was defined initially through /// `ClientOptions::with_content_type_for_suffix` /// - /// Otherwise returns the default mime type if it was defined + /// Otherwise, returns the default mime type if it was defined /// earlier through `ClientOptions::with_default_content_type` pub fn get_content_type(&self, path: &Path) -> Option<&str> { match path.extension() { diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index f91217f6f9a8..4ee03eaad629 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -29,14 +29,14 @@ use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; use crate::util::hex_encode; use crate::{ - ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutPayload, PutResult, - Result, RetryConfig, + Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, + PutPayload, PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Buf; -use hyper::header::CONTENT_LENGTH; +use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::header::HeaderName; use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode}; @@ -45,6 +45,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; +const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream"; static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match"); @@ -323,19 +324,31 @@ impl GoogleCloudStorageClient { /// Perform a put request /// /// Returns the new ETag - pub fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> { + pub fn put_request<'a>( + &'a self, + path: &'a Path, + payload: PutPayload, + attributes: Attributes, + ) -> PutRequest<'a> { let url = self.object_url(path); + let mut builder = self.client.request(Method::PUT, url); + + let mut has_content_type = false; + for (k, v) in &attributes { + builder = match k { + Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()), + Attribute::ContentType => { + has_content_type = true; + builder.header(CONTENT_TYPE, v.as_ref()) + } + }; + } - let content_type = self - .config - .client_options - .get_content_type(path) - .unwrap_or("application/octet-stream"); - - let builder = self - .client - .request(Method::PUT, url) - .header(header::CONTENT_TYPE, content_type); + if !has_content_type { + let opts = &self.config.client_options; + let value = opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE); + builder = builder.header(CONTENT_TYPE, value) + } PutRequest { path, @@ -352,7 +365,7 @@ impl GoogleCloudStorageClient { payload: PutPayload, opts: PutOptions, ) -> Result { - let builder = self.put_request(path, payload); + let builder = self.put_request(path, payload, opts.attributes); let builder = match &opts.mode { PutMode::Overwrite => builder.set_idempotent(true), @@ -386,7 +399,7 @@ impl GoogleCloudStorageClient { ("uploadId", upload_id), ]; let result = self - .put_request(path, data) + .put_request(path, data, Attributes::new()) .query(query) .set_idempotent(true) .send() @@ -459,7 +472,7 @@ impl GoogleCloudStorageClient { if completed_parts.is_empty() { // GCS doesn't allow empty multipart uploads let result = self - .put_request(path, Default::default()) + .put_request(path, Default::default(), Attributes::new()) .set_idempotent(true) .send() .await?; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 149da76f559a..af6e671cbc35 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -292,6 +292,8 @@ mod test { // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; put_opts(&integration, true).await; + // Fake GCS server doesn't currently support attributes + put_get_attributes(&integration).await; } } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 39f68ece65a3..cf259196ba40 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -21,11 +21,11 @@ use crate::client::retry::{self, RetryConfig, RetryExt}; use crate::client::GetOptionsExt; use crate::path::{Path, DELIMITER}; use crate::util::deserialize_rfc1123; -use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result}; +use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result}; use async_trait::async_trait; use bytes::Buf; use chrono::{DateTime, Utc}; -use hyper::header::CONTENT_LENGTH; +use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH}; use percent_encoding::percent_decode_str; use reqwest::header::CONTENT_TYPE; use reqwest::{Method, Response, StatusCode}; @@ -157,13 +157,32 @@ impl Client { Ok(()) } - pub async fn put(&self, location: &Path, payload: PutPayload) -> Result { + pub async fn put( + &self, + location: &Path, + payload: PutPayload, + attributes: Attributes, + ) -> Result { let mut retry = false; loop { let url = self.path_url(location); let mut builder = self.client.put(url); - if let Some(value) = self.client_options.get_content_type(location) { - builder = builder.header(CONTENT_TYPE, value); + + let mut has_content_type = false; + for (k, v) in &attributes { + builder = match k { + Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()), + Attribute::ContentType => { + has_content_type = true; + builder.header(CONTENT_TYPE, v.as_ref()) + } + }; + } + + if !has_content_type { + if let Some(value) = self.client_options.get_content_type(location) { + builder = builder.header(CONTENT_TYPE, value); + } } let resp = builder diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index a838a0f479d9..d6ba4f4d913d 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -105,7 +105,7 @@ impl ObjectStore for HttpStore { return Err(crate::Error::NotImplemented); } - let response = self.client.put(location, payload).await?; + let response = self.client.put(location, payload, opts.attributes).await?; let e_tag = match get_etag(response.headers()) { Ok(e_tag) => Some(e_tag), Err(crate::client::header::Error::MissingEtag) => None, @@ -260,7 +260,7 @@ mod tests { .build() .unwrap(); - put_get_delete_list_opts(&integration).await; + put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 157852ff9a6e..b492d93894a7 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -543,6 +543,10 @@ mod payload; mod upload; mod util; +mod attributes; + +pub use attributes::*; + pub use parse::{parse_url, parse_url_opts}; pub use payload::*; pub use upload::*; @@ -989,6 +993,8 @@ pub struct GetResult { pub meta: ObjectMeta, /// The range of bytes returned by this request pub range: Range, + /// Additional object attributes + pub attributes: Attributes, } /// The kind of a [`GetResult`] @@ -1114,6 +1120,10 @@ pub struct PutOptions { /// /// Implementations that don't support object tagging should ignore this pub tags: TagSet, + /// Provide a set of [`Attributes`] + /// + /// Implementations that don't support an attribute should return an error + pub attributes: Attributes, } impl From for PutOptions { @@ -1251,10 +1261,6 @@ mod tests { use rand::{thread_rng, Rng}; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { - put_get_delete_list_opts(storage).await - } - - pub(crate) async fn put_get_delete_list_opts(storage: &DynObjectStore) { delete_fixtures(storage).await; let content_list = flatten_list_stream(storage, None).await.unwrap(); @@ -1674,6 +1680,28 @@ mod tests { storage.delete(&path).await.unwrap(); } + pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) { + // Test handling of attributes + let attributes = Attributes::from_iter([ + (Attribute::ContentType, "text/html; charset=utf-8"), + (Attribute::CacheControl, "max-age=604800"), + ]); + + let path = Path::from("attributes"); + let opts = PutOptions { + attributes: attributes.clone(), + ..Default::default() + }; + match integration.put_opts(&path, "foo".into(), opts).await { + Ok(_) => { + let r = integration.get(&path).await.unwrap(); + assert_eq!(r.attributes, attributes); + } + Err(Error::NotImplemented) => {} + Err(e) => panic!("{e}"), + } + } + pub(crate) async fn get_opts(storage: &dyn ObjectStore) { let path = Path::from("test"); storage.put(&path, "foo".into()).await.unwrap(); diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d5581cdc8f59..a3695ad91744 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -38,8 +38,8 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, util::InvalidGetRange, - GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart, + Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -346,6 +346,10 @@ impl ObjectStore for LocalFileSystem { return Err(crate::Error::NotImplemented); } + if !opts.attributes.is_empty() { + return Err(crate::Error::NotImplemented); + } + let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, staging_path) = new_staged_upload(&path)?; @@ -421,6 +425,7 @@ impl ObjectStore for LocalFileSystem { Ok(GetResult { payload: GetResultPayload::File(file, path), + attributes: Attributes::default(), range, meta, }) diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index d42e6f231c04..e34b28fd27c5 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -30,8 +30,9 @@ use snafu::{OptionExt, ResultExt, Snafu}; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ - path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart, + path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, + UpdateVersion, UploadPart, }; use crate::{GetOptions, PutPayload}; @@ -88,15 +89,22 @@ pub struct InMemory { struct Entry { data: Bytes, last_modified: DateTime, + attributes: Attributes, e_tag: usize, } impl Entry { - fn new(data: Bytes, last_modified: DateTime, e_tag: usize) -> Self { + fn new( + data: Bytes, + last_modified: DateTime, + e_tag: usize, + attributes: Attributes, + ) -> Self { Self { data, last_modified, e_tag, + attributes, } } } @@ -116,10 +124,10 @@ struct PartStorage { type SharedStorage = Arc>; impl Storage { - fn insert(&mut self, location: &Path, bytes: Bytes) -> usize { + fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize { let etag = self.next_etag; self.next_etag += 1; - let entry = Entry::new(bytes, Utc::now(), etag); + let entry = Entry::new(bytes, Utc::now(), etag, attributes); self.overwrite(location, entry); etag } @@ -200,7 +208,7 @@ impl ObjectStore for InMemory { ) -> Result { let mut storage = self.storage.write(); let etag = storage.next_etag; - let entry = Entry::new(payload.into(), Utc::now(), etag); + let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes); match opts.mode { PutMode::Overwrite => storage.overwrite(location, entry), @@ -247,6 +255,7 @@ impl ObjectStore for InMemory { Ok(GetResult { payload: GetResultPayload::Stream(stream.boxed()), + attributes: entry.attributes, meta, range, }) @@ -363,7 +372,9 @@ impl ObjectStore for InMemory { async fn copy(&self, from: &Path, to: &Path) -> Result<()> { let entry = self.entry(from).await?; - self.storage.write().insert(to, entry.data); + self.storage + .write() + .insert(to, entry.data, entry.attributes); Ok(()) } @@ -376,7 +387,7 @@ impl ObjectStore for InMemory { } .into()); } - storage.insert(to, entry.data); + storage.insert(to, entry.data, entry.attributes); Ok(()) } } @@ -426,7 +437,7 @@ impl MultipartStore for InMemory { for x in &upload.parts { buf.extend_from_slice(x.as_ref().unwrap()) } - let etag = storage.insert(path, buf.into()); + let etag = storage.insert(path, buf.into(), Default::default()); Ok(PutResult { e_tag: Some(etag.to_string()), version: None, @@ -492,7 +503,11 @@ impl MultipartUpload for InMemoryUpload { let mut buf = Vec::with_capacity(cap); let parts = self.parts.iter().flatten(); parts.for_each(|x| buf.extend_from_slice(x)); - let etag = self.storage.write().insert(&self.location, buf.into()); + let etag = self + .storage + .write() + .insert(&self.location, buf.into(), Attributes::new()); + Ok(PutResult { e_tag: Some(etag.to_string()), version: None, @@ -523,6 +538,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + put_get_attributes(&integration).await; } #[tokio::test]