From c79dec0df54882abd1884ff4aa2b889d96a0c1ef Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 25 Oct 2023 21:21:59 +0100 Subject: [PATCH] Object tagging (#4754) --- object_store/src/aws/client.rs | 22 +++++++++ object_store/src/aws/mod.rs | 15 +++++- object_store/src/azure/client.rs | 26 ++++++++++- object_store/src/azure/mod.rs | 9 ++++ object_store/src/lib.rs | 80 +++++++++++++++++++++++++++++++- object_store/src/tags.rs | 57 +++++++++++++++++++++++ 6 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 object_store/src/tags.rs diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 20c2a96b57cd..129294aa62b4 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -588,6 +588,28 @@ impl S3Client { version, }) } + + #[cfg(test)] + pub async fn get_object_tagging(&self, path: &Path) -> Result { + let credential = self.config.get_credential().await?; + let url = format!("{}?tagging", self.config.path_url(path)); + let response = self + .client + .request(Method::GET, url) + .with_aws_sigv4( + credential.as_deref(), + &self.config.region, + "s3", + self.config.sign_payload, + None, + ) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })?; + Ok(response) + } } #[async_trait] diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 99e637695059..d367cf8fbe6e 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -35,7 +35,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; -use reqwest::header::{IF_MATCH, IF_NONE_MATCH}; +use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::Method; use std::{sync::Arc, time::Duration}; use tokio::io::AsyncWrite; @@ -52,6 +52,8 @@ use crate::{ PutOptions, PutResult, Result, }; +static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); + mod builder; mod checksum; mod client; @@ -161,6 +163,11 @@ impl Signer for AmazonS3 { impl ObjectStore for AmazonS3 { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { let request = self.client.put_request(location, bytes); + let request = match opts.tags.encoded() { + "" => request, + tags => request.header(&TAGS_HEADER, tags), + }; + match (opts.mode, &self.client.config().conditional_put) { (PutMode::Overwrite, _) => request.send().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), @@ -342,6 +349,12 @@ mod tests { stream_get(&integration).await; multipart(&integration, &integration).await; + tagging(&integration, |p| { + let client = Arc::clone(&integration.client); + async move { client.get_object_tagging(&p).await } + }) + .await; + if test_not_exists { copy_if_not_exists(&integration).await; } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index c7bd79149872..4462c43d7817 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -50,6 +50,8 @@ use url::Url; const VERSION_HEADER: &str = "x-ms-version-id"; +static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags"); + /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -128,7 +130,7 @@ pub(crate) struct AzureConfig { } impl AzureConfig { - fn path_url(&self, path: &Path) -> Url { + pub(crate) fn path_url(&self, path: &Path) -> Url { let mut url = self.service.clone(); { let mut path_mut = url.path_segments_mut().unwrap(); @@ -229,6 +231,11 @@ impl AzureClient { } }; + let builder = match opts.tags.encoded() { + "" => builder, + tags => builder.header(&TAGS_HEADER, tags), + }; + let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } @@ -315,6 +322,23 @@ impl AzureClient { Ok(()) } + + #[cfg(test)] + pub async fn get_blob_tagging(&self, path: &Path) -> Result { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + let response = self + .client + .request(Method::GET, url) + .query(&[("comp", "tags")]) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })?; + Ok(response) + } } #[async_trait] diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 762a51dd9d60..5bcc662872da 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -202,6 +202,15 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + + // Azure hierarchical namespaces don't support tagging + if std::env::var("AZURE_HIERARCHICAL").is_err() { + tagging(&integration, |p| { + let client = Arc::clone(&integration.client); + async move { client.get_blob_tagging(&p).await } + }) + .await + } } #[test] diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 13526c336880..59b98ce11aa9 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -263,6 +263,10 @@ pub use client::{ #[cfg(feature = "cloud")] mod config; +mod tags; + +pub use tags::TagSet; + pub mod multipart; mod parse; mod util; @@ -893,11 +897,27 @@ impl From for UpdateVersion { pub struct PutOptions { /// Configure the [`PutMode`] for this operation pub mode: PutMode, + /// Provide a [`TagSet`] for this object + /// + /// Implementations that don't support object tagging should ignore this + pub tags: TagSet, } impl From for PutOptions { fn from(mode: PutMode) -> Self { - Self { mode } + Self { + mode, + ..Default::default() + } + } +} + +impl From for PutOptions { + fn from(tags: TagSet) -> Self { + Self { + tags, + ..Default::default() + } } } @@ -1014,6 +1034,7 @@ mod tests { use crate::test_util::flatten_list_stream; use chrono::TimeZone; use rand::{thread_rng, Rng}; + use std::future::Future; use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { @@ -1835,6 +1856,63 @@ mod tests { assert_eq!(meta.size, chunk_size * 2); } + #[cfg(any(feature = "aws", feature = "azure"))] + pub(crate) async fn tagging(storage: &dyn ObjectStore, get_tags: F) + where + F: Fn(Path) -> Fut, + Fut: Future>, + { + use bytes::Buf; + use serde::Deserialize; + + #[derive(Deserialize)] + struct Tagging { + #[serde(rename = "TagSet")] + list: TagList, + } + + #[derive(Debug, Deserialize)] + struct TagList { + #[serde(rename = "Tag")] + tags: Vec, + } + + #[derive(Debug, Deserialize, Eq, PartialEq)] + #[serde(rename_all = "PascalCase")] + struct Tag { + key: String, + value: String, + } + + let tags = vec![ + Tag { + key: "foo.com=bar/s".to_string(), + value: "bananas/foo.com-_".to_string(), + }, + Tag { + key: "namespace/key.foo".to_string(), + value: "value with a space".to_string(), + }, + ]; + let mut tag_set = TagSet::default(); + for t in &tags { + tag_set.push(&t.key, &t.value) + } + + let path = Path::from("tag_test"); + storage + .put_opts(&path, "test".into(), tag_set.into()) + .await + .unwrap(); + + let resp = get_tags(path.clone()).await.unwrap(); + let body = resp.bytes().await.unwrap(); + + let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap(); + resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key)); + assert_eq!(resp.list.tags, tags); + } + async fn delete_fixtures(storage: &DynObjectStore) { let paths = storage.list(None).map_ok(|meta| meta.location).boxed(); storage diff --git a/object_store/src/tags.rs b/object_store/src/tags.rs new file mode 100644 index 000000000000..30c2067ef581 --- /dev/null +++ b/object_store/src/tags.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use url::form_urlencoded::Serializer; + +/// A collection of key value pairs used to annotate objects +/// +/// +/// +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct TagSet(String); + +impl TagSet { + /// Append a key value pair to this [`TagSet`] + /// + /// Stores have different restrictions on what characters are permitted, + /// for portability it is recommended applications use no more than 10 tags, + /// and stick to alphanumeric characters, and `+ - = . _ : /` + /// + /// + /// + pub fn push(&mut self, key: &str, value: &str) { + Serializer::new(&mut self.0).append_pair(key, value); + } + + /// Return this [`TagSet`] as a URL-encoded string + pub fn encoded(&self) -> &str { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tag_set() { + let mut set = TagSet::default(); + set.push("test/foo", "value sdlks"); + set.push("foo", " sdf _ /+./sd"); + assert_eq!(set.encoded(), "test%2Ffoo=value+sdlks&foo=+sdf+_+%2F%2B.%2Fsd"); + } +}