From 353e08be0202c45334dcdceee65a8679f35de710 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 11 May 2024 08:43:23 +0200 Subject: [PATCH] feat(python, rust): respect column stats collection configurations (#2428) # Description All of the Rust and Python write actions will now properly adhere to the configuration regarding the amount of columns stats have to be collected for. Either by dataSkippingNumIndexedCols or dataSkippingStatsColumns. # Related Issue(s) - closes https://github.com/delta-io/delta-rs/issues/2427 --------- Co-authored-by: R. Tyler Croy --- crates/core/src/operations/delete.rs | 10 +++ crates/core/src/operations/merge/mod.rs | 11 +++- crates/core/src/operations/optimize.rs | 16 ++++- crates/core/src/operations/update.rs | 11 +++- crates/core/src/operations/write.rs | 71 +++++++++++++++++++++ crates/core/src/operations/writer.rs | 32 +++++++++- crates/core/src/table/config.rs | 3 + crates/core/src/writer/json.rs | 3 + crates/core/src/writer/record_batch.rs | 3 + crates/core/src/writer/stats.rs | 39 ++++++++++-- python/deltalake/_internal.pyi | 5 ++ python/deltalake/writer.py | 35 ++++++++++- python/src/lib.rs | 47 +++++++++++++- python/tests/test_delete.py | 48 ++++++++++++++ python/tests/test_merge.py | 61 ++++++++++++++++++ python/tests/test_update.py | 48 ++++++++++++++ python/tests/test_writer.py | 84 +++++++++++++++++++++++++ 17 files changed, 513 insertions(+), 14 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index e20005c69d..5a666b924f 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -34,6 +34,7 @@ use serde::Serialize; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder, @@ -153,6 +154,14 @@ async fn excute_non_empty_expr( let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(snapshot), state.clone(), @@ -164,6 +173,7 @@ async fn excute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await? .into_iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index dd5d433ebd..ce1ee7b223 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -74,7 +74,7 @@ use crate::kernel::Action; use crate::logstore::LogStoreRef; use crate::operations::merge::barrier::find_barrier_node; use crate::operations::transaction::CommitBuilder; -use crate::operations::write::write_execution_plan; +use crate::operations::write::{write_execution_plan, WriterStatsConfig}; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -1368,6 +1368,14 @@ async fn execute( // write projected records let table_partition_cols = current_metadata.partition_columns.clone(); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let rewrite_start = Instant::now(); let add_actions = write_execution_plan( Some(&snapshot), @@ -1380,6 +1388,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 614fdf0d99..73155f4f19 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -424,6 +424,10 @@ pub struct MergeTaskParameters { file_schema: ArrowSchemaRef, /// Properties passed to parquet writer writer_properties: WriterProperties, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } /// A stream of record batches, with a ParquetError on failure. @@ -483,7 +487,12 @@ impl MergePlan { Some(task_parameters.input_parameters.target_size as usize), None, )?; - let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?; + let mut writer = PartitionWriter::try_with_config( + object_store, + writer_config, + task_parameters.num_indexed_cols, + task_parameters.stats_columns.clone(), + )?; let mut read_stream = read_stream.await?; @@ -841,6 +850,11 @@ pub fn create_merge_plan( input_parameters, file_schema, writer_properties, + num_indexed_cols: snapshot.table_config().num_indexed_cols(), + stats_columns: snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), }), read_table_version: snapshot.version(), }) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 700e23a411..29a1495e47 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -41,12 +41,12 @@ use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde::Serialize; -use super::transaction::PROTOCOL; use super::write::write_execution_plan; use super::{ datafusion_utils::Expression, transaction::{CommitBuilder, CommitProperties}, }; +use super::{transaction::PROTOCOL, write::WriterStatsConfig}; use crate::delta_datafusion::{ create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, DeltaSessionContext, @@ -348,6 +348,14 @@ async fn execute( projection_update.clone(), )?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(&snapshot), state.clone(), @@ -359,6 +367,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 8ecfb3078b..f87037fa16 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -58,6 +58,7 @@ use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; @@ -337,6 +338,24 @@ impl WriteBuilder { } } } +/// Configuration for the writer on how to collect stats +#[derive(Clone)] +pub struct WriterStatsConfig { + /// Number of columns to collect stats for, idx based + num_indexed_cols: i32, + /// Optional list of columns which to collect stats for, takes precedende over num_index_cols + stats_columns: Option>, +} + +impl WriterStatsConfig { + /// Create new writer stats config + pub fn new(num_indexed_cols: i32, stats_columns: Option>) -> Self { + Self { + num_indexed_cols, + stats_columns, + } + } +} #[allow(clippy::too_many_arguments)] async fn write_execution_plan_with_predicate( @@ -351,6 +370,7 @@ async fn write_execution_plan_with_predicate( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let schema: ArrowSchemaRef = if schema_mode.is_some() { plan.schema() @@ -386,6 +406,8 @@ async fn write_execution_plan_with_predicate( writer_properties.clone(), target_file_size, write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), ); let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); @@ -438,6 +460,7 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { write_execution_plan_with_predicate( None, @@ -451,10 +474,12 @@ pub(crate) async fn write_execution_plan( writer_properties, safe_cast, schema_mode, + writer_stats_config, ) .await } +#[allow(clippy::too_many_arguments)] async fn execute_non_empty_expr( snapshot: &DeltaTableState, log_store: LogStoreRef, @@ -463,6 +488,7 @@ async fn execute_non_empty_expr( expression: &Expr, rewrite: &[Add], writer_properties: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -496,6 +522,7 @@ async fn execute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await?; @@ -503,6 +530,7 @@ async fn execute_non_empty_expr( } // This should only be called wth a valid predicate +#[allow(clippy::too_many_arguments)] async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, @@ -511,6 +539,7 @@ async fn prepare_predicate_actions( partition_columns: Vec, writer_properties: Option, deletion_timestamp: i64, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let candidates = find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; @@ -526,6 +555,7 @@ async fn prepare_predicate_actions( &predicate, &candidates.candidates, writer_properties, + writer_stats_config, ) .await? }; @@ -723,6 +753,18 @@ impl std::future::IntoFuture for WriteBuilder { _ => (None, None), }; + let config: Option> = this + .snapshot + .as_ref() + .map(|snapshot| snapshot.table_config()); + + let (num_indexed_cols, stats_columns) = + get_num_idx_cols_and_stats_columns(config, this.configuration); + + let writer_stats_config = WriterStatsConfig { + num_indexed_cols, + stats_columns, + }; // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( predicate.clone(), @@ -736,6 +778,7 @@ impl std::future::IntoFuture for WriteBuilder { this.writer_properties.clone(), this.safe_cast, this.schema_mode, + writer_stats_config.clone(), ) .await?; actions.extend(add_actions); @@ -772,6 +815,7 @@ impl std::future::IntoFuture for WriteBuilder { partition_columns.clone(), this.writer_properties, deletion_timestamp, + writer_stats_config, ) .await?; if !predicate_actions.is_empty() { @@ -878,6 +922,33 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE Ok(()) } +/// Get the num_idx_columns and stats_columns from the table configuration in the state +/// If table_config does not exist (only can occur in the first write action) it takes +/// the configuration that was passed to the writerBuilder. +pub fn get_num_idx_cols_and_stats_columns( + config: Option>, + configuration: HashMap>, +) -> (i32, Option>) { + let (num_index_cols, stats_columns) = match &config { + Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), + _ => ( + configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| v.as_ref().map(|v| v.split(',').collect::>())), + ), + }; + ( + num_index_cols, + stats_columns + .clone() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index c778ddfad5..6c22cf6828 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -79,6 +79,10 @@ pub struct WriterConfig { /// Row chunks passed to parquet writer. This and the internal parquet writer settings /// determine how fine granular we can track / control the size of resulting files. write_batch_size: usize, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl WriterConfig { @@ -89,6 +93,8 @@ impl WriterConfig { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + num_indexed_cols: i32, + stats_columns: Option>, ) -> Self { let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() @@ -104,6 +110,8 @@ impl WriterConfig { writer_properties, target_file_size, write_batch_size, + num_indexed_cols, + stats_columns, } } @@ -177,8 +185,12 @@ impl DeltaWriter { Some(self.config.target_file_size), Some(self.config.write_batch_size), )?; - let mut writer = - PartitionWriter::try_with_config(self.object_store.clone(), config)?; + let mut writer = PartitionWriter::try_with_config( + self.object_store.clone(), + config, + self.config.num_indexed_cols, + self.config.stats_columns.clone(), + )?; writer.write(&record_batch).await?; let _ = self.partition_writers.insert(partition_key, writer); } @@ -269,6 +281,10 @@ pub(crate) struct PartitionWriter { arrow_writer: ArrowWriter, part_counter: usize, files_written: Vec, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl PartitionWriter { @@ -276,6 +292,8 @@ impl PartitionWriter { pub fn try_with_config( object_store: ObjectStoreRef, config: PartitionWriterConfig, + num_indexed_cols: i32, + stats_columns: Option>, ) -> DeltaResult { let buffer = ShareableBuffer::default(); let arrow_writer = ArrowWriter::try_new( @@ -292,6 +310,8 @@ impl PartitionWriter { arrow_writer, part_counter: 0, files_written: Vec::new(), + num_indexed_cols, + stats_columns, }) } @@ -349,6 +369,8 @@ impl PartitionWriter { path.to_string(), file_size, &metadata, + self.num_indexed_cols, + &self.stats_columns, ) .map_err(|err| WriteError::CreateAdd { source: Box::new(err), @@ -400,6 +422,7 @@ impl PartitionWriter { mod tests { use super::*; use crate::storage::utils::flatten_list_stream as list; + use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::test_utils::*; use crate::DeltaTableBuilder; use arrow::array::{Int32Array, StringArray}; @@ -419,6 +442,8 @@ mod tests { writer_properties, target_file_size, write_batch_size, + DEFAULT_NUM_INDEX_COLS, + None, ); DeltaWriter::new(object_store, config) } @@ -438,7 +463,8 @@ mod tests { write_batch_size, ) .unwrap(); - PartitionWriter::try_with_config(object_store, config).unwrap() + PartitionWriter::try_with_config(object_store, config, DEFAULT_NUM_INDEX_COLS, None) + .unwrap() } #[tokio::test] diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 28b06e8f79..05fb0c53ca 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -208,6 +208,9 @@ macro_rules! table_config { /// Well known delta table configuration pub struct TableConfig<'a>(pub(crate) &'a HashMap>); +/// Default num index cols +pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; + impl<'a> TableConfig<'a> { table_config!( ( diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 8cc908320e..72d6ffff42 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -27,6 +27,7 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -368,6 +369,8 @@ impl DeltaWriter> for JsonWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 56ca7642e4..28db1b7072 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -32,6 +32,7 @@ use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; use crate::operations::cast::merge_schema; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::DeltaTable; /// Writes messages to a delta lake table. @@ -230,6 +231,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 312de6f9e3..849179a973 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,3 +1,4 @@ +use std::cmp::min; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; @@ -21,8 +22,15 @@ pub fn create_add( path: String, size: i64, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { - let stats = stats_from_file_metadata(partition_values, file_metadata)?; + let stats = stats_from_file_metadata( + partition_values, + file_metadata, + num_indexed_cols, + stats_columns, + )?; let stats_string = serde_json::to_string(&stats)?; // Determine the modification timestamp to include in the add action - milliseconds since epoch @@ -61,6 +69,8 @@ pub fn create_add( fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; @@ -75,9 +85,30 @@ fn stats_from_file_metadata( .map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) .collect(); let row_group_metadata = row_group_metadata?; + let schema_cols = file_metadata + .schema + .iter() + .map(|v| &v.name) + .collect::>(); + + let idx_to_iterate = if let Some(stats_cols) = stats_columns { + stats_cols + .iter() + .map(|col| schema_cols[1..].iter().position(|value| *value == col)) + .flatten() + .collect() + } else if num_indexed_cols == -1 { + (0..schema_descriptor.num_columns()).collect::>() + } else if num_indexed_cols >= 0 { + (0..min(num_indexed_cols as usize, schema_descriptor.num_columns())).collect::>() + } else { + return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic( + "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(), + ))); + }; - for i in 0..schema_descriptor.num_columns() { - let column_descr = schema_descriptor.column(i); + for idx in idx_to_iterate { + let column_descr = schema_descriptor.column(idx); let column_path = column_descr.path(); let column_path_parts = column_path.parts(); @@ -90,7 +121,7 @@ fn stats_from_file_metadata( let maybe_stats: Option = row_group_metadata .iter() .map(|g| { - g.column(i) + g.column(idx) .statistics() .map(|s| AggregatedStats::from((s, &column_descr.logical_type()))) }) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index d97331c094..8cf2262d2c 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -43,6 +43,8 @@ class RawDeltaTable: def table_uri(self) -> str: ... def version(self) -> int: ... def get_latest_version(self) -> int: ... + def get_num_index_cols(self) -> int: ... + def get_stats_columns(self) -> Optional[List[str]]: ... def metadata(self) -> RawDeltaTableMetaData: ... def protocol_versions(self) -> List[Any]: ... def load_version(self, version: int) -> None: ... @@ -213,6 +215,9 @@ def create_deltalake( custom_metadata: Optional[Dict[str, str]], ) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... +def get_num_idx_cols_and_stats_columns( + table: Optional[RawDeltaTable], configuration: Optional[Mapping[str, Optional[str]]] +) -> Tuple[int, Optional[List[str]]]: ... # Can't implement inheritance (see note in src/schema.rs), so this is next # best thing. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index eaf95650ea..5cd2128a17 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -41,6 +41,9 @@ from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import batch_distinct from ._internal import convert_to_deltalake as _convert_to_deltalake +from ._internal import ( + get_num_idx_cols_and_stats_columns as get_num_idx_cols_and_stats_columns, +) from ._internal import write_new_deltalake as write_deltalake_pyarrow from ._internal import write_to_deltalake as write_deltalake_rust from .exceptions import DeltaProtocolError, TableNotFoundError @@ -66,6 +69,7 @@ _has_pandas = True PYARROW_MAJOR_VERSION = int(pa.__version__.split(".", maxsplit=1)[0]) +DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 @dataclass @@ -262,7 +266,6 @@ def write_deltalake( if table is not None: storage_options = table._storage_options or {} storage_options.update(storage_options or {}) - table.update_incremental() __enforce_append_only(table=table, configuration=configuration, mode=mode) @@ -340,6 +343,10 @@ def write_deltalake( ) # We need to write against the latest table version + num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( + table._table if table is not None else None, configuration + ) + def sort_arrow_schema(schema: pa.schema) -> pa.schema: sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type))) return pa.schema(sorted_cols) @@ -413,7 +420,11 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: def visitor(written_file: Any) -> None: path, partition_values = get_partitions_from_path(written_file.path) - stats = get_file_stats_from_metadata(written_file.metadata) + stats = get_file_stats_from_metadata( + written_file.metadata, + num_indexed_cols=num_indexed_cols, + columns_to_collect_stats=stats_cols, + ) # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: @@ -711,6 +722,8 @@ def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: def get_file_stats_from_metadata( metadata: Any, + num_indexed_cols: int, + columns_to_collect_stats: Optional[List[str]], ) -> Dict[str, Union[int, Dict[str, Any]]]: stats = { "numRecords": metadata.num_rows, @@ -724,8 +737,24 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if metadata.row_group(i).num_rows > 0: yield metadata.row_group(i) - for column_idx in range(metadata.num_columns): + schema_columns = metadata.schema.names + if columns_to_collect_stats is not None: + idx_to_iterate = [] + for col in columns_to_collect_stats: + try: + idx_to_iterate.append(schema_columns.index(col)) + except ValueError: + pass + elif num_indexed_cols == -1: + idx_to_iterate = list(range(metadata.num_columns)) + elif num_indexed_cols >= 0: + idx_to_iterate = list(range(min(num_indexed_cols, metadata.num_columns))) + else: + raise ValueError("delta.dataSkippingNumIndexedCols valid values are >=-1") + + for column_idx in idx_to_iterate: name = metadata.row_group(0).column(column_idx).path_in_schema + # If stats missing, then we can't know aggregate stats if all( group.column(column_idx).is_stats_set for group in iter_groups(metadata) diff --git a/python/src/lib.rs b/python/src/lib.rs index 2bada08943..faea1f236b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -37,7 +37,9 @@ use deltalake::operations::load_cdf::CdfLoadBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; -use deltalake::operations::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use deltalake::operations::transaction::{ + CommitBuilder, CommitProperties, TableReference, PROTOCOL, +}; use deltalake::operations::update::UpdateBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::parquet::basic::Compression; @@ -203,6 +205,25 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } + pub fn get_num_index_cols(&mut self) -> PyResult { + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? + .config() + .num_indexed_cols()) + } + + pub fn get_stats_columns(&mut self) -> PyResult>> { + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? + .config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>())) + } + pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( @@ -1699,6 +1720,26 @@ fn convert_to_deltalake( Ok(()) } +#[pyfunction] +fn get_num_idx_cols_and_stats_columns( + table: Option<&RawDeltaTable>, + configuration: Option>>, +) -> PyResult<(i32, Option>)> { + let config = table + .as_ref() + .map(|table| table._table.snapshot()) + .transpose() + .map_err(PythonError::from)? + .map(|snapshot| snapshot.table_config()); + + Ok( + deltalake::operations::write::get_num_idx_cols_and_stats_columns( + config, + configuration.unwrap_or_default(), + ), + ) +} + #[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] struct PyDeltaDataChecker { inner: DeltaDataChecker, @@ -1755,6 +1796,10 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; + m.add_function(pyo3::wrap_pyfunction!( + get_num_idx_cols_and_stats_columns, + m + )?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py index 519af0c935..65b5ebdec3 100644 --- a/python/tests/test_delete.py +++ b/python/tests/test_delete.py @@ -81,3 +81,51 @@ def test_delete_large_dtypes( table = dt.to_pyarrow_table() assert table.equals(expected_table) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_delete_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.delete("bar == 3") + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 1 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 1 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 608a551069..2349e68963 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -857,6 +857,67 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre assert last_action["operationParameters"].get("predicate") == predicate +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a"]), + "bar": pa.array([10]), + "baz": pa.array([10]), + } + ) + + dt.merge( + data, + predicate="source.foo = target.foo", + source_alias="source", + target_alias="target", + ).when_matched_update_all().execute() + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 10 + + def test_merge_field_special_characters_delete_2438(tmp_path: pathlib.Path): ## See issue: https://github.com/delta-io/delta-rs/issues/2438 data = pa.table({"x": [1, 2, 3], "y--1": [4, 5, 6]}) diff --git a/python/tests/test_update.py b/python/tests/test_update.py index fcc17cf027..74ae130224 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -234,3 +234,51 @@ def test_update_with_incorrect_updates_input( str(excinfo.value) == "Invalid datatype provided in new_values, only int, float, bool, list, str or datetime or accepted." ) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_update_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.update({"foo": "'hello world'"}) + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 0 + assert stats["min.foo"] == "hello world" + assert stats["max.foo"] == "hello world" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 41fb876c12..6e973ab18d 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1586,6 +1586,90 @@ def test_rust_decimal_cast(tmp_path: pathlib.Path): ) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_column_idx(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] == 1 + assert stats["min.bar"] == 1 + assert stats["max.bar"] == 3 + assert stats["null_count.baz"] is None + assert stats["min.baz"] is None + assert stats["max.baz"] is None + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingNumIndexedCols": "2"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) + + @pytest.mark.parametrize( "array", [