Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object tagging (#4754) #4999

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub struct AmazonS3Builder {
copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
/// Put precondition
conditional_put: Option<ConfigValue<S3ConditionalPut>>,
/// Ignore tags
disable_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -299,6 +301,15 @@ pub enum AmazonS3ConfigKey {
/// Skip signing request
SkipSignature,

/// Disable tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `aws_disable_tagging`
/// - `disable_tagging`
DisableTagging,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -322,6 +333,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::SkipSignature => "aws_skip_signature",
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
Self::Client(opt) => opt.as_ref(),
}
}
Expand Down Expand Up @@ -350,6 +362,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -453,6 +466,7 @@ impl AmazonS3Builder {
self.client_options = self.client_options.with_config(key, value)
}
AmazonS3ConfigKey::SkipSignature => self.skip_signature.parse(value),
AmazonS3ConfigKey::DisableTagging => self.disable_tagging.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into()))
}
Expand Down Expand Up @@ -525,6 +539,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
}
}

Expand Down Expand Up @@ -735,6 +750,12 @@ impl AmazonS3Builder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
self.disable_tagging = ignore.into();
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -851,6 +872,7 @@ impl AmazonS3Builder {
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
disable_tagging: self.disable_tagging.get()?,
checksum,
copy_if_not_exists,
conditional_put: put_precondition,
Expand Down
23 changes: 23 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ pub struct S3Config {
pub client_options: ClientOptions,
pub sign_payload: bool,
pub skip_signature: bool,
pub disable_tagging: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
Expand Down Expand Up @@ -588,6 +589,28 @@ impl S3Client {
version,
})
}

#[cfg(test)]
pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.config.get_credential().await?;
let url = format!("{}?tagging", self.config.path_url(path));
let response = self
.client
.request(Method::GET, url)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
16 changes: 14 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
Expand All @@ -52,6 +52,8 @@ use crate::{
PutOptions, PutResult, Result,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");

mod builder;
mod checksum;
mod client;
Expand Down Expand Up @@ -160,7 +162,12 @@ impl Signer for AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let request = self.client.put_request(location, bytes);
let mut request = self.client.put_request(location, bytes);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config().disable_tagging {
request = request.header(&TAGS_HEADER, tags);
}

match (opts.mode, &self.client.config().conditional_put) {
(PutMode::Overwrite, _) => request.send().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
Expand Down Expand Up @@ -342,6 +349,11 @@ mod tests {
stream_get(&integration).await;
multipart(&integration, &integration).await;

tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
.await;
if test_not_exists {
copy_if_not_exists(&integration).await;
}
Expand Down
22 changes: 22 additions & 0 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub struct MicrosoftAzureBuilder {
///
/// i.e. https://{account_name}.dfs.fabric.microsoft.com
use_fabric_endpoint: ConfigValue<bool>,
/// When set to true, skips tagging objects
disable_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`MicrosoftAzureBuilder`]
Expand Down Expand Up @@ -321,6 +323,15 @@ pub enum AzureConfigKey {
/// - `container_name`
ContainerName,

/// Disables tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `azure_disable_tagging`
/// - `disable_tagging`
DisableTagging,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -344,6 +355,7 @@ impl AsRef<str> for AzureConfigKey {
Self::FederatedTokenFile => "azure_federated_token_file",
Self::UseAzureCli => "azure_use_azure_cli",
Self::ContainerName => "azure_container_name",
Self::DisableTagging => "azure_disable_tagging",
Self::Client(key) => key.as_ref(),
}
}
Expand Down Expand Up @@ -387,6 +399,7 @@ impl FromStr for AzureConfigKey {
"azure_use_fabric_endpoint" | "use_fabric_endpoint" => Ok(Self::UseFabricEndpoint),
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
"azure_container_name" | "container_name" => Ok(Self::ContainerName),
"azure_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -503,6 +516,7 @@ impl MicrosoftAzureBuilder {
self.client_options = self.client_options.with_config(key, value)
}
AzureConfigKey::ContainerName => self.container_name = Some(value.into()),
AzureConfigKey::DisableTagging => self.disable_tagging.parse(value),
};
self
}
Expand Down Expand Up @@ -556,6 +570,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::UseAzureCli => Some(self.use_azure_cli.to_string()),
AzureConfigKey::Client(key) => self.client_options.get_config_value(key),
AzureConfigKey::ContainerName => self.container_name.clone(),
AzureConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
}
}

Expand Down Expand Up @@ -781,6 +796,12 @@ impl MicrosoftAzureBuilder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
self.disable_tagging = ignore.into();
self
}

/// Configure a connection to container with given name on Microsoft Azure Blob store.
pub fn build(mut self) -> Result<MicrosoftAzure> {
if let Some(url) = self.url.take() {
Expand Down Expand Up @@ -885,6 +906,7 @@ impl MicrosoftAzureBuilder {
account,
is_emulator,
container,
disable_tagging: self.disable_tagging.get()?,
retry_config: self.retry_config,
client_options: self.client_options,
service: storage_url,
Expand Down
27 changes: 26 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -124,11 +126,12 @@ pub(crate) struct AzureConfig {
pub retry_config: RetryConfig,
pub service: Url,
pub is_emulator: bool,
pub disable_tagging: bool,
pub client_options: ClientOptions,
}

impl AzureConfig {
fn path_url(&self, path: &Path) -> Url {
pub(crate) fn path_url(&self, path: &Path) -> Url {
let mut url = self.service.clone();
{
let mut path_mut = url.path_segments_mut().unwrap();
Expand Down Expand Up @@ -229,6 +232,11 @@ impl AzureClient {
}
};

let builder = match (opts.tags.encoded(), self.config.disable_tagging) {
("", _) | (_, true) => builder,
(tags, false) => builder.header(&TAGS_HEADER, tags),
};

let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}
Expand Down Expand Up @@ -315,6 +323,23 @@ impl AzureClient {

Ok(())
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let response = self
.client
.request(Method::GET, url)
.query(&[("comp", "tags")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;

let validate = !integration.client.config().disable_tagging;
tagging(&integration, validate, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
.await
}

#[test]
Expand Down
Loading
Loading