Skip to content

Commit

Permalink
Move lock client into separate crate deltalake-aws.
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Dec 6, 2023
1 parent ede3540 commit b12aaff
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 103 deletions.
35 changes: 35 additions & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "deltalake-aws"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
object_store = "0.7"
lazy_static = "1"
maplit = "1"
thiserror = { workspace = true }
regex = { workspace = true }

[dev-dependencies]

[features]
native-tls = [
"rusoto_core/native-tls",
"rusoto_credential",
"rusoto_sts/native-tls",
"rusoto_dynamodb/native-tls",
"object_store/aws",
]
rustls = [
"rusoto_core/rustls",
"rusoto_credential",
"rusoto_sts/rustls",
"rusoto_dynamodb/rustls",
"object_store/aws",
]
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! Lock client implementation based on DynamoDb.
pub mod errors;

use lazy_static::lazy_static;
use regex::Regex;
use std::{
collections::HashMap,
str::FromStr,
time::{Duration, SystemTime},
};

use object_store::path::Path;
use rusoto_core::{HttpClient, RusotoError};
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
AttributeDefinition, AttributeValue, CreateTableError, CreateTableInput, DynamoDb,
Expand All @@ -15,12 +20,39 @@ use rusoto_dynamodb::{
};
use rusoto_sts::WebIdentityProvider;

use crate::{logstore::extract_version_from_filename, storage::s3::S3StorageOptions};
use errors::{DynamoDbConfigError, LockClientError};

use super::{
errors::{DynamoDbConfigError, LockClientError},
CommitEntry, CreateLockTableResult,
};
/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
/// - tablePath: String - tracked in the log store implementation
/// - fileName: String - commit version.json (part of primary key), stored as i64 in this struct
/// - tempPath: String - name of temporary file containing commit info
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
/// - expireTime: Option<SystemTime> - epoch seconds at which this external commit entry is safe to be deleted
#[derive(Debug, PartialEq)]
pub struct CommitEntry {
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
pub version: i64,
/// Path to temp file for this commit, relative to the `_delta_log
pub temp_path: Path,
/// true if delta json file is successfully copied to its destination location, else false
pub complete: bool,
/// If complete = true, epoch seconds at which this external commit entry is safe to be deleted
pub expire_time: Option<SystemTime>,
}

impl CommitEntry {
/// Create a new log entry for the given version.
/// Initial log entry state is incomplete.
pub fn new(version: i64, temp_path: Path) -> CommitEntry {
Self {
version,
temp_path,
complete: false,
expire_time: None,
}
}
}

/// Lock client backed by DynamoDb.
pub struct DynamoDbLockClient {
Expand All @@ -34,37 +66,39 @@ impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
///
/// Options are described in [crate::table::builder::s3_storage_options].
pub fn try_new(options: &S3StorageOptions) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = create_dynamodb_client(options)?;
let lock_table_name = options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);
pub fn try_new(
lock_table_name: Option<&String>,
billing_mode: Option<&String>,
region: Region,
use_web_identity: bool,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?;
let lock_table_name = lock_table_name.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);

let billing_mode: BillingMode = options
.extra_opts
.get(constants::BILLING_MODE_KEY_NAME)
.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
let billing_mode: BillingMode = billing_mode.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
let config = DynamoDbConfig {
billing_mode,
lock_table_name,
use_web_identity,
region,
};
Ok(Self {
dynamodb_client,
config: DynamoDbConfig {
lock_table_name,
billing_mode,
},
config,
})
}

Expand Down Expand Up @@ -211,7 +245,7 @@ impl DynamoDbLockClient {
}

