Skip to content

Commit

Permalink
Add Attributes API (apache#5329)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 15, 2024
1 parent 0124307 commit 56374ff
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 60 deletions.
211 changes: 211 additions & 0 deletions object_store/src/attributes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;

/// Additional object attribute types
#[non_exhaustive]
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub enum Attribute {
/// Specifies the MIME type of the object
///
/// This takes precedence over any [ClientOptions](crate::ClientOptions) configuration
///
/// See [Content-Type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type)
ContentType,
/// Overrides cache control policy of the object
///
/// See [Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control)
CacheControl,
}

/// The value of an [`Attribute`]
///
/// Provides efficient conversion from both static and owned strings
///
/// ```
/// # use object_store::AttributeValue;
/// // Can use static strings without needing an allocation
/// let value = AttributeValue::from("bar");
/// // Can also store owned strings
/// let value = AttributeValue::from("foo".to_string());
/// ```
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub struct AttributeValue(Cow<'static, str>);

impl AsRef<str> for AttributeValue {
fn as_ref(&self) -> &str {
&*self.0
}
}

impl From<&'static str> for AttributeValue {
fn from(value: &'static str) -> Self {
Self(Cow::Borrowed(value))
}
}

impl From<String> for AttributeValue {
fn from(value: String) -> Self {
Self(Cow::Owned(value))
}
}

impl Deref for AttributeValue {
type Target = str;

fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

/// Additional attributes of an object
///
/// Attributes can be specified in [PutOptions](crate::PutOptions) and retrieved
/// from APIs returning [GetResult](crate::GetResult).
///
/// Unlike [`ObjectMeta`](crate::ObjectMeta), [`Attributes`] are not returned by
/// listing APIs
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct Attributes(HashMap<Attribute, AttributeValue>);

impl Attributes {
/// Create a new empty [`Attributes`]
pub fn new() -> Self {
Self::default()
}

/// Create a new [`Attributes`] with space for `capacity` [`Attribute`]
pub fn with_capacity(capacity: usize) -> Self {
Self(HashMap::with_capacity(capacity))
}

/// Insert a new [`Attribute`], [`AttributeValue`] pair
///
/// Returns the previous value for `key` if any
pub fn insert(&mut self, key: Attribute, value: AttributeValue) -> Option<AttributeValue> {
self.0.insert(key, value)
}

/// Returns the [`AttributeValue`] for `key` if any
pub fn get(&self, key: &Attribute) -> Option<&AttributeValue> {
self.0.get(&key)
}

/// Removes the [`AttributeValue`] for `key` if any
pub fn remove(&mut self, key: &Attribute) -> Option<AttributeValue> {
self.0.remove(&key)
}

/// Returns an [`AttributesIter`] over this
pub fn iter(&self) -> AttributesIter<'_> {
self.into_iter()
}

/// Returns the number of [`Attribute`] in this collection
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}

/// Returns true if this contains no [`Attribute`]
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl<K, V> FromIterator<(K, V)> for Attributes
where
K: Into<Attribute>,
V: Into<AttributeValue>,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
Self(
iter.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
)
}
}

impl<'a> IntoIterator for &'a Attributes {
type Item = (&'a Attribute, &'a AttributeValue);
type IntoIter = AttributesIter<'a>;

fn into_iter(self) -> Self::IntoIter {
AttributesIter(self.0.iter())
}
}

/// Iterator over [`Attributes`]
#[derive(Debug)]
pub struct AttributesIter<'a>(std::collections::hash_map::Iter<'a, Attribute, AttributeValue>);

