diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index 6f0b22ca6e..7463427ee8 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -1,5 +1,6 @@ //! Drop a column from the table +use delta_kernel::column_mapping::ColumnMappingMode; use delta_kernel::schema::DataType; use delta_kernel::schema::StructType; use futures::future::BoxFuture; @@ -9,9 +10,10 @@ use itertools::Itertools; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; -use super::transaction::{CommitBuilder, CommitProperties}; +use super::transaction::TransactionError; +use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::StructField; +use crate::kernel::{StructField, WriterFeatures}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -72,6 +74,32 @@ impl std::future::IntoFuture for DropColumnBuilder { let this = self; Box::pin(async move { + let protocol = this.snapshot.protocol(); + + // Check if column mapping is enabled + if vec![5, 6].contains(&protocol.min_writer_version) + && this.snapshot.table_config().column_mapping_mode() == ColumnMappingMode::None + { + return Err(DeltaTableError::Generic( + "Column mapping mode shouldn't be None".to_string(), + )); + } else if protocol.min_writer_version == 7 + && !protocol + .writer_features + .as_ref() + .map(|v| v.contains(&crate::kernel::WriterFeatures::ColumnMapping)) + .unwrap_or_default() + { + return Err(DeltaTableError::Transaction { + source: TransactionError::WriterFeaturesRequired(WriterFeatures::ColumnMapping), + }); + } else if protocol.min_writer_version < 5 { + return Err(DeltaTableError::Generic(format!( + "Min writer >= 5, current version is ({})", + protocol.min_writer_version + ))); + }; + let dialect = GenericDialect {}; let mut metadata = this.snapshot.metadata().clone(); let fields = match this.fields { @@ -138,7 +166,9 @@ impl std::future::IntoFuture for DropColumnBuilder { .collect_vec(), ); - if !not_found.is_empty() && this.raise_if_not_exists { + if (!not_found.is_empty() || table_schema.ne(&new_table_schema)) + && this.raise_if_not_exists + { return Err(DeltaTableError::Generic(format!( "Column(s) with name: {:#?} doesn't exist", ¬_found