From 99bb2eb14a2182544b1eee4429f9c7262d6ccbb9 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 26 Jul 2024 17:59:05 +0300 Subject: [PATCH 1/9] prepare drop columns skeleton --- crates/core/src/operations/drop_column.rs | 88 +++++++++++++++++++++++ crates/core/src/operations/mod.rs | 7 ++ crates/core/src/protocol/mod.rs | 9 +++ python/deltalake/_internal.pyi | 6 ++ python/deltalake/table.py | 32 +++++++++ python/src/lib.rs | 49 +++++++++++++ 6 files changed, 191 insertions(+) create mode 100644 crates/core/src/operations/drop_column.rs diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs new file mode 100644 index 0000000000..657ca2b083 --- /dev/null +++ b/crates/core/src/operations/drop_column.rs @@ -0,0 +1,88 @@ +//! Drop a column from the table + +use delta_kernel::schema::StructType; +use futures::future::BoxFuture; +use itertools::Itertools; + +use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; + +use crate::kernel::StructField; +use crate::logstore::LogStoreRef; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + +/// Add new columns and/or nested fields to a table +pub struct DropColumnBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Fields to drop from the schema + fields: Option>, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional information to add to the commit + commit_properties: CommitProperties, +} + +impl super::Operation<()> for DropColumnBuilder {} + +impl DropColumnBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + snapshot, + log_store, + fields: None, + commit_properties: CommitProperties::default(), + } + } + + /// Specify the fields to be added + pub fn with_fields(mut self, fields: impl IntoIterator + Clone) -> Self { + self.fields = Some(fields.into_iter().collect()); + self + } + /// Additional metadata to be added to commit info + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { + self.commit_properties = commit_properties; + self + } +} + +impl std::future::IntoFuture for DropColumnBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let mut metadata = this.snapshot.metadata().clone(); + let fields = match this.fields { + Some(v) => v, + None => return Err(DeltaTableError::Generic("No fields provided".to_string())), + }; + + let table_schema = this.snapshot.schema(); + + let new_table_schema = table_schema; + + let operation = DeltaOperation::DropColumn { fields: fields }; + + metadata.schema_string = serde_json::to_string(&new_table_schema)?; + + let actions = vec![metadata.into()]; + + let commit = CommitBuilder::from(this.commit_properties) + .with_actions(actions) + .build(Some(&this.snapshot), this.log_store.clone(), operation) + .await?; + + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) + }) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 761ebd7b4e..8dbf18a4ed 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; pub mod cast; pub mod convert_to_delta; pub mod create; +pub mod drop_column; pub mod drop_constraints; pub mod filesystem_check; pub mod optimize; @@ -35,6 +36,7 @@ use self::{ pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; #[cfg(feature = "datafusion")] use arrow::record_batch::RecordBatch; +use drop_column::DropColumnBuilder; use optimize::OptimizeBuilder; use restore::RestoreBuilder; use set_tbl_properties::SetTablePropertiesBuilder; @@ -224,6 +226,11 @@ impl DeltaOps { DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) } + /// Add new columns + pub fn drop_columns(self) -> DropColumnBuilder { + DropColumnBuilder::new(self.0.log_store, self.0.state.unwrap()) + } + /// Set table properties pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder { SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap()) diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 9cfa429fde..d5ec73ae77 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -377,6 +377,13 @@ pub enum DeltaOperation { name: String, }, + /// Represents a Delta `Drop Column` operation. + /// Used to drop columns or fields in a struct + DropColumn { + /// Fields to be dropped from the schema + fields: Vec, + }, + /// Merge data with a source data with the following predicate #[serde(rename_all = "camelCase")] Merge { @@ -476,6 +483,7 @@ impl DeltaOperation { DeltaOperation::VacuumEnd { .. } => "VACUUM END", DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT", DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT", + DeltaOperation::DropColumn { .. } => "DROP COLUMN", } } @@ -516,6 +524,7 @@ impl DeltaOperation { | Self::VacuumStart { .. } | Self::VacuumEnd { .. } | Self::AddConstraint { .. } + | Self::DropColumn { .. } | Self::DropConstraint { .. } => false, Self::Create { .. } | Self::FileSystemCheck {} diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 3bfe017eb0..8fbf5f2b53 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -91,6 +91,12 @@ class RawDeltaTable: custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> None: ... + def drop_columns( + self, + fields: List[str], + custom_metadata: Optional[Dict[str, str]], + post_commithook_properties: Optional[Dict[str, Optional[bool]]], + ) -> None: ... def set_table_properties( self, properties: Dict[str, str], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 4c3c540639..2f97d42183 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1868,6 +1868,38 @@ def drop_constraint( post_commithook_properties.__dict__ if post_commithook_properties else None, ) + def drop_columns( + self, + fields: Union[str, List[str]], + custom_metadata: Optional[Dict[str, str]] = None, + post_commithook_properties: Optional[PostCommitHookProperties] = None, + ) -> None: + """Drop columns and/or update the fields of a stuct column + Args: + fields: fields to drop from table schema + custom_metadata: custom metadata that will be added to the transaction commit. + post_commithook_properties: properties for the post commit hook. If None, default values are used. + Example: + ```python + from deltalake import DeltaTable + dt = DeltaTable("test_table") + dt.alter.drop_columns( + [ + "foo", + "bar.baz" + ] + ) + ``` + """ + if isinstance(fields, str): + fields = [fields] + + self.table._table.drop_columns( + fields, + custom_metadata, + post_commithook_properties.__dict__ if post_commithook_properties else None, + ) + def set_table_properties( self, properties: Dict[str, str], diff --git a/python/src/lib.rs b/python/src/lib.rs index 6784ae8735..d051e49ecc 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -32,6 +32,7 @@ use deltalake::operations::collect_sendable_stream; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; +use deltalake::operations::drop_column::DropColumnBuilder; use deltalake::operations::drop_constraints::DropConstraintBuilder; use deltalake::operations::filesystem_check::FileSystemCheckBuilder; use deltalake::operations::load_cdf::CdfLoadBuilder; @@ -630,6 +631,33 @@ impl RawDeltaTable { Ok(()) } + #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None))] + pub fn drop_columns( + &mut self, + py: Python, + fields: Vec, + custom_metadata: Option>, + post_commithook_properties: Option>>, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let mut cmd = DropColumnBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ); + cmd = cmd.with_fields(fields); + + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { + cmd = cmd.with_commit_properties(commit_properties); + } + + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; + self._table.state = table.state; + Ok(()) + } + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None))] pub fn load_cdf( &mut self, @@ -1430,6 +1458,27 @@ fn convert_partition_filters( .collect() } +fn maybe_create_commit_properties( + custom_metadata: Option>, + post_commithook_properties: Option>>, +) -> Option { + if custom_metadata.is_none() && post_commithook_properties.is_none() { + return None; + } + let mut commit_properties = CommitProperties::default(); + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + commit_properties = commit_properties.with_metadata(json_metadata); + }; + + if let Some(post_commit_hook_props) = post_commithook_properties { + commit_properties = + set_post_commithook_properties(commit_properties, post_commit_hook_props) + } + Some(commit_properties) +} + fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult> { use Scalar::*; From eb54143818452711945d7d2b1ea274a12b43baca Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 15:33:12 +0300 Subject: [PATCH 2/9] functional drop columns --- crates/core/src/operations/drop_column.rs | 91 ++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index 657ca2b083..e3e61f6c44 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -1,10 +1,15 @@ //! Drop a column from the table +use delta_kernel::schema::DataType; use delta_kernel::schema::StructType; use futures::future::BoxFuture; +use std::collections::HashMap; + use itertools::Itertools; +use sqlparser::dialect::GenericDialect; +use sqlparser::parser::Parser; -use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use super::transaction::{CommitBuilder, CommitProperties}; use crate::kernel::StructField; use crate::logstore::LogStoreRef; @@ -18,6 +23,8 @@ pub struct DropColumnBuilder { snapshot: DeltaTableState, /// Fields to drop from the schema fields: Option>, + /// Raise if constraint doesn't exist + raise_if_not_exists: bool, /// Delta object store for handling data files log_store: LogStoreRef, /// Additional information to add to the commit @@ -32,6 +39,7 @@ impl DropColumnBuilder { Self { snapshot, log_store, + raise_if_not_exists: true, fields: None, commit_properties: CommitProperties::default(), } @@ -47,6 +55,12 @@ impl DropColumnBuilder { self.commit_properties = commit_properties; self } + + /// Specify if you want to raise if the specified column does not exist + pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self { + self.raise_if_not_exists = raise; + self + } } impl std::future::IntoFuture for DropColumnBuilder { @@ -58,6 +72,7 @@ impl std::future::IntoFuture for DropColumnBuilder { let this = self; Box::pin(async move { + let dialect = GenericDialect {}; let mut metadata = this.snapshot.metadata().clone(); let fields = match this.fields { Some(v) => v, @@ -66,7 +81,47 @@ impl std::future::IntoFuture for DropColumnBuilder { let table_schema = this.snapshot.schema(); - let new_table_schema = table_schema; + let fields_map = fields + .iter() + .map(|field_name| { + let identifiers = Parser::new(&dialect) + .try_with_sql(field_name.as_str()) + .unwrap() + .parse_multipart_identifier() + .unwrap() + .iter() + .map(|v| v.value.to_owned()) + .collect_vec(); + // Root field, field path + (identifiers[0].clone(), identifiers) + }) + .collect::>>(); + + let new_table_schema = StructType::new( + table_schema + .fields() + .filter_map(|field| { + if let Some(identifiers) = fields_map.get(field.name()) { + if identifiers.len() == 1 { + None + } else { + drop_nested_fields(field, &identifiers[1..]) + } + } else { + Some(field.clone()) + } + }) + .collect::>(), + ); + + dbg!(&new_table_schema); + + if new_table_schema.eq(table_schema) && this.raise_if_not_exists { + return Err(DeltaTableError::Generic(format!( + "Schema remained unchanges, column with name: {:#?} doesn't exists", + &fields + ))); + } let operation = DeltaOperation::DropColumn { fields: fields }; @@ -86,3 +141,35 @@ impl std::future::IntoFuture for DropColumnBuilder { }) } } + +fn drop_nested_fields(field: &StructField, path: &[String]) -> Option { + match field.data_type() { + DataType::Struct(inner_struct) => { + let remaining_fields = inner_struct + .fields() + .filter_map(|nested_field| { + if nested_field.name() == &path[0] { + if path.len() > 1 { + drop_nested_fields(nested_field, &path[1..]) + } else { + None + } + } else { + Some(nested_field.clone()) + } + }) + .collect::>(); + + if remaining_fields.is_empty() { + None + } else { + Some(StructField::new( + field.name(), + DataType::Struct(Box::new(StructType::new(remaining_fields))), + field.is_nullable(), + )) + } + } + _ => Some(field.clone()), + } +} From 067ff56261548614ffb8912ea6b26996b146af14 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 17:53:50 +0300 Subject: [PATCH 3/9] remember columns that do not exist, add option to raise then --- crates/core/src/operations/drop_column.rs | 36 ++++++++++++++++++----- python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 2 ++ python/src/lib.rs | 7 +++-- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index e3e61f6c44..e414afe5d9 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -80,6 +80,7 @@ impl std::future::IntoFuture for DropColumnBuilder { }; let table_schema = this.snapshot.schema(); + let mut fields_not_found = HashMap::new(); let fields_map = fields .iter() @@ -96,7 +97,7 @@ impl std::future::IntoFuture for DropColumnBuilder { (identifiers[0].clone(), identifiers) }) .collect::>>(); - + let new_table_schema = StructType::new( table_schema .fields() @@ -105,7 +106,7 @@ impl std::future::IntoFuture for DropColumnBuilder { if identifiers.len() == 1 { None } else { - drop_nested_fields(field, &identifiers[1..]) + drop_nested_fields(field, &identifiers[1..], &mut fields_not_found) } } else { Some(field.clone()) @@ -114,12 +115,15 @@ impl std::future::IntoFuture for DropColumnBuilder { .collect::>(), ); - dbg!(&new_table_schema); + let not_found: Vec = fields_not_found + .iter() + .map(|(key, value)| format!("{}.{}", key, value.join("."))) + .collect(); - if new_table_schema.eq(table_schema) && this.raise_if_not_exists { + if !not_found.is_empty() && this.raise_if_not_exists { return Err(DeltaTableError::Generic(format!( - "Schema remained unchanges, column with name: {:#?} doesn't exists", - &fields + "Column(s) with name: {:#?} doesn't exists", + ¬_found ))); } @@ -142,7 +146,11 @@ impl std::future::IntoFuture for DropColumnBuilder { } } -fn drop_nested_fields(field: &StructField, path: &[String]) -> Option { +fn drop_nested_fields<'a>( + field: &StructField, + path: &'a [String], + unmatched_paths: &mut HashMap, +) -> Option { match field.data_type() { DataType::Struct(inner_struct) => { let remaining_fields = inner_struct @@ -150,7 +158,7 @@ fn drop_nested_fields(field: &StructField, path: &[String]) -> Option 1 { - drop_nested_fields(nested_field, &path[1..]) + drop_nested_fields(nested_field, &path[1..], unmatched_paths) } else { None } @@ -160,6 +168,18 @@ fn drop_nested_fields(field: &StructField, path: &[String]) -> Option>(); + // If field was the same, we push the missing paths recursively into the hashmap + // we also remove the subpaths from the hashmap if we see that it's a subset + // (might need to find better way ) + if remaining_fields.eq(&inner_struct.fields().cloned().collect_vec()) { + if let Some(part_path) = unmatched_paths.get(&path[0]) { + if part_path == &&path[1..] { + unmatched_paths.remove(&path[0]); + } + } + unmatched_paths.insert(field.name().to_owned(), path); + }; + if remaining_fields.is_empty() { None } else { diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 8fbf5f2b53..e66340f147 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -94,6 +94,7 @@ class RawDeltaTable: def drop_columns( self, fields: List[str], + raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> None: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 2f97d42183..dde4db16de 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1871,6 +1871,7 @@ def drop_constraint( def drop_columns( self, fields: Union[str, List[str]], + raise_if_not_exists: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, ) -> None: @@ -1896,6 +1897,7 @@ def drop_columns( self.table._table.drop_columns( fields, + raise_if_not_exists, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, ) diff --git a/python/src/lib.rs b/python/src/lib.rs index d051e49ecc..6dca948360 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -631,11 +631,12 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (fields, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None))] pub fn drop_columns( &mut self, py: Python, fields: Vec, + raise_if_not_exists: bool, custom_metadata: Option>, post_commithook_properties: Option>>, ) -> PyResult<()> { @@ -644,7 +645,9 @@ impl RawDeltaTable { self._table.log_store(), self._table.snapshot().map_err(PythonError::from)?.clone(), ); - cmd = cmd.with_fields(fields); + cmd = cmd + .with_fields(fields) + .with_raise_if_not_exists(raise_if_not_exists); if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, post_commithook_properties) From d8ea776884ba3ac4ec30ae5195a4d8a0eed59cc5 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 18:59:27 +0300 Subject: [PATCH 4/9] catch also root columns not there --- crates/core/src/operations/drop_column.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index e414afe5d9..d3c48260a9 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -115,14 +115,32 @@ impl std::future::IntoFuture for DropColumnBuilder { .collect::>(), ); - let not_found: Vec = fields_not_found + let mut not_found: Vec = fields_not_found .iter() .map(|(key, value)| format!("{}.{}", key, value.join("."))) .collect(); + + // Catch root fields that do not exist + not_found.append( + &mut fields_map + .values() + .filter_map(|v| { + if v.len() == 1 { + if table_schema.field(&v[0]).is_none() { + Some(v[0].clone()) + } else { + None + } + } else { + None + } + }) + .collect_vec(), + ); if !not_found.is_empty() && this.raise_if_not_exists { return Err(DeltaTableError::Generic(format!( - "Column(s) with name: {:#?} doesn't exists", + "Column(s) with name: {:#?} doesn't exist", ¬_found ))); } From 4f53d87f4c76409b2290f9c6dc86c415ebc79300 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 19:12:12 +0300 Subject: [PATCH 5/9] add tests --- python/tests/test_alter.py | 205 +++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index fb03acd23b..946d5607e9 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -1,3 +1,4 @@ +import json import pathlib import pyarrow as pa @@ -305,3 +306,207 @@ def test_set_table_properties_enable_dv(tmp_path: pathlib.Path, sample_table: pa assert protocol.min_writer_version == 7 assert protocol.writer_features == ["deletionVectors"] assert protocol.reader_features == ["deletionVectors"] + + +data = pa.Table.from_pydict( + { + "foo": [1], + "bar": [{"baz": {"nested": 1, "nested_02": 1}, "foo": 2}], + "barz": [{"baz": {"nested": 1}}], + } +) + + +def test_drop_columns_single_col(tmp_path): + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + dt.alter.drop_columns("foo") + expected = { + "type": "struct", + "fields": [ + { + "name": "bar", + "type": { + "type": "struct", + "fields": [ + { + "name": "baz", + "type": { + "type": "struct", + "fields": [ + { + "name": "nested", + "type": "long", + "nullable": True, + "metadata": {}, + }, + { + "name": "nested_02", + "type": "long", + "nullable": True, + "metadata": {}, + }, + ], + }, + "nullable": True, + "metadata": {}, + }, + { + "name": "foo", + "type": "long", + "nullable": True, + "metadata": {}, + }, + ], + }, + "nullable": True, + "metadata": {}, + }, + { + "name": "barz", + "type": { + "type": "struct", + "fields": [ + { + "name": "baz", + "type": { + "type": "struct", + "fields": [ + { + "name": "nested", + "type": "long", + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + }, + ], + } + assert json.loads(dt.schema().to_json()) == expected + + +def test_drop_columns_multiple_cols(tmp_path): + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + dt.alter.drop_columns(["foo", "bar"]) + expected = { + "type": "struct", + "fields": [ + { + "name": "barz", + "type": { + "type": "struct", + "fields": [ + { + "name": "baz", + "type": { + "type": "struct", + "fields": [ + { + "name": "nested", + "type": "long", + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + } + ], + } + assert json.loads(dt.schema().to_json()) == expected + + +def test_drop_columns_multiple_cols_nested(tmp_path): + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + dt.alter.drop_columns(["foo", "bar.baz.nested"]) + expected = { + "type": "struct", + "fields": [ + { + "name": "bar", + "type": { + "type": "struct", + "fields": [ + { + "name": "baz", + "type": { + "type": "struct", + "fields": [ + { + "name": "nested_02", + "type": "long", + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + }, + { + "name": "foo", + "type": "long", + "nullable": True, + "metadata": {}, + }, + ], + }, + "nullable": True, + "metadata": {}, + }, + { + "name": "barz", + "type": { + "type": "struct", + "fields": [ + { + "name": "baz", + "type": { + "type": "struct", + "fields": [ + { + "name": "nested", + "type": "long", + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + } + ], + }, + "nullable": True, + "metadata": {}, + }, + ], + } + assert json.loads(dt.schema().to_json()) == expected + + +def test_drop_columns_raise_not_exists(tmp_path): + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + with pytest.raises(DeltaError): + dt.alter.drop_columns(["foo", "not_exists"], raise_if_not_exists=True) From 6dc71e3bb6e9f63e5ea29400e94e0e943a744d71 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 22:13:16 +0300 Subject: [PATCH 6/9] fmt --- crates/core/src/operations/drop_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index d3c48260a9..b952460916 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -119,7 +119,7 @@ impl std::future::IntoFuture for DropColumnBuilder { .iter() .map(|(key, value)| format!("{}.{}", key, value.join("."))) .collect(); - + // Catch root fields that do not exist not_found.append( &mut fields_map From c3ca4c3afaa3dad6a4e71b9faf7855853f9a6ed2 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Jul 2024 22:23:27 +0300 Subject: [PATCH 7/9] fmt --- crates/core/src/operations/drop_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index b952460916..e01838b9a1 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -145,7 +145,7 @@ impl std::future::IntoFuture for DropColumnBuilder { ))); } - let operation = DeltaOperation::DropColumn { fields: fields }; + let operation = DeltaOperation::DropColumn { fields }; metadata.schema_string = serde_json::to_string(&new_table_schema)?; From da58838526becc9eb842a6e2a95ef1d3d0c08386 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 29 Jul 2024 12:39:12 +0300 Subject: [PATCH 8/9] Update crates/core/src/operations/drop_column.rs Co-authored-by: Guilhem de Viry --- crates/core/src/operations/drop_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index e01838b9a1..6f0b22ca6e 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -45,7 +45,7 @@ impl DropColumnBuilder { } } - /// Specify the fields to be added + /// Specify the fields to be removed pub fn with_fields(mut self, fields: impl IntoIterator + Clone) -> Self { self.fields = Some(fields.into_iter().collect()); self From dcca23c6663ea1340c6d7c9801de8ef332d962c7 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 30 Jul 2024 17:48:13 +0300 Subject: [PATCH 9/9] check if column mapping is enabled --- crates/core/src/operations/drop_column.rs | 36 +++++++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/crates/core/src/operations/drop_column.rs b/crates/core/src/operations/drop_column.rs index 6f0b22ca6e..7463427ee8 100644 --- a/crates/core/src/operations/drop_column.rs +++ b/crates/core/src/operations/drop_column.rs @@ -1,5 +1,6 @@ //! Drop a column from the table +use delta_kernel::column_mapping::ColumnMappingMode; use delta_kernel::schema::DataType; use delta_kernel::schema::StructType; use futures::future::BoxFuture; @@ -9,9 +10,10 @@ use itertools::Itertools; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; -use super::transaction::{CommitBuilder, CommitProperties}; +use super::transaction::TransactionError; +use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::StructField; +use crate::kernel::{StructField, WriterFeatures}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -72,6 +74,32 @@ impl std::future::IntoFuture for DropColumnBuilder { let this = self; Box::pin(async move { + let protocol = this.snapshot.protocol(); + + // Check if column mapping is enabled + if vec![5, 6].contains(&protocol.min_writer_version) + && this.snapshot.table_config().column_mapping_mode() == ColumnMappingMode::None + { + return Err(DeltaTableError::Generic( + "Column mapping mode shouldn't be None".to_string(), + )); + } else if protocol.min_writer_version == 7 + && !protocol + .writer_features + .as_ref() + .map(|v| v.contains(&crate::kernel::WriterFeatures::ColumnMapping)) + .unwrap_or_default() + { + return Err(DeltaTableError::Transaction { + source: TransactionError::WriterFeaturesRequired(WriterFeatures::ColumnMapping), + }); + } else if protocol.min_writer_version < 5 { + return Err(DeltaTableError::Generic(format!( + "Min writer >= 5, current version is ({})", + protocol.min_writer_version + ))); + }; + let dialect = GenericDialect {}; let mut metadata = this.snapshot.metadata().clone(); let fields = match this.fields { @@ -138,7 +166,9 @@ impl std::future::IntoFuture for DropColumnBuilder { .collect_vec(), ); - if !not_found.is_empty() && this.raise_if_not_exists { + if (!not_found.is_empty() || table_schema.ne(&new_table_schema)) + && this.raise_if_not_exists + { return Err(DeltaTableError::Generic(format!( "Column(s) with name: {:#?} doesn't exist", ¬_found