Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: override dynamodb config #3011

Merged
merged 5 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,25 @@ use std::time::Duration;

/// Custom S3 endpoint.
pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
/// Custom DynamoDB endpoint.

/// Custom DynamoDB
///
/// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL)
/// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB
pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB";
/// If DynamoDB region is different from S3 region, set this to the DynamoDB region.
/// If it is supplied, this region takes precedence over the global region set in AWS_REGION for DynamoDB
pub const AWS_REGION_DYNAMODB: &str = "AWS_REGION_DYNAMODB";
/// If DynamoDB access key is different from S3 access key, set this to the DynamoDB access key.
/// If it is supplied, this access key takes precedence over the global access key set in AWS_ACCESS_KEY_ID for DynamoDB
pub const AWS_ACCESS_KEY_ID_DYNAMODB: &str = "AWS_ACCESS_KEY_ID_DYNAMODB";
/// If DynamoDB secret key is different from S3 secret key, set this to the DynamoDB secret key.
/// If it is supplied, this secret key takes precedence over the global secret key set in AWS_SECRET_ACCESS_KEY for DynamoDB
pub const AWS_SECRET_ACCESS_KEY_DYNAMODB: &str = "AWS_SECRET_ACCESS_KEY_DYNAMODB";
/// If DynamoDB session token is different from S3 session token, set this to the DynamoDB session token.
/// If it is supplied, this session token takes precedence over the global session token set in AWS_SESSION_TOKEN for DynamoDB
pub const AWS_SESSION_TOKEN_DYNAMODB: &str = "AWS_SESSION_TOKEN_DYNAMODB";

