From b4316009d9568e091e9a5bb364eb2728edbf1a7c Mon Sep 17 00:00:00 2001 From: Thomas <12407096+thomas-chauvet@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:16:00 +0100 Subject: [PATCH 1/2] feat: override dynamodb config Signed-off-by: Thomas <12407096+thomas-chauvet@users.noreply.github.com> --- crates/aws/src/constants.rs | 16 +- crates/aws/src/credentials.rs | 2 +- crates/aws/src/lib.rs | 139 +++++++++++++++--- crates/aws/src/logstore/dynamodb_logstore.rs | 4 + crates/aws/src/storage.rs | 23 +++ crates/aws/tests/integration_s3_dynamodb.rs | 4 + .../writing-to-s3-with-locking-provider.md | 43 +++++- 7 files changed, 204 insertions(+), 27 deletions(-) diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index ca73b69819..28be65e6c6 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -7,11 +7,25 @@ use std::time::Duration; /// Custom S3 endpoint. pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; -/// Custom DynamoDB endpoint. + +/// Custom DynamoDB /// /// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) /// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; +/// If DynamoDB region is different from S3 region, set this to the DynamoDB region. +/// If it is supplied, this region takes precedence over the global region set in AWS_REGION for DynamoDB +pub const AWS_REGION_DYNAMODB: &str = "AWS_REGION_DYNAMODB"; +/// If DynamoDB access key is different from S3 access key, set this to the DynamoDB access key. +/// If it is supplied, this access key takes precedence over the global access key set in AWS_ACCESS_KEY_ID for DynamoDB +pub const AWS_ACCESS_KEY_ID_DYNAMODB: &str = "AWS_ACCESS_KEY_ID_DYNAMODB"; +/// If DynamoDB secret key is different from S3 secret key, set this to the DynamoDB secret key. +/// If it is supplied, this secret key takes precedence over the global secret key set in AWS_SECRET_ACCESS_KEY for DynamoDB +pub const AWS_SECRET_ACCESS_KEY_DYNAMODB: &str = "AWS_SECRET_ACCESS_KEY_DYNAMODB"; +/// If DynamoDB session token is different from S3 session token, set this to the DynamoDB session token. +/// If it is supplied, this session token takes precedence over the global session token set in AWS_SESSION_TOKEN for DynamoDB +pub const AWS_SESSION_TOKEN_DYNAMODB: &str = "AWS_SESSION_TOKEN_DYNAMODB"; + /// The AWS region. pub const AWS_REGION: &str = "AWS_REGION"; /// The AWS profile. diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 2a10d2825b..cdaf3f2999 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -372,7 +372,7 @@ mod tests { }); let sdk_config = resolve_credentials(options) .await - .expect("Failed to resolve credentijals for the test"); + .expect("Failed to resolve credentials for the test"); let provider = AWSForObjectStore::new(sdk_config); let _credential = provider .get_credential() diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 2f2bc1b472..69ac891838 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -10,7 +10,9 @@ pub mod logstore; #[cfg(feature = "native-tls")] mod native; pub mod storage; +use aws_config::Region; use aws_config::SdkConfig; +pub use aws_credential_types::provider::SharedCredentialsProvider; use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::{ operation::{ @@ -23,6 +25,10 @@ use aws_sdk_dynamodb::{ }, Client, }; +use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions}; +use deltalake_core::{DeltaResult, Path}; +use errors::{DynamoDbConfigError, LockClientError}; use lazy_static::lazy_static; use object_store::aws::AmazonS3ConfigKey; use regex::Regex; @@ -32,16 +38,10 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; +use storage::{S3ObjectStoreFactory, S3StorageOptions}; use tracing::debug; - -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; -use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions}; -use deltalake_core::{DeltaResult, Path}; use url::Url; -use errors::{DynamoDbConfigError, LockClientError}; -use storage::{S3ObjectStoreFactory, S3StorageOptions}; - #[derive(Clone, Debug, Default)] pub struct S3LogStoreFactory {} @@ -160,9 +160,19 @@ impl DynamoDbLockClient { billing_mode: Option, max_elapsed_request_time: Option, dynamodb_override_endpoint: Option, + dynamodb_override_region: Option, + dynamodb_override_access_key_id: Option, + dynamodb_override_secret_access_key: Option, + dynamodb_override_session_token: Option, ) -> Result { - let dynamodb_sdk_config = - Self::create_dynamodb_sdk_config(sdk_config, dynamodb_override_endpoint); + let dynamodb_sdk_config = Self::create_dynamodb_sdk_config( + sdk_config, + dynamodb_override_endpoint, + dynamodb_override_region, + dynamodb_override_access_key_id, + dynamodb_override_secret_access_key, + dynamodb_override_session_token, + ); let dynamodb_client = aws_sdk_dynamodb::Client::new(&dynamodb_sdk_config); @@ -202,20 +212,45 @@ impl DynamoDbLockClient { fn create_dynamodb_sdk_config( sdk_config: &SdkConfig, dynamodb_override_endpoint: Option, + dynamodb_override_region: Option, + dynamodb_override_access_key_id: Option, + dynamodb_override_secret_access_key: Option, + dynamodb_override_session_token: Option, ) -> SdkConfig { /* if dynamodb_override_endpoint exists/AWS_ENDPOINT_URL_DYNAMODB is specified by user - use dynamodb_override_endpoint to create dynamodb client + override the endpoint in the sdk_config + if dynamodb_override_region exists/AWS_REGION_DYNAMODB is specified by user + override the region in the sdk_config + if dynamodb_override_access_key_id exists/AWS_ACCESS_KEY_ID_DYNAMODB is specified by user + override the access_key_id in the sdk_config + if dynamodb_override_secret_access_key exists/AWS_SECRET_ACCESS_KEY_DYNAMODB is specified by user + override the secret_access_key in the sdk_config */ - match dynamodb_override_endpoint { - Some(dynamodb_endpoint_url) => sdk_config - .to_owned() - .to_builder() - .endpoint_url(dynamodb_endpoint_url) - .build(), - None => sdk_config.to_owned(), + let mut config_builder = sdk_config.to_owned().to_builder(); + + if let Some(dynamodb_endpoint_url) = dynamodb_override_endpoint { + config_builder = config_builder.endpoint_url(dynamodb_endpoint_url); } + + if let Some(dynamodb_region) = dynamodb_override_region { + config_builder = config_builder.region(Region::new(dynamodb_region)); + } + + if let (Some(access_key_id), Some(secret_access_key)) = ( + dynamodb_override_access_key_id, + dynamodb_override_secret_access_key, + ) { + config_builder = config_builder.credentials_provider(SharedCredentialsProvider::new( + aws_credential_types::Credentials::from_keys( + access_key_id, + secret_access_key, + dynamodb_override_session_token, + ), + )); + } + config_builder.build() } /// Create the lock table where DynamoDb stores the commit information for all delta tables. @@ -692,9 +727,11 @@ fn extract_version_from_filename(name: &str) -> Option { #[cfg(test)] mod tests { use super::*; - use aws_config::Region; + use aws_sdk_sts::config::{ProvideCredentials, ResolveCachedIdentity}; + use futures::future::Shared; use object_store::memory::InMemory; use serial_test::serial; + use tracing::instrument::WithSubscriber; fn commit_entry_roundtrip(c: &CommitEntry) -> Result<(), LockClientError> { let item_data: HashMap = create_value_map(c, "some_table"); @@ -752,6 +789,10 @@ mod tests { let dynamodb_sdk_config = DynamoDbLockClient::create_dynamodb_sdk_config( &sdk_config, Some("http://localhost:2345".to_string()), + None, + None, + None, + None, ); assert_eq!( dynamodb_sdk_config.endpoint_url(), @@ -761,8 +802,66 @@ mod tests { dynamodb_sdk_config.region().unwrap().to_string(), "eu-west-1".to_string(), ); - let dynamodb_sdk_no_override_config = - DynamoDbLockClient::create_dynamodb_sdk_config(&sdk_config, None); + let dynamodb_sdk_no_override_config = DynamoDbLockClient::create_dynamodb_sdk_config( + &sdk_config, + None, + None, + None, + None, + None, + ); + assert_eq!( + dynamodb_sdk_no_override_config.endpoint_url(), + Some("http://localhost:1234"), + ); + } + + #[tokio::test] + #[serial] + async fn test_create_dynamodb_sdk_config_override_credentials() { + let sdk_config = SdkConfig::builder() + .region(Region::from_static("eu-west-1")) + .endpoint_url("http://localhost:1234") + .build(); + let dynamodb_sdk_config = DynamoDbLockClient::create_dynamodb_sdk_config( + &sdk_config, + Some("http://localhost:2345".to_string()), + Some("us-west-1".to_string()), + Some("access_key_dynamodb".to_string()), + Some("secret_access_key_dynamodb".to_string()), + None, + ); + assert_eq!( + dynamodb_sdk_config.endpoint_url(), + Some("http://localhost:2345"), + ); + assert_eq!( + dynamodb_sdk_config.region().unwrap().to_string(), + "us-west-1".to_string(), + ); + + // check that access key and secret access key are overridden + let credentials_provider = dynamodb_sdk_config + .credentials_provider() + .unwrap() + .provide_credentials() + .await + .unwrap(); + + assert_eq!(credentials_provider.access_key_id(), "access_key_dynamodb"); + assert_eq!( + credentials_provider.secret_access_key(), + "secret_access_key_dynamodb" + ); + + let dynamodb_sdk_no_override_config = DynamoDbLockClient::create_dynamodb_sdk_config( + &sdk_config, + None, + None, + None, + None, + None, + ); assert_eq!( dynamodb_sdk_no_override_config.endpoint_url(), Some("http://localhost:1234"), diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index fccb8c9060..e08520c2a1 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -59,6 +59,10 @@ impl S3DynamoDbLogStore { .get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME) .cloned(), s3_options.dynamodb_endpoint.clone(), + s3_options.dynamodb_region.clone(), + s3_options.dynamodb_access_key_id.clone(), + s3_options.dynamodb_secret_access_key.clone(), + s3_options.dynamodb_session_token.clone(), ) .map_err(|err| DeltaTableError::ObjectStore { source: ObjectStoreError::Generic { diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 604404e79b..909f4da2e2 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -154,6 +154,10 @@ pub struct S3StorageOptions { pub virtual_hosted_style_request: bool, pub locking_provider: Option, pub dynamodb_endpoint: Option, + pub dynamodb_region: Option, + pub dynamodb_access_key_id: Option, + pub dynamodb_secret_access_key: Option, + pub dynamodb_session_token: Option, pub s3_pool_idle_timeout: Duration, pub sts_pool_idle_timeout: Duration, pub s3_get_internal_server_error_retries: usize, @@ -168,6 +172,10 @@ impl PartialEq for S3StorageOptions { self.virtual_hosted_style_request == other.virtual_hosted_style_request && self.locking_provider == other.locking_provider && self.dynamodb_endpoint == other.dynamodb_endpoint + && self.dynamodb_region == other.dynamodb_region + && self.dynamodb_access_key_id == other.dynamodb_access_key_id + && self.dynamodb_secret_access_key == other.dynamodb_secret_access_key + && self.dynamodb_session_token == other.dynamodb_session_token && self.s3_pool_idle_timeout == other.s3_pool_idle_timeout && self.sts_pool_idle_timeout == other.sts_pool_idle_timeout && self.s3_get_internal_server_error_retries @@ -231,6 +239,13 @@ impl S3StorageOptions { virtual_hosted_style_request, locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER), dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB), + dynamodb_region: str_option(options, constants::AWS_REGION_DYNAMODB), + dynamodb_access_key_id: str_option(options, constants::AWS_ACCESS_KEY_ID_DYNAMODB), + dynamodb_secret_access_key: str_option( + options, + constants::AWS_SECRET_ACCESS_KEY_DYNAMODB, + ), + dynamodb_session_token: str_option(options, constants::AWS_SESSION_TOKEN_DYNAMODB), s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout), sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout), s3_get_internal_server_error_retries, @@ -550,6 +565,10 @@ mod tests { virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: None, + dynamodb_region: Some("us-west-1".to_string()), + dynamodb_access_key_id: None, + dynamodb_secret_access_key: None, + dynamodb_session_token: None, s3_pool_idle_timeout: Duration::from_secs(15), sts_pool_idle_timeout: Duration::from_secs(10), s3_get_internal_server_error_retries: 10, @@ -696,6 +715,10 @@ mod tests { virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()), + dynamodb_region: Some("us-west-2".to_string()), + dynamodb_access_key_id: None, + dynamodb_secret_access_key: None, + dynamodb_session_token: None, s3_pool_idle_timeout: Duration::from_secs(1), sts_pool_idle_timeout: Duration::from_secs(2), s3_get_internal_server_error_retries: 3, diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index da0b0e06c8..1e0110fcb2 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -49,6 +49,10 @@ fn make_client() -> TestResult { None, None, None, + None, + None, + None, + None, )?) } diff --git a/docs/usage/writing/writing-to-s3-with-locking-provider.md b/docs/usage/writing/writing-to-s3-with-locking-provider.md index 6a275d685a..861f25aa2c 100644 --- a/docs/usage/writing/writing-to-s3-with-locking-provider.md +++ b/docs/usage/writing/writing-to-s3-with-locking-provider.md @@ -6,7 +6,7 @@ When writing to S3, delta-rs provides a locking mechanism to ensure that concurr To enable safe concurrent writes to AWS S3, we must provide an external locking mechanism. -### DynamoDB +## DynamoDB DynamoDB is the only available locking provider at the moment in delta-rs. To enable DynamoDB as the locking provider, you need to set the `AWS_S3_LOCKING_PROVIDER` to 'dynamodb' as a `storage_options` or as an environment variable. @@ -74,12 +74,45 @@ aws dynamodb create-table \ You can find additional information in the [Delta Lake documentation](https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup), which also includes recommendations on configuring a time-to-live (TTL) for the table to avoid growing the table indefinitely. -### Enable unsafe writes in S3 (opt-in) +### Override DynamoDB config + +In some cases, you may want to override the default DynamoDB configuration. For instance, you use an S3-compatible storage on another cloud provider which is not AWS. Or you need to have another set of credentials for DynamoDB. In this case, you can configure the `storage_options` with extra environment variables: + +- `AWS_ENDPOINT_URL_DYNAMODB` overrides `AWS_ENDPOINT_URL`, +- `AWS_REGION_DYNAMODB` overrides `AWS_REGION`, +- `AWS_ACCESS_KEY_ID_DYNAMODB` overrides `AWS_ACCESS_KEY_ID`, +- `AWS_SECRET_ACCESS_KEY_DYNAMODB` overrides `AWS_SECRET_ACCESS_KEY`. + +Example: + +```python +from deltalake import write_deltalake +df = pd.DataFrame({'x': [1, 2, 3]}) +storage_options = { + "endpoint_url": "https://", + "REGION": "", + "AWS_ACCESS_KEY_ID": "", + "AWS_SECRET_ACCESS_KEY": "", + # override dynamodb config + "AWS_S3_LOCKING_PROVIDER": "dynamodb", + "AWS_ENDPOINT_URL_DYNAMODB": "https://dynamodb..amazonaws.com", + "AWS_REGION_DYNAMODB": "", + "AWS_ACCESS_KEY_ID_DYNAMODB": "", + "AWS_SECRET_ACCESS_KEY_DYNAMODB": "", +} +write_deltalake( + 's3a://path/to/table', + df, + storage_options=storage_options +) +``` + +## Enable unsafe writes in S3 (opt-in) If for some reason you don't want to use dynamodb as your locking mechanism you can choose to set the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to `true` in order to enable S3 unsafe writes. -### Required permissions +## Required permissions You need to have permissions to get, put and delete objects in the S3 bucket you're storing your data in. Please note that you must be allowed to delete objects even if you're just appending to the deltalake, because there are temporary files into the log folder that are deleted after usage. @@ -95,9 +128,9 @@ In DynamoDB, you need those permissions: - dynamodb:Query - dynamodb:PutItem - dynamodb:UpdateItem -- dynamodb:DeleteItem +- dynamodb:DeleteItem -### Enabling concurrent writes for alternative clients +## Enabling concurrent writes for alternative clients Unlike AWS S3, some S3 clients support atomic renames by passing some headers in requests. From d04e317a38885babac5c8f5841522008cca84ed5 Mon Sep 17 00:00:00 2001 From: Thomas <12407096+thomas-chauvet@users.noreply.github.com> Date: Fri, 22 Nov 2024 07:33:17 +0100 Subject: [PATCH 2/2] fix(test): fix previous workaround Signed-off-by: Thomas <12407096+thomas-chauvet@users.noreply.github.com> --- crates/aws/src/storage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 909f4da2e2..b2ad64d0c1 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -565,7 +565,7 @@ mod tests { virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: None, - dynamodb_region: Some("us-west-1".to_string()), + dynamodb_region: None, dynamodb_access_key_id: None, dynamodb_secret_access_key: None, dynamodb_session_token: None, @@ -715,7 +715,7 @@ mod tests { virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()), - dynamodb_region: Some("us-west-2".to_string()), + dynamodb_region: None, dynamodb_access_key_id: None, dynamodb_secret_access_key: None, dynamodb_session_token: None,