/// Update existing log entry
pub(super) async fn update_commit_entry(
pub async fn update_commit_entry(
&self,
version: i64,
table_path: &str,
Expand Down Expand Up @@ -245,7 +279,7 @@ impl DynamoDbLockClient {
}

#[derive(Debug, PartialEq)]
pub(super) enum UpdateLogEntryResult {
pub enum UpdateLogEntryResult {
UpdatePerformed,
AlreadyCompleted,
}
Expand All @@ -263,11 +297,8 @@ impl TryFrom<&HashMap<String, AttributeValue>> for CommitEntry {
}
})?;
let temp_path = extract_required_string_field(item, constants::ATTR_TEMP_PATH)?;
let temp_path = Path::from_iter(
super::DELTA_LOG_PATH
.parts()
.chain(Path::from(temp_path).parts()),
);
let temp_path =
Path::from_iter(DELTA_LOG_PATH.parts().chain(Path::from(temp_path).parts()));
let expire_time: Option<SystemTime> =
extract_optional_number_field(item, constants::ATTR_EXPIRE_TIME)?
.map(|s| {
Expand Down Expand Up @@ -343,13 +374,25 @@ impl FromStr for BillingMode {
}
}

#[derive(Debug)]
struct DynamoDbConfig {
#[derive(Debug, PartialEq)]
pub struct DynamoDbConfig {
billing_mode: BillingMode,
lock_table_name: String,
use_web_identity: bool,
region: Region,
}

mod constants {
/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()`
#[derive(Debug, PartialEq)]
pub enum CreateLockTableResult {
/// Table created successfully.
TableCreated,
/// Table was not created because it already exists.
/// Does not imply that the table has the correct schema.
TableAlreadyExists,
}

pub mod constants {
use std::time::Duration;

use lazy_static::lazy_static;
Expand Down Expand Up @@ -381,18 +424,19 @@ mod constants {
}

fn create_dynamodb_client(
options: &S3StorageOptions,
region: Region,
use_web_identity: bool,
) -> Result<DynamoDbClient, DynamoDbConfigError> {
Ok(match options.use_web_identity {
Ok(match use_web_identity {
true => {
let dispatcher = HttpClient::new()?;
rusoto_dynamodb::DynamoDbClient::new_with(
dispatcher,
get_web_identity_provider()?,
options.region.clone(),
region,
)
}
false => rusoto_dynamodb::DynamoDbClient::new(options.region.clone()),
false => rusoto_dynamodb::DynamoDbClient::new(region),
})
}

Expand Down Expand Up @@ -458,6 +502,18 @@ fn get_web_identity_provider(
Ok(AutoRefreshingProvider::new(provider)?)
}

lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap();
}

/// Extract version from a file name in the delta log
fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
.captures(name)
.map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
}

#[cfg(test)]
mod tests {

Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
deltalake-aws = { path = "../deltalake-aws", default-features = false, optional = true }

# Glue
rusoto_glue = { version = "0.47", default-features = false, optional = true }
Expand Down Expand Up @@ -175,6 +176,7 @@ s3-native-tls = [
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"deltalake-aws/native-tls",
]
s3 = [
"rusoto_core/rustls",
Expand All @@ -183,6 +185,7 @@ s3 = [
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"deltalake-aws/rustls",
]
unity-experimental = ["reqwest", "tracing", "hyper"]

Expand Down
61 changes: 14 additions & 47 deletions crates/deltalake-core/src/logstore/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@
//! when the underlying object storage does not support atomic `put_if_absent`
//! or `rename_if_absent` operations, as is the case for S3.
mod errors;
pub mod lock_client;

use std::time::SystemTime;
use deltalake_aws::errors::LockClientError;
use deltalake_aws::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};

use bytes::Bytes;
use object_store::path::Path;
use url::Url;

use crate::logstore::DELTA_LOG_PATH;
use crate::{
operations::transaction::TransactionError,
storage::{config::StorageOptions, s3::S3StorageOptions, ObjectStoreRef},
DeltaResult, DeltaTableError,
};
use errors::LockClientError;

use self::lock_client::{DynamoDbLockClient, UpdateLogEntryResult};

use super::{LogStore, LogStoreConfig};

Expand All @@ -43,13 +37,17 @@ impl S3DynamoDbLogStore {
s3_options: &S3StorageOptions,
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(s3_options).map_err(|err| {
DeltaTableError::ObjectStore {
source: object_store::Error::Generic {
store: STORE_NAME,
source: err.into(),
},
}
let lock_client = DynamoDbLockClient::try_new(
s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME),
s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME),
s3_options.region.clone(),
s3_options.use_web_identity,
)
.map_err(|err| DeltaTableError::ObjectStore {
source: object_store::Error::Generic {
store: STORE_NAME,
source: err.into(),
},
})?;
let table_path = super::to_uri(&location, &Path::from(""));
Ok(Self {
Expand Down Expand Up @@ -204,6 +202,7 @@ impl LogStore for S3DynamoDbLogStore {
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;
println!("twh; fetched entry for {current_version}: {:?}", entry);
// when there is a latest entry in DynamoDb, we can avoid the file listing in S3.
if let Some(entry) = entry {
self.repair_entry(&entry).await?;
Expand Down Expand Up @@ -231,38 +230,6 @@ impl LogStore for S3DynamoDbLogStore {
}
}

/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
/// - tablePath: String - tracked in the log store implementation
/// - fileName: String - commit version.json (part of primary key), stored as i64 in this struct
/// - tempPath: String - name of temporary file containing commit info
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
/// - expireTime: Option<SystemTime> - epoch seconds at which this external commit entry is safe to be deleted
#[derive(Debug, PartialEq)]
pub struct CommitEntry {
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
pub version: i64,
/// Path to temp file for this commit, relative to the `_delta_log
pub temp_path: Path,
/// true if delta json file is successfully copied to its destination location, else false
pub complete: bool,
/// If complete = true, epoch seconds at which this external commit entry is safe to be deleted
pub expire_time: Option<SystemTime>,
}

impl CommitEntry {
/// Create a new log entry for the given version.
/// Initial log entry state is incomplete.
pub fn new(version: i64, temp_path: Path) -> CommitEntry {
Self {
version,
temp_path,
complete: false,
expire_time: None,
}
}
}

/// Represents the possible outcomes of calling `DynamoDbLockClient::repair_entry()`.
#[derive(Debug, PartialEq)]
pub enum RepairLogEntryResult {
Expand Down
Loading

0 comments on commit b12aaff

Please sign in to comment.