Skip to content

Commit

Permalink
Remove legacy S3 locking based on dynamodb_lock crate
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser authored and rtyler committed Dec 11, 2023
1 parent 2d2f038 commit f87afce
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 331 deletions.
7 changes: 7 additions & 0 deletions crates/deltalake-aws/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ pub enum LockClientError {
Credentials {
source: rusoto_credential::CredentialsError,
},

#[error(
"Atomic rename requires a LockClient for S3 backends. \
Either configure the LockClient, or set AWS_S3_ALLOW_UNSAFE_RENAME=true \
to opt out of support for concurrent writers."
)]
LockClientRequired,
}

impl From<GetItemError> for LockClientError {
Expand Down
4 changes: 1 addition & 3 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use errors::{DynamoDbConfigError, LockClientError};
/// - fileName: String - commit version.json (part of primary key), stored as i64 in this struct
/// - tempPath: String - name of temporary file containing commit info
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
/// - expireTime: Option<SystemTime> - epoch seconds at which this external commit entry is safe to be deleted
/// - expireTime: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
#[derive(Debug, PartialEq)]
pub struct CommitEntry {
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
Expand Down Expand Up @@ -64,8 +64,6 @@ pub struct DynamoDbLockClient {

impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
///
/// Options are described in [crate::table::builder::s3_storage_options].
pub fn try_new(
lock_table_name: Option<&String>,
billing_mode: Option<&String>,
Expand Down
7 changes: 0 additions & 7 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, fea
rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
deltalake-aws = { path = "../deltalake-aws", default-features = false, optional = true }

# Glue
Expand All @@ -119,8 +118,6 @@ sqlparser = { version = "0.39", optional = true }
fs_extra = { version = "1.3.0", optional = true }
tempdir = { version = "0", optional = true }

dynamodb_lock = { version = "0.6", default-features = false, optional = true }

[dev-dependencies]
ctor = "0"
dotenvy = "0"
Expand Down Expand Up @@ -173,17 +170,13 @@ s3-native-tls = [
"rusoto_core/native-tls",
"rusoto_credential",
"rusoto_sts/native-tls",
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"deltalake-aws/native-tls",
]
s3 = [
"rusoto_core/rustls",
"rusoto_credential",
"rusoto_sts/rustls",
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"deltalake-aws/rustls",
]
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/logstore/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use deltalake_aws::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryRe

use bytes::Bytes;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use url::Url;

use crate::{
Expand Down Expand Up @@ -79,7 +80,9 @@ impl S3DynamoDbLogStore {
return self.try_complete_entry(entry, true).await;
}
// `N.json` has already been moved, complete the entry in DynamoDb just in case
Err(TransactionError::VersionAlreadyExists(_)) => {
Err(TransactionError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
return self.try_complete_entry(entry, false).await;
}
Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err),
Expand Down Expand Up @@ -202,7 +205,6 @@ impl LogStore for S3DynamoDbLogStore {
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;
println!("twh; fetched entry for {current_version}: {:?}", entry);
// when there is a latest entry in DynamoDb, we can avoid the file listing in S3.
if let Some(entry) = entry {
self.repair_entry(&entry).await?;
Expand Down
2 changes: 0 additions & 2 deletions crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ impl ConvertToDeltaBuilder {
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
///
/// If an object store is also passed using `with_log_store()`, these options will be ignored.
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(storage_options);
Expand Down
2 changes: 0 additions & 2 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ impl CreateBuilder {
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
///
/// If an object store is also passed using `with_object_store()` these options will be ignored.
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(storage_options);
Expand Down
16 changes: 12 additions & 4 deletions crates/deltalake-core/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,24 @@ pub fn configure_store(
ObjectStoreScheme::AmazonS3 => {
options.with_env_s3();
let (store, prefix) = parse_url_opts(url, options.as_s3_options())?;
let s3_options = S3StorageOptions::from_map(&options.0);
if options
.as_s3_options()
.contains_key(&AmazonS3ConfigKey::CopyIfNotExists)
{
url_prefix_handler(store, prefix)
} else if Some("dynamodb".to_owned())
== s3_options
.locking_provider
.as_ref()
.map(|v| v.to_lowercase())
{
// if a lock client is requested, unsafe rename is always safe
let store = S3StorageBackend::try_new(Arc::new(store), true)?;
url_prefix_handler(store, prefix)
} else {
let store = S3StorageBackend::try_new(
Arc::new(store),
S3StorageOptions::from_map(&options.0),
)?;
let store =
S3StorageBackend::try_new(Arc::new(store), s3_options.allow_unsafe_rename)?;
url_prefix_handler(store, prefix)
}
}
Expand Down
Loading

0 comments on commit f87afce

Please sign in to comment.