Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object tagging (#4754) #4999

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub struct AmazonS3Builder {
copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
/// Put precondition
conditional_put: Option<ConfigValue<S3ConditionalPut>>,
/// Ignore tags
skip_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -299,6 +301,15 @@ pub enum AmazonS3ConfigKey {
/// Skip signing request
SkipSignature,

/// Skips tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `aws_skip_tagging`
/// - `skip_tagging`
SkipTagging,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about this name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe DisableTagging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I don't feel strongly either :)


/// Client options
Client(ClientConfigKey),
}
Expand All @@ -322,6 +333,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::SkipSignature => "aws_skip_signature",
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::SkipTagging => "aws_skip_tagging",
Self::Client(opt) => opt.as_ref(),
}
}
Expand Down Expand Up @@ -350,6 +362,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_skip_tagging" | "skip_tagging" => Ok(Self::SkipTagging),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -453,6 +466,7 @@ impl AmazonS3Builder {
self.client_options = self.client_options.with_config(key, value)
}
AmazonS3ConfigKey::SkipSignature => self.skip_signature.parse(value),
AmazonS3ConfigKey::SkipTagging => self.skip_tagging.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into()))
}
Expand Down Expand Up @@ -525,6 +539,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::SkipTagging => Some(self.skip_tagging.to_string()),
}
}

Expand Down Expand Up @@ -735,6 +750,12 @@ impl AmazonS3Builder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_skip_tagging(mut self, ignore: bool) -> Self {
self.skip_tagging = ignore.into();
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -851,6 +872,7 @@ impl AmazonS3Builder {
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
skip_tagging: self.skip_tagging.get()?,
checksum,
copy_if_not_exists,
conditional_put: put_precondition,
Expand Down
23 changes: 23 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ pub struct S3Config {
pub client_options: ClientOptions,
pub sign_payload: bool,
pub skip_signature: bool,
pub skip_tagging: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
Expand Down Expand Up @@ -588,6 +589,28 @@ impl S3Client {
version,
})
}

#[cfg(test)]
pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.config.get_credential().await?;
let url = format!("{}?tagging", self.config.path_url(path));
let response = self
.client
.request(Method::GET, url)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
16 changes: 14 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
Expand All @@ -52,6 +52,8 @@ use crate::{
PutOptions, PutResult, Result,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");

mod builder;
mod checksum;
mod client;
Expand Down Expand Up @@ -160,7 +162,12 @@ impl Signer for AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let request = self.client.put_request(location, bytes);
let mut request = self.client.put_request(location, bytes);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config().skip_tagging {
request = request.header(&TAGS_HEADER, tags);
}

match (opts.mode, &self.client.config().conditional_put) {
(PutMode::Overwrite, _) => request.send().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
Expand Down Expand Up @@ -342,6 +349,11 @@ mod tests {
stream_get(&integration).await;
multipart(&integration, &integration).await;

tagging(&integration, !config.skip_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
.await;
if test_not_exists {
copy_if_not_exists(&integration).await;
}
Expand Down
22 changes: 22 additions & 0 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub struct MicrosoftAzureBuilder {
///
/// i.e. https://{account_name}.dfs.fabric.microsoft.com
use_fabric_endpoint: ConfigValue<bool>,
/// When set to true, skips tagging objects
skip_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`MicrosoftAzureBuilder`]
Expand Down Expand Up @@ -321,6 +323,15 @@ pub enum AzureConfigKey {
/// - `container_name`
ContainerName,

/// Skips tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `azure_skip_tagging`
/// - `skip_tagging`
SkipTagging,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -344,6 +355,7 @@ impl AsRef<str> for AzureConfigKey {
Self::FederatedTokenFile => "azure_federated_token_file",
Self::UseAzureCli => "azure_use_azure_cli",
Self::ContainerName => "azure_container_name",
Self::SkipTagging => "azure_skip_tagging",
Self::Client(key) => key.as_ref(),
}
}
Expand Down Expand Up @@ -387,6 +399,7 @@ impl FromStr for AzureConfigKey {
"azure_use_fabric_endpoint" | "use_fabric_endpoint" => Ok(Self::UseFabricEndpoint),
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
"azure_container_name" | "container_name" => Ok(Self::ContainerName),
"azure_skip_tagging" | "skip_tagging" => Ok(Self::SkipTagging),
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -503,6 +516,7 @@ impl MicrosoftAzureBuilder {
self.client_options = self.client_options.with_config(key, value)
}
AzureConfigKey::ContainerName => self.container_name = Some(value.into()),
AzureConfigKey::SkipTagging => self.skip_tagging.parse(value),
};
self
}
Expand Down Expand Up @@ -556,6 +570,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::UseAzureCli => Some(self.use_azure_cli.to_string()),
AzureConfigKey::Client(key) => self.client_options.get_config_value(key),
AzureConfigKey::ContainerName => self.container_name.clone(),
AzureConfigKey::SkipTagging => Some(self.skip_tagging.to_string()),
}
}

Expand Down Expand Up @@ -781,6 +796,12 @@ impl MicrosoftAzureBuilder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_skip_tagging(mut self, ignore: bool) -> Self {
self.skip_tagging = ignore.into();
self
}

/// Configure a connection to container with given name on Microsoft Azure Blob store.
pub fn build(mut self) -> Result<MicrosoftAzure> {
if let Some(url) = self.url.take() {
Expand Down Expand Up @@ -885,6 +906,7 @@ impl MicrosoftAzureBuilder {
account,
is_emulator,
container,
skip_tagging: self.skip_tagging.get()?,
retry_config: self.retry_config,
client_options: self.client_options,
service: storage_url,
Expand Down
27 changes: 26 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -124,11 +126,12 @@ pub(crate) struct AzureConfig {
pub retry_config: RetryConfig,
pub service: Url,
pub is_emulator: bool,
pub skip_tagging: bool,
pub client_options: ClientOptions,
}

impl AzureConfig {
fn path_url(&self, path: &Path) -> Url {
pub(crate) fn path_url(&self, path: &Path) -> Url {
let mut url = self.service.clone();
{
let mut path_mut = url.path_segments_mut().unwrap();
Expand Down Expand Up @@ -229,6 +232,11 @@ impl AzureClient {
}
};

let builder = match (opts.tags.encoded(), self.config.skip_tagging) {
("", _) | (_, true) => builder,
(tags, false) => builder.header(&TAGS_HEADER, tags),
};

let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}
Expand Down Expand Up @@ -315,6 +323,23 @@ impl AzureClient {

Ok(())
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let response = self
.client
.request(Method::GET, url)
.query(&[("comp", "tags")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;

let validate = !integration.client.config().skip_tagging;
tagging(&integration, validate, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
.await
}

#[test]
Expand Down
Loading