From b2bad8ac249d79281d416ac7ca8b4ded8dd44378 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 12 Sep 2024 15:02:01 +0000 Subject: [PATCH] feat: improve AWS credential loading between S3 and DynamoDb code paths This change implements a number of improvements to the code paths for loading credentials. This is a prerequisite to fixing assume role support #2879 but should also address a number of bugs I noticed: * Python libraries can pass in keys via `storage_options` which are used for configuration of the AmazonS3 ObjectStore, but those credentials would not be used in the construction of the DynamoDB connection * Using AWS credentials such as those from ~/.aws/profile or SSO would not be properly dropped into the AmazonS3 object store creation There is some additional work that needs to come in to clean up how various options overrides are managed still. Sponsored-by: Scribd Inc. --- crates/aws/Cargo.toml | 2 +- crates/aws/src/constants.rs | 138 +++++++++++ crates/aws/src/credentials.rs | 306 +++++++++++++++++------- crates/aws/src/lib.rs | 42 +--- crates/aws/src/storage.rs | 436 +++++++++++++--------------------- 5 files changed, 518 insertions(+), 406 deletions(-) create mode 100644 crates/aws/src/constants.rs diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 9fe0c05934..c68af0f6c5 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.2.0" +version = "0.2.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs new file mode 100644 index 0000000000..73d2da1b48 --- /dev/null +++ b/crates/aws/src/constants.rs @@ -0,0 +1,138 @@ +//! Constants used for modifying and configuring various AWS S3 (or similar) connections with +//! delta-rs +//! + +use lazy_static::lazy_static; +use std::time::Duration; + +/// Custom S3 endpoint. +pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; +/// Custom DynamoDB endpoint. +/// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) +/// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB +pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; +/// The AWS region. +pub const AWS_REGION: &str = "AWS_REGION"; +/// The AWS profile. +pub const AWS_PROFILE: &str = "AWS_PROFILE"; +/// The AWS_ACCESS_KEY_ID to use for S3. +pub const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; +/// The AWS_SECRET_ACCESS_KEY to use for S3. +pub const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; +/// The AWS_SESSION_TOKEN to use for S3. +pub const AWS_SESSION_TOKEN: &str = "AWS_SESSION_TOKEN"; +/// Uses either "path" (the default) or "virtual", which turns on +/// [virtual host addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). +pub const AWS_S3_ADDRESSING_STYLE: &str = "AWS_S3_ADDRESSING_STYLE"; +/// Locking provider to use for safe atomic rename. +/// `dynamodb` is currently the only supported locking provider. +/// If not set, safe atomic rename is not available. +pub const AWS_S3_LOCKING_PROVIDER: &str = "AWS_S3_LOCKING_PROVIDER"; +/// The role to assume for S3 writes. +pub const AWS_IAM_ROLE_ARN: &str = "AWS_IAM_ROLE_ARN"; +/// The role to assume. Please use [AWS_IAM_ROLE_ARN] instead +#[deprecated(since = "0.20.0", note = "Please use AWS_IAM_ROLE_ARN instead")] +pub const AWS_S3_ASSUME_ROLE_ARN: &str = "AWS_S3_ASSUME_ROLE_ARN"; +/// The role session name to use when a role is assumed. If not provided a random session name is generated. +pub const AWS_IAM_ROLE_SESSION_NAME: &str = "AWS_IAM_ROLE_SESSION_NAME"; +/// The role session name to use when a role is assumed. If not provided a random session name is generated. +#[deprecated( + since = "0.20.0", + note = "Please use AWS_IAM_ROLE_SESSION_NAME instead" +)] +pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME"; +/// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is +/// default S3 server timeout . +/// However, since rusoto uses hyper as a client, its default timeout is 90 seconds +/// . +/// Hence, the `connection closed before message completed` could occur. +/// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise. +pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS"; +/// The `pool_idle_timeout` for the as3_constants sts client. See +/// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`. +pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS"; +/// The number of retries for S3 GET requests failed with 500 Internal Server Error. +pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = + "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES"; +/// The web identity token file to use when using a web identity provider. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; +/// The role name to use for web identity. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; +/// The role session name to use for web identity. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME"; +/// Allow http connections - mainly useful for integration tests +pub const AWS_ALLOW_HTTP: &str = "AWS_ALLOW_HTTP"; + +/// If set to "true", allows creating commits without concurrent writer protection. +/// Only safe if there is one writer to a given table. +pub const AWS_S3_ALLOW_UNSAFE_RENAME: &str = "AWS_S3_ALLOW_UNSAFE_RENAME"; + +/// If set to "true", disables the imds client +/// Defaults to "true" +pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED"; + +/// The timeout in milliseconds for the EC2 metadata endpoint +/// Defaults to 100 +pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT"; + +/// The list of option keys owned by the S3 module. +/// Option keys not contained in this list will be added to the `extra_opts` +/// field of [crate::storage::s3::S3StorageOptions]. +pub const S3_OPTS: &[&str] = &[ + AWS_ENDPOINT_URL, + AWS_ENDPOINT_URL_DYNAMODB, + AWS_REGION, + AWS_PROFILE, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, + AWS_SESSION_TOKEN, + AWS_S3_LOCKING_PROVIDER, + AWS_S3_ASSUME_ROLE_ARN, + AWS_S3_ROLE_SESSION_NAME, + AWS_WEB_IDENTITY_TOKEN_FILE, + AWS_ROLE_ARN, + AWS_ROLE_SESSION_NAME, + AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, + AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, + AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, + AWS_EC2_METADATA_DISABLED, + AWS_EC2_METADATA_TIMEOUT, +]; + +pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; +pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; +pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; +pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; + +pub const ATTR_TABLE_PATH: &str = "tablePath"; +pub const ATTR_FILE_NAME: &str = "fileName"; +pub const ATTR_TEMP_PATH: &str = "tempPath"; +pub const ATTR_COMPLETE: &str = "complete"; +pub const ATTR_EXPIRE_TIME: &str = "expireTime"; + +pub const STRING_TYPE: &str = "S"; + +pub const KEY_TYPE_HASH: &str = "HASH"; +pub const KEY_TYPE_RANGE: &str = "RANGE"; + +lazy_static! { + pub static ref CONDITION_EXPR_CREATE: String = format!( + "attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})" + ); + + pub static ref CONDITION_DELETE_INCOMPLETE: String = format!( + "(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))" + ); +} + +pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f"; +pub const DEFAULT_COMMIT_ENTRY_EXPIRATION_DELAY: Duration = Duration::from_secs(86_400); diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 9ddf19b74c..171548de87 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -1,118 +1,240 @@ -use std::{sync::Arc, time::Duration}; - -use aws_config::{ - ecs::EcsCredentialsProvider, - environment::{EnvironmentVariableCredentialsProvider, EnvironmentVariableRegionProvider}, - imds::credentials::ImdsCredentialsProvider, - meta::{credentials::CredentialsProviderChain, region::RegionProviderChain}, - profile::ProfileFileCredentialsProvider, - provider_config::ProviderConfig, - web_identity_token::WebIdentityTokenCredentialsProvider, +//! Custom AWS credential providers used by delta-rs +//! + +use std::sync::Arc; + +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_config::meta::credentials::CredentialsProviderChain; +use aws_config::sts::AssumeRoleProvider; +use aws_config::SdkConfig; +use aws_credential_types::provider::error::CredentialsError; +use aws_credential_types::provider::{future, ProvideCredentials}; +use aws_credential_types::Credentials; + +use deltalake_core::storage::object_store::aws::{AmazonS3ConfigKey, AwsCredential}; +use deltalake_core::storage::object_store::{ + CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult, }; -use aws_credential_types::provider::{self, ProvideCredentials}; -use tracing::Instrument; +use deltalake_core::storage::StorageOptions; +use deltalake_core::DeltaResult; -const IMDS_PROVIDER_NAME: &str = "Ec2InstanceMetadata"; +use crate::constants; -#[derive(Debug)] -pub struct ConfiguredCredentialChain { - provider_chain: CredentialsProviderChain, +/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig] +/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] +#[derive(Clone, Debug)] +pub(crate) struct AWSForObjectStore { + sdk_config: SdkConfig, } -#[derive(Debug)] -pub struct NoOpCredentials {} +impl AWSForObjectStore { + pub(crate) fn new(sdk_config: SdkConfig) -> Self { + Self { sdk_config } + } +} + +#[async_trait::async_trait] +impl CredentialProvider for AWSForObjectStore { + type Credential = AwsCredential; -pub fn new_region_provider(disable_imds: bool, imds_timeout: u64) -> RegionProviderChain { - let env_provider = EnvironmentVariableRegionProvider::new(); - let profile_file = aws_config::profile::region::ProfileFileRegionProvider::default(); - if disable_imds { - return RegionProviderChain::first_try(env_provider).or_else(profile_file); + /// Invoke the underlying [AssumeRoleProvider] to retrieve the temporary credentials associated + /// with the role assumed + async fn get_credential(&self) -> ObjectStoreResult> { + let provider = self + .sdk_config + .credentials_provider() + .ok_or(ObjectStoreError::NotImplemented)?; + let credentials = + provider + .provide_credentials() + .await + .map_err(|e| ObjectStoreError::NotSupported { + source: Box::new(e), + })?; + + Ok(Arc::new(Self::Credential { + key_id: credentials.access_key_id().into(), + secret_key: credentials.secret_access_key().into(), + token: None, + })) } +} - RegionProviderChain::first_try(env_provider) - .or_else(profile_file) - .or_else( - aws_config::imds::region::Builder::default() - .imds_client( - aws_config::imds::Client::builder() - .connect_timeout(Duration::from_millis(imds_timeout)) - .read_timeout(Duration::from_millis(imds_timeout)) - .build(), - ) - .build(), - ) +/// An [object_store::CredentialProvider] which handles retrieving the necessary +/// temporary credentials associated with the assumed role +#[derive(Debug)] +pub(crate) struct AssumeRoleCredentialProvider { + sdk_config: SdkConfig, } -impl ConfiguredCredentialChain { - pub fn new(disable_imds: bool, imds_timeout: u64, conf: &ProviderConfig) -> Self { - let imds_provider = Self::build_imds_provider(conf, disable_imds, imds_timeout); - let env_provider = EnvironmentVariableCredentialsProvider::default(); - let profile_provider = ProfileFileCredentialsProvider::builder() - .configure(conf) - .with_custom_provider(IMDS_PROVIDER_NAME, imds_provider.clone()) - .build(); - let web_identity_token_provider = WebIdentityTokenCredentialsProvider::builder() - .configure(conf) - .build(); - - let ecs_provider = EcsCredentialsProvider::builder().configure(conf).build(); - - let provider_chain = CredentialsProviderChain::first_try("Environment", env_provider) - .or_else("Profile", profile_provider) - .or_else("WebIdentityToken", web_identity_token_provider) - .or_else("EcsContainer", ecs_provider) - .or_else(IMDS_PROVIDER_NAME, imds_provider); - - Self { provider_chain } +impl AssumeRoleCredentialProvider { + fn session_name(&self) -> String { + /* + if let Some(_) = str_option(options, s3_constants::AWS_S3_ROLE_SESSION_NAME) { + warn!( + "AWS_S3_ROLE_SESSION_NAME is deprecated please AWS_IAM_ROLE_SESSION_NAME instead!" + ); + } + str_option(options, s3_constants::AWS_IAM_ROLE_SESSION_NAME) + .or(str_option(options, s3_constants::AWS_S3_ROLE_SESSION_NAME)) + .unwrap_or("delta-rs".into()) + */ + todo!() } - async fn credentials(&self) -> provider::Result { - self.provider_chain - .provide_credentials() - .instrument(tracing::debug_span!("provide_credentials", provider = %"default_chain")) - .await + fn iam_role(&self) -> String { + todo!() } +} - fn build_imds_provider( - conf: &ProviderConfig, - disable_imds: bool, - imds_timeout: u64, - ) -> Arc { - if disable_imds { - return Arc::new(NoOpCredentials {}); - } +#[async_trait::async_trait] +impl CredentialProvider for AssumeRoleCredentialProvider { + type Credential = AwsCredential; - let imds_provider = ImdsCredentialsProvider::builder() - .configure(conf) - .imds_client( - aws_config::imds::Client::builder() - .connect_timeout(Duration::from_millis(imds_timeout)) - .read_timeout(Duration::from_millis(imds_timeout)) - .build(), - ) - .build(); - Arc::new(imds_provider) + /// Invoke the underlying [AssumeRoleProvider] to retrieve the temporary credentials associated + /// with the role assumed + async fn get_credential(&self) -> ObjectStoreResult> { + let provider = AssumeRoleProvider::builder(self.iam_role()) + .configure(&self.sdk_config) + .session_name(self.session_name()) + .build() + .await; + let credentials = + provider + .provide_credentials() + .await + .map_err(|e| ObjectStoreError::NotSupported { + source: Box::new(e), + })?; + + Ok(Arc::new(Self::Credential { + key_id: credentials.access_key_id().into(), + secret_key: credentials.secret_access_key().into(), + token: None, + })) } } -impl ProvideCredentials for ConfiguredCredentialChain { - fn provide_credentials<'a>( - &'a self, - ) -> aws_credential_types::provider::future::ProvideCredentials<'a> - where - Self: 'a, - { - aws_credential_types::provider::future::ProvideCredentials::new(self.credentials()) +/// Name of the [OptionsCredentialsProvider] for AWS SDK use +const OPTS_PROVIDER: &str = "DeltaStorageOptionsProvider"; + +/// The [OptionsCredentialsProvider] helps users plug specific AWS credentials into their +/// [StorageOptions] in such a way that the AWS SDK code will be properly +/// loaded with those credentials before following the +/// [aws_config::default_provider::credentials::DefaultCredentialsChain] +#[derive(Clone, Debug)] +pub(crate) struct OptionsCredentialsProvider { + options: StorageOptions, +} + +impl OptionsCredentialsProvider { + /// Look at the options configured on the provider and return an appropriate + /// [Credentials] instance for AWS SDK credential resolution + fn credentials(&self) -> aws_credential_types::provider::Result { + let access_key = self.options.0.get(constants::AWS_ACCESS_KEY_ID).ok_or( + CredentialsError::not_loaded("access key not in StorageOptions"), + )?; + let secret_key = self.options.0.get(constants::AWS_SECRET_ACCESS_KEY).ok_or( + CredentialsError::not_loaded("secret key not in StorageOptions"), + )?; + let session_token = self.options.0.get(constants::AWS_SESSION_TOKEN).cloned(); + + Ok(Credentials::new( + access_key, + secret_key, + session_token, + None, + OPTS_PROVIDER, + )) } } -impl ProvideCredentials for NoOpCredentials { - fn provide_credentials<'a>(&'a self) -> provider::future::ProvideCredentials<'a> +impl ProvideCredentials for OptionsCredentialsProvider { + fn provide_credentials<'a>(&'a self) -> future::ProvideCredentials<'a> where Self: 'a, { - aws_credential_types::provider::future::ProvideCredentials::new(std::future::ready(Err( - provider::error::CredentialsError::not_loaded_no_source(), - ))) + future::ProvideCredentials::ready(self.credentials()) + } +} + +/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig] +/// for use with various AWS SDK APIs, such as in our [crate::logstore::S3DynamoDbLogStore] +pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult { + let options_provider = OptionsCredentialsProvider { options }; + + let default_provider = DefaultCredentialsChain::builder().build().await; + let credentials_provider = + CredentialsProviderChain::first_try("StorageOptions", options_provider) + .or_else("DefaultChain", default_provider); + + Ok(aws_config::from_env() + .credentials_provider(credentials_provider) + .load() + .await) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::constants; + use maplit::hashmap; + + #[tokio::test] + async fn test_options_credentials_provider() { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + + let config = resolve_credentials(options).await; + assert!(config.is_ok(), "{config:?}"); + let config = config.unwrap(); + + if let Some(provider) = &config.credentials_provider() { + let credentials = provider + .provide_credentials() + .await + .expect("Failed to provide credentials"); + assert_eq!( + "test_id", + credentials.access_key_id(), + "The access key should come from our options! {credentials:?}" + ); + assert_eq!( + "test_secret", + credentials.secret_access_key(), + "The secret should come from our options! {credentials:?}" + ); + } else { + panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); + } + } + + #[tokio::test] + async fn test_options_credentials_provider_session_token() { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_SESSION_TOKEN.to_string() => "test_token".to_string(), + }); + + let config = resolve_credentials(options) + .await + .expect("Failed to resolve_credentials"); + + if let Some(provider) = &config.credentials_provider() { + let credentials = provider + .provide_credentials() + .await + .expect("Failed to provide credentials"); + assert_eq!( + Some("test_token"), + credentials.session_token(), + "The session token should come from our options! {credentials:?}" + ); + } else { + panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); + } } } diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 187462cb12..9834fc8b54 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -1,5 +1,9 @@ -//! Lock client implementation based on DynamoDb. +//! AWS S3 and similar tooling for delta-rs +//! +//! This module also contains the [S3DynamoDbLogStore] implemtnation for concurrent writer support +//! with AWS S3 specifically. +pub mod constants; mod credentials; pub mod errors; pub mod logstore; @@ -608,42 +612,6 @@ pub enum CreateLockTableResult { TableAlreadyExists, } -pub mod constants { - use std::time::Duration; - - use lazy_static::lazy_static; - - pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; - pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; - pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; - pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; - - pub const ATTR_TABLE_PATH: &str = "tablePath"; - pub const ATTR_FILE_NAME: &str = "fileName"; - pub const ATTR_TEMP_PATH: &str = "tempPath"; - pub const ATTR_COMPLETE: &str = "complete"; - pub const ATTR_EXPIRE_TIME: &str = "expireTime"; - - pub const STRING_TYPE: &str = "S"; - - pub const KEY_TYPE_HASH: &str = "HASH"; - pub const KEY_TYPE_RANGE: &str = "RANGE"; - - lazy_static! { - pub static ref CONDITION_EXPR_CREATE: String = format!( - "attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})" - ); - - pub static ref CONDITION_DELETE_INCOMPLETE: String = format!( - "(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))" - ); - } - - pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f"; - - pub const DEFAULT_COMMIT_ENTRY_EXPIRATION_DELAY: Duration = Duration::from_secs(86_400); -} - /// Extract a field from an item's attribute value map, producing a descriptive error /// of the various failure cases. fn extract_required_string_field<'a>( diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 563e2de284..738b5aff64 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -1,29 +1,29 @@ //! AWS S3 storage backend. -use aws_config::default_provider::token::DefaultTokenChain; -use aws_config::meta::region::ProvideRegion; -use aws_config::provider_config::ProviderConfig; use aws_config::{Region, SdkConfig}; use bytes::Bytes; +use deltalake_core::storage::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; use deltalake_core::storage::object_store::{ - aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, ObjectMeta, - ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult, + parse_url_opts, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreScheme, PutMultipartOpts, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, }; use deltalake_core::storage::{ limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, }; -use deltalake_core::{DeltaResult, ObjectStoreError, Path}; +use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; -use object_store::{MultipartUpload, PutMultipartOpts, PutPayload}; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tracing::log::*; use url::Url; +use crate::constants; use crate::errors::DynamoDbConfigError; #[cfg(feature = "native-tls")] use crate::native; @@ -72,14 +72,29 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { storage_options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); - let (inner, prefix) = parse_url_opts( - url, - options.0.iter().filter_map(|(key, value)| { - let s3_key = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()).ok()?; - Some((s3_key, value.clone())) - }), - )?; + let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( + storage_options.clone(), + ))??; + + let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config)); + + let mut builder = AmazonS3Builder::new().with_url(url.to_string()); + + for (key, value) in options.0.iter() { + if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { + builder = builder.with_config(key, value.clone()); + } + } + + let (_scheme, path) = + ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError { + source: Box::new(e), + })?; + let prefix = Path::parse(path)?; + let inner = builder.with_credentials(os_credentials).build()?; + let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; + debug!("Initialized the object store: {store:?}"); Ok((store, prefix)) } @@ -147,29 +162,29 @@ impl PartialEq for S3StorageOptions { impl S3StorageOptions { /// Creates an instance of S3StorageOptions from the given HashMap. pub fn from_map(options: &HashMap) -> DeltaResult { - let extra_opts = options + let extra_opts: HashMap = options .iter() .filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str())) .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(); // Copy web identity values provided in options but not the environment into the environment // to get picked up by the `from_k8s_env` call in `get_web_identity_provider`. - Self::ensure_env_var(options, s3_constants::AWS_REGION); - Self::ensure_env_var(options, s3_constants::AWS_PROFILE); - Self::ensure_env_var(options, s3_constants::AWS_ACCESS_KEY_ID); - Self::ensure_env_var(options, s3_constants::AWS_SECRET_ACCESS_KEY); - Self::ensure_env_var(options, s3_constants::AWS_SESSION_TOKEN); - Self::ensure_env_var(options, s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE); - Self::ensure_env_var(options, s3_constants::AWS_ROLE_ARN); - Self::ensure_env_var(options, s3_constants::AWS_ROLE_SESSION_NAME); + Self::ensure_env_var(options, constants::AWS_REGION); + Self::ensure_env_var(options, constants::AWS_PROFILE); + Self::ensure_env_var(options, constants::AWS_ACCESS_KEY_ID); + Self::ensure_env_var(options, constants::AWS_SECRET_ACCESS_KEY); + Self::ensure_env_var(options, constants::AWS_SESSION_TOKEN); + Self::ensure_env_var(options, constants::AWS_WEB_IDENTITY_TOKEN_FILE); + Self::ensure_env_var(options, constants::AWS_ROLE_ARN); + Self::ensure_env_var(options, constants::AWS_ROLE_SESSION_NAME); let s3_pool_idle_timeout = - Self::u64_or_default(options, s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15); + Self::u64_or_default(options, constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15); let sts_pool_idle_timeout = - Self::u64_or_default(options, s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10); + Self::u64_or_default(options, constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10); let s3_get_internal_server_error_retries = Self::u64_or_default( options, - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, 10, ) as usize; @@ -178,74 +193,18 @@ impl S3StorageOptions { .map(|addressing_style| addressing_style == "virtual") .unwrap_or(false); - let allow_unsafe_rename = str_option(options, s3_constants::AWS_S3_ALLOW_UNSAFE_RENAME) + let allow_unsafe_rename = str_option(options, constants::AWS_S3_ALLOW_UNSAFE_RENAME) .map(|val| str_is_truthy(&val)) .unwrap_or(false); - let disable_imds = str_option(options, s3_constants::AWS_EC2_METADATA_DISABLED) - .map(|val| str_is_truthy(&val)) - .unwrap_or(true); - let imds_timeout = - Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100); - let (loader, provider_config) = - if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { - let (region_provider, provider_config) = Self::create_provider_config( - str_option(options, s3_constants::AWS_REGION) - .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok()) - .map_or(Region::from_static("custom"), Region::new), - )?; - let loader = aws_config::from_env() - .endpoint_url(endpoint_url) - .region(region_provider); - (loader, provider_config) - } else { - let (region_provider, provider_config) = Self::create_provider_config( - crate::credentials::new_region_provider(disable_imds, imds_timeout), - )?; - ( - aws_config::from_env().region(region_provider), - provider_config, - ) - }; - let credentials_provider = crate::credentials::ConfiguredCredentialChain::new( - disable_imds, - imds_timeout, - &provider_config, - ); - - let token_provider: DefaultTokenChain = execute_sdk_future( - DefaultTokenChain::builder() - .region(crate::credentials::new_region_provider( - disable_imds, - imds_timeout, - )) - .build(), - )?; - - #[cfg(feature = "native-tls")] - let sdk_config = execute_sdk_future( - loader - .http_client(native::use_native_tls_client( - str_option(options, s3_constants::AWS_ALLOW_HTTP) - .map(|val| str_is_truthy(&val)) - .unwrap_or(false), - )) - .credentials_provider(credentials_provider) - .token_provider(token_provider) - .load(), - )?; - #[cfg(feature = "rustls")] - let sdk_config = execute_sdk_future( - loader - .credentials_provider(credentials_provider) - .token_provider(token_provider) - .load(), - )?; + let storage_options = StorageOptions(options.clone()); + let sdk_config = + execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??; Ok(Self { virtual_hosted_style_request, - locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER), - dynamodb_endpoint: str_option(options, s3_constants::AWS_ENDPOINT_URL_DYNAMODB), + locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER), + dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB), s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout), sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout), s3_get_internal_server_error_retries, @@ -255,24 +214,16 @@ impl S3StorageOptions { }) } + /// Return the configured endpoint URL for S3 operations pub fn endpoint_url(&self) -> Option<&str> { self.sdk_config.endpoint_url() } + /// Return the configured region used for S3 operations pub fn region(&self) -> Option<&Region> { self.sdk_config.region() } - fn create_provider_config( - region_provider: T, - ) -> DeltaResult<(T, ProviderConfig)> { - let region = execute_sdk_future(region_provider.region())?; - Ok(( - region_provider, - ProviderConfig::default().with_region(region), - )) - } - fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { str_option(map, key) .and_then(|v| v.parse().ok()) @@ -281,7 +232,9 @@ impl S3StorageOptions { fn ensure_env_var(map: &HashMap, key: &str) { if let Some(val) = str_option(map, key) { - std::env::set_var(key, val); + unsafe { + std::env::set_var(key, val); + } } } @@ -307,7 +260,7 @@ where cfg = Some(handle.block_on(future)); }); }); - cfg.ok_or(deltalake_core::DeltaTableError::ObjectStore { + cfg.ok_or(DeltaTableError::ObjectStore { source: ObjectStoreError::Generic { store: STORE_NAME, source: Box::new(DynamoDbConfigError::InitializationError), @@ -334,7 +287,11 @@ pub struct S3StorageBackend { impl std::fmt::Display for S3StorageBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "S3StorageBackend") + write!( + f, + "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {} }}", + self.allow_unsafe_rename, self.inner + ) } } @@ -352,7 +309,11 @@ impl S3StorageBackend { impl std::fmt::Debug for S3StorageBackend { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!(fmt, "S3StorageBackend") + write!( + fmt, + "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {:?} }}", + self.allow_unsafe_rename, self.inner + ) } } @@ -442,100 +403,12 @@ impl ObjectStore for S3StorageBackend { /// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions]. /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename. +#[deprecated( + since = "0.20.0", + note = "s3_constants has moved up to deltalake_aws::constants::*" +)] pub mod s3_constants { - /// Custom S3 endpoint. - pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; - /// Custom DynamoDB endpoint. - /// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) - /// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB - pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; - /// The AWS region. - pub const AWS_REGION: &str = "AWS_REGION"; - /// The AWS profile. - pub const AWS_PROFILE: &str = "AWS_PROFILE"; - /// The AWS_ACCESS_KEY_ID to use for S3. - pub const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; - /// The AWS_SECRET_ACCESS_KEY to use for S3. - pub const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; - /// The AWS_SESSION_TOKEN to use for S3. - pub const AWS_SESSION_TOKEN: &str = "AWS_SESSION_TOKEN"; - /// Uses either "path" (the default) or "virtual", which turns on - /// [virtual host addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). - pub const AWS_S3_ADDRESSING_STYLE: &str = "AWS_S3_ADDRESSING_STYLE"; - /// Locking provider to use for safe atomic rename. - /// `dynamodb` is currently the only supported locking provider. - /// If not set, safe atomic rename is not available. - pub const AWS_S3_LOCKING_PROVIDER: &str = "AWS_S3_LOCKING_PROVIDER"; - /// The role to assume for S3 writes. - pub const AWS_S3_ASSUME_ROLE_ARN: &str = "AWS_S3_ASSUME_ROLE_ARN"; - /// The role session name to use when a role is assumed. If not provided a random session name is generated. - pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME"; - /// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is - /// default S3 server timeout . - /// However, since rusoto uses hyper as a client, its default timeout is 90 seconds - /// . - /// Hence, the `connection closed before message completed` could occur. - /// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise. - pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS"; - /// The `pool_idle_timeout` for the as3_constants sts client. See - /// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`. - pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS"; - /// The number of retries for S3 GET requests failed with 500 Internal Server Error. - pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = - "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES"; - /// The web identity token file to use when using a web identity provider. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; - /// The role name to use for web identity. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; - /// The role session name to use for web identity. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME"; - /// Allow http connections - mainly useful for integration tests - pub const AWS_ALLOW_HTTP: &str = "AWS_ALLOW_HTTP"; - - /// If set to "true", allows creating commits without concurrent writer protection. - /// Only safe if there is one writer to a given table. - pub const AWS_S3_ALLOW_UNSAFE_RENAME: &str = "AWS_S3_ALLOW_UNSAFE_RENAME"; - - /// If set to "true", disables the imds client - /// Defaults to "true" - pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED"; - - /// The timeout in milliseconds for the EC2 metadata endpoint - /// Defaults to 100 - pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT"; - - /// The list of option keys owned by the S3 module. - /// Option keys not contained in this list will be added to the `extra_opts` - /// field of [crate::storage::s3::S3StorageOptions]. - pub const S3_OPTS: &[&str] = &[ - AWS_ENDPOINT_URL, - AWS_ENDPOINT_URL_DYNAMODB, - AWS_REGION, - AWS_PROFILE, - AWS_ACCESS_KEY_ID, - AWS_SECRET_ACCESS_KEY, - AWS_SESSION_TOKEN, - AWS_S3_LOCKING_PROVIDER, - AWS_S3_ASSUME_ROLE_ARN, - AWS_S3_ROLE_SESSION_NAME, - AWS_WEB_IDENTITY_TOKEN_FILE, - AWS_ROLE_ARN, - AWS_ROLE_SESSION_NAME, - AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, - AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, - AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, - AWS_EC2_METADATA_DISABLED, - AWS_EC2_METADATA_TIMEOUT, - ]; + pub use crate::constants::*; } pub(crate) fn str_option(map: &HashMap, key: &str) -> Option { @@ -556,6 +429,7 @@ mod tests { use super::*; + use crate::constants; use aws_sdk_sts::config::ProvideCredentials; use maplit::hashmap; use serial_test::serial; @@ -592,10 +466,14 @@ mod tests { .filter(|k| !self.vars.contains_key(k)) .collect(); for k in to_remove { - std::env::remove_var(k); + unsafe { + std::env::remove_var(k); + } } for (key, value) in self.vars.drain() { - std::env::set_var(key, value); + unsafe { + std::env::set_var(key, value); + } } } } @@ -610,7 +488,9 @@ mod tests { }); for k in keys_to_clear { - std::env::remove_var(k); + unsafe { + std::env::remove_var(k); + } } } @@ -620,18 +500,18 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); - std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost"); - std::env::set_var(s3_constants::AWS_REGION, "us-west-1"); - std::env::set_var(s3_constants::AWS_PROFILE, "default"); - std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "default_key_id"); - std::env::set_var(s3_constants::AWS_SECRET_ACCESS_KEY, "default_secret_key"); - std::env::set_var(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); + std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost"); + std::env::set_var(constants::AWS_REGION, "us-west-1"); + std::env::set_var(constants::AWS_PROFILE, "default"); + std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id"); + std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key"); + std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); std::env::set_var( - s3_constants::AWS_S3_ASSUME_ROLE_ARN, + constants::AWS_S3_ASSUME_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(s3_constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); - std::env::set_var(s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); + std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); let options = S3StorageOptions::try_default().unwrap(); assert_eq!( @@ -681,27 +561,37 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), - s3_constants::AWS_PROFILE.to_string() => "default".to_string(), - s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), - s3_constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - s3_constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - s3_constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), - s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), - s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_PROFILE.to_string() => "default".to_string(), + constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), + constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), + constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), + constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), + constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), }).unwrap(); + // Get a default SdkConfig first, this ensures that if there are environment or profile + // information in the default load of credentials for the test run that it will pass + // the equivalence below + let storage_options = StorageOptions(HashMap::new()); + let sdk_config = + execute_sdk_future(crate::credentials::resolve_credentials(storage_options)) + .expect("Failed to run future") + .expect("Failed to load default SdkConfig") + .to_builder() + .endpoint_url("http://localhost:1234".to_string()) + .region(Region::from_static("us-west-2")) + .build(); + assert_eq!( S3StorageOptions { - sdk_config: SdkConfig::builder() - .endpoint_url("http://localhost:1234".to_string()) - .region(Region::from_static("us-west-2")) - .build(), + sdk_config, virtual_hosted_style_request: true, locking_provider: Some("another_locking_provider".to_string()), dynamodb_endpoint: None, @@ -724,20 +614,20 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), - s3_constants::AWS_ENDPOINT_URL_DYNAMODB.to_string() => "http://localhost:2345".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), - s3_constants::AWS_PROFILE.to_string() => "default".to_string(), - s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), - s3_constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - s3_constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - s3_constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), - s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), - s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), + constants::AWS_ENDPOINT_URL_DYNAMODB.to_string() => "http://localhost:2345".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_PROFILE.to_string() => "default".to_string(), + constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), + constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), + constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), + constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), + constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), }).unwrap(); assert_eq!( @@ -767,30 +657,30 @@ mod tests { fn storage_options_mixed_test() { ScopedEnv::run(|| { clear_env_of_aws_keys(); - std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost"); + std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost"); std::env::set_var( - s3_constants::AWS_ENDPOINT_URL_DYNAMODB, + constants::AWS_ENDPOINT_URL_DYNAMODB, "http://localhost:dynamodb", ); - std::env::set_var(s3_constants::AWS_REGION, "us-west-1"); - std::env::set_var(s3_constants::AWS_PROFILE, "default"); - std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "wrong_key_id"); - std::env::set_var(s3_constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key"); - std::env::set_var(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); + std::env::set_var(constants::AWS_REGION, "us-west-1"); + std::env::set_var(constants::AWS_PROFILE, "default"); + std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id"); + std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key"); + std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); std::env::set_var( - s3_constants::AWS_S3_ASSUME_ROLE_ARN, + constants::AWS_S3_ASSUME_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(s3_constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); - std::env::set_var(s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); + std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); - std::env::set_var(s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1"); - std::env::set_var(s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2"); - std::env::set_var(s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3"); + std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1"); + std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2"); + std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3"); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES".to_string() => "3".to_string(), }) .unwrap(); @@ -821,30 +711,27 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); let _options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "web_identity_token_file".to_string(), - s3_constants::AWS_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/web_identity_role".to_string(), - s3_constants::AWS_ROLE_SESSION_NAME.to_string() => "web_identity_session_name".to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "web_identity_token_file".to_string(), + constants::AWS_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/web_identity_role".to_string(), + constants::AWS_ROLE_SESSION_NAME.to_string() => "web_identity_session_name".to_string(), }).unwrap(); - assert_eq!( - "eu-west-1", - std::env::var(s3_constants::AWS_REGION).unwrap() - ); + assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap()); assert_eq!( "web_identity_token_file", - std::env::var(s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap() + std::env::var(constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap() ); assert_eq!( "arn:aws:iam::123456789012:role/web_identity_role", - std::env::var(s3_constants::AWS_ROLE_ARN).unwrap() + std::env::var(constants::AWS_ROLE_ARN).unwrap() ); assert_eq!( "web_identity_session_name", - std::env::var(s3_constants::AWS_ROLE_SESSION_NAME).unwrap() + std::env::var(constants::AWS_ROLE_SESSION_NAME).unwrap() ); }); } @@ -856,10 +743,10 @@ mod tests { clear_env_of_aws_keys(); let raw_options = hashmap! {}; - std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "env_key"); - std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "env_key"); - std::env::set_var(s3_constants::AWS_SECRET_ACCESS_KEY, "env_key"); - std::env::set_var(s3_constants::AWS_REGION, "env_key"); + std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key"); + std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key"); + std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key"); + std::env::set_var(constants::AWS_REGION, "env_key"); let combined_options = S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); @@ -921,20 +808,17 @@ mod tests { async fn storage_options_configure_imds(value: Option<&str>) -> Duration { let _options = match value { Some(value) => S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), - s3_constants::AWS_EC2_METADATA_DISABLED.to_string() => value.to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_EC2_METADATA_DISABLED.to_string() => value.to_string(), }) .unwrap(), None => S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), }) .unwrap(), }; - assert_eq!( - "eu-west-1", - std::env::var(s3_constants::AWS_REGION).unwrap() - ); + assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap()); let provider = _options.sdk_config.credentials_provider().unwrap(); let now = SystemTime::now();