Skip to content

Commit

Permalink
Object tagging (apache#4754)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 27, 2023
1 parent e3cce56 commit 631c2cb
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 3 deletions.
22 changes: 22 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,28 @@ impl S3Client {
version,
})
}

#[cfg(test)]
pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.config.get_credential().await?;
let url = format!("{}?tagging", self.config.path_url(path));
let response = self
.client
.request(Method::GET, url)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
15 changes: 14 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
Expand All @@ -52,6 +52,8 @@ use crate::{
PutOptions, PutResult, Result,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");

mod builder;
mod checksum;
mod client;
Expand Down Expand Up @@ -161,6 +163,11 @@ impl Signer for AmazonS3 {
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let request = self.client.put_request(location, bytes);
let request = match opts.tags.encoded() {
"" => request,
tags => request.header(&TAGS_HEADER, tags),
};

match (opts.mode, &self.client.config().conditional_put) {
(PutMode::Overwrite, _) => request.send().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
Expand Down Expand Up @@ -342,6 +349,12 @@ mod tests {
stream_get(&integration).await;
multipart(&integration, &integration).await;

tagging(&integration, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
.await;

if test_not_exists {
copy_if_not_exists(&integration).await;
}
Expand Down
26 changes: 25 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -128,7 +130,7 @@ pub(crate) struct AzureConfig {
}

impl AzureConfig {
fn path_url(&self, path: &Path) -> Url {
pub(crate) fn path_url(&self, path: &Path) -> Url {
let mut url = self.service.clone();
{
let mut path_mut = url.path_segments_mut().unwrap();
Expand Down Expand Up @@ -229,6 +231,11 @@ impl AzureClient {
}
};

let builder = match opts.tags.encoded() {
"" => builder,
tags => builder.header(&TAGS_HEADER, tags),
};

let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}
Expand Down Expand Up @@ -315,6 +322,23 @@ impl AzureClient {

Ok(())
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let response = self
.client
.request(Method::GET, url)
.query(&[("comp", "tags")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
9 changes: 9 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;

// Azure hierarchical namespaces don't support tagging
if std::env::var("AZURE_HIERARCHICAL").is_err() {
tagging(&integration, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
.await
}
}

#[test]
Expand Down
80 changes: 79 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ pub use client::{
#[cfg(feature = "cloud")]
mod config;

mod tags;

pub use tags::TagSet;

pub mod multipart;
mod parse;
mod util;
Expand Down Expand Up @@ -893,11 +897,27 @@ impl From<PutResult> for UpdateVersion {
pub struct PutOptions {
/// Configure the [`PutMode`] for this operation
pub mode: PutMode,
/// Provide a [`TagSet`] for this object
///
/// Implementations that don't support object tagging should ignore this
pub tags: TagSet,
}

impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
Self { mode }
Self {
mode,
..Default::default()
}
}
}

impl From<TagSet> for PutOptions {
fn from(tags: TagSet) -> Self {
Self {
tags,
..Default::default()
}
}
}

Expand Down Expand Up @@ -1015,6 +1035,7 @@ mod tests {
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::{thread_rng, Rng};
use std::future::Future;
use tokio::io::AsyncWriteExt;

pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
Expand Down Expand Up @@ -1882,6 +1903,63 @@ mod tests {
assert_eq!(meta.size, chunk_size * 2);
}

#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: Future<Output = Result<reqwest::Response>> + Send,
{
use bytes::Buf;
use serde::Deserialize;

#[derive(Deserialize)]
struct Tagging {
#[serde(rename = "TagSet")]
list: TagList,
}

#[derive(Debug, Deserialize)]
struct TagList {
#[serde(rename = "Tag")]
tags: Vec<Tag>,
}

#[derive(Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "PascalCase")]
struct Tag {
key: String,
value: String,
}

let tags = vec![
Tag {
key: "foo.com=bar/s".to_string(),
value: "bananas/foo.com-_".to_string(),
},
Tag {
key: "namespace/key.foo".to_string(),
value: "value with a space".to_string(),
},
];
let mut tag_set = TagSet::default();
for t in &tags {
tag_set.push(&t.key, &t.value)
}

let path = Path::from("tag_test");
storage
.put_opts(&path, "test".into(), tag_set.into())
.await
.unwrap();

let resp = get_tags(path.clone()).await.unwrap();
let body = resp.bytes().await.unwrap();

let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
assert_eq!(resp.list.tags, tags);
}

async fn delete_fixtures(storage: &DynObjectStore) {
let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
storage
Expand Down
57 changes: 57 additions & 0 deletions object_store/src/tags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use url::form_urlencoded::Serializer;

/// A collection of key value pairs used to annotate objects
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html>
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags>
#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub struct TagSet(String);

impl TagSet {
/// Append a key value pair to this [`TagSet`]
///
/// Stores have different restrictions on what characters are permitted,
/// for portability it is recommended applications use no more than 10 tags,
/// and stick to alphanumeric characters, and `+ - = . _ : /`
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html>
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags?tabs=azure-ad#request-body>
pub fn push(&mut self, key: &str, value: &str) {
Serializer::new(&mut self.0).append_pair(key, value);
}

/// Return this [`TagSet`] as a URL-encoded string
pub fn encoded(&self) -> &str {
&self.0
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_tag_set() {
let mut set = TagSet::default();
set.push("test/foo", "value sdlks");
set.push("foo", " sdf _ /+./sd");
assert_eq!(set.encoded(), "test%2Ffoo=value+sdlks&foo=+sdf+_+%2F%2B.%2Fsd");
}
}

0 comments on commit 631c2cb

Please sign in to comment.