diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index e20005c69d..5a666b924f 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -34,6 +34,7 @@ use serde::Serialize; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder, @@ -153,6 +154,14 @@ async fn excute_non_empty_expr( let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(snapshot), state.clone(), @@ -164,6 +173,7 @@ async fn excute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await? .into_iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index dd5d433ebd..ce1ee7b223 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -74,7 +74,7 @@ use crate::kernel::Action; use crate::logstore::LogStoreRef; use crate::operations::merge::barrier::find_barrier_node; use crate::operations::transaction::CommitBuilder; -use crate::operations::write::write_execution_plan; +use crate::operations::write::{write_execution_plan, WriterStatsConfig}; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -1368,6 +1368,14 @@ async fn execute( // write projected records let table_partition_cols = current_metadata.partition_columns.clone(); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let rewrite_start = Instant::now(); let add_actions = write_execution_plan( Some(&snapshot), @@ -1380,6 +1388,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 614fdf0d99..73155f4f19 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -424,6 +424,10 @@ pub struct MergeTaskParameters { file_schema: ArrowSchemaRef, /// Properties passed to parquet writer writer_properties: WriterProperties, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } /// A stream of record batches, with a ParquetError on failure. @@ -483,7 +487,12 @@ impl MergePlan { Some(task_parameters.input_parameters.target_size as usize), None, )?; - let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?; + let mut writer = PartitionWriter::try_with_config( + object_store, + writer_config, + task_parameters.num_indexed_cols, + task_parameters.stats_columns.clone(), + )?; let mut read_stream = read_stream.await?; @@ -841,6 +850,11 @@ pub fn create_merge_plan( input_parameters, file_schema, writer_properties, + num_indexed_cols: snapshot.table_config().num_indexed_cols(), + stats_columns: snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), }), read_table_version: snapshot.version(), }) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 700e23a411..29a1495e47 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -41,12 +41,12 @@ use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde::Serialize; -use super::transaction::PROTOCOL; use super::write::write_execution_plan; use super::{ datafusion_utils::Expression, transaction::{CommitBuilder, CommitProperties}, }; +use super::{transaction::PROTOCOL, write::WriterStatsConfig}; use crate::delta_datafusion::{ create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, DeltaSessionContext, @@ -348,6 +348,14 @@ async fn execute( projection_update.clone(), )?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(&snapshot), state.clone(), @@ -359,6 +367,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 8ecfb3078b..f87037fa16 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -58,6 +58,7 @@ use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; @@ -337,6 +338,24 @@ impl WriteBuilder { } } } +/// Configuration for the writer on how to collect stats +#[derive(Clone)] +pub struct WriterStatsConfig { + /// Number of columns to collect stats for, idx based + num_indexed_cols: i32, + /// Optional list of columns which to collect stats for, takes precedende over num_index_cols + stats_columns: Option>, +} + +impl WriterStatsConfig { + /// Create new writer stats config + pub fn new(num_indexed_cols: i32, stats_columns: Option>) -> Self { + Self { + num_indexed_cols, + stats_columns, + } + } +} #[allow(clippy::too_many_arguments)] async fn write_execution_plan_with_predicate( @@ -351,6 +370,7 @@ async fn write_execution_plan_with_predicate( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let schema: ArrowSchemaRef = if schema_mode.is_some() { plan.schema() @@ -386,6 +406,8 @@ async fn write_execution_plan_with_predicate( writer_properties.clone(), target_file_size, write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), ); let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); @@ -438,6 +460,7 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { write_execution_plan_with_predicate( None, @@ -451,10 +474,12 @@ pub(crate) async fn write_execution_plan( writer_properties, safe_cast, schema_mode, + writer_stats_config, ) .await } +#[allow(clippy::too_many_arguments)] async fn execute_non_empty_expr( snapshot: &DeltaTableState, log_store: LogStoreRef, @@ -463,6 +488,7 @@ async fn execute_non_empty_expr( expression: &Expr, rewrite: &[Add], writer_properties: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -496,6 +522,7 @@ async fn execute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await?; @@ -503,6 +530,7 @@ async fn execute_non_empty_expr( } // This should only be called wth a valid predicate +#[allow(clippy::too_many_arguments)] async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, @@ -511,6 +539,7 @@ async fn prepare_predicate_actions( partition_columns: Vec, writer_properties: Option, deletion_timestamp: i64, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let candidates = find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; @@ -526,6 +555,7 @@ async fn prepare_predicate_actions( &predicate, &candidates.candidates, writer_properties, + writer_stats_config, ) .await? }; @@ -723,6 +753,18 @@ impl std::future::IntoFuture for WriteBuilder { _ => (None, None), }; + let config: Option> = this + .snapshot + .as_ref() + .map(|snapshot| snapshot.table_config()); + + let (num_indexed_cols, stats_columns) = + get_num_idx_cols_and_stats_columns(config, this.configuration); + + let writer_stats_config = WriterStatsConfig { + num_indexed_cols, + stats_columns, + }; // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( predicate.clone(), @@ -736,6 +778,7 @@ impl std::future::IntoFuture for WriteBuilder { this.writer_properties.clone(), this.safe_cast, this.schema_mode, + writer_stats_config.clone(), ) .await?; actions.extend(add_actions); @@ -772,6 +815,7 @@ impl std::future::IntoFuture for WriteBuilder { partition_columns.clone(), this.writer_properties, deletion_timestamp, + writer_stats_config, ) .await?; if !predicate_actions.is_empty() { @@ -878,6 +922,33 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE Ok(()) } +/// Get the num_idx_columns and stats_columns from the table configuration in the state +/// If table_config does not exist (only can occur in the first write action) it takes +/// the configuration that was passed to the writerBuilder. +pub fn get_num_idx_cols_and_stats_columns( + config: Option>, + configuration: HashMap>, +) -> (i32, Option>) { + let (num_index_cols, stats_columns) = match &config { + Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), + _ => ( + configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| v.as_ref().map(|v| v.split(',').collect::>())), + ), + }; + ( + num_index_cols, + stats_columns + .clone() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index c778ddfad5..6c22cf6828 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -79,6 +79,10 @@ pub struct WriterConfig { /// Row chunks passed to parquet writer. This and the internal parquet writer settings /// determine how fine granular we can track / control the size of resulting files. write_batch_size: usize, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl WriterConfig { @@ -89,6 +93,8 @@ impl WriterConfig { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + num_indexed_cols: i32, + stats_columns: Option>, ) -> Self { let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() @@ -104,6 +110,8 @@ impl WriterConfig { writer_properties, target_file_size, write_batch_size, + num_indexed_cols, + stats_columns, } } @@ -177,8 +185,12 @@ impl DeltaWriter { Some(self.config.target_file_size), Some(self.config.write_batch_size), )?; - let mut writer = - PartitionWriter::try_with_config(self.object_store.clone(), config)?; + let mut writer = PartitionWriter::try_with_config( + self.object_store.clone(), + config, + self.config.num_indexed_cols, + self.config.stats_columns.clone(), + )?; writer.write(&record_batch).await?; let _ = self.partition_writers.insert(partition_key, writer); } @@ -269,6 +281,10 @@ pub(crate) struct PartitionWriter { arrow_writer: ArrowWriter, part_counter: usize, files_written: Vec, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl PartitionWriter { @@ -276,6 +292,8 @@ impl PartitionWriter { pub fn try_with_config( object_store: ObjectStoreRef, config: PartitionWriterConfig, + num_indexed_cols: i32, + stats_columns: Option>, ) -> DeltaResult { let buffer = ShareableBuffer::default(); let arrow_writer = ArrowWriter::try_new( @@ -292,6 +310,8 @@ impl PartitionWriter { arrow_writer, part_counter: 0, files_written: Vec::new(), + num_indexed_cols, + stats_columns, }) } @@ -349,6 +369,8 @@ impl PartitionWriter { path.to_string(), file_size, &metadata, + self.num_indexed_cols, + &self.stats_columns, ) .map_err(|err| WriteError::CreateAdd { source: Box::new(err), @@ -400,6 +422,7 @@ impl PartitionWriter { mod tests { use super::*; use crate::storage::utils::flatten_list_stream as list; + use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::test_utils::*; use crate::DeltaTableBuilder; use arrow::array::{Int32Array, StringArray}; @@ -419,6 +442,8 @@ mod tests { writer_properties, target_file_size, write_batch_size, + DEFAULT_NUM_INDEX_COLS, + None, ); DeltaWriter::new(object_store, config) } @@ -438,7 +463,8 @@ mod tests { write_batch_size, ) .unwrap(); - PartitionWriter::try_with_config(object_store, config).unwrap() + PartitionWriter::try_with_config(object_store, config, DEFAULT_NUM_INDEX_COLS, None) + .unwrap() } #[tokio::test] diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 28b06e8f79..05fb0c53ca 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -208,6 +208,9 @@ macro_rules! table_config { /// Well known delta table configuration pub struct TableConfig<'a>(pub(crate) &'a HashMap>); +/// Default num index cols +pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; + impl<'a> TableConfig<'a> { table_config!( ( diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 8cc908320e..72d6ffff42 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -27,6 +27,7 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -368,6 +369,8 @@ impl DeltaWriter> for JsonWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 56ca7642e4..28db1b7072 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -32,6 +32,7 @@ use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; use crate::operations::cast::merge_schema; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::DeltaTable; /// Writes messages to a delta lake table. @@ -230,6 +231,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 312de6f9e3..849179a973 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,3 +1,4 @@ +use std::cmp::min; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; @@ -21,8 +22,15 @@ pub fn create_add( path: String, size: i64, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { - let stats = stats_from_file_metadata(partition_values, file_metadata)?; + let stats = stats_from_file_metadata( + partition_values, + file_metadata, + num_indexed_cols, + stats_columns, + )?; let stats_string = serde_json::to_string(&stats)?; // Determine the modification timestamp to include in the add action - milliseconds since epoch @@ -61,6 +69,8 @@ pub fn create_add( fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; @@ -75,9 +85,30 @@ fn stats_from_file_metadata( .map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) .collect(); let row_group_metadata = row_group_metadata?; + let schema_cols = file_metadata + .schema + .iter() + .map(|v| &v.name) + .collect::>(); + + let idx_to_iterate = if let Some(stats_cols) = stats_columns { + stats_cols + .iter() + .map(|col| schema_cols[1..].iter().position(|value| *value == col)) + .flatten() + .collect() + } else if num_indexed_cols == -1 { + (0..schema_descriptor.num_columns()).collect::>() + } else if num_indexed_cols >= 0 { + (0..min(num_indexed_cols as usize, schema_descriptor.num_columns())).collect::>() + } else { + return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic( + "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(), + ))); + }; - for i in 0..schema_descriptor.num_columns() { - let column_descr = schema_descriptor.column(i); + for idx in idx_to_iterate { + let column_descr = schema_descriptor.column(idx); let column_path = column_descr.path(); let column_path_parts = column_path.parts(); @@ -90,7 +121,7 @@ fn stats_from_file_metadata( let maybe_stats: Option = row_group_metadata .iter() .map(|g| { - g.column(i) + g.column(idx) .statistics() .map(|s| AggregatedStats::from((s, &column_descr.logical_type()))) }) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index d97331c094..8cf2262d2c 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -43,6 +43,8 @@ class RawDeltaTable: def table_uri(self) -> str: ... def version(self) -> int: ... def get_latest_version(self) -> int: ... + def get_num_index_cols(self) -> int: ... + def get_stats_columns(self) -> Optional[List[str]]: ... def metadata(self) -> RawDeltaTableMetaData: ... def protocol_versions(self) -> List[Any]: ... def load_version(self, version: int) -> None: ... @@ -213,6 +215,9 @@ def create_deltalake( custom_metadata: Optional[Dict[str, str]], ) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... +def get_num_idx_cols_and_stats_columns( + table: Optional[RawDeltaTable], configuration: Optional[Mapping[str, Optional[str]]] +) -> Tuple[int, Optional[List[str]]]: ... # Can't implement inheritance (see note in src/schema.rs), so this is next # best thing. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index eaf95650ea..5cd2128a17 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -41,6 +41,9 @@ from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import batch_distinct from ._internal import convert_to_deltalake as _convert_to_deltalake +from ._internal import ( + get_num_idx_cols_and_stats_columns as get_num_idx_cols_and_stats_columns, +) from ._internal import write_new_deltalake as write_deltalake_pyarrow from ._internal import write_to_deltalake as write_deltalake_rust from .exceptions import DeltaProtocolError, TableNotFoundError @@ -66,6 +69,7 @@ _has_pandas = True PYARROW_MAJOR_VERSION = int(pa.__version__.split(".", maxsplit=1)[0]) +DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 @dataclass @@ -262,7 +266,6 @@ def write_deltalake( if table is not None: storage_options = table._storage_options or {} storage_options.update(storage_options or {}) - table.update_incremental() __enforce_append_only(table=table, configuration=configuration, mode=mode) @@ -340,6 +343,10 @@ def write_deltalake( ) # We need to write against the latest table version + num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( + table._table if table is not None else None, configuration + ) + def sort_arrow_schema(schema: pa.schema) -> pa.schema: sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type))) return pa.schema(sorted_cols) @@ -413,7 +420,11 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: def visitor(written_file: Any) -> None: path, partition_values = get_partitions_from_path(written_file.path) - stats = get_file_stats_from_metadata(written_file.metadata) + stats = get_file_stats_from_metadata( + written_file.metadata, + num_indexed_cols=num_indexed_cols, + columns_to_collect_stats=stats_cols, + ) # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: @@ -711,6 +722,8 @@ def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: def get_file_stats_from_metadata( metadata: Any, + num_indexed_cols: int, + columns_to_collect_stats: Optional[List[str]], ) -> Dict[str, Union[int, Dict[str, Any]]]: stats = { "numRecords": metadata.num_rows, @@ -724,8 +737,24 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if metadata.row_group(i).num_rows > 0: yield metadata.row_group(i) - for column_idx in range(metadata.num_columns): + schema_columns = metadata.schema.names + if columns_to_collect_stats is not None: + idx_to_iterate = [] + for col in columns_to_collect_stats: + try: + idx_to_iterate.append(schema_columns.index(col)) + except ValueError: + pass + elif num_indexed_cols == -1: + idx_to_iterate = list(range(metadata.num_columns)) + elif num_indexed_cols >= 0: + idx_to_iterate = list(range(min(num_indexed_cols, metadata.num_columns))) + else: + raise ValueError("delta.dataSkippingNumIndexedCols valid values are >=-1") + + for column_idx in idx_to_iterate: name = metadata.row_group(0).column(column_idx).path_in_schema + # If stats missing, then we can't know aggregate stats if all( group.column(column_idx).is_stats_set for group in iter_groups(metadata) diff --git a/python/src/lib.rs b/python/src/lib.rs index 2bada08943..faea1f236b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -37,7 +37,9 @@ use deltalake::operations::load_cdf::CdfLoadBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; -use deltalake::operations::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use deltalake::operations::transaction::{ + CommitBuilder, CommitProperties, TableReference, PROTOCOL, +}; use deltalake::operations::update::UpdateBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::parquet::basic::Compression; @@ -203,6 +205,25 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } + pub fn get_num_index_cols(&mut self) -> PyResult { + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? + .config() + .num_indexed_cols()) + } + + pub fn get_stats_columns(&mut self) -> PyResult>> { + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? + .config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>())) + } + pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( @@ -1699,6 +1720,26 @@ fn convert_to_deltalake( Ok(()) } +#[pyfunction] +fn get_num_idx_cols_and_stats_columns( + table: Option<&RawDeltaTable>, + configuration: Option>>, +) -> PyResult<(i32, Option>)> { + let config = table + .as_ref() + .map(|table| table._table.snapshot()) + .transpose() + .map_err(PythonError::from)? + .map(|snapshot| snapshot.table_config()); + + Ok( + deltalake::operations::write::get_num_idx_cols_and_stats_columns( + config, + configuration.unwrap_or_default(), + ), + ) +} + #[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] struct PyDeltaDataChecker { inner: DeltaDataChecker, @@ -1755,6 +1796,10 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; + m.add_function(pyo3::wrap_pyfunction!( + get_num_idx_cols_and_stats_columns, + m + )?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py index 519af0c935..65b5ebdec3 100644 --- a/python/tests/test_delete.py +++ b/python/tests/test_delete.py @@ -81,3 +81,51 @@ def test_delete_large_dtypes( table = dt.to_pyarrow_table() assert table.equals(expected_table) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_delete_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.delete("bar == 3") + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 1 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 1 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 608a551069..2349e68963 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -857,6 +857,67 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre assert last_action["operationParameters"].get("predicate") == predicate +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a"]), + "bar": pa.array([10]), + "baz": pa.array([10]), + } + ) + + dt.merge( + data, + predicate="source.foo = target.foo", + source_alias="source", + target_alias="target", + ).when_matched_update_all().execute() + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 10 + + def test_merge_field_special_characters_delete_2438(tmp_path: pathlib.Path): ## See issue: https://github.com/delta-io/delta-rs/issues/2438 data = pa.table({"x": [1, 2, 3], "y--1": [4, 5, 6]}) diff --git a/python/tests/test_update.py b/python/tests/test_update.py index fcc17cf027..74ae130224 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -234,3 +234,51 @@ def test_update_with_incorrect_updates_input( str(excinfo.value) == "Invalid datatype provided in new_values, only int, float, bool, list, str or datetime or accepted." ) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_update_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.update({"foo": "'hello world'"}) + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 0 + assert stats["min.foo"] == "hello world" + assert stats["max.foo"] == "hello world" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 41fb876c12..6e973ab18d 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1586,6 +1586,90 @@ def test_rust_decimal_cast(tmp_path: pathlib.Path): ) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_column_idx(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] == 1 + assert stats["min.bar"] == 1 + assert stats["max.bar"] == 3 + assert stats["null_count.baz"] is None + assert stats["min.baz"] is None + assert stats["max.baz"] is None + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingNumIndexedCols": "2"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) + + @pytest.mark.parametrize( "array", [