diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a356b5b312..caa9b65afc 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -244,7 +244,6 @@ async fn execute( datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), }, &actions, - &snapshot, None, ) .await?; diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index bd150ee25c..bb874f722d 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -69,6 +69,14 @@ pub enum TransactionError { /// Error returned when unsupported writer features are required #[error("Unsupported writer features required: {0:?}")] UnsupportedWriterFeatures(Vec), + + /// Error returned when writer features are required but not specified + #[error("Writer features must be specified for writerversion >= 7")] + WriterFeaturesRequired, + + /// Error returned when reader features are required but not specified + #[error("Reader features must be specified for reader version >= 3")] + ReaderFeaturesRequired, } impl From for DeltaTableError { @@ -89,18 +97,9 @@ impl From for DeltaTableError { // Convert actions to their json representation fn log_entry_from_actions<'a>( actions: impl IntoIterator, - read_snapshot: &DeltaTableState, ) -> Result { - let append_only = read_snapshot.table_config().append_only(); let mut jsons = Vec::::new(); for action in actions { - if append_only { - if let Action::Remove(remove) = action { - if remove.data_change { - return Err(TransactionError::DeltaTableAppendOnly); - } - } - } let json = serde_json::to_string(action) .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; jsons.push(json); @@ -111,7 +110,6 @@ fn log_entry_from_actions<'a>( pub(crate) fn get_commit_bytes( operation: &DeltaOperation, actions: &Vec, - read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { @@ -130,13 +128,9 @@ pub(crate) fn get_commit_bytes( actions .iter() .chain(std::iter::once(&Action::CommitInfo(commit_info))), - read_snapshot, )?)) } else { - Ok(bytes::Bytes::from(log_entry_from_actions( - actions, - read_snapshot, - )?)) + Ok(bytes::Bytes::from(log_entry_from_actions(actions)?)) } } @@ -147,11 +141,10 @@ pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, actions: &Vec, - read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { // Serialize all actions that are part of this log entry. - let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?; + let log_entry = get_commit_bytes(operation, actions, app_metadata)?; // Write delta log entry as temporary file to storage. For the actual commit, // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. @@ -213,8 +206,8 @@ pub async fn commit_with_retries( app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + PROTOCOL.can_commit(read_snapshot, actions)?; + let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?; let mut attempt_number = 1; @@ -255,11 +248,9 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { - use self::test_utils::{create_remove_action, init_table_actions}; + use self::test_utils::init_table_actions; use super::*; - use crate::DeltaConfigKey; use object_store::memory::InMemory; - use std::collections::HashMap; #[test] fn test_commit_uri_from_version() { @@ -272,35 +263,12 @@ mod tests { #[test] fn test_log_entry_from_actions() { let actions = init_table_actions(None); - let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap(); - let entry = log_entry_from_actions(&actions, &state).unwrap(); + let entry = log_entry_from_actions(&actions).unwrap(); let lines: Vec<_> = entry.lines().collect(); // writes every action to a line assert_eq!(actions.len(), lines.len()) } - fn remove_action_exists_when_delta_table_is_append_only( - data_change: bool, - ) -> Result { - let remove = create_remove_action("test_append_only", data_change); - let mut actions = init_table_actions(Some(HashMap::from([( - DeltaConfigKey::AppendOnly.as_ref().to_string(), - Some("true".to_string()), - )]))); - actions.push(remove); - let state = - DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state"); - log_entry_from_actions(&actions, &state) - } - - #[test] - fn test_remove_action_exists_when_delta_table_is_append_only() { - let _err = remove_action_exists_when_delta_table_is_append_only(true) - .expect_err("Remove action is included when Delta table is append-only. Should error"); - let _actions = remove_action_exists_when_delta_table_is_append_only(false) - .expect("Data is not changed by the Remove action. Should succeed"); - } - #[tokio::test] async fn test_try_commit_transaction() { let store = InMemory::new(); diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index 8bceba74ce..d551f47de5 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use once_cell::sync::Lazy; use super::TransactionError; -use crate::kernel::{ReaderFeatures, WriterFeatures}; +use crate::kernel::{Action, ReaderFeatures, WriterFeatures}; use crate::table::state::DeltaTableState; static READER_V2: Lazy> = Lazy::new(|| { @@ -62,7 +62,7 @@ impl ProtocolChecker { } pub fn default_writer_version(&self) -> i32 { - 1 + 2 } /// Check if delta-rs can read form the given delta table. @@ -111,6 +111,37 @@ impl ProtocolChecker { }; Ok(()) } + + pub fn can_commit( + &self, + snapshot: &DeltaTableState, + actions: &[Action], + ) -> Result<(), TransactionError> { + self.can_write_to(snapshot)?; + + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables + let append_only_enabled = if snapshot.min_writer_version() < 2 { + false + } else if snapshot.min_writer_version() < 7 { + snapshot.table_config().append_only() + } else { + snapshot + .writer_features() + .ok_or(TransactionError::WriterFeaturesRequired)? + .contains(&WriterFeatures::AppendOnly) + && snapshot.table_config().append_only() + }; + if append_only_enabled { + actions.iter().try_for_each(|action| match action { + Action::Remove(remove) if remove.data_change => { + Err(TransactionError::DeltaTableAppendOnly) + } + _ => Ok(()), + })?; + } + + Ok(()) + } } /// The global protocol checker instance to validate table versions and features. @@ -132,8 +163,100 @@ pub static INSTANCE: Lazy = Lazy::new(|| { #[cfg(test)] mod tests { + use super::super::test_utils::create_metadata_action; use super::*; - use crate::kernel::{Action, Protocol}; + use crate::kernel::{Action, Add, Protocol, Remove}; + use crate::DeltaConfigKey; + use std::collections::HashMap; + + #[test] + fn test_can_commit_append_only() { + let append_actions = vec![Action::Add(Add { + path: "test".to_string(), + data_change: true, + ..Default::default() + })]; + let change_actions = vec![ + Action::Add(Add { + path: "test".to_string(), + data_change: true, + ..Default::default() + }), + Action::Remove(Remove { + path: "test".to_string(), + data_change: true, + ..Default::default() + }), + ]; + let neutral_actions = vec![ + Action::Add(Add { + path: "test".to_string(), + data_change: false, + ..Default::default() + }), + Action::Remove(Remove { + path: "test".to_string(), + data_change: false, + ..Default::default() + }), + ]; + + let create_actions = |writer: i32, append: &str, feat: Vec| { + vec![ + Action::Protocol(Protocol { + min_reader_version: 1, + min_writer_version: writer, + writer_features: Some(feat.into_iter().collect()), + ..Default::default() + }), + create_metadata_action( + None, + Some(HashMap::from([( + DeltaConfigKey::AppendOnly.as_ref().to_string(), + Some(append.to_string()), + )])), + ), + ] + }; + + let checker = ProtocolChecker::new(HashSet::new(), WRITER_V2.clone()); + + let actions = create_actions(1, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(2, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_err()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(2, "false", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "true", vec![WriterFeatures::AppendOnly]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_err()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "false", vec![WriterFeatures::AppendOnly]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + } #[test] fn test_versions() { diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index a1e06388e6..2ac5355fe0 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -162,7 +162,7 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) + let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None) .await .unwrap(); try_commit_transaction(storage.as_ref(), &prepared_commit, 0)