Skip to content

Commit

Permalink
feat: Honor appendOnly table config
Browse files Browse the repository at this point in the history
Throw an error if a transaction includes Remove action with data change
but the Delta Table is append-only.
  • Loading branch information
junjunjd committed Oct 23, 2023
1 parent 7681ef0 commit aa8062d
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 51 deletions.
19 changes: 17 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,13 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, write_batch
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::array::Int32Array;
use arrow::datatypes::{Field, Schema};
Expand All @@ -339,6 +341,19 @@ mod tests {
table
}

#[tokio::test]
async fn test_delete_when_delta_table_is_append_only() {
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
let batch = get_record_batch(None, false);
// append some data
let table = write_batch(table, batch).await;
// delete
let _err = DeltaOps(table)
.delete()
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

#[tokio::test]
async fn test_delete_default() {
let schema = get_arrow_schema(&None);
Expand Down
43 changes: 31 additions & 12 deletions rust/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl MergeOperationConfig {
}
}

#[derive(Default, Serialize)]
#[derive(Default, Serialize, Debug)]
/// Metrics for the Merge Operation
pub struct MergeMetrics {
/// Number of rows in the source data
Expand Down Expand Up @@ -1245,12 +1245,13 @@ impl std::future::IntoFuture for MergeBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::get_arrow_schema;
use crate::writer::test_utils::get_delta_schema;
use crate::writer::test_utils::setup_table_with_configuration;
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::datatypes::Schema as ArrowSchema;
use arrow::record_batch::RecordBatch;
Expand All @@ -1277,6 +1278,21 @@ mod tests {
table
}

#[tokio::test]
async fn test_merge_when_delta_table_is_append_only() {
let schema = get_arrow_schema(&None);
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
// append some data
let table = write_data(table, &schema).await;
// merge
let _err = DeltaOps(table)
.merge(merge_source(schema), col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

async fn write_data(table: DeltaTable, schema: &Arc<ArrowSchema>) -> DeltaTable {
let batch = RecordBatch::try_new(
Arc::clone(schema),
Expand All @@ -1300,14 +1316,7 @@ mod tests {
.unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

fn merge_source(schema: Arc<ArrowSchema>) -> DataFrame {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
Expand All @@ -1322,8 +1331,18 @@ mod tests {
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();
(table, source)
ctx.read_batch(batch).unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

(table, merge_source(schema))
}

async fn assert_merge(table: DeltaTable, metrics: MergeMetrics) {
Expand Down
1 change: 1 addition & 0 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
&snapshot,
None,
)
.await?;
Expand Down
12 changes: 6 additions & 6 deletions rust/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ mod tests {
actions: Vec<Action>,
read_whole_table: bool,
) -> Result<(), CommitConflictError> {
let setup_actions = setup.unwrap_or_else(init_table_actions);
let setup_actions = setup.unwrap_or_else(|| init_table_actions(None));
let state = DeltaTableState::from_actions(setup_actions, 0).unwrap();
let transaction_info = TransactionInfo::new(&state, reads, &actions, read_whole_table);
let summary = WinningCommitSummary {
Expand All @@ -717,7 +717,7 @@ mod tests {
// the concurrent transaction deletes a file that the current transaction did NOT read
let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_not_read);
setup_actions.push(file_read);
let result = execute_test(
Expand All @@ -733,7 +733,7 @@ mod tests {
// concurrently add file, that the current transaction would not have read
let file_added = tu::create_add_action("file_added", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Expand Down Expand Up @@ -797,7 +797,7 @@ mod tests {
// delete / read
// transaction reads a file that is removed by concurrent transaction
let file_read = tu::create_add_action("file_read", true, get_stats(1, 10));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Expand Down Expand Up @@ -842,7 +842,7 @@ mod tests {
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
Expand All @@ -858,7 +858,7 @@ mod tests {
// `read_whole_table` should disallow any concurrent remove actions
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
Expand Down
58 changes: 52 additions & 6 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub enum TransactionError {
/// Error returned when maximum number of commit trioals is exceeded
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),
/// The transaction includes Remove action with data change but Delta table is append-only
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,
}

impl From<TransactionError> for DeltaTableError {
Expand All @@ -68,9 +73,18 @@ impl From<TransactionError> for DeltaTableError {
// Convert actions to their json representation
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
read_snapshot: &DeltaTableState,
) -> Result<String, TransactionError> {
let append_only = read_snapshot.table_config().append_only();
let mut jsons = Vec::<String>::new();
for action in actions {
if append_only {
if let Action::remove(remove) = action {
if remove.data_change {
return Err(TransactionError::DeltaTableAppendOnly);
}
}
}
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
Expand All @@ -81,6 +95,7 @@ fn log_entry_from_actions<'a>(
pub(crate) fn get_commit_bytes(
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<bytes::Bytes, TransactionError> {
if !actions.iter().any(|a| matches!(a, Action::commitInfo(..))) {
Expand All @@ -99,9 +114,13 @@ pub(crate) fn get_commit_bytes(
actions
.iter()
.chain(std::iter::once(&Action::commitInfo(commit_info))),
read_snapshot,
)?))
} else {
Ok(bytes::Bytes::from(log_entry_from_actions(actions)?))
Ok(bytes::Bytes::from(log_entry_from_actions(
actions,
read_snapshot,
)?))
}
}

Expand All @@ -112,10 +131,11 @@ pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(operation, actions, app_metadata)?;
let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?;

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
Expand Down Expand Up @@ -177,7 +197,8 @@ pub async fn commit_with_retries(
app_metadata: Option<Map<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;
let tmp_commit =
prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?;

let mut attempt_number = 1;

Expand Down Expand Up @@ -218,9 +239,11 @@ pub async fn commit_with_retries(

#[cfg(all(test, feature = "parquet"))]
mod tests {
use self::test_utils::init_table_actions;
use self::test_utils::{create_remove_action, init_table_actions};
use super::*;
use crate::DeltaConfigKey;
use object_store::memory::InMemory;
use std::collections::HashMap;

#[test]
fn test_commit_uri_from_version() {
Expand All @@ -232,13 +255,36 @@ mod tests {

#[test]
fn test_log_entry_from_actions() {
let actions = init_table_actions();
let entry = log_entry_from_actions(&actions).unwrap();
let actions = init_table_actions(None);
let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap();
let entry = log_entry_from_actions(&actions, &state).unwrap();
let lines: Vec<_> = entry.lines().collect();
// writes every action to a line
assert_eq!(actions.len(), lines.len())
}

fn remove_action_exists_when_delta_table_is_append_only(
data_change: bool,
) -> Result<String, TransactionError> {
let remove = create_remove_action("test_append_only", data_change);
let mut actions = init_table_actions(Some(HashMap::from([(
DeltaConfigKey::AppendOnly.as_ref().to_string(),
Some("true".to_string()),
)])));
actions.push(remove);
let state =
DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state");
log_entry_from_actions(&actions, &state)
}

#[test]
fn test_remove_action_exists_when_delta_table_is_append_only() {
let _err = remove_action_exists_when_delta_table_is_append_only(true)
.expect_err("Remove action is included when Delta table is append-only. Should error");
let _actions = remove_action_exists_when_delta_table_is_append_only(false)
.expect("Data is not changed by the Remove action. Should succeed");
}

#[tokio::test]
async fn test_try_commit_transaction() {
let store = InMemory::new();
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ mod tests {

#[test]
fn test_parse_predicate_expression() {
let snapshot = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let snapshot = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let session = SessionContext::new();
let state = session.state();

Expand All @@ -361,7 +361,7 @@ mod tests {

#[test]
fn test_files_matching_predicate() {
let mut actions = init_table_actions();
let mut actions = init_table_actions(None);
actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into())));
Expand Down
10 changes: 5 additions & 5 deletions rust/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn create_metadata_action(
Action::metaData(MetaData::try_from(metadata).unwrap())
}

pub fn init_table_actions() -> Vec<Action> {
pub fn init_table_actions(configuration: Option<HashMap<String, Option<String>>>) -> Vec<Action> {
let raw = r#"
{
"timestamp": 1670892998177,
Expand All @@ -96,7 +96,7 @@ pub fn init_table_actions() -> Vec<Action> {
vec![
Action::commitInfo(commit_info),
create_protocol_action(None, None),
create_metadata_action(None, None),
create_metadata_action(None, configuration),
]
}

Expand Down Expand Up @@ -127,7 +127,7 @@ pub async fn create_initialized_table(
HashMap::new(),
),
]);
let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let state = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let operation = DeltaOperation::Create {
mode: SaveMode::ErrorIfExists,
location: "location".into(),
Expand All @@ -144,8 +144,8 @@ pub async fn create_initialized_table(
configuration.unwrap_or_default(),
),
};
let actions = init_table_actions();
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None)
let actions = init_table_actions(None);
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None)
.await
.unwrap();
try_commit_transaction(storage.as_ref(), &prepared_commit, 0)
Expand Down
Loading

0 comments on commit aa8062d

Please sign in to comment.