diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 93b3cbdc3e..ff650eff98 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,6 +7,9 @@ on: branches: [main, "rust-v*"] merge_group: +env: + DEFAULT_FEATURES: "azure,datafusion,s3,gcs,glue,hdfs " + jobs: format: runs-on: ubuntu-latest @@ -17,12 +20,37 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: stable + toolchain: '1.80' override: true - name: Format run: cargo fmt -- --check + coverage: + runs-on: ubuntu-latest + env: + CARGO_TERM_COLOR: always + steps: + - uses: actions/checkout@v4 + - name: Install rust + uses: actions-rs/toolchain@v1 + with: + profile: default + toolchain: '1.80' + override: true + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@cargo-llvm-cov + - uses: Swatinem/rust-cache@v2 + - name: Generate code coverage + run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + files: codecov.json + fail_ci_if_error: true + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + build: strategy: fail-fast: false @@ -39,17 +67,17 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: stable + toolchain: '1.80' override: true - name: build and lint with clippy - run: cargo clippy --features azure,datafusion,s3,gcs,glue,hdfs --tests + run: cargo clippy --features ${{ env.DEFAULT_FEATURES }} --tests - name: Spot-check build for native-tls features run: cargo clippy --no-default-features --features azure,datafusion,s3-native-tls,gcs,glue --tests - name: Check docs - run: cargo doc --features azure,datafusion,s3,gcs,glue,hdfs + run: cargo doc --features ${{ env.DEFAULT_FEATURES }} - name: Check no default features (except rustls) run: cargo check --no-default-features --features rustls @@ -76,11 +104,11 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: "stable" + toolchain: '1.80' override: true - name: Run tests - run: cargo test --verbose --features datafusion,azure + run: cargo test --verbose --features ${{ env.DEFAULT_FEATURES }} integration_test: name: Integration Tests @@ -110,7 +138,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: stable + toolchain: '1.80' override: true # Install Java and Hadoop for HDFS integration tests @@ -130,7 +158,7 @@ jobs: - name: Run tests with rustls (default) run: | - cargo test --features integration_test,azure,s3,gcs,datafusion,hdfs + cargo test --features integration_test,${{ env.DEFAULT_FEATURES }} - name: Run tests with native-tls run: | diff --git a/Cargo.toml b/Cargo.toml index e8cb698318..74a4c73597 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] authors = ["Qingping Hou "] -rust-version = "1.75" +rust-version = "1.80" keywords = ["deltalake", "delta", "datalake"] readme = "README.md" edition = "2021" diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index c47065dce4..e79d92a3d2 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -33,7 +33,7 @@ tokio = { workspace = true } regex = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } url = { workspace = true } -backoff = { version = "0.4", features = [ "tokio" ] } +backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] } hyper-tls = { version = "0.5", optional = true } [dev-dependencies] diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 720a1e6a07..187462cb12 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -7,6 +7,7 @@ pub mod logstore; mod native; pub mod storage; use aws_config::SdkConfig; +use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::{ operation::{ create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError, @@ -283,28 +284,28 @@ impl DynamoDbLockClient { version: i64, ) -> Result, LockClientError> { let item = self - .retry(|| async { - match self - .dynamodb_client - .get_item() - .consistent_read(true) - .table_name(&self.config.lock_table_name) - .set_key(Some(self.get_primary_key(version, table_path))) - .send() - .await - { - Ok(x) => Ok(x), - Err(sdk_err) => match sdk_err.as_service_error() { - Some(GetItemError::ProvisionedThroughputExceededException(_)) => { - Err(backoff::Error::transient( - LockClientError::ProvisionedThroughputExceeded, - )) - } - _ => Err(backoff::Error::permanent(sdk_err.into())), - }, + .retry( + || async { + self.dynamodb_client + .get_item() + .consistent_read(true) + .table_name(&self.config.lock_table_name) + .set_key(Some(self.get_primary_key(version, table_path))) + .send() + .await + }, + |err| match err.as_service_error() { + Some(GetItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(GetItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } - }) - .await?; + _ => err.into(), + })?; item.item.as_ref().map(CommitEntry::try_from).transpose() } @@ -314,36 +315,38 @@ impl DynamoDbLockClient { table_path: &str, entry: &CommitEntry, ) -> Result<(), LockClientError> { - self.retry(|| async { - let item = create_value_map(entry, table_path); - match self - .dynamodb_client - .put_item() - .condition_expression(constants::CONDITION_EXPR_CREATE.as_str()) - .table_name(self.get_lock_table_name()) - .set_item(Some(item)) - .send() - .await - { - Ok(_) => Ok(()), - Err(err) => match err.as_service_error() { - Some(PutItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(PutItemError::ConditionalCheckFailedException(_)) => Err( - backoff::Error::permanent(LockClientError::VersionAlreadyExists { - table_path: table_path.to_owned(), - version: entry.version, - }), - ), - Some(PutItemError::ResourceNotFoundException(_)) => Err( - backoff::Error::permanent(LockClientError::LockTableNotFound), - ), - _ => Err(backoff::Error::permanent(err.into())), - }, + self.retry( + || async { + let item = create_value_map(entry, table_path); + let _ = self + .dynamodb_client + .put_item() + .condition_expression(constants::CONDITION_EXPR_CREATE.as_str()) + .table_name(self.get_lock_table_name()) + .set_item(Some(item)) + .send() + .await?; + Ok(()) + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(PutItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(PutItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } + Some(PutItemError::ConditionalCheckFailedException(_)) => { + LockClientError::VersionAlreadyExists { + table_path: table_path.to_owned(), + version: entry.version, + } + } + Some(PutItemError::ResourceNotFoundException(_)) => LockClientError::LockTableNotFound, + _ => err.into(), }) - .await } /// Get the latest entry (entry with highest version). @@ -365,33 +368,33 @@ impl DynamoDbLockClient { limit: i64, ) -> Result, LockClientError> { let query_result = self - .retry(|| async { - match self - .dynamodb_client - .query() - .table_name(self.get_lock_table_name()) - .consistent_read(true) - .limit(limit.try_into().unwrap_or(i32::MAX)) - .scan_index_forward(false) - .key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH)) - .set_expression_attribute_values(Some( - maplit::hashmap!(":tn".into() => string_attr(table_path)), - )) - .send() - .await - { - Ok(result) => Ok(result), - Err(sdk_err) => match sdk_err.as_service_error() { - Some(QueryError::ProvisionedThroughputExceededException(_)) => { - Err(backoff::Error::transient( - LockClientError::ProvisionedThroughputExceeded, - )) - } - _ => Err(backoff::Error::permanent(sdk_err.into())), - }, + .retry( + || async { + self.dynamodb_client + .query() + .table_name(self.get_lock_table_name()) + .consistent_read(true) + .limit(limit.try_into().unwrap_or(i32::MAX)) + .scan_index_forward(false) + .key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH)) + .set_expression_attribute_values(Some( + maplit::hashmap!(":tn".into() => string_attr(table_path)), + )) + .send() + .await + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(QueryError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(QueryError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } - }) - .await?; + _ => err.into(), + })?; query_result .items @@ -412,35 +415,44 @@ impl DynamoDbLockClient { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); - self.retry(|| async { - match self - .dynamodb_client - .update_item() - .table_name(self.get_lock_table_name()) - .set_key(Some(self.get_primary_key(version, table_path))) - .update_expression("SET complete = :c, expireTime = :e".to_owned()) - .set_expression_attribute_values(Some(maplit::hashmap! { - ":c".to_owned() => string_attr("true"), - ":e".to_owned() => num_attr(seconds_since_epoch), - ":f".into() => string_attr("false"), - })) - .condition_expression(constants::CONDITION_UPDATE_INCOMPLETE) - .send() - .await - { - Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed), - Err(err) => match err.as_service_error() { - Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(UpdateItemError::ConditionalCheckFailedException(_)) => { - Ok(UpdateLogEntryResult::AlreadyCompleted) - } - _ => Err(backoff::Error::permanent(err.into())), + let res = self + .retry( + || async { + let _ = self + .dynamodb_client + .update_item() + .table_name(self.get_lock_table_name()) + .set_key(Some(self.get_primary_key(version, table_path))) + .update_expression("SET complete = :c, expireTime = :e".to_owned()) + .set_expression_attribute_values(Some(maplit::hashmap! { + ":c".to_owned() => string_attr("true"), + ":e".to_owned() => num_attr(seconds_since_epoch), + ":f".into() => string_attr("false"), + })) + .condition_expression(constants::CONDITION_UPDATE_INCOMPLETE) + .send() + .await?; + Ok(()) }, - } - }) - .await + |err: &SdkError<_, _>| match err.as_service_error() { + Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await; + + match res { + Ok(()) => Ok(UpdateLogEntryResult::UpdatePerformed), + Err(err) => match err.as_service_error() { + Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => { + Err(LockClientError::ProvisionedThroughputExceeded) + } + Some(UpdateItemError::ConditionalCheckFailedException(_)) => { + Ok(UpdateLogEntryResult::AlreadyCompleted) + } + _ => Err(err.into()), + }, + } } /// Delete existing log entry if it is not already complete @@ -449,48 +461,52 @@ impl DynamoDbLockClient { version: i64, table_path: &str, ) -> Result<(), LockClientError> { - self.retry(|| async { - match self - .dynamodb_client - .delete_item() - .table_name(self.get_lock_table_name()) - .set_key(Some(self.get_primary_key(version, table_path))) - .set_expression_attribute_values(Some(maplit::hashmap! { - ":f".into() => string_attr("false"), - })) - .condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str()) - .send() - .await - { - Ok(_) => Ok(()), - Err(err) => match err.as_service_error() { - Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err( - backoff::Error::permanent(LockClientError::VersionAlreadyCompleted { - table_path: table_path.to_owned(), - version, - }), - ), - _ => Err(backoff::Error::permanent(err.into())), - }, + self.retry( + || async { + let _ = self + .dynamodb_client + .delete_item() + .table_name(self.get_lock_table_name()) + .set_key(Some(self.get_primary_key(version, table_path))) + .set_expression_attribute_values(Some(maplit::hashmap! { + ":f".into() => string_attr("false"), + })) + .condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str()) + .send() + .await?; + Ok(()) + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded + } + Some(DeleteItemError::ConditionalCheckFailedException(_)) => { + LockClientError::VersionAlreadyCompleted { + table_path: table_path.to_owned(), + version, + } } + _ => err.into(), }) - .await } - async fn retry(&self, operation: Fn) -> Result + async fn retry(&self, operation: F, when: Wn) -> Result where - Fn: FnMut() -> Fut, - Fut: std::future::Future>>, + F: FnMut() -> Fut, + Fut: std::future::Future>, + Wn: Fn(&E) -> bool, { - let backoff = backoff::ExponentialBackoffBuilder::new() - .with_multiplier(2.) - .with_max_interval(Duration::from_secs(15)) - .with_max_elapsed_time(Some(self.config.max_elapsed_request_time)) - .build(); - backoff::future::retry(backoff, operation).await + use backon::Retryable; + let backoff = backon::ExponentialBuilder::default() + .with_factor(2.) + .with_max_delay(self.config.max_elapsed_request_time); + operation.retry(backoff).when(when).await } } diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 515143f088..676098c832 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -283,6 +283,22 @@ pub fn get_num_idx_cols_and_stats_columns( ) } +/// Get the target_file_size from the table configuration in the sates +/// If table_config does not exist (only can occur in the first write action) it takes +/// the configuration that was passed to the writerBuilder. +pub(crate) fn get_target_file_size( + config: &Option>, + configuration: &HashMap>, +) -> i64 { + match &config { + Some(conf) => conf.target_file_size(), + _ => configuration + .get("delta.targetFileSize") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE), + } +} + #[cfg(feature = "datafusion")] mod datafusion_utils { use datafusion::execution::context::SessionState; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 00be7be869..48fc2df368 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -27,7 +27,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use std::vec; use arrow_array::RecordBatch; @@ -46,6 +46,7 @@ use futures::future::BoxFuture; use futures::StreamExt; use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; +use serde::{Deserialize, Serialize}; use tracing::log::*; use super::cdc::should_write_cdc; @@ -60,7 +61,9 @@ use crate::delta_datafusion::{ }; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType}; +use crate::kernel::{ + Action, ActionType, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType, +}; use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema::merge_arrow_schema}; use crate::protocol::{DeltaOperation, SaveMode}; @@ -162,6 +165,21 @@ pub struct WriteBuilder { configuration: HashMap>, } +#[derive(Default, Debug, Serialize, Deserialize)] +/// Metrics for the Write Operation +pub struct WriteMetrics { + /// Number of files added + pub num_added_files: usize, + /// Number of files removed + pub num_removed_files: usize, + /// Number of partitions + pub num_partitions: usize, + /// Number of rows added + pub num_added_rows: usize, + /// Time taken to execute the entire operation + pub execution_time_ms: u64, +} + impl super::Operation<()> for WriteBuilder {} impl WriteBuilder { @@ -398,7 +416,6 @@ async fn write_execution_plan_with_predicate( } _ => checker, }; - // Write data to disk let mut tasks = vec![]; for i in 0..plan.properties().output_partitioning().partition_count() { @@ -766,6 +783,9 @@ impl std::future::IntoFuture for WriteBuilder { let this = self; Box::pin(async move { + let mut metrics = WriteMetrics::default(); + let exec_start = Instant::now(); + if this.mode == SaveMode::Overwrite { if let Some(snapshot) = &this.snapshot { PROTOCOL.check_append_only(&snapshot.snapshot)?; @@ -854,6 +874,8 @@ impl std::future::IntoFuture for WriteBuilder { let data = if !partition_columns.is_empty() { // TODO partitioning should probably happen in its own plan ... let mut partitions: HashMap> = HashMap::new(); + let mut num_partitions = 0; + let mut num_added_rows = 0; for batch in batches { let real_batch = match new_schema.clone() { Some(new_schema) => cast_record_batch( @@ -870,7 +892,9 @@ impl std::future::IntoFuture for WriteBuilder { partition_columns.clone(), &real_batch, )?; + num_partitions += divided.len(); for part in divided { + num_added_rows += part.record_batch.num_rows(); let key = part.partition_values.hive_partition_path(); match partitions.get_mut(&key) { Some(part_batches) => { @@ -882,11 +906,14 @@ impl std::future::IntoFuture for WriteBuilder { } } } + metrics.num_partitions = num_partitions; + metrics.num_added_rows = num_added_rows; partitions.into_values().collect::>() } else { match new_schema { Some(ref new_schema) => { let mut new_batches = vec![]; + let mut num_added_rows = 0; for batch in batches { new_batches.push(cast_record_batch( &batch, @@ -894,10 +921,15 @@ impl std::future::IntoFuture for WriteBuilder { this.safe_cast, schema_drift, // Schema drifted so we have to add the missing columns/structfields. )?); + num_added_rows += batch.num_rows(); } + metrics.num_added_rows = num_added_rows; vec![new_batches] } - None => vec![batches], + None => { + metrics.num_added_rows = batches.iter().map(|b| b.num_rows()).sum(); + vec![batches] + } } }; @@ -977,6 +1009,9 @@ impl std::future::IntoFuture for WriteBuilder { .as_ref() .map(|snapshot| snapshot.table_config()); + let target_file_size = this.target_file_size.or_else(|| { + Some(super::get_target_file_size(&config, &this.configuration) as usize) + }); let (num_indexed_cols, stats_columns) = super::get_num_idx_cols_and_stats_columns(config, this.configuration); @@ -984,6 +1019,7 @@ impl std::future::IntoFuture for WriteBuilder { num_indexed_cols, stats_columns, }; + // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( predicate.clone(), @@ -992,13 +1028,14 @@ impl std::future::IntoFuture for WriteBuilder { plan.clone(), partition_columns.clone(), this.log_store.object_store().clone(), - this.target_file_size, + target_file_size, this.write_batch_size, this.writer_properties.clone(), writer_stats_config.clone(), None, ) .await?; + metrics.num_added_files = add_actions.len(); actions.extend(add_actions); // Collect remove actions if we are overwriting the table @@ -1074,8 +1111,15 @@ impl std::future::IntoFuture for WriteBuilder { } }; } + metrics.num_removed_files = actions + .iter() + .filter(|a| a.action_type() == ActionType::Remove) + .count(); } + metrics.execution_time_ms = + Instant::now().duration_since(exec_start).as_millis() as u64; + let operation = DeltaOperation::Write { mode: this.mode, partition_by: if !partition_columns.is_empty() { @@ -1086,7 +1130,13 @@ impl std::future::IntoFuture for WriteBuilder { predicate: predicate_str, }; - let commit = CommitBuilder::from(this.commit_properties) + let mut commit_properties = this.commit_properties.clone(); + commit_properties.app_metadata.insert( + "operationMetrics".to_owned(), + serde_json::to_value(&metrics)?, + ); + + let commit = CommitBuilder::from(commit_properties) .with_actions(actions) .build( this.snapshot.as_ref().map(|f| f as &dyn TableReference), @@ -1186,12 +1236,33 @@ mod tests { use itertools::Itertools; use serde_json::{json, Value}; + async fn get_write_metrics(table: DeltaTable) -> WriteMetrics { + let mut commit_info = table.history(Some(1)).await.unwrap(); + let metrics = commit_info + .first_mut() + .unwrap() + .info + .remove("operationMetrics") + .unwrap(); + serde_json::from_value(metrics).unwrap() + } + + fn assert_common_write_metrics(write_metrics: WriteMetrics) { + assert!(write_metrics.execution_time_ms > 0); + assert!(write_metrics.num_added_files > 0); + } + #[tokio::test] async fn test_write_when_delta_table_is_append_only() { let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; let batch = get_record_batch(None, false); // Append let table = write_batch(table, batch.clone()).await; + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_removed_files, 0); + assert_common_write_metrics(write_metrics); + // Overwrite let _err = DeltaOps(table) .write(vec![batch]) @@ -1223,6 +1294,12 @@ mod tests { .unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_files_count(), 1); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_added_files, table.get_files_count()); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 2); assert_eq!( @@ -1230,7 +1307,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k1") .collect::>(), metadata ); @@ -1246,6 +1323,11 @@ mod tests { .unwrap(); assert_eq!(table.version(), 2); assert_eq!(table.get_files_count(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_added_files, 1); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 3); assert_eq!( @@ -1253,7 +1335,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k1") .collect::>(), metadata ); @@ -1262,13 +1344,18 @@ mod tests { let metadata: HashMap = HashMap::from_iter(vec![("k2".to_string(), json!("v2.1"))]); let mut table = DeltaOps(table) - .write(vec![batch]) + .write(vec![batch.clone()]) .with_save_mode(SaveMode::Overwrite) .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone())) .await .unwrap(); assert_eq!(table.version(), 3); assert_eq!(table.get_files_count(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 4); assert_eq!( @@ -1276,7 +1363,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k2") .collect::>(), metadata ); @@ -1299,6 +1386,9 @@ mod tests { ) .unwrap(); let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 2); + assert_common_write_metrics(write_metrics); let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", @@ -1323,6 +1413,10 @@ mod tests { .await .unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); + let expected = [ "+-------+", "| value |", @@ -1356,6 +1450,10 @@ mod tests { .unwrap(); let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert_common_write_metrics(write_metrics); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())), @@ -1391,7 +1489,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_files_count(), 1) + assert_eq!(table.get_files_count(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1405,6 +1505,10 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); assert_eq!(table.get_files_count(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_eq!(write_metrics.num_added_files, 2); + assert_common_write_metrics(write_metrics); let table = DeltaOps::new_in_memory() .write(vec![batch]) @@ -1413,7 +1517,12 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_files_count(), 4) + assert_eq!(table.get_files_count(), 4); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_eq!(write_metrics.num_added_files, 4); + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1426,6 +1535,9 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); + let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1472,6 +1584,9 @@ mod tests { let fields = new_schema.fields(); let names = fields.map(|f| f.name()).collect::>(); assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1485,6 +1600,10 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); + let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1533,6 +1652,10 @@ mod tests { assert_eq!(names, vec!["id", "inserted_by", "modified", "value"]); let part_cols = table.metadata().unwrap().partition_columns.clone(); assert_eq!(part_cols, vec!["id", "value"]); // we want to preserve partitions + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1544,7 +1667,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1597,6 +1721,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); @@ -1652,6 +1778,8 @@ mod tests { let table = DeltaOps(table).write(vec![batch.clone()]).await.unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let schema: StructType = serde_json::from_value(json!({ "type": "struct", @@ -1673,7 +1801,7 @@ mod tests { assert_eq!(table.version(), 0); let table = DeltaOps(table).write(vec![batch.clone()]).await; - assert!(table.is_err()) + assert!(table.is_err()); } #[tokio::test] @@ -1694,6 +1822,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let actual = get_data(&table).await; let expected = DataType::Struct(Fields::from(vec![Field::new( @@ -1732,6 +1862,8 @@ mod tests { .with_partition_columns(["string"]) .await .unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(_table.clone()).await; + assert_common_write_metrics(write_metrics); let table = crate::open_table(tmp_path.as_os_str().to_str().unwrap()) .await @@ -1775,6 +1907,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 4); + assert_common_write_metrics(write_metrics); let batch_add = RecordBatch::try_new( Arc::clone(&schema), @@ -1793,6 +1928,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert_common_write_metrics(write_metrics); let expected = [ "+----+-------+------------+", @@ -1831,6 +1969,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); // Take clones of these before an operation resulting in error, otherwise it will // be impossible to refer to an in-memory table @@ -1873,6 +2013,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let batch_add = RecordBatch::try_new( Arc::clone(&schema), @@ -1895,6 +2037,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); let expected = [ "+----+-------+------------+", @@ -1956,6 +2101,9 @@ mod tests { .await .expect("Failed to write first batch"); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); let table = DeltaOps(table) .write([second_batch]) @@ -1963,6 +2111,10 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); let snapshot_bytes = table .log_store @@ -2022,6 +2174,10 @@ mod tests { .await .expect("Failed to write first batch"); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); let table = DeltaOps(table) .write([second_batch]) @@ -2030,6 +2186,12 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert!(write_metrics.num_partitions > 0); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); + let snapshot_bytes = table .log_store .read_commit_entry(2) diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 47307cfecd..68b41d6f67 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -210,6 +210,8 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap>); /// Default num index cols pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; +/// Default target file size +pub const DEFAULT_TARGET_FILE_SIZE: i64 = 104857600; impl<'a> TableConfig<'a> { table_config!( diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 28a089ae1c..e4b93a54f5 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -5,6 +5,7 @@ use std::{collections::HashMap, ops::AddAssign}; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; +use itertools::Itertools; use parquet::file::metadata::ParquetMetaData; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; @@ -130,8 +131,29 @@ fn stats_from_metadata( let mut min_values: HashMap = HashMap::new(); let mut max_values: HashMap = HashMap::new(); let mut null_count: HashMap = HashMap::new(); + let dialect = sqlparser::dialect::GenericDialect {}; let idx_to_iterate = if let Some(stats_cols) = stats_columns { + let stats_cols = stats_cols + .into_iter() + .map(|v| { + match sqlparser::parser::Parser::new(&dialect) + .try_with_sql(v) + .map_err(|e| DeltaTableError::generic(e.to_string()))? + .parse_multipart_identifier() + { + Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")), + Err(e) => { + return Err(DeltaWriterError::DeltaTable( + DeltaTableError::GenericError { + source: Box::new(e), + }, + )) + } + } + }) + .collect::, DeltaWriterError>>()?; + schema_descriptor .columns() .iter() diff --git a/crates/mount/src/file.rs b/crates/mount/src/file.rs index 29285a4a96..090562d442 100644 --- a/crates/mount/src/file.rs +++ b/crates/mount/src/file.rs @@ -9,7 +9,7 @@ use object_store::{ GetResult, ListResult, ObjectMeta, ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult, }; -use object_store::{MultipartUpload, PutMultipartOpts, PutPayload}; +use object_store::{MultipartUpload, PutMode, PutMultipartOpts, PutPayload}; use std::ops::Range; use std::sync::Arc; use url::Url; @@ -168,8 +168,11 @@ impl ObjectStore for MountFileStorageBackend { &self, location: &ObjectStorePath, bytes: PutPayload, - options: PutOptions, + mut options: PutOptions, ) -> ObjectStoreResult { + // In mounted storage we do an unsafe rename/overwrite + // We don't conditionally check whether the file already exists + options.mode = PutMode::Overwrite; self.inner.put_opts(location, bytes, options).await } diff --git a/docs/usage/writing/writing-to-s3-with-locking-provider.md b/docs/usage/writing/writing-to-s3-with-locking-provider.md index bbe2fa958c..6a275d685a 100644 --- a/docs/usage/writing/writing-to-s3-with-locking-provider.md +++ b/docs/usage/writing/writing-to-s3-with-locking-provider.md @@ -95,6 +95,7 @@ In DynamoDB, you need those permissions: - dynamodb:Query - dynamodb:PutItem - dynamodb:UpdateItem +- dynamodb:DeleteItem ### Enabling concurrent writes for alternative clients diff --git a/python/Cargo.toml b/python/Cargo.toml index 70eb378e20..d4c1597277 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.19.1" +version = "0.19.2" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" @@ -55,3 +55,6 @@ features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs" default = ["rustls"] native-tls = ["deltalake/s3-native-tls", "deltalake/glue"] rustls = ["deltalake/s3", "deltalake/glue"] + +[build-dependencies] +openssl-src = "=300.3.1" diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 5aedd5e162..ceac16e7f8 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -200,6 +200,7 @@ def write_to_deltalake( table: Optional[RawDeltaTable], schema_mode: Optional[str], predicate: Optional[str], + target_file_size: Optional[int], name: Optional[str], description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 95368dbf79..e08d9cc9b8 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -178,6 +178,7 @@ def write_deltalake( schema_mode: Optional[Literal["merge", "overwrite"]] = ..., storage_options: Optional[Dict[str, str]] = ..., predicate: Optional[str] = ..., + target_file_size: Optional[int] = ..., large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., @@ -214,6 +215,7 @@ def write_deltalake( storage_options: Optional[Dict[str, str]] = None, partition_filters: Optional[List[Tuple[str, str, Any]]] = None, predicate: Optional[str] = None, + target_file_size: Optional[int] = None, large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "rust", writer_properties: Optional[WriterProperties] = None, @@ -267,7 +269,8 @@ def write_deltalake( configuration: A map containing configuration options for the metadata action. schema_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema. storage_options: options passed to the native delta filesystem. - predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine. + predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.' + target_file_size: Override for target file size for data files written to the delta table. If not passed, it's taken from `delta.targetFileSize`. partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine. large_dtypes: Only used for pyarrow engine engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0. @@ -308,6 +311,7 @@ def write_deltalake( table=table._table if table is not None else None, schema_mode=schema_mode, predicate=predicate, + target_file_size=target_file_size, name=name, description=description, configuration=configuration, diff --git a/python/src/lib.rs b/python/src/lib.rs index 787d321d08..aeb1b3c429 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1602,6 +1602,7 @@ fn write_to_deltalake( schema_mode: Option, partition_by: Option>, predicate: Option, + target_file_size: Option, name: Option, description: Option, configuration: Option>>, @@ -1650,6 +1651,10 @@ fn write_to_deltalake( builder = builder.with_replace_where(predicate); }; + if let Some(target_file_size) = target_file_size { + builder = builder.with_target_file_size(target_file_size) + }; + if let Some(config) = configuration { builder = builder.with_configuration(config); }; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 1534d42789..0186500032 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1699,8 +1699,7 @@ def _check_stats(dt: DeltaTable): _check_stats(dt) -@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) -def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): +def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path): def _check_stats(dt: DeltaTable): add_actions_table = dt.get_add_actions(flatten=True) stats = add_actions_table.to_pylist()[0] @@ -1726,15 +1725,14 @@ def _check_stats(dt: DeltaTable): tmp_path, data, mode="append", - engine=engine, - configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + configuration={"delta.dataSkippingStatsColumns": "foo,`baz`"}, ) dt = DeltaTable(tmp_path) _check_stats(dt) # Check if it properly takes skippingNumIndexCols from the config in the table - write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + write_deltalake(tmp_path, data, mode="overwrite") dt = DeltaTable(tmp_path) assert dt.version() == 1