Skip to content

Commit

Permalink
Add configurable max elapsed time
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Dec 27, 2023
1 parent 2c45ffe commit 82eeaca
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 70 deletions.
9 changes: 9 additions & 0 deletions crates/deltalake-aws/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Errors for S3 log store backed by DynamoDb
use std::num::ParseIntError;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{CreateTableError, GetItemError, PutItemError, QueryError, UpdateItemError};

Expand All @@ -24,6 +26,13 @@ pub enum DynamoDbConfigError {
/// Billing mode string invalid
#[error("Invalid billing mode : {0}, supported values : ['provided', 'pay_per_request']")]
InvalidBillingMode(String),

/// Cannot parse max_elapsed_request_time value into u64
#[error("Cannot parse max elapsed request time into u64: {source}")]
ParseMaxElapsedRequestTime {
// config_value: String,
source: ParseIntError,
},
}

/// Errors produced by `DynamoDbLockClient`
Expand Down
129 changes: 72 additions & 57 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,37 @@ pub struct DynamoDbLockClient {
impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
pub fn try_new(
lock_table_name: Option<&String>,
billing_mode: Option<&String>,
lock_table_name: Option<String>,
billing_mode: Option<String>,
max_elapsed_request_time: Option<String>,
region: Region,
use_web_identity: bool,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?;
let lock_table_name = lock_table_name.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);

let billing_mode: BillingMode = billing_mode.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
let lock_table_name = lock_table_name
.or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok())
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned());

let billing_mode = billing_mode
.or_else(|| std::env::var(constants::BILLING_MODE_KEY_NAME).ok())
.map_or_else(
|| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)?;

let max_elapsed_request_time = max_elapsed_request_time
.or_else(|| std::env::var(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME).ok())
.map_or_else(
|| Ok(Duration::from_secs(60)),
|secs| u64::from_str(&secs).map(Duration::from_secs),
)
.map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?;

let config = DynamoDbConfig {
billing_mode,
lock_table_name,
max_elapsed_request_time,
use_web_identity,
region,
};
Expand Down Expand Up @@ -150,6 +155,10 @@ impl DynamoDbLockClient {
self.config.lock_table_name.clone()
}

pub fn get_dynamodb_config(&self) -> &DynamoDbConfig {
&self.config
}

fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
maplit::hashmap! {
constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path),
Expand All @@ -169,16 +178,19 @@ impl DynamoDbLockClient {
key: self.get_primary_key(version, table_path),
..Default::default()
};
let item = retry(|| async {
match self.dynamodb_client.get_item(input.clone()).await {
Ok(x) => Ok(x),
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
let item = self
.retry(|| async {
match self.dynamodb_client.get_item(input.clone()).await {
Ok(x) => Ok(x),
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
item.item.as_ref().map(CommitEntry::try_from).transpose()
}

Expand All @@ -195,7 +207,7 @@ impl DynamoDbLockClient {
item,
..Default::default()
};
retry(|| async {
self.retry(|| async {
match self.dynamodb_client.put_item(input.clone()).await {
Ok(_) => Ok(()),
Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => Err(
Expand Down Expand Up @@ -245,16 +257,17 @@ impl DynamoDbLockClient {
),
..Default::default()
};
let query_result = retry(|| async {
match self.dynamodb_client.query(input.clone()).await {
Ok(result) => Ok(result),
Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
let query_result = self
.retry(|| async {
match self.dynamodb_client.query(input.clone()).await {
Ok(result) => Ok(result),
Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;

query_result
.items
Expand Down Expand Up @@ -288,7 +301,7 @@ impl DynamoDbLockClient {
..Default::default()
};

retry(|| async {
self.retry(|| async {
match self.dynamodb_client.update_item(input.clone()).await {
Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => {
Expand All @@ -304,19 +317,19 @@ impl DynamoDbLockClient {
})
.await
}
}

async fn retry<I, E, Fn, Fut>(operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(Duration::from_secs(60)))
.build();
backoff::future::retry(backoff, operation).await
async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(self.config.max_elapsed_request_time))
.build();
backoff::future::retry(backoff, operation).await
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -389,7 +402,7 @@ fn create_value_map(
}

#[derive(Debug, PartialEq)]
enum BillingMode {
pub enum BillingMode {
PayPerRequest,
Provisioned,
}
Expand Down Expand Up @@ -417,10 +430,11 @@ impl FromStr for BillingMode {

#[derive(Debug, PartialEq)]
pub struct DynamoDbConfig {
billing_mode: BillingMode,
lock_table_name: String,
use_web_identity: bool,
region: Region,
pub billing_mode: BillingMode,
pub lock_table_name: String,
pub max_elapsed_request_time: Duration,
pub use_web_identity: bool,
pub region: Region,
}

/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()`
Expand All @@ -441,6 +455,7 @@ pub mod constants {
pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log";
pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME";
pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE";
pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME";

pub const ATTR_TABLE_PATH: &str = "tablePath";
pub const ATTR_FILE_NAME: &str = "fileName";
Expand Down
15 changes: 13 additions & 2 deletions crates/deltalake-core/src/logstore/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,18 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME),
s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::BILLING_MODE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME)
.cloned(),
s3_options.region.clone(),
s3_options.use_web_identity,
)
Expand All @@ -50,6 +60,7 @@ impl S3DynamoDbLogStore {
source: err.into(),
},
})?;

let table_path = super::to_uri(&location, &Path::from(""));
Ok(Self {
storage: object_store,
Expand Down
11 changes: 7 additions & 4 deletions crates/deltalake-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::storage::utils::copy_table;
use crate::DeltaTableBuilder;
use chrono::Utc;
use deltalake_aws::constants;
use fs_extra::dir::{copy, CopyOptions};
use object_store::DynObjectStore;
use rand::Rng;
Expand Down Expand Up @@ -240,8 +241,8 @@ impl StorageIntegration {
Self::Onelake => Ok(()),
Self::OnelakeAbfs => Ok(()),
Self::Amazon => {
std::env::set_var(
"DELTA_DYNAMO_TABLE_NAME",
set_env_if_not_set(
constants::LOCK_TABLE_KEY_NAME,
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
);
s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?;
Expand Down Expand Up @@ -409,6 +410,8 @@ pub mod az_cli {

/// small wrapper around s3 cli
pub mod s3_cli {
use deltalake_aws::constants;

use super::set_env_if_not_set;
use crate::table::builder::s3_storage_options;
use std::process::{Command, ExitStatus, Stdio};
Expand Down Expand Up @@ -521,7 +524,7 @@ pub mod s3_cli {

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into());
create_dynamodb_table(
&table_name,
&[
Expand All @@ -546,7 +549,7 @@ pub mod s3_cli {

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into());
delete_dynamodb_table(&table_name)
}
}
Expand Down
39 changes: 32 additions & 7 deletions crates/deltalake-core/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use deltalake_aws::{CommitEntry, DynamoDbLockClient};
use deltalake_aws::{constants, CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::s3::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_core::logstore::LogStore;
Expand Down Expand Up @@ -38,21 +38,46 @@ lazy_static! {
}

fn make_client() -> TestResult<DynamoDbLockClient> {
let options: S3StorageOptions = S3StorageOptions::default();
Ok(DynamoDbLockClient::try_new(
None,
None,
S3_OPTIONS.region.clone(),
None,
options.region.clone(),
false,
)?)
}

#[test]
#[serial]
fn client_config_picks_up_lock_table_name() -> TestResult<()> {
let _context = IntegrationContext::new(StorageIntegration::Amazon)?;
assert!(make_client()?
.get_lock_table_name()
.starts_with("delta_log_it_"));
fn client_configs_via_env_variables() -> TestResult<()> {
std::env::set_var(
deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME,
"64",
);
std::env::set_var(
deltalake_aws::constants::LOCK_TABLE_KEY_NAME,
"some_table".to_owned(),
);
std::env::set_var(
deltalake_aws::constants::BILLING_MODE_KEY_NAME,
"PAY_PER_REQUEST".to_owned(),
);
let client = make_client()?;
let config = client.get_dynamodb_config();
assert_eq!(
DynamoDbConfig {
billing_mode: deltalake_aws::BillingMode::PayPerRequest,
lock_table_name: "some_table".to_owned(),
max_elapsed_request_time: Duration::from_secs(64),
use_web_identity: false,
region: config.region.clone(),
},
*config,
);
std::env::remove_var(deltalake_aws::constants::LOCK_TABLE_KEY_NAME);
std::env::remove_var(deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME);
std::env::remove_var(deltalake_aws::constants::BILLING_MODE_KEY_NAME);
Ok(())
}

Expand Down

0 comments on commit 82eeaca

Please sign in to comment.