impl<'a> Iterator for AttributesIter<'a> {
type Item = (&'a Attribute, &'a AttributeValue);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_attributes_basic() {
let mut attributes = Attributes::from_iter([
(Attribute::ContentType, "test"),
(Attribute::CacheControl, "control"),
]);

assert!(!attributes.is_empty());
assert_eq!(attributes.len(), 2);

assert_eq!(
attributes.get(&Attribute::ContentType),
Some(&"test".into())
);

let metav = "control".into();
assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
assert_eq!(
attributes.insert(Attribute::CacheControl, "v1".into()),
Some(metav)
);
assert_eq!(attributes.len(), 2);

assert_eq!(
attributes.remove(&Attribute::CacheControl).unwrap(),
"v1".into()
);
assert_eq!(attributes.len(), 1);

let metav: AttributeValue = "v2".into();
attributes.insert(Attribute::CacheControl, metav.clone());
assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
assert_eq!(attributes.len(), 2);
}
}
30 changes: 21 additions & 9 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@ use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result,
RetryConfig,
Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload,
PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
use hyper::http;
use hyper::http::HeaderName;
use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
Client as ReqwestClient, Method, RequestBuilder, Response,
};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, RequestBuilder, Response};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -344,6 +342,7 @@ impl S3Client {
&'a self,
path: &'a Path,
payload: PutPayload,
attributes: Attributes,
with_encryption_headers: bool,
) -> Request<'a> {
let url = self.config.path_url(path);
Expand All @@ -363,8 +362,21 @@ impl S3Client {
)
}

if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
let mut has_content_type = false;
for (k, v) in &attributes {
builder = match k {
Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
Attribute::ContentType => {
has_content_type = true;
builder.header(CONTENT_TYPE, v.as_ref())
}
};
}

if !has_content_type {
if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
}
}

Request {
Expand Down Expand Up @@ -556,7 +568,7 @@ impl S3Client {
let part = (part_idx + 1).to_string();

let response = self
.put_request(path, data, false)
.put_request(path, data, Attributes::default(), false)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true)
.send()
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl ObjectStore for AmazonS3 {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let mut request = self.client.put_request(location, payload, true);
let attrs = opts.attributes;
let mut request = self.client.put_request(location, payload, attrs, true);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config.disable_tagging {
request = request.header(&TAGS_HEADER, tags);
Expand Down
43 changes: 31 additions & 12 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutPayload,
PutResult, Result, RetryConfig,
Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode,
PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use hyper::header::CACHE_CONTROL;
use hyper::http::HeaderName;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
Expand Down Expand Up @@ -187,9 +188,8 @@ impl<'a> PutRequest<'a> {
Self { builder, ..self }
}

fn set_idempotent(mut self, idempotent: bool) -> Self {
self.idempotent = idempotent;
self
fn set_idempotent(self, idempotent: bool) -> Self {
Self { idempotent, ..self }
}

async fn send(self) -> Result<Response> {
Expand All @@ -199,7 +199,7 @@ impl<'a> PutRequest<'a> {
.header(CONTENT_LENGTH, self.payload.content_length())
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
.idempotent(true)
.idempotent(self.idempotent)
.payload(Some(self.payload))
.send()
.await
Expand Down Expand Up @@ -233,13 +233,31 @@ impl AzureClient {
self.config.get_credential().await
}

fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> {
fn put_request<'a>(
&'a self,
path: &'a Path,
payload: PutPayload,
attributes: Attributes,
) -> PutRequest<'a> {
let url = self.config.path_url(path);

let mut builder = self.client.request(Method::PUT, url);

if let Some(value) = self.config().client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
let mut has_content_type = false;
for (k, v) in &attributes {
builder = match k {
Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
Attribute::ContentType => {
has_content_type = true;
builder.header(CONTENT_TYPE, v.as_ref())
}
};
}

if !has_content_type {
if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
}
}

PutRequest {
Expand All @@ -258,7 +276,7 @@ impl AzureClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let builder = self.put_request(path, payload);
let builder = self.put_request(path, payload, opts.attributes);

let builder = match &opts.mode {
PutMode::Overwrite => builder.set_idempotent(true),
Expand Down Expand Up @@ -288,7 +306,7 @@ impl AzureClient {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);

self.put_request(path, payload)
self.put_request(path, payload, Attributes::default())
.query(&[("comp", "block"), ("blockid", &block_id)])
.set_idempotent(true)
.send()
Expand All @@ -304,8 +322,9 @@ impl AzureClient {
.map(|part| BlockId::from(part.content_id))
.collect();

let payload = BlockList { blocks }.to_xml().into();
let response = self
.put_request(path, BlockList { blocks }.to_xml().into())
.put_request(path, payload, Attributes::default())
.query(&[("comp", "blocklist")])
.set_idempotent(true)
.send()
Expand Down
Loading

0 comments on commit 56374ff

Please sign in to comment.