diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index fc3e21c38c..95331eb93e 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -130,6 +130,7 @@ tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" hyper = { version = "0.14", features = ["server"] } +criterion = "0.5" [features] azure = ["object_store/azure"] diff --git a/crates/deltalake-core/benches/read_checkpoint.rs b/crates/deltalake-core/benches/read_checkpoint.rs index 2ecbee661b..0db72c3e17 100644 --- a/crates/deltalake-core/benches/read_checkpoint.rs +++ b/crates/deltalake-core/benches/read_checkpoint.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use deltalake::table::state::DeltaTableState; -use deltalake::DeltaTableConfig; +use deltalake_core::table::state::DeltaTableState; +use deltalake_core::DeltaTableConfig; use std::fs::File; use std::io::Read; diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index e8060f145e..a788315b82 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::fmt; use std::str::FromStr; // use std::io::{Cursor, Read}; // use std::sync::Arc; @@ -225,6 +226,24 @@ impl From for ReaderFeatures { } } +impl AsRef for ReaderFeatures { + fn as_ref(&self) -> &str { + match self { + ReaderFeatures::ColumnMapping => "columnMapping", + ReaderFeatures::DeleteionVecotrs => "deletionVectors", + ReaderFeatures::TimestampWithoutTimezone => "timestampNtz", + ReaderFeatures::V2Checkpoint => "v2Checkpoint", + ReaderFeatures::Other(f) => f, + } + } +} + +impl fmt::Display for ReaderFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + /// Features table writers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -303,6 +322,33 @@ impl From for WriterFeatures { } } +impl AsRef for WriterFeatures { + fn as_ref(&self) -> &str { + match self { + WriterFeatures::AppendOnly => "appendOnly", + WriterFeatures::Invariants => "invariants", + WriterFeatures::CheckConstraints => "checkConstraints", + WriterFeatures::ChangeDataFeed => "changeDataFeed", + WriterFeatures::GeneratedColumns => "generatedColumns", + WriterFeatures::ColumnMapping => "columnMapping", + WriterFeatures::IdentityColumns => "identityColumns", + WriterFeatures::DeleteionVecotrs => "deletionVectors", + WriterFeatures::RowTracking => "rowTracking", + WriterFeatures::TimestampWithoutTimezone => "timestampNtz", + WriterFeatures::DomainMetadata => "domainMetadata", + WriterFeatures::V2Checkpoint => "v2Checkpoint", + WriterFeatures::IcebergCompatV1 => "icebergCompatV1", + WriterFeatures::Other(f) => f, + } + } +} + +impl fmt::Display for WriterFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + #[cfg(all(not(feature = "parquet2"), feature = "parquet"))] impl From<&parquet::record::Field> for WriterFeatures { fn from(value: &parquet::record::Field) -> Self { diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 71398faf97..84c2e03627 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -7,8 +7,7 @@ use std::sync::Arc; use futures::future::BoxFuture; use serde_json::{Map, Value}; -use super::transaction::commit; -use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; +use super::transaction::{commit, PROTOCOL}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; use crate::logstore::{LogStore, LogStoreRef}; @@ -245,8 +244,8 @@ impl CreateBuilder { _ => unreachable!(), }) .unwrap_or_else(|| Protocol { - min_reader_version: MAX_SUPPORTED_READER_VERSION, - min_writer_version: MAX_SUPPORTED_WRITER_VERSION, + min_reader_version: PROTOCOL.default_reader_version(), + min_writer_version: PROTOCOL.default_writer_version(), writer_features: None, reader_features: None, }); @@ -391,8 +390,14 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION); - assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION); + assert_eq!( + table.get_min_reader_version(), + PROTOCOL.default_reader_version() + ); + assert_eq!( + table.get_min_writer_version(), + PROTOCOL.default_writer_version() + ); assert_eq!(table.schema().unwrap(), &schema); // check we can overwrite default settings via adding actions diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index bd361c9707..b6c94f423b 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -34,6 +34,8 @@ use parquet::file::properties::WriterProperties; use serde::Serialize; use serde_json::Value; +use super::datafusion_utils::Expression; +use super::transaction::PROTOCOL; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -44,8 +46,6 @@ use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::DeltaTable; -use super::datafusion_utils::Expression; - /// Delete Records from the Delta Table. /// See this module's documentation for more information pub struct DeleteBuilder { @@ -274,6 +274,8 @@ impl std::future::IntoFuture for DeleteBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 1a4c5c4cc6..610f86dee6 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -6,6 +6,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use super::transaction::PROTOCOL; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; @@ -46,6 +47,8 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { + PROTOCOL.can_read_from(&this.snapshot)?; + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index d38ddf0efb..a9ad6a8655 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -64,7 +64,7 @@ use serde::Serialize; use serde_json::Value; use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; @@ -1208,6 +1208,8 @@ impl std::future::IntoFuture for MergeBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 863a5a8b63..a0dbfd0239 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -50,11 +50,6 @@ pub mod write; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; -/// Maximum supported writer version -pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1; -/// Maximum supported reader version -pub const MAX_SUPPORTED_READER_VERSION: i32 = 1; - /// High level interface for executing commands against a DeltaTable pub struct DeltaOps(pub DeltaTable); diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index e7d6a1b11d..ef8905e0c9 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -38,7 +38,7 @@ use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; @@ -260,6 +260,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let writer_properties = this.writer_properties.unwrap_or_else(|| { WriterProperties::builder() .set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap())) diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index c391de6f04..be43bacf5f 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -245,7 +245,6 @@ async fn execute( datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), }, &actions, - &snapshot, None, ) .await?; diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index e5e808d2d5..2f62f663cf 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -7,21 +7,23 @@ use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::Value; +use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, CommitInfo}; +use crate::kernel::{Action, CommitInfo, ReaderFeatures, WriterFeatures}; use crate::logstore::LogStore; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; +pub use self::protocol::INSTANCE as PROTOCOL; + mod conflict_checker; +mod protocol; #[cfg(feature = "datafusion")] mod state; #[cfg(test)] pub(crate) mod test_utils; -use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; - const DELTA_LOG_FOLDER: &str = "_delta_log"; /// Error raised while commititng transaction @@ -45,17 +47,36 @@ pub enum TransactionError { #[from] source: ObjectStoreError, }, + /// Error returned when a commit conflict ocurred #[error("Failed to commit transaction: {0}")] CommitConflict(#[from] CommitConflictError), + /// Error returned when maximum number of commit trioals is exceeded #[error("Failed to commit transaction: {0}")] MaxCommitAttempts(i32), + /// The transaction includes Remove action with data change but Delta table is append-only #[error( "The transaction includes Remove action with data change but Delta table is append-only" )] DeltaTableAppendOnly, + + /// Error returned when unsupported reader features are required + #[error("Unsupported reader features required: {0:?}")] + UnsupportedReaderFeatures(Vec), + + /// Error returned when unsupported writer features are required + #[error("Unsupported writer features required: {0:?}")] + UnsupportedWriterFeatures(Vec), + + /// Error returned when writer features are required but not specified + #[error("Writer features must be specified for writerversion >= 7")] + WriterFeaturesRequired, + + /// Error returned when reader features are required but not specified + #[error("Reader features must be specified for reader version >= 3")] + ReaderFeaturesRequired, } impl From for DeltaTableError { @@ -76,18 +97,9 @@ impl From for DeltaTableError { // Convert actions to their json representation fn log_entry_from_actions<'a>( actions: impl IntoIterator, - read_snapshot: &DeltaTableState, ) -> Result { - let append_only = read_snapshot.table_config().append_only(); let mut jsons = Vec::::new(); for action in actions { - if append_only { - if let Action::Remove(remove) = action { - if remove.data_change { - return Err(TransactionError::DeltaTableAppendOnly); - } - } - } let json = serde_json::to_string(action) .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; jsons.push(json); @@ -98,7 +110,6 @@ fn log_entry_from_actions<'a>( pub(crate) fn get_commit_bytes( operation: &DeltaOperation, actions: &Vec, - read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { @@ -117,13 +128,9 @@ pub(crate) fn get_commit_bytes( actions .iter() .chain(std::iter::once(&Action::CommitInfo(commit_info))), - read_snapshot, )?)) } else { - Ok(bytes::Bytes::from(log_entry_from_actions( - actions, - read_snapshot, - )?)) + Ok(bytes::Bytes::from(log_entry_from_actions(actions)?)) } } @@ -135,11 +142,10 @@ pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, actions: &Vec, - read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> Result { // Serialize all actions that are part of this log entry. - let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?; + let log_entry = get_commit_bytes(operation, actions, app_metadata)?; // Write delta log entry as temporary file to storage. For the actual commit, // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. @@ -185,11 +191,11 @@ pub async fn commit_with_retries( app_metadata: Option>, max_retries: usize, ) -> DeltaResult { + PROTOCOL.can_commit(read_snapshot, actions)?; let tmp_commit = prepare_commit( log_store.object_store().as_ref(), &operation, actions, - read_snapshot, app_metadata, ) .await?; @@ -240,12 +246,9 @@ pub async fn commit_with_retries( mod tests { use std::{collections::HashMap, sync::Arc}; - use self::test_utils::{create_remove_action, init_table_actions}; + use self::test_utils::init_table_actions; use super::*; - use crate::{ - logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version, - DeltaConfigKey, - }; + use crate::{logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version}; use object_store::memory::InMemory; use url::Url; @@ -260,35 +263,12 @@ mod tests { #[test] fn test_log_entry_from_actions() { let actions = init_table_actions(None); - let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap(); - let entry = log_entry_from_actions(&actions, &state).unwrap(); + let entry = log_entry_from_actions(&actions).unwrap(); let lines: Vec<_> = entry.lines().collect(); // writes every action to a line assert_eq!(actions.len(), lines.len()) } - fn remove_action_exists_when_delta_table_is_append_only( - data_change: bool, - ) -> Result { - let remove = create_remove_action("test_append_only", data_change); - let mut actions = init_table_actions(Some(HashMap::from([( - DeltaConfigKey::AppendOnly.as_ref().to_string(), - Some("true".to_string()), - )]))); - actions.push(remove); - let state = - DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state"); - log_entry_from_actions(&actions, &state) - } - - #[test] - fn test_remove_action_exists_when_delta_table_is_append_only() { - let _err = remove_action_exists_when_delta_table_is_append_only(true) - .expect_err("Remove action is included when Delta table is append-only. Should error"); - let _actions = remove_action_exists_when_delta_table_is_append_only(false) - .expect("Data is not changed by the Remove action. Should succeed"); - } - #[tokio::test] async fn test_try_commit_transaction() { let store = Arc::new(InMemory::new()); diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs new file mode 100644 index 0000000000..47e4d0a41a --- /dev/null +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -0,0 +1,404 @@ +use std::collections::HashSet; + +use lazy_static::lazy_static; +use once_cell::sync::Lazy; + +use super::TransactionError; +use crate::kernel::{Action, ReaderFeatures, WriterFeatures}; +use crate::table::state::DeltaTableState; + +lazy_static! { + static ref READER_V2: HashSet = + HashSet::from_iter([ReaderFeatures::ColumnMapping]); + static ref WRITER_V2: HashSet = + HashSet::from_iter([WriterFeatures::AppendOnly, WriterFeatures::Invariants]); + static ref WRITER_V3: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints + ]); + static ref WRITER_V4: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns + ]); + static ref WRITER_V5: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns, + WriterFeatures::ColumnMapping, + ]); + static ref WRITER_V6: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns, + WriterFeatures::ColumnMapping, + WriterFeatures::IdentityColumns, + ]); +} + +pub struct ProtocolChecker { + reader_features: HashSet, + writer_features: HashSet, +} + +impl ProtocolChecker { + /// Create a new protocol checker. + pub fn new( + reader_features: HashSet, + writer_features: HashSet, + ) -> Self { + Self { + reader_features, + writer_features, + } + } + + pub fn default_reader_version(&self) -> i32 { + 1 + } + + pub fn default_writer_version(&self) -> i32 { + 2 + } + + /// Check if delta-rs can read form the given delta table. + pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { + let required_features: Option<&HashSet> = + match snapshot.min_reader_version() { + 0 | 1 => None, + 2 => Some(&READER_V2), + _ => snapshot.reader_features(), + }; + if let Some(features) = required_features { + let mut diff = features.difference(&self.reader_features).peekable(); + if diff.peek().is_some() { + return Err(TransactionError::UnsupportedReaderFeatures( + diff.cloned().collect(), + )); + } + }; + Ok(()) + } + + /// Check if delta-rs can write to the given delta table. + pub fn can_write_to(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { + // NOTE: writers must always support all required reader features + self.can_read_from(snapshot)?; + + let required_features: Option<&HashSet> = + match snapshot.min_writer_version() { + 0 | 1 => None, + 2 => Some(&WRITER_V2), + 3 => Some(&WRITER_V3), + 4 => Some(&WRITER_V4), + 5 => Some(&WRITER_V5), + 6 => Some(&WRITER_V6), + _ => snapshot.writer_features(), + }; + + if let Some(features) = required_features { + let mut diff = features.difference(&self.writer_features).peekable(); + if diff.peek().is_some() { + return Err(TransactionError::UnsupportedWriterFeatures( + diff.cloned().collect(), + )); + } + }; + Ok(()) + } + + pub fn can_commit( + &self, + snapshot: &DeltaTableState, + actions: &[Action], + ) -> Result<(), TransactionError> { + self.can_write_to(snapshot)?; + + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables + let append_only_enabled = if snapshot.min_writer_version() < 2 { + false + } else if snapshot.min_writer_version() < 7 { + snapshot.table_config().append_only() + } else { + snapshot + .writer_features() + .ok_or(TransactionError::WriterFeaturesRequired)? + .contains(&WriterFeatures::AppendOnly) + && snapshot.table_config().append_only() + }; + if append_only_enabled { + actions.iter().try_for_each(|action| match action { + Action::Remove(remove) if remove.data_change => { + Err(TransactionError::DeltaTableAppendOnly) + } + _ => Ok(()), + })?; + } + + Ok(()) + } +} + +/// The global protocol checker instance to validate table versions and features. +/// +/// This instance is used by default in all transaction operations, since feature +/// support is not configurable but rather decided at compile time. +/// +/// As we implement new features, we need to update this instance accordingly. +/// resulting version support is determined by the supported table feature set. +pub static INSTANCE: Lazy = Lazy::new(|| { + let reader_features = HashSet::new(); + // reader_features.insert(ReaderFeatures::ColumnMapping); + + let mut writer_features = HashSet::new(); + writer_features.insert(WriterFeatures::AppendOnly); + writer_features.insert(WriterFeatures::Invariants); + // writer_features.insert(WriterFeatures::CheckConstraints); + // writer_features.insert(WriterFeatures::ChangeDataFeed); + // writer_features.insert(WriterFeatures::GeneratedColumns); + // writer_features.insert(WriterFeatures::ColumnMapping); + // writer_features.insert(WriterFeatures::IdentityColumns); + + ProtocolChecker::new(reader_features, writer_features) +}); + +#[cfg(test)] +mod tests { + use super::super::test_utils::create_metadata_action; + use super::*; + use crate::kernel::{Action, Add, Protocol, Remove}; + use crate::DeltaConfigKey; + use std::collections::HashMap; + + #[test] + fn test_can_commit_append_only() { + let append_actions = vec![Action::Add(Add { + path: "test".to_string(), + data_change: true, + ..Default::default() + })]; + let change_actions = vec![ + Action::Add(Add { + path: "test".to_string(), + data_change: true, + ..Default::default() + }), + Action::Remove(Remove { + path: "test".to_string(), + data_change: true, + ..Default::default() + }), + ]; + let neutral_actions = vec![ + Action::Add(Add { + path: "test".to_string(), + data_change: false, + ..Default::default() + }), + Action::Remove(Remove { + path: "test".to_string(), + data_change: false, + ..Default::default() + }), + ]; + + let create_actions = |writer: i32, append: &str, feat: Vec| { + vec![ + Action::Protocol(Protocol { + min_reader_version: 1, + min_writer_version: writer, + writer_features: Some(feat.into_iter().collect()), + ..Default::default() + }), + create_metadata_action( + None, + Some(HashMap::from([( + DeltaConfigKey::AppendOnly.as_ref().to_string(), + Some(append.to_string()), + )])), + ), + ] + }; + + let checker = ProtocolChecker::new(HashSet::new(), WRITER_V2.clone()); + + let actions = create_actions(1, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(2, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_err()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(2, "false", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "true", vec![WriterFeatures::AppendOnly]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_err()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "false", vec![WriterFeatures::AppendOnly]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + + let actions = create_actions(7, "true", vec![]); + let snapshot = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker.can_commit(&snapshot, &append_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &change_actions).is_ok()); + assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok()); + } + + #[test] + fn test_versions() { + let checker_1 = ProtocolChecker::new(HashSet::new(), HashSet::new()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 1, + min_writer_version: 1, + ..Default::default() + })]; + let snapshot_1 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_1).is_ok()); + assert!(checker_1.can_write_to(&snapshot_1).is_ok()); + + let checker_2 = ProtocolChecker::new(READER_V2.clone(), HashSet::new()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 1, + ..Default::default() + })]; + let snapshot_2 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_2).is_err()); + assert!(checker_1.can_write_to(&snapshot_2).is_err()); + assert!(checker_2.can_read_from(&snapshot_1).is_ok()); + assert!(checker_2.can_read_from(&snapshot_2).is_ok()); + assert!(checker_2.can_write_to(&snapshot_2).is_ok()); + + let checker_3 = ProtocolChecker::new(READER_V2.clone(), WRITER_V2.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 2, + ..Default::default() + })]; + let snapshot_3 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_3).is_err()); + assert!(checker_1.can_write_to(&snapshot_3).is_err()); + assert!(checker_2.can_read_from(&snapshot_3).is_ok()); + assert!(checker_2.can_write_to(&snapshot_3).is_err()); + assert!(checker_3.can_read_from(&snapshot_1).is_ok()); + assert!(checker_3.can_read_from(&snapshot_2).is_ok()); + assert!(checker_3.can_read_from(&snapshot_3).is_ok()); + assert!(checker_3.can_write_to(&snapshot_3).is_ok()); + + let checker_4 = ProtocolChecker::new(READER_V2.clone(), WRITER_V3.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 3, + ..Default::default() + })]; + let snapshot_4 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_4).is_err()); + assert!(checker_1.can_write_to(&snapshot_4).is_err()); + assert!(checker_2.can_read_from(&snapshot_4).is_ok()); + assert!(checker_2.can_write_to(&snapshot_4).is_err()); + assert!(checker_3.can_read_from(&snapshot_4).is_ok()); + assert!(checker_3.can_write_to(&snapshot_4).is_err()); + assert!(checker_4.can_read_from(&snapshot_1).is_ok()); + assert!(checker_4.can_read_from(&snapshot_2).is_ok()); + assert!(checker_4.can_read_from(&snapshot_3).is_ok()); + assert!(checker_4.can_read_from(&snapshot_4).is_ok()); + assert!(checker_4.can_write_to(&snapshot_4).is_ok()); + + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 4, + ..Default::default() + })]; + let snapshot_5 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_5).is_err()); + assert!(checker_1.can_write_to(&snapshot_5).is_err()); + assert!(checker_2.can_read_from(&snapshot_5).is_ok()); + assert!(checker_2.can_write_to(&snapshot_5).is_err()); + assert!(checker_3.can_read_from(&snapshot_5).is_ok()); + assert!(checker_3.can_write_to(&snapshot_5).is_err()); + assert!(checker_4.can_read_from(&snapshot_5).is_ok()); + assert!(checker_4.can_write_to(&snapshot_5).is_err()); + assert!(checker_5.can_read_from(&snapshot_1).is_ok()); + assert!(checker_5.can_read_from(&snapshot_2).is_ok()); + assert!(checker_5.can_read_from(&snapshot_3).is_ok()); + assert!(checker_5.can_read_from(&snapshot_4).is_ok()); + assert!(checker_5.can_read_from(&snapshot_5).is_ok()); + assert!(checker_5.can_write_to(&snapshot_5).is_ok()); + + let checker_6 = ProtocolChecker::new(READER_V2.clone(), WRITER_V5.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 5, + ..Default::default() + })]; + let snapshot_6 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_6).is_err()); + assert!(checker_1.can_write_to(&snapshot_6).is_err()); + assert!(checker_2.can_read_from(&snapshot_6).is_ok()); + assert!(checker_2.can_write_to(&snapshot_6).is_err()); + assert!(checker_3.can_read_from(&snapshot_6).is_ok()); + assert!(checker_3.can_write_to(&snapshot_6).is_err()); + assert!(checker_4.can_read_from(&snapshot_6).is_ok()); + assert!(checker_4.can_write_to(&snapshot_6).is_err()); + assert!(checker_5.can_read_from(&snapshot_6).is_ok()); + assert!(checker_5.can_write_to(&snapshot_6).is_err()); + assert!(checker_6.can_read_from(&snapshot_1).is_ok()); + assert!(checker_6.can_read_from(&snapshot_2).is_ok()); + assert!(checker_6.can_read_from(&snapshot_3).is_ok()); + assert!(checker_6.can_read_from(&snapshot_4).is_ok()); + assert!(checker_6.can_read_from(&snapshot_5).is_ok()); + assert!(checker_6.can_read_from(&snapshot_6).is_ok()); + assert!(checker_6.can_write_to(&snapshot_6).is_ok()); + + let checker_7 = ProtocolChecker::new(READER_V2.clone(), WRITER_V6.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 6, + ..Default::default() + })]; + let snapshot_7 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_7).is_err()); + assert!(checker_1.can_write_to(&snapshot_7).is_err()); + assert!(checker_2.can_read_from(&snapshot_7).is_ok()); + assert!(checker_2.can_write_to(&snapshot_7).is_err()); + assert!(checker_3.can_read_from(&snapshot_7).is_ok()); + assert!(checker_3.can_write_to(&snapshot_7).is_err()); + assert!(checker_4.can_read_from(&snapshot_7).is_ok()); + assert!(checker_4.can_write_to(&snapshot_7).is_err()); + assert!(checker_5.can_read_from(&snapshot_7).is_ok()); + assert!(checker_5.can_write_to(&snapshot_7).is_err()); + assert!(checker_6.can_read_from(&snapshot_7).is_ok()); + assert!(checker_6.can_write_to(&snapshot_7).is_err()); + assert!(checker_7.can_read_from(&snapshot_1).is_ok()); + assert!(checker_7.can_read_from(&snapshot_2).is_ok()); + assert!(checker_7.can_read_from(&snapshot_3).is_ok()); + assert!(checker_7.can_read_from(&snapshot_4).is_ok()); + assert!(checker_7.can_read_from(&snapshot_5).is_ok()); + assert!(checker_7.can_read_from(&snapshot_6).is_ok()); + assert!(checker_7.can_read_from(&snapshot_7).is_ok()); + assert!(checker_7.can_write_to(&snapshot_7).is_ok()); + } +} diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index 56b0894019..2efdcde2ea 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -6,6 +6,7 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, }; +use crate::operations::transaction::PROTOCOL; use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::state::DeltaTableState; use crate::table::DeltaTableMetaData; @@ -49,8 +50,8 @@ pub fn create_remove_action(path: impl Into, data_change: bool) -> Actio pub fn create_protocol_action(max_reader: Option, max_writer: Option) -> Action { let protocol = Protocol { - min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION), - min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION), + min_reader_version: max_reader.unwrap_or(PROTOCOL.default_reader_version()), + min_writer_version: max_writer.unwrap_or(PROTOCOL.default_writer_version()), writer_features: None, reader_features: None, }; @@ -165,7 +166,6 @@ pub async fn create_initialized_table( log_store.object_store().as_ref(), &operation, &actions, - &state, None, ) .await diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 9f51912579..7583ed6b39 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -44,7 +44,7 @@ use serde::Serialize; use serde_json::Value; use super::datafusion_utils::{Expression, MetricObserverExec}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; @@ -426,6 +426,8 @@ impl std::future::IntoFuture for UpdateBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index dec4b7ced7..cb68b72bb2 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -38,8 +38,8 @@ use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use super::transaction::PROTOCOL; use super::writer::{DeltaWriter, WriterConfig}; -use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; @@ -60,16 +60,11 @@ enum WriteError { #[error("Failed to execute write task: {source}")] WriteTask { source: tokio::task::JoinError }, - #[error("Delta-rs does not support writer version requirement: {0}")] - UnsupportedWriterVersion(i32), - #[error("A table already exists at: {0}")] AlreadyExists(String), #[error( - "Specified table partitioning does not match table partitioning: expected: {:?}, got: {:?}", - expected, - got + "Specified table partitioning does not match table partitioning: expected: {expected:?}, got: {got:?}", )] PartitionColumnMismatch { expected: Vec, @@ -213,16 +208,12 @@ impl WriteBuilder { async fn check_preconditions(&self) -> DeltaResult> { match self.log_store.is_delta_table_location().await? { true => { - let min_writer = self.snapshot.min_writer_version(); - if min_writer > MAX_SUPPORTED_WRITER_VERSION { - Err(WriteError::UnsupportedWriterVersion(min_writer).into()) - } else { - match self.mode { - SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) - } - _ => Ok(vec![]), + PROTOCOL.can_write_to(&self.snapshot)?; + match self.mode { + SaveMode::ErrorIfExists => { + Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) } + _ => Ok(vec![]), } } false => { diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index 90dba7659a..79c16e85dc 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -69,7 +69,7 @@ async fn prepare_table( assert_eq!(0, table.version()); assert_eq!(1, table.get_min_reader_version()); - assert_eq!(1, table.get_min_writer_version()); + assert_eq!(2, table.get_min_writer_version()); assert_eq!(0, table.get_files().len()); Ok((table, table_uri))