Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): add drop column operation [BLOCKED] #2710

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions crates/core/src/operations/drop_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
//! Drop a column from the table

use delta_kernel::column_mapping::ColumnMappingMode;
use delta_kernel::schema::DataType;
use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use std::collections::HashMap;

use itertools::Itertools;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;

use super::transaction::TransactionError;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};

use crate::kernel::{StructField, WriterFeatures};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

/// Add new columns and/or nested fields to a table
pub struct DropColumnBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Fields to drop from the schema
fields: Option<Vec<String>>,
/// Raise if constraint doesn't exist
raise_if_not_exists: bool,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}

impl super::Operation<()> for DropColumnBuilder {}

impl DropColumnBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
snapshot,
log_store,
raise_if_not_exists: true,
fields: None,
commit_properties: CommitProperties::default(),
}
}

/// Specify the fields to be removed
pub fn with_fields(mut self, fields: impl IntoIterator<Item = String> + Clone) -> Self {
self.fields = Some(fields.into_iter().collect());
self
}
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}

/// Specify if you want to raise if the specified column does not exist
pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self {
self.raise_if_not_exists = raise;
self
}
Comment on lines +61 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question out of curiosity: What would be the use case or reason to allow drop column commits for columns that are missing from the schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say I drop 10 columns, and one is a typo, it could still be useful for the commit to pass.

I probably need to add another check though to check if the schema actually changed, otherwise the commits are pointless

}

impl std::future::IntoFuture for DropColumnBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let protocol = this.snapshot.protocol();

// Check if column mapping is enabled
if vec![5, 6].contains(&protocol.min_writer_version)
&& this.snapshot.table_config().column_mapping_mode() == ColumnMappingMode::None
{
return Err(DeltaTableError::Generic(
"Column mapping mode shouldn't be None".to_string(),
));
} else if protocol.min_writer_version == 7
&& !protocol
.writer_features
.as_ref()
.map(|v| v.contains(&crate::kernel::WriterFeatures::ColumnMapping))
.unwrap_or_default()
{
return Err(DeltaTableError::Transaction {
source: TransactionError::WriterFeaturesRequired(WriterFeatures::ColumnMapping),
});
} else if protocol.min_writer_version < 5 {
return Err(DeltaTableError::Generic(format!(
"Min writer >= 5, current version is ({})",
protocol.min_writer_version
)));
};

let dialect = GenericDialect {};
let mut metadata = this.snapshot.metadata().clone();
let fields = match this.fields {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
};

let table_schema = this.snapshot.schema();
let mut fields_not_found = HashMap::new();

let fields_map = fields
.iter()
.map(|field_name| {
let identifiers = Parser::new(&dialect)
.try_with_sql(field_name.as_str())
.unwrap()
.parse_multipart_identifier()
.unwrap()
.iter()
.map(|v| v.value.to_owned())
.collect_vec();
// Root field, field path
(identifiers[0].clone(), identifiers)
})
.collect::<HashMap<String, Vec<String>>>();

let new_table_schema = StructType::new(
table_schema
.fields()
.filter_map(|field| {
if let Some(identifiers) = fields_map.get(field.name()) {
if identifiers.len() == 1 {
None
} else {
drop_nested_fields(field, &identifiers[1..], &mut fields_not_found)
}
} else {
Some(field.clone())
}
})
.collect::<Vec<StructField>>(),
);

let mut not_found: Vec<String> = fields_not_found
.iter()
.map(|(key, value)| format!("{}.{}", key, value.join(".")))
.collect();

// Catch root fields that do not exist
not_found.append(
&mut fields_map
.values()
.filter_map(|v| {
if v.len() == 1 {
if table_schema.field(&v[0]).is_none() {
Some(v[0].clone())
} else {
None
}
} else {
None
}
})
.collect_vec(),
);

if (!not_found.is_empty() || table_schema.ne(&new_table_schema))
&& this.raise_if_not_exists
{
return Err(DeltaTableError::Generic(format!(
"Column(s) with name: {:#?} doesn't exist",
&not_found
)));
}

let operation = DeltaOperation::DropColumn { fields };

