diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index bb524aa1db..11abfe9a3c 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -312,11 +312,13 @@ impl std::future::IntoFuture for DeleteBuilder { #[cfg(test)] mod tests { - use crate::operations::DeltaOps; use crate::protocol::*; use crate::writer::test_utils::datafusion::get_data; - use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; + use crate::writer::test_utils::{ + get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, write_batch + }; + use crate::DeltaConfigKey; use crate::DeltaTable; use arrow::array::Int32Array; use arrow::datatypes::{Field, Schema}; @@ -339,6 +341,19 @@ mod tests { table } + #[tokio::test] + async fn test_delete_when_delta_table_is_append_only() { + let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; + let batch = get_record_batch(None, false); + // append some data + let table = write_batch(table, batch).await; + // delete + let _err = DeltaOps(table) + .delete() + .await + .expect_err("Remove action is included when Delta table is append-only. Should error"); + } + #[tokio::test] async fn test_delete_default() { let schema = get_arrow_schema(&None); diff --git a/rust/src/operations/merge.rs b/rust/src/operations/merge.rs index dc8e108ab7..a51e7649fc 100644 --- a/rust/src/operations/merge.rs +++ b/rust/src/operations/merge.rs @@ -532,7 +532,7 @@ impl MergeOperationConfig { } } -#[derive(Default, Serialize)] +#[derive(Default, Serialize, Debug)] /// Metrics for the Merge Operation pub struct MergeMetrics { /// Number of rows in the source data @@ -1245,12 +1245,13 @@ impl std::future::IntoFuture for MergeBuilder { #[cfg(test)] mod tests { - use crate::operations::DeltaOps; use crate::protocol::*; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::get_arrow_schema; use crate::writer::test_utils::get_delta_schema; + use crate::writer::test_utils::setup_table_with_configuration; + use crate::DeltaConfigKey; use crate::DeltaTable; use arrow::datatypes::Schema as ArrowSchema; use arrow::record_batch::RecordBatch; @@ -1277,6 +1278,21 @@ mod tests { table } + #[tokio::test] + async fn test_merge_when_delta_table_is_append_only() { + let schema = get_arrow_schema(&None); + let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; + // append some data + let table = write_data(table, &schema).await; + // merge + let _err = DeltaOps(table) + .merge(merge_source(schema), col("target.id").eq(col("source.id"))) + .with_source_alias("source") + .with_target_alias("target") + .await + .expect_err("Remove action is included when Delta table is append-only. Should error"); + } + async fn write_data(table: DeltaTable, schema: &Arc) -> DeltaTable { let batch = RecordBatch::try_new( Arc::clone(schema), @@ -1300,14 +1316,7 @@ mod tests { .unwrap() } - async fn setup() -> (DeltaTable, DataFrame) { - let schema = get_arrow_schema(&None); - let table = setup_table(None).await; - - let table = write_data(table, &schema).await; - assert_eq!(table.version(), 1); - assert_eq!(table.get_file_uris().count(), 1); - + fn merge_source(schema: Arc) -> DataFrame { let ctx = SessionContext::new(); let batch = RecordBatch::try_new( Arc::clone(&schema), @@ -1322,8 +1331,18 @@ mod tests { ], ) .unwrap(); - let source = ctx.read_batch(batch).unwrap(); - (table, source) + ctx.read_batch(batch).unwrap() + } + + async fn setup() -> (DeltaTable, DataFrame) { + let schema = get_arrow_schema(&None); + let table = setup_table(None).await; + + let table = write_data(table, &schema).await; + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + + (table, merge_source(schema)) } async fn assert_merge(table: DeltaTable, metrics: MergeMetrics) { diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index c450554fea..59aceaa98f 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -229,6 +229,7 @@ async fn execute( datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), }, &actions, + &snapshot, None, ) .await?; diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs index d7a9d3fb86..6bbc2a9d45 100644 --- a/rust/src/operations/transaction/conflict_checker.rs +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -691,7 +691,7 @@ mod tests { actions: Vec, read_whole_table: bool, ) -> Result<(), CommitConflictError> { - let setup_actions = setup.unwrap_or_else(init_table_actions); + let setup_actions = setup.unwrap_or_else(|| init_table_actions(None)); let state = DeltaTableState::from_actions(setup_actions, 0).unwrap(); let transaction_info = TransactionInfo::new(&state, reads, &actions, read_whole_table); let summary = WinningCommitSummary { @@ -717,7 +717,7 @@ mod tests { // the concurrent transaction deletes a file that the current transaction did NOT read let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10)); let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); - let mut setup_actions = init_table_actions(); + let mut setup_actions = init_table_actions(None); setup_actions.push(file_not_read); setup_actions.push(file_read); let result = execute_test( @@ -733,7 +733,7 @@ mod tests { // concurrently add file, that the current transaction would not have read let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); - let mut setup_actions = init_table_actions(); + let mut setup_actions = init_table_actions(None); setup_actions.push(file_read); let result = execute_test( Some(setup_actions), @@ -797,7 +797,7 @@ mod tests { // delete / read // transaction reads a file that is removed by concurrent transaction let file_read = tu::create_add_action("file_read", true, get_stats(1, 10)); - let mut setup_actions = init_table_actions(); + let mut setup_actions = init_table_actions(None); setup_actions.push(file_read); let result = execute_test( Some(setup_actions), @@ -842,7 +842,7 @@ mod tests { let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000)); - let mut setup_actions = init_table_actions(); + let mut setup_actions = init_table_actions(None); setup_actions.push(file_part1); let result = execute_test( Some(setup_actions), @@ -858,7 +858,7 @@ mod tests { // `read_whole_table` should disallow any concurrent remove actions let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); - let mut setup_actions = init_table_actions(); + let mut setup_actions = init_table_actions(None); setup_actions.push(file_part1); let result = execute_test( Some(setup_actions), diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index cc26e75fc2..738ae404ec 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -48,6 +48,11 @@ pub enum TransactionError { /// Error returned when maximum number of commit trioals is exceeded #[error("Failed to commit transaction: {0}")] MaxCommitAttempts(i32), + /// The transaction includes Remove action with data change but Delta table is append-only + #[error( + "The transaction includes Remove action with data change but Delta table is append-only" + )] + DeltaTableAppendOnly, } impl From for DeltaTableError { @@ -68,9 +73,18 @@ impl From for DeltaTableError { // Convert actions to their json representation fn log_entry_from_actions<'a>( actions: impl IntoIterator, + read_snapshot: &DeltaTableState, ) -> Result { + let append_only = read_snapshot.table_config().append_only(); let mut jsons = Vec::::new(); for action in actions { + if append_only { + if let Action::remove(remove) = action { + if remove.data_change { + return Err(TransactionError::DeltaTableAppendOnly); + } + } + } let json = serde_json::to_string(action) .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; jsons.push(json); @@ -81,6 +95,7 @@ fn log_entry_from_actions<'a>( pub(crate) fn get_commit_bytes( operation: &DeltaOperation, actions: &Vec, + read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { if !actions.iter().any(|a| matches!(a, Action::commitInfo(..))) { @@ -99,9 +114,13 @@ pub(crate) fn get_commit_bytes( actions .iter() .chain(std::iter::once(&Action::commitInfo(commit_info))), + read_snapshot, )?)) } else { - Ok(bytes::Bytes::from(log_entry_from_actions(actions)?)) + Ok(bytes::Bytes::from(log_entry_from_actions( + actions, + read_snapshot, + )?)) } } @@ -112,10 +131,11 @@ pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, actions: &Vec, + read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { // Serialize all actions that are part of this log entry. - let log_entry = get_commit_bytes(operation, actions, app_metadata)?; + let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?; // Write delta log entry as temporary file to storage. For the actual commit, // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. @@ -177,7 +197,8 @@ pub async fn commit_with_retries( app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?; + let tmp_commit = + prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; let mut attempt_number = 1; @@ -218,9 +239,11 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { - use self::test_utils::init_table_actions; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; + use crate::DeltaConfigKey; use object_store::memory::InMemory; + use std::collections::HashMap; #[test] fn test_commit_uri_from_version() { @@ -232,13 +255,36 @@ mod tests { #[test] fn test_log_entry_from_actions() { - let actions = init_table_actions(); - let entry = log_entry_from_actions(&actions).unwrap(); + let actions = init_table_actions(None); + let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap(); + let entry = log_entry_from_actions(&actions, &state).unwrap(); let lines: Vec<_> = entry.lines().collect(); // writes every action to a line assert_eq!(actions.len(), lines.len()) } + fn remove_action_exists_when_delta_table_is_append_only( + data_change: bool, + ) -> Result { + let remove = create_remove_action("test_append_only", data_change); + let mut actions = init_table_actions(Some(HashMap::from([( + DeltaConfigKey::AppendOnly.as_ref().to_string(), + Some("true".to_string()), + )]))); + actions.push(remove); + let state = + DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state"); + log_entry_from_actions(&actions, &state) + } + + #[test] + fn test_remove_action_exists_when_delta_table_is_append_only() { + let _err = remove_action_exists_when_delta_table_is_append_only(true) + .expect_err("Remove action is included when Delta table is append-only. Should error"); + let _actions = remove_action_exists_when_delta_table_is_append_only(false) + .expect("Data is not changed by the Remove action. Should succeed"); + } + #[tokio::test] async fn test_try_commit_transaction() { let store = InMemory::new(); diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 32c386cbdc..bb9c3ff35e 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -334,7 +334,7 @@ mod tests { #[test] fn test_parse_predicate_expression() { - let snapshot = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + let snapshot = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap(); let session = SessionContext::new(); let state = session.state(); @@ -361,7 +361,7 @@ mod tests { #[test] fn test_files_matching_predicate() { - let mut actions = init_table_actions(); + let mut actions = init_table_actions(None); actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into()))); actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into()))); actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into()))); diff --git a/rust/src/operations/transaction/test_utils.rs b/rust/src/operations/transaction/test_utils.rs index cdd98f8d1f..ec7848b95a 100644 --- a/rust/src/operations/transaction/test_utils.rs +++ b/rust/src/operations/transaction/test_utils.rs @@ -72,7 +72,7 @@ pub fn create_metadata_action( Action::metaData(MetaData::try_from(metadata).unwrap()) } -pub fn init_table_actions() -> Vec { +pub fn init_table_actions(configuration: Option>>) -> Vec { let raw = r#" { "timestamp": 1670892998177, @@ -96,7 +96,7 @@ pub fn init_table_actions() -> Vec { vec![ Action::commitInfo(commit_info), create_protocol_action(None, None), - create_metadata_action(None, None), + create_metadata_action(None, configuration), ] } @@ -127,7 +127,7 @@ pub async fn create_initialized_table( HashMap::new(), ), ]); - let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + let state = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap(); let operation = DeltaOperation::Create { mode: SaveMode::ErrorIfExists, location: "location".into(), @@ -144,8 +144,8 @@ pub async fn create_initialized_table( configuration.unwrap_or_default(), ), }; - let actions = init_table_actions(); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None) + let actions = init_table_actions(None); + let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) .await .unwrap(); try_commit_transaction(storage.as_ref(), &prepared_commit, 0) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 67cccb3387..4764736eb2 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -79,7 +79,7 @@ pub struct UpdateBuilder { safe_cast: bool, } -#[derive(Default, Serialize)] +#[derive(Default, Serialize, Debug)] /// Metrics collected during the Update operation pub struct UpdateMetrics { /// Number of files added. @@ -459,12 +459,14 @@ impl std::future::IntoFuture for UpdateBuilder { #[cfg(test)] mod tests { - use crate::operations::DeltaOps; use crate::writer::test_utils::datafusion::get_data; - use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; + use crate::writer::test_utils::{ + get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, + write_batch, + }; + use crate::DeltaConfigKey; use crate::DeltaTable; - use crate::{protocol::SaveMode, DeltaResult}; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::Int32Array; @@ -486,13 +488,6 @@ mod tests { table } - async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaResult { - DeltaOps(table) - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::Append) - .await - } - async fn prepare_values_table() -> DeltaTable { let schema = Arc::new(Schema::new(vec![Field::new( "value", @@ -515,6 +510,19 @@ mod tests { DeltaOps::new_in_memory().write(vec![batch]).await.unwrap() } + #[tokio::test] + async fn test_update_when_delta_table_is_append_only() { + let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; + let batch = get_record_batch(None, false); + // Append + let table = write_batch(table, batch).await; + let _err = DeltaOps(table) + .update() + .with_update("modified", lit("2023-05-14")) + .await + .expect_err("Remove action is included when Delta table is append-only. Should error"); + } + #[tokio::test] async fn test_update_no_predicate() { let schema = get_arrow_schema(&None); @@ -535,7 +543,7 @@ mod tests { ) .unwrap(); - let table = write_batch(table, batch).await.unwrap(); + let table = write_batch(table, batch).await; assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); @@ -589,7 +597,7 @@ mod tests { // Update a partitioned table where the predicate contains only partition column // The expectation is that a physical scan of data is not required - let table = write_batch(table, batch).await.unwrap(); + let table = write_batch(table, batch).await; assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); @@ -646,7 +654,7 @@ mod tests { ) .unwrap(); - let table = write_batch(table, batch.clone()).await.unwrap(); + let table = write_batch(table, batch.clone()).await; assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 2); @@ -681,7 +689,7 @@ mod tests { // Update a partitioned table where the predicate contains a partition column and non-partition column let table = setup_table(Some(vec!["modified"])).await; - let table = write_batch(table, batch).await.unwrap(); + let table = write_batch(table, batch).await; assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 2); diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index a04821a531..7472d02d07 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -567,8 +567,9 @@ mod tests { use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{ get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, - get_record_batch_with_nested_struct, + get_record_batch_with_nested_struct, setup_table_with_configuration, write_batch, }; + use crate::DeltaConfigKey; use arrow::datatypes::Field; use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; @@ -576,6 +577,20 @@ mod tests { use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use serde_json::{json, Value}; + #[tokio::test] + async fn test_write_when_delta_table_is_append_only() { + let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; + let batch = get_record_batch(None, false); + // Append + let table = write_batch(table, batch.clone()).await; + // Overwrite + let _err = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Overwrite) + .await + .expect_err("Remove action is included when Delta table is append-only. Should error"); + } + #[tokio::test] async fn test_create_write() { let table_schema = get_delta_schema(); diff --git a/rust/src/writer/test_utils.rs b/rust/src/writer/test_utils.rs index d6c79bee94..0e2770759d 100644 --- a/rust/src/writer/test_utils.rs +++ b/rust/src/writer/test_utils.rs @@ -7,10 +7,11 @@ use arrow::compute::take; use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray, StructArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; -use crate::operations::create::CreateBuilder; +use crate::operations::{create::CreateBuilder, DeltaOps}; use crate::schema::{Schema, SchemaTypeStruct}; use crate::table::DeltaTableMetaData; -use crate::{DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField}; +use crate::writer::SaveMode; +use crate::{DeltaConfigKey, DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField}; pub type TestResult = Result<(), Box>; @@ -48,6 +49,14 @@ pub fn get_record_batch(part: Option, with_null: bool) -> RecordBatch { } } +pub async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaTable { + DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .expect("Failed to append") +} + pub fn get_arrow_schema(part: &Option) -> Arc { match part { Some(key) if key.contains("/id=") => Arc::new(ArrowSchema::new(vec![Field::new( @@ -284,6 +293,19 @@ pub fn get_delta_schema_with_nested_struct() -> Schema { ]) } +pub async fn setup_table_with_configuration( + key: DeltaConfigKey, + value: Option>, +) -> DeltaTable { + let table_schema = get_delta_schema(); + DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.get_fields().clone()) + .with_configuration_property(key, value) + .await + .expect("Failed to create table") +} + pub fn create_bare_table() -> DeltaTable { let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path();