/// The AWS region.
pub const AWS_REGION: &str = "AWS_REGION";
/// The AWS profile.
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mod tests {
});
let sdk_config = resolve_credentials(options)
.await
.expect("Failed to resolve credentijals for the test");
.expect("Failed to resolve credentials for the test");
let provider = AWSForObjectStore::new(sdk_config);
let _credential = provider
.get_credential()
Expand Down
139 changes: 119 additions & 20 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub mod logstore;
#[cfg(feature = "native-tls")]
mod native;
pub mod storage;
use aws_config::Region;
use aws_config::SdkConfig;
pub use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::{
operation::{
Expand All @@ -23,6 +25,10 @@ use aws_sdk_dynamodb::{
},
Client,
};
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
use errors::{DynamoDbConfigError, LockClientError};
use lazy_static::lazy_static;
use object_store::aws::AmazonS3ConfigKey;
use regex::Regex;
Expand All @@ -32,16 +38,10 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use storage::{S3ObjectStoreFactory, S3StorageOptions};
use tracing::debug;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
use url::Url;

use errors::{DynamoDbConfigError, LockClientError};
use storage::{S3ObjectStoreFactory, S3StorageOptions};

#[derive(Clone, Debug, Default)]
pub struct S3LogStoreFactory {}

Expand Down Expand Up @@ -160,9 +160,19 @@ impl DynamoDbLockClient {
billing_mode: Option<String>,
max_elapsed_request_time: Option<String>,
dynamodb_override_endpoint: Option<String>,
dynamodb_override_region: Option<String>,
dynamodb_override_access_key_id: Option<String>,
dynamodb_override_secret_access_key: Option<String>,
dynamodb_override_session_token: Option<String>,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_sdk_config =
Self::create_dynamodb_sdk_config(sdk_config, dynamodb_override_endpoint);
let dynamodb_sdk_config = Self::create_dynamodb_sdk_config(
sdk_config,
dynamodb_override_endpoint,
dynamodb_override_region,
dynamodb_override_access_key_id,
dynamodb_override_secret_access_key,
dynamodb_override_session_token,
);

let dynamodb_client = aws_sdk_dynamodb::Client::new(&dynamodb_sdk_config);

Expand Down Expand Up @@ -202,20 +212,45 @@ impl DynamoDbLockClient {
fn create_dynamodb_sdk_config(
sdk_config: &SdkConfig,
dynamodb_override_endpoint: Option<String>,
dynamodb_override_region: Option<String>,
dynamodb_override_access_key_id: Option<String>,
dynamodb_override_secret_access_key: Option<String>,
dynamodb_override_session_token: Option<String>,
) -> SdkConfig {
/*
if dynamodb_override_endpoint exists/AWS_ENDPOINT_URL_DYNAMODB is specified by user
use dynamodb_override_endpoint to create dynamodb client
override the endpoint in the sdk_config
if dynamodb_override_region exists/AWS_REGION_DYNAMODB is specified by user
override the region in the sdk_config
if dynamodb_override_access_key_id exists/AWS_ACCESS_KEY_ID_DYNAMODB is specified by user
override the access_key_id in the sdk_config
if dynamodb_override_secret_access_key exists/AWS_SECRET_ACCESS_KEY_DYNAMODB is specified by user
override the secret_access_key in the sdk_config
*/

match dynamodb_override_endpoint {
Some(dynamodb_endpoint_url) => sdk_config
.to_owned()
.to_builder()
.endpoint_url(dynamodb_endpoint_url)
.build(),
None => sdk_config.to_owned(),
let mut config_builder = sdk_config.to_owned().to_builder();

if let Some(dynamodb_endpoint_url) = dynamodb_override_endpoint {
config_builder = config_builder.endpoint_url(dynamodb_endpoint_url);
}

if let Some(dynamodb_region) = dynamodb_override_region {
config_builder = config_builder.region(Region::new(dynamodb_region));
}

if let (Some(access_key_id), Some(secret_access_key)) = (
dynamodb_override_access_key_id,
dynamodb_override_secret_access_key,
) {
config_builder = config_builder.credentials_provider(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
access_key_id,
secret_access_key,
dynamodb_override_session_token,
),
));
}
config_builder.build()
}

/// Create the lock table where DynamoDb stores the commit information for all delta tables.
Expand Down Expand Up @@ -692,9 +727,11 @@ fn extract_version_from_filename(name: &str) -> Option<i64> {
#[cfg(test)]
mod tests {
use super::*;
use aws_config::Region;
use aws_sdk_sts::config::{ProvideCredentials, ResolveCachedIdentity};
use futures::future::Shared;
use object_store::memory::InMemory;
use serial_test::serial;
use tracing::instrument::WithSubscriber;

fn commit_entry_roundtrip(c: &CommitEntry) -> Result<(), LockClientError> {
let item_data: HashMap<String, AttributeValue> = create_value_map(c, "some_table");
Expand Down Expand Up @@ -752,6 +789,10 @@ mod tests {
let dynamodb_sdk_config = DynamoDbLockClient::create_dynamodb_sdk_config(
&sdk_config,
Some("http://localhost:2345".to_string()),
None,
None,
None,
None,
);
assert_eq!(
dynamodb_sdk_config.endpoint_url(),
Expand All @@ -761,8 +802,66 @@ mod tests {
dynamodb_sdk_config.region().unwrap().to_string(),
"eu-west-1".to_string(),
);
let dynamodb_sdk_no_override_config =
DynamoDbLockClient::create_dynamodb_sdk_config(&sdk_config, None);
let dynamodb_sdk_no_override_config = DynamoDbLockClient::create_dynamodb_sdk_config(
&sdk_config,
None,
None,
None,
None,
None,
);
assert_eq!(
dynamodb_sdk_no_override_config.endpoint_url(),
Some("http://localhost:1234"),
);
}

#[tokio::test]
#[serial]
async fn test_create_dynamodb_sdk_config_override_credentials() {
let sdk_config = SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.endpoint_url("http://localhost:1234")
.build();
let dynamodb_sdk_config = DynamoDbLockClient::create_dynamodb_sdk_config(
&sdk_config,
Some("http://localhost:2345".to_string()),
Some("us-west-1".to_string()),
Some("access_key_dynamodb".to_string()),
Some("secret_access_key_dynamodb".to_string()),
None,
);
assert_eq!(
dynamodb_sdk_config.endpoint_url(),
Some("http://localhost:2345"),
);
assert_eq!(
dynamodb_sdk_config.region().unwrap().to_string(),
"us-west-1".to_string(),
);

// check that access key and secret access key are overridden
let credentials_provider = dynamodb_sdk_config
.credentials_provider()
.unwrap()
.provide_credentials()
.await
.unwrap();

assert_eq!(credentials_provider.access_key_id(), "access_key_dynamodb");
assert_eq!(
credentials_provider.secret_access_key(),
"secret_access_key_dynamodb"
);

let dynamodb_sdk_no_override_config = DynamoDbLockClient::create_dynamodb_sdk_config(
&sdk_config,
None,
None,
None,
None,
None,
);
assert_eq!(
dynamodb_sdk_no_override_config.endpoint_url(),
Some("http://localhost:1234"),
Expand Down
4 changes: 4 additions & 0 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl S3DynamoDbLogStore {
.get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME)
.cloned(),
s3_options.dynamodb_endpoint.clone(),
s3_options.dynamodb_region.clone(),
s3_options.dynamodb_access_key_id.clone(),
s3_options.dynamodb_secret_access_key.clone(),
s3_options.dynamodb_session_token.clone(),
)
.map_err(|err| DeltaTableError::ObjectStore {
source: ObjectStoreError::Generic {
Expand Down
23 changes: 23 additions & 0 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ pub struct S3StorageOptions {
pub virtual_hosted_style_request: bool,
pub locking_provider: Option<String>,
pub dynamodb_endpoint: Option<String>,
pub dynamodb_region: Option<String>,
pub dynamodb_access_key_id: Option<String>,
pub dynamodb_secret_access_key: Option<String>,
pub dynamodb_session_token: Option<String>,
pub s3_pool_idle_timeout: Duration,
pub sts_pool_idle_timeout: Duration,
pub s3_get_internal_server_error_retries: usize,
Expand All @@ -168,6 +172,10 @@ impl PartialEq for S3StorageOptions {
self.virtual_hosted_style_request == other.virtual_hosted_style_request
&& self.locking_provider == other.locking_provider
&& self.dynamodb_endpoint == other.dynamodb_endpoint
&& self.dynamodb_region == other.dynamodb_region
&& self.dynamodb_access_key_id == other.dynamodb_access_key_id
&& self.dynamodb_secret_access_key == other.dynamodb_secret_access_key
&& self.dynamodb_session_token == other.dynamodb_session_token
&& self.s3_pool_idle_timeout == other.s3_pool_idle_timeout
&& self.sts_pool_idle_timeout == other.sts_pool_idle_timeout
&& self.s3_get_internal_server_error_retries
Expand Down Expand Up @@ -231,6 +239,13 @@ impl S3StorageOptions {
virtual_hosted_style_request,
locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER),
dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB),
dynamodb_region: str_option(options, constants::AWS_REGION_DYNAMODB),
dynamodb_access_key_id: str_option(options, constants::AWS_ACCESS_KEY_ID_DYNAMODB),
dynamodb_secret_access_key: str_option(
options,
constants::AWS_SECRET_ACCESS_KEY_DYNAMODB,
),
dynamodb_session_token: str_option(options, constants::AWS_SESSION_TOKEN_DYNAMODB),
s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout),
sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout),
s3_get_internal_server_error_retries,
Expand Down Expand Up @@ -550,6 +565,10 @@ mod tests {
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: None,
dynamodb_region: None,
dynamodb_access_key_id: None,
dynamodb_secret_access_key: None,
dynamodb_session_token: None,
s3_pool_idle_timeout: Duration::from_secs(15),
sts_pool_idle_timeout: Duration::from_secs(10),
s3_get_internal_server_error_retries: 10,
Expand Down Expand Up @@ -696,6 +715,10 @@ mod tests {
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
dynamodb_region: None,
dynamodb_access_key_id: None,
dynamodb_secret_access_key: None,
dynamodb_session_token: None,
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
s3_get_internal_server_error_retries: 3,
Expand Down
4 changes: 4 additions & 0 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ fn make_client() -> TestResult<DynamoDbLockClient> {
None,
None,
None,
None,
None,
None,
None,
)?)
}

Expand Down
43 changes: 38 additions & 5 deletions docs/usage/writing/writing-to-s3-with-locking-provider.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ When writing to S3, delta-rs provides a locking mechanism to ensure that concurr

To enable safe concurrent writes to AWS S3, we must provide an external locking mechanism.

### DynamoDB
## DynamoDB

DynamoDB is the only available locking provider at the moment in delta-rs. To enable DynamoDB as the locking provider, you need to set the `AWS_S3_LOCKING_PROVIDER` to 'dynamodb' as a `storage_options` or as an environment variable.

Expand Down Expand Up @@ -76,12 +76,45 @@ aws dynamodb create-table \

You can find additional information in the [Delta Lake documentation](https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup), which also includes recommendations on configuring a time-to-live (TTL) for the table to avoid growing the table indefinitely.

### Enable unsafe writes in S3 (opt-in)
### Override DynamoDB config

In some cases, you may want to override the default DynamoDB configuration. For instance, you use an S3-compatible storage on another cloud provider which is not AWS. Or you need to have another set of credentials for DynamoDB. In this case, you can configure the `storage_options` with extra environment variables:

- `AWS_ENDPOINT_URL_DYNAMODB` overrides `AWS_ENDPOINT_URL`,
- `AWS_REGION_DYNAMODB` overrides `AWS_REGION`,
- `AWS_ACCESS_KEY_ID_DYNAMODB` overrides `AWS_ACCESS_KEY_ID`,
- `AWS_SECRET_ACCESS_KEY_DYNAMODB` overrides `AWS_SECRET_ACCESS_KEY`.

Example:

```python
from deltalake import write_deltalake
df = pd.DataFrame({'x': [1, 2, 3]})
storage_options = {
"endpoint_url": "https://<your-s3-compatible-storage>",
"REGION": "<s3-region>",
"AWS_ACCESS_KEY_ID": "<s3-access-key-id>",
"AWS_SECRET_ACCESS_KEY": "<s3-secret-access-key>",
# override dynamodb config
"AWS_S3_LOCKING_PROVIDER": "dynamodb",
"AWS_ENDPOINT_URL_DYNAMODB": "https://dynamodb.<dynamodb-region>.amazonaws.com",
"AWS_REGION_DYNAMODB": "<dynamodb-region>",
"AWS_ACCESS_KEY_ID_DYNAMODB": "<dynamodb-access-key-id>",
"AWS_SECRET_ACCESS_KEY_DYNAMODB": "<dynamodb-secret-access-key>",
}
write_deltalake(
's3a://path/to/table',
df,
storage_options=storage_options
)
```

## Enable unsafe writes in S3 (opt-in)

If for some reason you don't want to use dynamodb as your locking mechanism you can
choose to set the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to `true` in order to enable S3 unsafe writes.

### Required permissions
## Required permissions

You need to have permissions to get, put and delete objects in the S3 bucket you're storing your data in. Please note that you must be allowed to delete objects even if you're just appending to the deltalake, because there are temporary files into the log folder that are deleted after usage.

Expand All @@ -97,9 +130,9 @@ In DynamoDB, you need those permissions:
- dynamodb:Query
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:DeleteItem

### Enabling concurrent writes for alternative clients
## Enabling concurrent writes for alternative clients

Unlike AWS S3, some S3 clients support atomic renames by passing some headers
in requests.
Expand Down
Loading