diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index ce1c7490ad..a670456363 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -130,6 +130,7 @@ tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" hyper = { version = "0.14", features = ["server"] } +criterion = "0.5" [features] azure = ["object_store/azure"] diff --git a/crates/deltalake-core/benches/read_checkpoint.rs b/crates/deltalake-core/benches/read_checkpoint.rs index 2ecbee661b..0db72c3e17 100644 --- a/crates/deltalake-core/benches/read_checkpoint.rs +++ b/crates/deltalake-core/benches/read_checkpoint.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use deltalake::table::state::DeltaTableState; -use deltalake::DeltaTableConfig; +use deltalake_core::table::state::DeltaTableState; +use deltalake_core::DeltaTableConfig; use std::fs::File; use std::io::Read; diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index 166dbc98ef..81c8a6cc64 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::fmt; use std::str::FromStr; // use std::io::{Cursor, Read}; // use std::sync::Arc; @@ -225,6 +226,24 @@ impl From for ReaderFeatures { } } +impl AsRef for ReaderFeatures { + fn as_ref(&self) -> &str { + match self { + ReaderFeatures::ColumnMapping => "columnMapping", + ReaderFeatures::DeleteionVecotrs => "deletionVectors", + ReaderFeatures::TimestampWithoutTimezone => "timestampNtz", + ReaderFeatures::V2Checkpoint => "v2Checkpoint", + ReaderFeatures::Other(f) => f, + } + } +} + +impl fmt::Display for ReaderFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + /// Features table writers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -303,6 +322,33 @@ impl From for WriterFeatures { } } +impl AsRef for WriterFeatures { + fn as_ref(&self) -> &str { + match self { + WriterFeatures::AppendOnly => "appendOnly", + WriterFeatures::Invariants => "invariants", + WriterFeatures::CheckConstraints => "checkConstraints", + WriterFeatures::ChangeDataFeed => "changeDataFeed", + WriterFeatures::GeneratedColumns => "generatedColumns", + WriterFeatures::ColumnMapping => "columnMapping", + WriterFeatures::IdentityColumns => "identityColumns", + WriterFeatures::DeleteionVecotrs => "deletionVectors", + WriterFeatures::RowTracking => "rowTracking", + WriterFeatures::TimestampWithoutTimezone => "timestampNtz", + WriterFeatures::DomainMetadata => "domainMetadata", + WriterFeatures::V2Checkpoint => "v2Checkpoint", + WriterFeatures::IcebergCompatV1 => "icebergCompatV1", + WriterFeatures::Other(f) => f, + } + } +} + +impl fmt::Display for WriterFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + #[cfg(all(not(feature = "parquet2"), feature = "parquet"))] impl From<&parquet::record::Field> for WriterFeatures { fn from(value: &parquet::record::Field) -> Self { diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 1dc9fdf8b2..c0479e323b 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -7,8 +7,7 @@ use std::sync::Arc; use futures::future::BoxFuture; use serde_json::{Map, Value}; -use super::transaction::commit; -use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; +use super::transaction::{commit, PROTOCOL}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; use crate::protocol::{DeltaOperation, SaveMode}; @@ -247,8 +246,8 @@ impl CreateBuilder { _ => unreachable!(), }) .unwrap_or_else(|| Protocol { - min_reader_version: MAX_SUPPORTED_READER_VERSION, - min_writer_version: MAX_SUPPORTED_WRITER_VERSION, + min_reader_version: PROTOCOL.default_reader_version(), + min_writer_version: PROTOCOL.default_writer_version(), writer_features: None, reader_features: None, }); @@ -392,8 +391,14 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION); - assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION); + assert_eq!( + table.get_min_reader_version(), + PROTOCOL.default_reader_version() + ); + assert_eq!( + table.get_min_writer_version(), + PROTOCOL.default_writer_version() + ); assert_eq!(table.schema().unwrap(), &schema); // check we can overwrite default settings via adding actions diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 7f8be1f293..62315af80b 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -33,6 +33,8 @@ use parquet::file::properties::WriterProperties; use serde::Serialize; use serde_json::Value; +use super::datafusion_utils::Expression; +use super::transaction::PROTOCOL; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -44,8 +46,6 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::DeltaTable; -use super::datafusion_utils::Expression; - /// Delete Records from the Delta Table. /// See this module's documentaiton for more information pub struct DeleteBuilder { @@ -274,6 +274,8 @@ impl std::future::IntoFuture for DeleteBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 7baa59e3e1..46ed797ed7 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -6,6 +6,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use super::transaction::PROTOCOL; use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; @@ -46,6 +47,8 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { + PROTOCOL.can_read_from(&this.snapshot)?; + let table = DeltaTable::new_with_state(this.store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 57621cb316..e48af3a0c2 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -64,7 +64,7 @@ use serde::Serialize; use serde_json::Value; use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; @@ -1208,6 +1208,8 @@ impl std::future::IntoFuture for MergeBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 35301f067e..f274eb6cdf 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -48,11 +48,6 @@ pub mod write; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; -/// Maximum supported writer version -pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1; -/// Maximum supported reader version -pub const MAX_SUPPORTED_READER_VERSION: i32 = 1; - /// High level interface for executing commands against a DeltaTable pub struct DeltaOps(pub DeltaTable); diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 7feecd1e56..93171d5435 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -38,7 +38,7 @@ use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; @@ -259,6 +259,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let writer_properties = this.writer_properties.unwrap_or_else(|| { WriterProperties::builder() .set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap())) diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index c31c349fd7..bd150ee25c 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -7,21 +7,23 @@ use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::Value; +use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, CommitInfo}; +use crate::kernel::{Action, CommitInfo, ReaderFeatures, WriterFeatures}; use crate::protocol::DeltaOperation; use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; +pub use self::protocol::INSTANCE as PROTOCOL; + mod conflict_checker; +mod protocol; #[cfg(feature = "datafusion")] mod state; #[cfg(test)] pub(crate) mod test_utils; -use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; - const DELTA_LOG_FOLDER: &str = "_delta_log"; /// Error raised while commititng transaction @@ -45,17 +47,28 @@ pub enum TransactionError { #[from] source: ObjectStoreError, }, + /// Error returned when a commit conflict ocurred #[error("Failed to commit transaction: {0}")] CommitConflict(#[from] CommitConflictError), + /// Error returned when maximum number of commit trioals is exceeded #[error("Failed to commit transaction: {0}")] MaxCommitAttempts(i32), + /// The transaction includes Remove action with data change but Delta table is append-only #[error( "The transaction includes Remove action with data change but Delta table is append-only" )] DeltaTableAppendOnly, + + /// Error returned when unsupported reader features are required + #[error("Unsupported reader features required: {0:?}")] + UnsupportedReaderFeatures(Vec), + + /// Error returned when unsupported writer features are required + #[error("Unsupported writer features required: {0:?}")] + UnsupportedWriterFeatures(Vec), } impl From for DeltaTableError { diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs new file mode 100644 index 0000000000..8bceba74ce --- /dev/null +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -0,0 +1,273 @@ +use std::collections::HashSet; + +use once_cell::sync::Lazy; + +use super::TransactionError; +use crate::kernel::{ReaderFeatures, WriterFeatures}; +use crate::table::state::DeltaTableState; + +static READER_V2: Lazy> = Lazy::new(|| { + let mut set = HashSet::new(); + set.insert(ReaderFeatures::ColumnMapping); + set +}); + +static WRITER_V2: Lazy> = Lazy::new(|| { + let mut set = HashSet::new(); + set.insert(WriterFeatures::AppendOnly); + set.insert(WriterFeatures::Invariants); + set +}); +static WRITER_V3: Lazy> = Lazy::new(|| { + let mut set = WRITER_V2.clone(); + set.insert(WriterFeatures::CheckConstraints); + set +}); +static WRITER_V4: Lazy> = Lazy::new(|| { + let mut set = WRITER_V3.clone(); + set.insert(WriterFeatures::ChangeDataFeed); + set.insert(WriterFeatures::GeneratedColumns); + set +}); +static WRITER_V5: Lazy> = Lazy::new(|| { + let mut set = WRITER_V4.clone(); + set.insert(WriterFeatures::ColumnMapping); + set +}); +static WRITER_V6: Lazy> = Lazy::new(|| { + let mut set = WRITER_V5.clone(); + set.insert(WriterFeatures::IdentityColumns); + set +}); + +pub struct ProtocolChecker { + reader_features: HashSet, + writer_features: HashSet, +} + +impl ProtocolChecker { + /// Create a new protocol checker. + pub fn new( + reader_features: HashSet, + writer_features: HashSet, + ) -> Self { + Self { + reader_features, + writer_features, + } + } + + pub fn default_reader_version(&self) -> i32 { + 1 + } + + pub fn default_writer_version(&self) -> i32 { + 1 + } + + /// Check if delta-rs can read form the given delta table. + pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { + let required_features = match snapshot.min_reader_version() { + 0 | 1 => None, + 2 => Some(READER_V2.clone()), + _ => snapshot.reader_features().cloned(), + }; + + if let Some(features) = required_features { + let diff: Vec<_> = features + .difference(&self.reader_features) + .cloned() + .collect(); + if !diff.is_empty() { + return Err(TransactionError::UnsupportedReaderFeatures(diff)); + } + }; + Ok(()) + } + + /// Check if delta-rs can write to the given delta table. + pub fn can_write_to(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { + // NOTE: writers must always support all required reader features + self.can_read_from(snapshot)?; + + let required_features = match snapshot.min_writer_version() { + 0 | 1 => None, + 2 => Some(WRITER_V2.clone()), + 3 => Some(WRITER_V3.clone()), + 4 => Some(WRITER_V4.clone()), + 5 => Some(WRITER_V5.clone()), + 6 => Some(WRITER_V6.clone()), + _ => snapshot.writer_features().cloned(), + }; + + if let Some(features) = required_features { + let diff: Vec<_> = features + .difference(&self.writer_features) + .cloned() + .collect(); + if !diff.is_empty() { + return Err(TransactionError::UnsupportedWriterFeatures(diff)); + } + }; + Ok(()) + } +} + +/// The global protocol checker instance to validate table versions and features. +pub static INSTANCE: Lazy = Lazy::new(|| { + let reader_features = HashSet::new(); + // reader_features.insert(ReaderFeatures::ColumnMapping); + + let mut writer_features = HashSet::new(); + writer_features.insert(WriterFeatures::AppendOnly); + writer_features.insert(WriterFeatures::Invariants); + // writer_features.insert(WriterFeatures::CheckConstraints); + // writer_features.insert(WriterFeatures::ChangeDataFeed); + // writer_features.insert(WriterFeatures::GeneratedColumns); + // writer_features.insert(WriterFeatures::ColumnMapping); + // writer_features.insert(WriterFeatures::IdentityColumns); + + ProtocolChecker::new(reader_features, writer_features) +}); + +#[cfg(test)] +mod tests { + use super::*; + use crate::kernel::{Action, Protocol}; + + #[test] + fn test_versions() { + let checker_1 = ProtocolChecker::new(HashSet::new(), HashSet::new()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 1, + min_writer_version: 1, + ..Default::default() + })]; + let snapshot_1 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_1).is_ok()); + assert!(checker_1.can_write_to(&snapshot_1).is_ok()); + + let checker_2 = ProtocolChecker::new(READER_V2.clone(), HashSet::new()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 1, + ..Default::default() + })]; + let snapshot_2 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_2).is_err()); + assert!(checker_1.can_write_to(&snapshot_2).is_err()); + assert!(checker_2.can_read_from(&snapshot_1).is_ok()); + assert!(checker_2.can_read_from(&snapshot_2).is_ok()); + assert!(checker_2.can_write_to(&snapshot_2).is_ok()); + + let checker_3 = ProtocolChecker::new(READER_V2.clone(), WRITER_V2.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 2, + ..Default::default() + })]; + let snapshot_3 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_3).is_err()); + assert!(checker_1.can_write_to(&snapshot_3).is_err()); + assert!(checker_2.can_read_from(&snapshot_3).is_ok()); + assert!(checker_2.can_write_to(&snapshot_3).is_err()); + assert!(checker_3.can_read_from(&snapshot_1).is_ok()); + assert!(checker_3.can_read_from(&snapshot_2).is_ok()); + assert!(checker_3.can_read_from(&snapshot_3).is_ok()); + assert!(checker_3.can_write_to(&snapshot_3).is_ok()); + + let checker_4 = ProtocolChecker::new(READER_V2.clone(), WRITER_V3.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 3, + ..Default::default() + })]; + let snapshot_4 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_4).is_err()); + assert!(checker_1.can_write_to(&snapshot_4).is_err()); + assert!(checker_2.can_read_from(&snapshot_4).is_ok()); + assert!(checker_2.can_write_to(&snapshot_4).is_err()); + assert!(checker_3.can_read_from(&snapshot_4).is_ok()); + assert!(checker_3.can_write_to(&snapshot_4).is_err()); + assert!(checker_4.can_read_from(&snapshot_1).is_ok()); + assert!(checker_4.can_read_from(&snapshot_2).is_ok()); + assert!(checker_4.can_read_from(&snapshot_3).is_ok()); + assert!(checker_4.can_read_from(&snapshot_4).is_ok()); + assert!(checker_4.can_write_to(&snapshot_4).is_ok()); + + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 4, + ..Default::default() + })]; + let snapshot_5 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_5).is_err()); + assert!(checker_1.can_write_to(&snapshot_5).is_err()); + assert!(checker_2.can_read_from(&snapshot_5).is_ok()); + assert!(checker_2.can_write_to(&snapshot_5).is_err()); + assert!(checker_3.can_read_from(&snapshot_5).is_ok()); + assert!(checker_3.can_write_to(&snapshot_5).is_err()); + assert!(checker_4.can_read_from(&snapshot_5).is_ok()); + assert!(checker_4.can_write_to(&snapshot_5).is_err()); + assert!(checker_5.can_read_from(&snapshot_1).is_ok()); + assert!(checker_5.can_read_from(&snapshot_2).is_ok()); + assert!(checker_5.can_read_from(&snapshot_3).is_ok()); + assert!(checker_5.can_read_from(&snapshot_4).is_ok()); + assert!(checker_5.can_read_from(&snapshot_5).is_ok()); + assert!(checker_5.can_write_to(&snapshot_5).is_ok()); + + let checker_6 = ProtocolChecker::new(READER_V2.clone(), WRITER_V5.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 5, + ..Default::default() + })]; + let snapshot_6 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_6).is_err()); + assert!(checker_1.can_write_to(&snapshot_6).is_err()); + assert!(checker_2.can_read_from(&snapshot_6).is_ok()); + assert!(checker_2.can_write_to(&snapshot_6).is_err()); + assert!(checker_3.can_read_from(&snapshot_6).is_ok()); + assert!(checker_3.can_write_to(&snapshot_6).is_err()); + assert!(checker_4.can_read_from(&snapshot_6).is_ok()); + assert!(checker_4.can_write_to(&snapshot_6).is_err()); + assert!(checker_5.can_read_from(&snapshot_6).is_ok()); + assert!(checker_5.can_write_to(&snapshot_6).is_err()); + assert!(checker_6.can_read_from(&snapshot_1).is_ok()); + assert!(checker_6.can_read_from(&snapshot_2).is_ok()); + assert!(checker_6.can_read_from(&snapshot_3).is_ok()); + assert!(checker_6.can_read_from(&snapshot_4).is_ok()); + assert!(checker_6.can_read_from(&snapshot_5).is_ok()); + assert!(checker_6.can_read_from(&snapshot_6).is_ok()); + assert!(checker_6.can_write_to(&snapshot_6).is_ok()); + + let checker_7 = ProtocolChecker::new(READER_V2.clone(), WRITER_V6.clone()); + let actions = vec![Action::Protocol(Protocol { + min_reader_version: 2, + min_writer_version: 6, + ..Default::default() + })]; + let snapshot_7 = DeltaTableState::from_actions(actions, 1).unwrap(); + assert!(checker_1.can_read_from(&snapshot_7).is_err()); + assert!(checker_1.can_write_to(&snapshot_7).is_err()); + assert!(checker_2.can_read_from(&snapshot_7).is_ok()); + assert!(checker_2.can_write_to(&snapshot_7).is_err()); + assert!(checker_3.can_read_from(&snapshot_7).is_ok()); + assert!(checker_3.can_write_to(&snapshot_7).is_err()); + assert!(checker_4.can_read_from(&snapshot_7).is_ok()); + assert!(checker_4.can_write_to(&snapshot_7).is_err()); + assert!(checker_5.can_read_from(&snapshot_7).is_ok()); + assert!(checker_5.can_write_to(&snapshot_7).is_err()); + assert!(checker_6.can_read_from(&snapshot_7).is_ok()); + assert!(checker_6.can_write_to(&snapshot_7).is_err()); + assert!(checker_7.can_read_from(&snapshot_1).is_ok()); + assert!(checker_7.can_read_from(&snapshot_2).is_ok()); + assert!(checker_7.can_read_from(&snapshot_3).is_ok()); + assert!(checker_7.can_read_from(&snapshot_4).is_ok()); + assert!(checker_7.can_read_from(&snapshot_5).is_ok()); + assert!(checker_7.can_read_from(&snapshot_6).is_ok()); + assert!(checker_7.can_read_from(&snapshot_7).is_ok()); + assert!(checker_7.can_write_to(&snapshot_7).is_ok()); + } +} diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index b52b1a1c7b..a1e06388e6 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -6,6 +6,7 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, }; +use crate::operations::transaction::PROTOCOL; use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::state::DeltaTableState; use crate::table::DeltaTableMetaData; @@ -49,8 +50,8 @@ pub fn create_remove_action(path: impl Into, data_change: bool) -> Actio pub fn create_protocol_action(max_reader: Option, max_writer: Option) -> Action { let protocol = Protocol { - min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION), - min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION), + min_reader_version: max_reader.unwrap_or(PROTOCOL.default_reader_version()), + min_writer_version: max_writer.unwrap_or(PROTOCOL.default_writer_version()), writer_features: None, reader_features: None, }; diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 1723d287a2..665ea47eef 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -44,7 +44,7 @@ use serde::Serialize; use serde_json::Value; use super::datafusion_utils::{Expression, MetricObserverExec}; -use super::transaction::commit; +use super::transaction::{commit, PROTOCOL}; use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; @@ -426,6 +426,8 @@ impl std::future::IntoFuture for UpdateBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.can_write_to(&this.snapshot)?; + let state = this.state.unwrap_or_else(|| { let session = SessionContext::new(); diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 45bdaaeff5..79de12de66 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -38,8 +38,8 @@ use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use super::transaction::PROTOCOL; use super::writer::{DeltaWriter, WriterConfig}; -use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; @@ -59,16 +59,11 @@ enum WriteError { #[error("Failed to execute write task: {source}")] WriteTask { source: tokio::task::JoinError }, - #[error("Delta-rs does not support writer version requirement: {0}")] - UnsupportedWriterVersion(i32), - #[error("A table already exists at: {0}")] AlreadyExists(String), #[error( - "Specified table partitioning does not match table partitioning: expected: {:?}, got: {:?}", - expected, - got + "Specified table partitioning does not match table partitioning: expected: {expected:?}, got: {got:?}", )] PartitionColumnMismatch { expected: Vec, @@ -212,16 +207,12 @@ impl WriteBuilder { async fn check_preconditions(&self) -> DeltaResult> { match self.store.is_delta_table_location().await? { true => { - let min_writer = self.snapshot.min_writer_version(); - if min_writer > MAX_SUPPORTED_WRITER_VERSION { - Err(WriteError::UnsupportedWriterVersion(min_writer).into()) - } else { - match self.mode { - SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) - } - _ => Ok(vec![]), + PROTOCOL.can_write_to(&self.snapshot)?; + match self.mode { + SaveMode::ErrorIfExists => { + Err(WriteError::AlreadyExists(self.store.root_uri()).into()) } + _ => Ok(vec![]), } } false => {