metadata.schema_string = serde_json::to_string(&new_table_schema)?;

let actions = vec![metadata.into()];

let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}

fn drop_nested_fields<'a>(
field: &StructField,
path: &'a [String],
unmatched_paths: &mut HashMap<String, &'a [String]>,
) -> Option<StructField> {
match field.data_type() {
DataType::Struct(inner_struct) => {
let remaining_fields = inner_struct
.fields()
.filter_map(|nested_field| {
if nested_field.name() == &path[0] {
if path.len() > 1 {
drop_nested_fields(nested_field, &path[1..], unmatched_paths)
} else {
None
}
} else {
Some(nested_field.clone())
}
})
.collect::<Vec<StructField>>();

// If field was the same, we push the missing paths recursively into the hashmap
// we also remove the subpaths from the hashmap if we see that it's a subset
// (might need to find better way )
if remaining_fields.eq(&inner_struct.fields().cloned().collect_vec()) {
if let Some(part_path) = unmatched_paths.get(&path[0]) {
if part_path == &&path[1..] {
unmatched_paths.remove(&path[0]);
}
}
unmatched_paths.insert(field.name().to_owned(), path);
};

if remaining_fields.is_empty() {
None
} else {
Some(StructField::new(
field.name(),
DataType::Struct(Box::new(StructType::new(remaining_fields))),
field.is_nullable(),
))
}
}
_ => Some(field.clone()),
}
}
7 changes: 7 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
pub mod cast;
pub mod convert_to_delta;
pub mod create;
pub mod drop_column;
pub mod drop_constraints;
pub mod filesystem_check;
pub mod optimize;
Expand All @@ -35,6 +36,7 @@ use self::{
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;
use drop_column::DropColumnBuilder;
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;
use set_tbl_properties::SetTablePropertiesBuilder;
Expand Down Expand Up @@ -224,6 +226,11 @@ impl DeltaOps {
DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Add new columns
pub fn drop_columns(self) -> DropColumnBuilder {
DropColumnBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Set table properties
pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap())
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,13 @@ pub enum DeltaOperation {
name: String,
},

/// Represents a Delta `Drop Column` operation.
/// Used to drop columns or fields in a struct
DropColumn {
/// Fields to be dropped from the schema
fields: Vec<String>,
},

/// Merge data with a source data with the following predicate
#[serde(rename_all = "camelCase")]
Merge {
Expand Down Expand Up @@ -476,6 +483,7 @@ impl DeltaOperation {
DeltaOperation::VacuumEnd { .. } => "VACUUM END",
DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
DeltaOperation::DropColumn { .. } => "DROP COLUMN",
}
}

Expand Down Expand Up @@ -516,6 +524,7 @@ impl DeltaOperation {
| Self::VacuumStart { .. }
| Self::VacuumEnd { .. }
| Self::AddConstraint { .. }
| Self::DropColumn { .. }
| Self::DropConstraint { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
Expand Down
7 changes: 7 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ class RawDeltaTable:
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> None: ...
def drop_columns(
self,
fields: List[str],
raise_if_not_exists: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> None: ...
def set_table_properties(
self,
properties: Dict[str, str],
Expand Down
34 changes: 34 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,40 @@ def drop_constraint(
post_commithook_properties.__dict__ if post_commithook_properties else None,
)

def drop_columns(
self,
fields: Union[str, List[str]],
raise_if_not_exists: bool = True,
custom_metadata: Optional[Dict[str, str]] = None,
post_commithook_properties: Optional[PostCommitHookProperties] = None,
) -> None:
"""Drop columns and/or update the fields of a stuct column
Args:
fields: fields to drop from table schema
custom_metadata: custom metadata that will be added to the transaction commit.
post_commithook_properties: properties for the post commit hook. If None, default values are used.
Example:
```python
from deltalake import DeltaTable
dt = DeltaTable("test_table")
dt.alter.drop_columns(
[
"foo",
"bar.baz"
]
)
```
"""
if isinstance(fields, str):
fields = [fields]

self.table._table.drop_columns(
fields,
raise_if_not_exists,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
)

def set_table_properties(
self,
properties: Dict[str, str],
Expand Down
Loading
Loading