diff --git a/crates/polars-expr/src/expressions/aggregation.rs b/crates/polars-expr/src/expressions/aggregation.rs index a2e868c59fa2..367fdbe3ad2d 100644 --- a/crates/polars-expr/src/expressions/aggregation.rs +++ b/crates/polars-expr/src/expressions/aggregation.rs @@ -468,6 +468,10 @@ impl PhysicalExpr for AggregationExpr { } } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + } + fn is_scalar(&self) -> bool { true } @@ -757,6 +761,11 @@ impl PhysicalExpr for AggQuantileExpr { )) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + self.quantile.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.input.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/alias.rs b/crates/polars-expr/src/expressions/alias.rs index 131d2ca2f16c..410ca00448a4 100644 --- a/crates/polars-expr/src/expressions/alias.rs +++ b/crates/polars-expr/src/expressions/alias.rs @@ -59,6 +59,11 @@ impl PhysicalExpr for AliasExpr { Ok(ac) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.physical_expr.collect_live_columns(lv); + lv.insert(self.name.clone()); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { Ok(Field::new( self.name.clone(), diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index e6c5ae856ae7..7fc0739f131e 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -16,6 +16,7 @@ use crate::expressions::{ AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups, }; +#[derive(Clone)] pub struct ApplyExpr { inputs: Vec>, function: SpecialEq>, @@ -426,6 +427,50 @@ impl PhysicalExpr for ApplyExpr { } } } + + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + for i in &self.inputs { + i.collect_live_columns(lv); + } + } + fn replace_elementwise_const_columns( + &self, + const_columns: &PlHashMap>, + ) -> Option> { + if self.collect_groups == ApplyOptions::ElementWise { + let mut new_inputs = Vec::new(); + for i in 0..self.inputs.len() { + match self.inputs[i].replace_elementwise_const_columns(const_columns) { + None => continue, + Some(new) => { + new_inputs.reserve(self.inputs.len()); + new_inputs.extend(self.inputs[..i].iter().cloned()); + new_inputs.push(new); + break; + }, + } + } + + // Only copy inputs if it is actually needed + if new_inputs.is_empty() { + return None; + } + + new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| { + match i.replace_elementwise_const_columns(const_columns) { + None => i.clone(), + Some(new) => new, + } + })); + + let mut slf = self.clone(); + slf.inputs = new_inputs; + return Some(Arc::new(slf)); + } + + None + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.expr.to_field(input_schema, Context::Default) } diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index 29db6dcd643a..3696b0d9bf46 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -8,6 +8,7 @@ use crate::expressions::{ AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups, }; +#[derive(Clone)] pub struct BinaryExpr { left: Arc, op: Operator, @@ -265,6 +266,31 @@ impl PhysicalExpr for BinaryExpr { } } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.left.collect_live_columns(lv); + self.right.collect_live_columns(lv); + } + fn replace_elementwise_const_columns( + &self, + const_columns: &PlHashMap>, + ) -> Option> { + let rcc_left = self.left.replace_elementwise_const_columns(const_columns); + let rcc_right = self.right.replace_elementwise_const_columns(const_columns); + + if rcc_left.is_some() || rcc_right.is_some() { + let mut slf = self.clone(); + if let Some(left) = rcc_left { + slf.left = left; + } + if let Some(right) = rcc_right { + slf.right = right; + } + return Some(Arc::new(slf)); + } + + None + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.expr.to_field(input_schema, Context::Default) } diff --git a/crates/polars-expr/src/expressions/cast.rs b/crates/polars-expr/src/expressions/cast.rs index 1bc230ceab8f..623854d35b11 100644 --- a/crates/polars-expr/src/expressions/cast.rs +++ b/crates/polars-expr/src/expressions/cast.rs @@ -87,6 +87,10 @@ impl PhysicalExpr for CastExpr { Ok(ac) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.input.to_field(input_schema).map(|mut fld| { fld.coerce(self.dtype.clone()); diff --git a/crates/polars-expr/src/expressions/column.rs b/crates/polars-expr/src/expressions/column.rs index 99b5ba9fe262..4c730663b339 100644 --- a/crates/polars-expr/src/expressions/column.rs +++ b/crates/polars-expr/src/expressions/column.rs @@ -133,6 +133,7 @@ impl PhysicalExpr for ColumnExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let out = match self.schema.get_full(&self.name) { Some((idx, _, _)) => { @@ -178,6 +179,22 @@ impl PhysicalExpr for ColumnExpr { Some(self) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + lv.insert(self.name.clone()); + } + fn replace_elementwise_const_columns( + &self, + const_columns: &PlHashMap>, + ) -> Option> { + if let Some(av) = const_columns.get(&self.name) { + let lv = LiteralValue::from(av.clone()); + let le = LiteralExpr::new(lv, self.expr.clone()); + return Some(Arc::new(le)); + } + + None + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { input_schema.get_field(&self.name).ok_or_else(|| { polars_err!( diff --git a/crates/polars-expr/src/expressions/count.rs b/crates/polars-expr/src/expressions/count.rs index db25f0d9e73b..118334126ecf 100644 --- a/crates/polars-expr/src/expressions/count.rs +++ b/crates/polars-expr/src/expressions/count.rs @@ -36,6 +36,8 @@ impl PhysicalExpr for CountExpr { Ok(AggregationContext::new(c, Cow::Borrowed(groups), true)) } + fn collect_live_columns(&self, _lv: &mut PlIndexSet) {} + fn to_field(&self, _input_schema: &Schema) -> PolarsResult { Ok(Field::new(PlSmallStr::from_static(LEN), IDX_DTYPE)) } diff --git a/crates/polars-expr/src/expressions/filter.rs b/crates/polars-expr/src/expressions/filter.rs index f2b1383059ee..240e5a83be62 100644 --- a/crates/polars-expr/src/expressions/filter.rs +++ b/crates/polars-expr/src/expressions/filter.rs @@ -24,6 +24,7 @@ impl PhysicalExpr for FilterExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let s_f = || self.input.evaluate(df, state); let predicate_f = || self.by.evaluate(df, state); @@ -145,6 +146,11 @@ impl PhysicalExpr for FilterExpr { } } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + self.by.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.input.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/gather.rs b/crates/polars-expr/src/expressions/gather.rs index 5c0ccae4f2bc..e38b27aaeacc 100644 --- a/crates/polars-expr/src/expressions/gather.rs +++ b/crates/polars-expr/src/expressions/gather.rs @@ -18,6 +18,7 @@ impl PhysicalExpr for GatherExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series = self.phys_expr.evaluate(df, state)?; self.finish(df, state, series) @@ -88,6 +89,11 @@ impl PhysicalExpr for GatherExpr { Ok(ac) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.phys_expr.collect_live_columns(lv); + self.idx.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.phys_expr.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/literal.rs b/crates/polars-expr/src/expressions/literal.rs index 66a3e02e1834..8b152803bb64 100644 --- a/crates/polars-expr/src/expressions/literal.rs +++ b/crates/polars-expr/src/expressions/literal.rs @@ -121,6 +121,7 @@ impl PhysicalExpr for LiteralExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.1) } + fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult { self.as_column() } @@ -148,6 +149,8 @@ impl PhysicalExpr for LiteralExpr { Some(self) } + fn collect_live_columns(&self, _lv: &mut PlIndexSet) {} + fn to_field(&self, _input_schema: &Schema) -> PolarsResult { let dtype = self.0.get_datatype(); Ok(Field::new(PlSmallStr::from_static("literal"), dtype)) diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 70963dde7eec..c309991990ee 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -590,6 +590,22 @@ pub trait PhysicalExpr: Send + Sync { None } + /// Get the variables that are used in the expression i.e. live variables. + /// This can contain duplicates. + fn collect_live_columns(&self, lv: &mut PlIndexSet); + + /// Replace columns that are known to be a constant value with their const value. + /// + /// This should not replace values that are calculated non-elementwise e.g. col.max(), + /// col.std(), etc. + fn replace_elementwise_const_columns( + &self, + const_columns: &PlHashMap>, + ) -> Option> { + _ = const_columns; + None + } + /// Can take &dyn Statistics and determine of a file should be /// read -> `true` /// or not -> `false` @@ -630,8 +646,8 @@ impl PhysicalIoExpr for PhysicalIoHelper { .map(|c| c.take_materialized_series()) } - fn live_variables(&self) -> Option> { - Some(expr_to_leaf_column_names(self.expr.as_expression()?)) + fn collect_live_columns(&self, live_columns: &mut PlIndexSet) { + self.expr.collect_live_columns(live_columns); } #[cfg(feature = "parquet")] diff --git a/crates/polars-expr/src/expressions/rolling.rs b/crates/polars-expr/src/expressions/rolling.rs index 7e9897d7328c..2ec32069a30f 100644 --- a/crates/polars-expr/src/expressions/rolling.rs +++ b/crates/polars-expr/src/expressions/rolling.rs @@ -59,6 +59,10 @@ impl PhysicalExpr for RollingExpr { polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation"); } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.phys_function.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.function.to_field(input_schema, Context::Default) } diff --git a/crates/polars-expr/src/expressions/slice.rs b/crates/polars-expr/src/expressions/slice.rs index 72bb6376466c..62df859460f8 100644 --- a/crates/polars-expr/src/expressions/slice.rs +++ b/crates/polars-expr/src/expressions/slice.rs @@ -268,6 +268,12 @@ impl PhysicalExpr for SliceExpr { Ok(ac) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + self.offset.collect_live_columns(lv); + self.length.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.input.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/sort.rs b/crates/polars-expr/src/expressions/sort.rs index df816f9b48e7..746978b760a9 100644 --- a/crates/polars-expr/src/expressions/sort.rs +++ b/crates/polars-expr/src/expressions/sort.rs @@ -46,6 +46,7 @@ impl PhysicalExpr for SortExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series = self.physical_expr.evaluate(df, state)?; series.sort_with(self.options) @@ -104,6 +105,10 @@ impl PhysicalExpr for SortExpr { Ok(ac) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.physical_expr.collect_live_columns(lv); + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.physical_expr.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/sortby.rs b/crates/polars-expr/src/expressions/sortby.rs index ed34ed6414cd..3d9877038adc 100644 --- a/crates/polars-expr/src/expressions/sortby.rs +++ b/crates/polars-expr/src/expressions/sortby.rs @@ -201,6 +201,7 @@ impl PhysicalExpr for SortByExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series_f = || self.input.evaluate(df, state); if self.by.is_empty() { @@ -374,6 +375,13 @@ impl PhysicalExpr for SortByExpr { Ok(ac_in) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.input.collect_live_columns(lv); + for i in &self.by { + i.collect_live_columns(lv); + } + } + fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.input.to_field(input_schema) } diff --git a/crates/polars-expr/src/expressions/ternary.rs b/crates/polars-expr/src/expressions/ternary.rs index bbd0c5f7d936..e7ec666eda50 100644 --- a/crates/polars-expr/src/expressions/ternary.rs +++ b/crates/polars-expr/src/expressions/ternary.rs @@ -328,6 +328,12 @@ impl PhysicalExpr for TernaryExpr { Some(self) } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.predicate.collect_live_columns(lv); + self.truthy.collect_live_columns(lv); + self.falsy.collect_live_columns(lv); + } + fn is_scalar(&self) -> bool { self.returns_scalar } diff --git a/crates/polars-expr/src/expressions/window.rs b/crates/polars-expr/src/expressions/window.rs index bbb9a1cface1..d833278a12cb 100644 --- a/crates/polars-expr/src/expressions/window.rs +++ b/crates/polars-expr/src/expressions/window.rs @@ -641,6 +641,16 @@ impl PhysicalExpr for WindowExpr { false } + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + for i in &self.group_by { + i.collect_live_columns(lv); + } + if let Some((i, _)) = &self.order_by { + i.collect_live_columns(lv); + } + self.phys_function.collect_live_columns(lv); + } + #[allow(clippy::ptr_arg)] fn evaluate_on_groups<'a>( &self, diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index cc0020cc7857..75b9f51dc4ba 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -33,6 +33,8 @@ or set 'streaming'", pub use options::{ParallelStrategy, ParquetOptions}; use polars_error::{ErrString, PolarsError}; +pub use polars_parquet::arrow::read::infer_schema; +pub use polars_parquet::read::FileMetadata; pub use read_impl::{create_sorting_map, try_set_sorted_flag}; #[cfg(feature = "cloud")] pub use reader::ParquetAsyncReader; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 5f4f381a2e9b..3b931d5e80f4 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::collections::VecDeque; -use std::ops::{Deref, Range}; +use std::ops::Range; use arrow::array::BooleanArray; use arrow::bitmap::MutableBitmap; @@ -236,7 +236,9 @@ fn rg_to_dfs( if parallel == S::Prefiltered { if let Some(predicate) = predicate { - if let Some(live_variables) = predicate.live_variables() { + let mut live_columns = PlIndexSet::new(); + predicate.collect_live_columns(&mut live_columns); + if !live_columns.is_empty() { return rg_to_dfs_prefiltered( store, previous_row_count, @@ -244,7 +246,7 @@ fn rg_to_dfs( row_group_end, file_metadata, schema, - live_variables, + live_columns, predicate, row_index, projection, @@ -308,7 +310,7 @@ fn rg_to_dfs_prefiltered( row_group_end: usize, file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - live_variables: Vec, + live_columns: PlIndexSet, predicate: &dyn PhysicalIoExpr, row_index: Option, projection: &[usize], @@ -335,14 +337,8 @@ fn rg_to_dfs_prefiltered( .collect(), }; - // Deduplicate the live variables - let live_variables = live_variables - .iter() - .map(Deref::deref) - .collect::>(); - // Get the number of live columns - let num_live_columns = live_variables.len(); + let num_live_columns = live_columns.len(); let num_dead_columns = projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns; @@ -358,7 +354,7 @@ fn rg_to_dfs_prefiltered( for &i in projection.iter() { let name = schema.get_at_index(i).unwrap().0.as_str(); - if live_variables.contains(name) { + if live_columns.contains(name) { live_idx_to_col_idx.push(i); } else { dead_idx_to_col_idx.push(i); @@ -899,7 +895,9 @@ pub fn read_parquet( let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER"); let prefilter_env = prefilter_env.as_deref(); - let num_live_variables = predicate.live_variables().map_or(0, |v| v.len()); + let mut live_columns = PlIndexSet::new(); + predicate.collect_live_columns(&mut live_columns); + let num_live_variables = live_columns.len(); let mut do_prefilter = false; do_prefilter |= prefilter_env == Ok("1"); // Force enable diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 77872e708e40..75ebc922a4b9 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -9,7 +9,7 @@ pub trait PhysicalIoExpr: Send + Sync { /// Get the variables that are used in the expression i.e. live variables. /// This can contain duplicates. - fn live_variables(&self) -> Option>; + fn collect_live_columns(&self, live_columns: &mut PlIndexSet); /// Can take &dyn Statistics and determine of a file should be /// read -> `true` @@ -214,6 +214,16 @@ pub struct BatchStats { num_rows: Option, } +impl Default for BatchStats { + fn default() -> Self { + Self { + schema: Arc::new(Schema::default()), + stats: Vec::new(), + num_rows: None, + } + } +} + impl BatchStats { /// Constructs a new [`BatchStats`]. /// diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 0700f5f767e7..e36a8a7565a8 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -26,9 +26,8 @@ impl PhysicalIoExpr for Wrap { }; h.evaluate_io(df) } - fn live_variables(&self) -> Option> { - // @TODO: This should not unwrap - Some(expr_to_leaf_column_names(self.0.as_expression()?)) + fn collect_live_columns(&self, live_columns: &mut PlIndexSet) { + self.0.collect_live_columns(live_columns); } fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() diff --git a/crates/polars-mem-engine/src/executors/hive_scan.rs b/crates/polars-mem-engine/src/executors/hive_scan.rs new file mode 100644 index 000000000000..538ab10bbb9e --- /dev/null +++ b/crates/polars-mem-engine/src/executors/hive_scan.rs @@ -0,0 +1,535 @@ +use std::borrow::Cow; +use std::cell::OnceCell; + +use hive::HivePartitions; +use polars_core::config; +use polars_core::frame::column::ScalarColumn; +use polars_core::utils::{ + accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked, +}; +use polars_io::predicates::BatchStats; +use polars_io::RowIndex; + +use super::Executor; +#[cfg(feature = "csv")] +use crate::executors::CsvExec; +#[cfg(feature = "parquet")] +use crate::executors::ParquetExec; +use crate::prelude::*; + +pub trait IOFileMetadata: Send + Sync { + fn as_any(&self) -> &dyn std::any::Any; + fn num_rows(&self) -> PolarsResult; + fn schema(&self) -> PolarsResult; +} + +pub(super) struct BasicFileMetadata { + pub schema: Schema, + pub num_rows: IdxSize, +} + +impl IOFileMetadata for BasicFileMetadata { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn num_rows(&self) -> PolarsResult { + Ok(self.num_rows) + } + + fn schema(&self) -> PolarsResult { + Ok(self.schema.clone()) + } +} + +pub trait ScanExec { + fn read( + &mut self, + with_columns: Option>, + slice: Option<(usize, usize)>, + predicate: Option>, + row_index: Option, + metadata: Option>, + schema: Schema, + ) -> PolarsResult; + + fn metadata(&mut self) -> PolarsResult>; +} + +fn source_to_scan_exec( + source: ScanSourceRef, + scan_type: &FileScan, + file_info: &FileInfo, + file_options: &FileScanOptions, + metadata: Option<&dyn IOFileMetadata>, +) -> PolarsResult> { + let source = match source { + ScanSourceRef::Path(path) => ScanSources::Paths([path.to_path_buf()].into()), + ScanSourceRef::File(_) | ScanSourceRef::Buffer(_) => { + ScanSources::Buffers([source.to_memslice()?].into()) + }, + }; + + Ok(match scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { + options, + cloud_options, + .. + } => Box::new(ParquetExec::new( + source, + file_info.clone(), + None, + None, + options.clone(), + cloud_options.clone(), + file_options.clone(), + metadata.map(|md| { + md.as_any() + .downcast_ref::>() + .unwrap() + .clone() + }), + )) as _, + #[cfg(feature = "csv")] + FileScan::Csv { options, .. } => Box::new(CsvExec { + sources: source, + file_info: file_info.clone(), + options: options.clone(), + file_options: file_options.clone(), + predicate: None, + }), + _ => todo!(), + }) +} + +pub struct Source { + scan_exec: Box, + metadata: OnceCell>, +} + +impl Source { + fn new( + source: ScanSourceRef, + scan_type: &FileScan, + file_info: &FileInfo, + file_options: &FileScanOptions, + metadata: Option<&dyn IOFileMetadata>, + ) -> PolarsResult { + let scan_exec = source_to_scan_exec(source, scan_type, file_info, file_options, metadata)?; + Ok(Self { + scan_exec, + metadata: OnceCell::new(), + }) + } + + fn get_metadata(&mut self) -> PolarsResult<&dyn IOFileMetadata> { + match self.metadata.get() { + None => { + let metadata = self.scan_exec.metadata()?; + Ok(self.metadata.get_or_init(|| metadata).as_ref()) + }, + Some(metadata) => Ok(metadata.as_ref()), + } + } + + fn num_unfiltered_rows(&mut self) -> PolarsResult { + self.get_metadata()?.num_rows() + } + + fn schema(&mut self) -> PolarsResult { + self.get_metadata()?.schema() + } +} + +/// Scan over multiple sources and combine their results. +pub struct MultiScanExec { + sources: ScanSources, + file_info: FileInfo, + hive_parts: Option>>, + predicate: Option>, + file_options: FileScanOptions, + scan_type: FileScan, + + first_file_metadata: Option>, +} + +impl MultiScanExec { + pub fn new( + sources: ScanSources, + file_info: FileInfo, + hive_parts: Option>>, + predicate: Option>, + file_options: FileScanOptions, + mut scan_type: FileScan, + ) -> Self { + let first_file_metadata = match &mut scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { metadata, .. } => metadata.take().map(|md| Box::new(md) as _), + _ => None, + }; + + Self { + sources, + file_info, + hive_parts, + predicate, + file_options, + scan_type, + first_file_metadata, + } + } + + pub fn resolve_negative_slice( + &mut self, + offset: i64, + length: usize, + ) -> PolarsResult<(usize, usize)> { + // Walk the files in reverse until we find the first file, and then translate the + // slice into a positive-offset equivalent. + let mut offset_remaining = -offset as usize; + + for i in (0..self.sources.len()).rev() { + let source = self.sources.get(i).unwrap(); + let mut exec_source = Source::new( + source, + &self.scan_type, + &self.file_info, + &self.file_options, + self.first_file_metadata.as_deref().filter(|_| i == 0), + )?; + + let num_rows = exec_source.num_unfiltered_rows()? as usize; + + if num_rows >= offset_remaining { + return Ok((i, num_rows - offset_remaining)); + } + offset_remaining -= num_rows; + } + + Ok((0, length - offset_remaining)) + } + + pub fn read(&mut self) -> PolarsResult { + let include_file_paths = self.file_options.include_file_paths.take(); + let predicate = self.predicate.take(); + + // Create a index set of the hive columns. + let mut hive_column_set = PlIndexSet::default(); + if let Some(hive_parts) = &self.hive_parts { + assert_eq!(self.sources.len(), hive_parts.len()); + + if let Some(fst_hive_part) = hive_parts.first() { + hive_column_set.extend( + fst_hive_part + .get_statistics() + .column_stats() + .iter() + .map(|c| c.field_name().clone()), + ); + } + } + + // Look through the predicate and assess whether hive columns are being used in it. + let mut has_live_hive_columns = false; + if let Some(predicate) = &predicate { + let mut live_columns = PlIndexSet::new(); + predicate.collect_live_columns(&mut live_columns); + + for hive_column in &hive_column_set { + has_live_hive_columns |= live_columns.contains(hive_column); + } + } + + // Remove the hive columns for each file load. + let mut file_with_columns = self.file_options.with_columns.take(); + if self.hive_parts.is_some() { + if let Some(with_columns) = &self.file_options.with_columns { + file_with_columns = Some( + with_columns + .iter() + .filter(|&c| !hive_column_set.contains(c)) + .cloned() + .collect(), + ); + } + } + + let allow_missing_columns = self.file_options.allow_missing_columns; + self.file_options.allow_missing_columns = false; + let mut row_index = self.file_options.row_index.take(); + let slice = self.file_options.slice.take(); + + let current_schema = self.file_info.schema.clone(); + let output_schema = current_schema.clone(); + let mut missing_columns = Vec::new(); + + let mut first_slice_file = None; + let mut slice = match slice { + None => None, + Some((offset, length)) => Some({ + if offset >= 0 { + (offset as usize, length) + } else { + let (first_file, offset) = self.resolve_negative_slice(offset, length)?; + first_slice_file = Some(first_file); + (offset, length) + } + }), + }; + + let verbose = config::verbose(); + let mut dfs = Vec::with_capacity(self.sources.len()); + + let mut const_columns = PlHashMap::new(); + + // @TODO: This should be moved outside of the FileScan::Parquet + let use_statistics = match &self.scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { options, .. } => options.use_statistics, + _ => true, + }; + + for (i, source) in self.sources.iter().enumerate() { + let hive_part = self.hive_parts.as_ref().and_then(|h| h.get(i)); + if slice.is_some_and(|s| s.1 == 0) { + break; + } + + let mut exec_source = Source::new( + source, + &self.scan_type, + &self.file_info, + &self.file_options, + self.first_file_metadata.as_deref().filter(|_| i == 0), + )?; + + if verbose { + eprintln!( + "Multi-file / Hive read: currently reading '{}'", + source.to_include_path_name() + ); + } + + // @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate + let mut schema = exec_source.schema()?; + let mut extra_columns = Vec::new(); + + if let Some(file_with_columns) = &file_with_columns { + if allow_missing_columns { + schema = schema.try_project( + file_with_columns + .iter() + .filter(|c| schema.contains(c.as_str())), + )?; + } else { + schema = schema.try_project(file_with_columns.iter())?; + } + } + + if allow_missing_columns { + missing_columns.clear(); + extra_columns.clear(); + + current_schema.as_ref().field_compare( + &schema, + &mut missing_columns, + &mut extra_columns, + ); + + if !extra_columns.is_empty() { + // @TODO: Better error + polars_bail!(InvalidOperation: "More schema in file after first"); + } + } + + // Insert the hive partition values into the predicate. This allows the predicate + // to function even when there is a combination of hive and non-hive columns being + // used. + let mut file_predicate = predicate.clone(); + if has_live_hive_columns { + let hive_part = hive_part.unwrap(); + let predicate = predicate.as_ref().unwrap(); + const_columns.clear(); + for (idx, column) in hive_column_set.iter().enumerate() { + let value = hive_part.get_statistics().column_stats()[idx] + .to_min() + .unwrap() + .get(0) + .unwrap() + .into_static(); + const_columns.insert(column.clone(), value); + } + for (_, (missing_column, _)) in &missing_columns { + const_columns.insert((*missing_column).clone(), AnyValue::Null); + } + + file_predicate = predicate.replace_elementwise_const_columns(&const_columns); + + // @TODO: Set predicate to `None` if it's constant evaluated to true. + + // At this point the file_predicate should not contain any references to the + // hive columns anymore. + // + // Note that, replace_elementwise_const_columns does not actually guarantee the + // replacement of all reference to the const columns. But any expression which + // does not guarantee this should not be pushed down as an IO predicate. + if cfg!(debug_assertions) { + let mut live_columns = PlIndexSet::new(); + file_predicate + .as_ref() + .unwrap() + .collect_live_columns(&mut live_columns); + for hive_column in hive_part.get_statistics().column_stats() { + assert!( + !live_columns.contains(hive_column.field_name()), + "Predicate still contains hive column" + ); + } + } + } + + let mut do_skip_file = false; + if let Some(slice) = &slice { + let allow_slice_skip = match first_slice_file { + None => slice.0 as IdxSize >= exec_source.num_unfiltered_rows()?, + Some(f) => i < f, + }; + + if allow_slice_skip && verbose { + eprintln!( + "Slice allows skipping of '{}'", + source.to_include_path_name() + ); + } + do_skip_file |= allow_slice_skip; + } + + let stats_evaluator = file_predicate.as_ref().and_then(|p| p.as_stats_evaluator()); + let stats_evaluator = stats_evaluator.filter(|_| use_statistics); + + if let Some(stats_evaluator) = stats_evaluator { + let allow_predicate_skip = !stats_evaluator + .should_read(&BatchStats::default()) + .unwrap_or(true); + if allow_predicate_skip && verbose { + eprintln!( + "File statistics allows skipping of '{}'", + source.to_include_path_name() + ); + } + do_skip_file |= allow_predicate_skip; + } + + if do_skip_file { + // Update the row_index to the proper offset. + if let Some(row_index) = row_index.as_mut() { + row_index.offset += exec_source.num_unfiltered_rows()?; + } + // Update the slice offset. + if let Some(slice) = slice.as_mut() { + if first_slice_file.is_none_or(|f| i >= f) { + slice.0 = slice + .0 + .saturating_sub(exec_source.num_unfiltered_rows()? as usize); + } + } + + continue; + } + + let with_columns = if allow_missing_columns { + file_with_columns + .as_ref() + .map(|_| schema.iter_names().cloned().collect()) + } else { + file_with_columns.clone() + }; + + // Read the DataFrame and needed metadata. + let num_unfiltered_rows = exec_source.num_unfiltered_rows()?; + let mut df = exec_source.scan_exec.read( + with_columns, + slice, + file_predicate, + row_index.clone(), + exec_source.metadata.take(), + schema, + )?; + + // Update the row_index to the proper offset. + if let Some(row_index) = row_index.as_mut() { + row_index.offset += num_unfiltered_rows; + } + // Update the slice. + if let Some(slice) = slice.as_mut() { + if first_slice_file.is_none_or(|f| i >= f) { + slice.1 = slice + .1 + .saturating_sub(num_unfiltered_rows as usize - slice.0); + slice.0 = slice.0.saturating_sub(num_unfiltered_rows as usize); + } + } + + // Add all the missing columns. + if allow_missing_columns && !missing_columns.is_empty() { + for (_, (name, field)) in &missing_columns { + df.with_column(Column::full_null((*name).clone(), df.height(), field))?; + } + } + // Materialize the hive columns and add them back in. + if let Some(hive_part) = hive_part { + for hive_col in hive_part.get_statistics().column_stats() { + df.with_column( + ScalarColumn::from_single_value_series( + hive_col + .to_min() + .unwrap() + .clone() + .with_name(hive_col.field_name().clone()), + df.height(), + ) + .into_column(), + )?; + } + } + // Add the `include_file_paths` column + if let Some(include_file_paths) = &include_file_paths { + df.with_column(ScalarColumn::new( + include_file_paths.clone(), + PlSmallStr::from_str(source.to_include_path_name()).into(), + df.height(), + ))?; + } + + // Project to ensure that all DataFrames have the proper order. + df = df.select(output_schema.iter_names().cloned())?; + dfs.push(df); + } + + let out = if cfg!(debug_assertions) { + accumulate_dataframes_vertical(dfs)? + } else { + accumulate_dataframes_vertical_unchecked(dfs) + }; + + Ok(out) + } +} + +impl Executor for MultiScanExec { + fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { + let profile_name = if state.has_node_timer() { + let mut ids = vec![self.sources.id()]; + if self.predicate.is_some() { + ids.push("predicate".into()) + } + let name = comma_delimited("hive".to_string(), &ids); + Cow::Owned(name) + } else { + Cow::Borrowed("") + }; + + state.record(|| self.read(), profile_name) + } +} diff --git a/crates/polars-mem-engine/src/executors/mod.rs b/crates/polars-mem-engine/src/executors/mod.rs index 5c9d093d986a..7dc6ed65e545 100644 --- a/crates/polars-mem-engine/src/executors/mod.rs +++ b/crates/polars-mem-engine/src/executors/mod.rs @@ -7,6 +7,7 @@ mod group_by_dynamic; mod group_by_partitioned; pub(super) mod group_by_rolling; mod hconcat; +mod hive_scan; mod join; mod projection; mod projection_simple; @@ -38,6 +39,7 @@ pub(super) use self::group_by_partitioned::*; #[cfg(feature = "dynamic_group_by")] pub(super) use self::group_by_rolling::GroupByRollingExec; pub(super) use self::hconcat::*; +pub(super) use self::hive_scan::*; pub(super) use self::join::*; pub(super) use self::projection::*; pub(super) use self::projection_simple::*; diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 6f4448667130..6caf40019c8d 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -17,7 +17,7 @@ pub struct CsvExec { } impl CsvExec { - fn read(&self) -> PolarsResult { + fn read_impl(&self) -> PolarsResult { let with_columns = self .file_options .with_columns @@ -209,6 +209,72 @@ impl CsvExec { } } +impl ScanExec for CsvExec { + fn read( + &mut self, + with_columns: Option>, + slice: Option<(usize, usize)>, + predicate: Option>, + row_index: Option, + metadata: Option>, + schema: Schema, + ) -> PolarsResult { + self.file_options.with_columns = with_columns; + self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); + self.predicate = predicate; + self.file_options.row_index = row_index; + + let schema = Arc::new(schema); + self.file_info.reader_schema = Some(arrow::Either::Right(schema.clone())); + self.file_info.schema = schema.clone(); + + self.options.schema.take(); + // self.options.schema_overwrite.take(); + + // Use the metadata somehow + _ = metadata; + + self.read_impl() + } + + fn metadata(&mut self) -> PolarsResult> { + let force_async = config::force_async(); + let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url(); + + let source = self.sources.at(0); + let owned = &mut vec![]; + + let memslice = source.to_memslice_async_assume_latest(run_async)?; + + let popt = self.options.parse_options.as_ref(); + + let bytes = maybe_decompress_bytes(&memslice, owned)?; + let num_rows = count_rows_from_slice( + bytes, + popt.separator, + popt.quote_char, + popt.comment_prefix.as_ref(), + popt.eol_char, + self.options.has_header, + )? as IdxSize; + let schema = infer_file_schema( + &get_reader_bytes(&mut std::io::Cursor::new(bytes))?, + self.options.parse_options.as_ref(), + self.options.infer_schema_length, + self.options.has_header, + self.options.schema_overwrite.as_deref(), + self.options.skip_rows, + self.options.skip_lines, + self.options.skip_rows_after_header, + self.options.raise_if_empty, + &mut self.options.n_threads, + )? + .0; + + Ok(Box::new(BasicFileMetadata { schema, num_rows }) as _) + } +} + impl Executor for CsvExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let profile_name = if state.has_node_timer() { @@ -222,6 +288,6 @@ impl Executor for CsvExec { Cow::Borrowed("") }; - state.record(|| self.read(), profile_name) + state.record(|| self.read_impl(), profile_name) } } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 61a89693cfe0..25595ff5d998 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -14,7 +14,9 @@ use super::*; pub struct ParquetExec { sources: ScanSources, file_info: FileInfo, + hive_parts: Option>>, + predicate: Option>, options: ParquetOptions, #[allow(dead_code)] @@ -39,7 +41,9 @@ impl ParquetExec { ParquetExec { sources, file_info, + hive_parts, + predicate, options, cloud_options, @@ -473,7 +477,7 @@ impl ParquetExec { Ok(result) } - fn read(&mut self) -> PolarsResult { + fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)> { // FIXME: The row index implementation is incorrect when a predicate is // applied. This code mitigates that by applying the predicate after the // collection of the entire dataframe if a row index is requested. This is @@ -502,12 +506,107 @@ impl ParquetExec { let mut out = accumulate_dataframes_vertical(out)?; + let num_unfiltered_rows = out.height() as IdxSize; + polars_io::predicates::apply_predicate(&mut out, post_predicate.as_deref(), true)?; if self.file_options.rechunk { out.as_single_chunk_par(); } - Ok(out) + Ok((num_unfiltered_rows, out)) + } + + fn metadata_sync(&mut self) -> PolarsResult> { + Ok(Box::new(match &self.metadata { + None => { + let memslice = self.sources.get(0).unwrap().to_memslice()?; + ParquetReader::new(std::io::Cursor::new(memslice)) + .get_metadata()? + .clone() + }, + Some(md) => md.clone(), + }) as _) + } + + #[cfg(feature = "cloud")] + async fn metadata_async(&mut self) -> PolarsResult> { + let ScanSourceRef::Path(path) = self.sources.get(0).unwrap() else { + unreachable!(); + }; + + Ok(Box::new(match &self.metadata { + None => { + let mut reader = ParquetAsyncReader::from_uri( + path.to_str().unwrap(), + self.cloud_options.as_ref(), + None, + ) + .await?; + + reader.get_metadata().await?.clone() + }, + Some(md) => md.clone(), + }) as _) + } +} + +impl IOFileMetadata for Arc { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn num_rows(&self) -> PolarsResult { + Ok(self.num_rows as IdxSize) + } + + fn schema(&self) -> PolarsResult { + let arrow_schema = polars_io::parquet::read::infer_schema(self)?; + Ok(Schema::from_iter(arrow_schema.iter().map( + |(name, field)| (name.clone(), DataType::from_arrow_field(field)), + ))) + } +} + +impl ScanExec for ParquetExec { + fn read( + &mut self, + with_columns: Option>, + slice: Option<(usize, usize)>, + predicate: Option>, + row_index: Option, + metadata: Option>, + schema: Schema, + ) -> PolarsResult { + self.file_options.with_columns = with_columns; + self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); + self.predicate = predicate; + self.file_options.row_index = row_index; + + self.file_info.reader_schema = Some(arrow::Either::Left(Arc::new( + schema.to_arrow(CompatLevel::newest()), + ))); + self.file_info.schema = Arc::new(schema); + if let Some(metadata) = metadata { + self.metadata = Some( + metadata + .as_any() + .downcast_ref::>() + .unwrap() + .clone(), + ); + } + + self.read_with_num_unfiltered_rows().map(|(_, df)| df) + } + + fn metadata(&mut self) -> PolarsResult> { + #[cfg(feature = "cloud")] + if self.sources.is_cloud_url() { + return polars_io::pl_async::get_runtime() + .block_on_potential_spawn(self.metadata_async()); + } + + self.metadata_sync() } } @@ -524,6 +623,9 @@ impl Executor for ParquetExec { Cow::Borrowed("") }; - state.record(|| self.read(), profile_name) + state.record( + || self.read_with_num_unfiltered_rows().map(|(_, df)| df), + profile_name, + ) } } diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 25ad3eb48293..e473ea413794 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -300,15 +300,30 @@ fn create_physical_plan_impl( }) .map_or(Ok(None), |v| v.map(Some))?; - match scan_type { + match scan_type.clone() { #[cfg(feature = "csv")] - FileScan::Csv { options, .. } => Ok(Box::new(executors::CsvExec { - sources, - file_info, - options, - predicate, - file_options, - })), + FileScan::Csv { options, .. } => { + if sources.len() > 1 + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") + { + Ok(Box::new(executors::MultiScanExec::new( + sources, + file_info, + hive_parts, + predicate, + file_options, + scan_type, + ))) + } else { + Ok(Box::new(executors::CsvExec { + sources, + file_info, + options, + predicate, + file_options, + })) + } + }, #[cfg(feature = "ipc")] FileScan::Ipc { options, @@ -328,16 +343,31 @@ fn create_physical_plan_impl( options, cloud_options, metadata, - } => Ok(Box::new(executors::ParquetExec::new( - sources, - file_info, - hive_parts, - predicate, - options, - cloud_options, - file_options, - metadata, - ))), + } => { + if sources.len() > 1 + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") + { + Ok(Box::new(executors::MultiScanExec::new( + sources, + file_info, + hive_parts, + predicate, + file_options, + scan_type, + ))) + } else { + Ok(Box::new(executors::ParquetExec::new( + sources, + file_info, + hive_parts, + predicate, + options, + cloud_options, + file_options, + metadata, + ))) + } + }, #[cfg(feature = "json")] FileScan::NDJson { options, .. } => Ok(Box::new(executors::JsonExec::new( sources, diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 14ce4836096c..d00094fae4b6 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use polars_core::datatypes::Field; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; -use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE}; +use polars_core::prelude::{DataType, PlIndexSet, SchemaRef, Series, IDX_DTYPE}; use polars_core::schema::Schema; use polars_expr::state::ExecutionState; use polars_io::predicates::PhysicalIoExpr; @@ -32,9 +32,7 @@ impl PhysicalIoExpr for Len { unimplemented!() } - fn live_variables(&self) -> Option> { - Some(vec![]) - } + fn collect_live_columns(&self, _live_columns: &mut PlIndexSet) {} } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 9775518fffb6..a195e1f15766 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -137,8 +137,11 @@ where self.p.evaluate_io(df) } - fn live_variables(&self) -> Option> { - None + fn collect_live_columns( + &self, + live_columns: &mut PlIndexSet, + ) { + self.p.collect_live_columns(live_columns); } fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { diff --git a/crates/polars-plan/src/plans/lit.rs b/crates/polars-plan/src/plans/lit.rs index 3e18f44703fb..8c6aab394140 100644 --- a/crates/polars-plan/src/plans/lit.rs +++ b/crates/polars-plan/src/plans/lit.rs @@ -233,7 +233,7 @@ impl LiteralValue { } } - pub(crate) fn new_idxsize(value: IdxSize) -> Self { + pub fn new_idxsize(value: IdxSize) -> Self { #[cfg(feature = "bigidx")] { LiteralValue::UInt64(value) diff --git a/crates/polars-schema/src/schema.rs b/crates/polars-schema/src/schema.rs index d29113635de8..1e63fbc78b5f 100644 --- a/crates/polars-schema/src/schema.rs +++ b/crates/polars-schema/src/schema.rs @@ -285,6 +285,26 @@ impl Schema { Ok(i) } + + /// Compare the fields between two schema returning the additional columns that each schema has. + pub fn field_compare<'a, 'b>( + &'a self, + other: &'b Self, + self_extra: &mut Vec<(usize, (&'a PlSmallStr, &'a D))>, + other_extra: &mut Vec<(usize, (&'b PlSmallStr, &'b D))>, + ) { + self_extra.extend( + self.iter() + .enumerate() + .filter(|(_, (n, _))| !other.contains(n)), + ); + other_extra.extend( + other + .iter() + .enumerate() + .filter(|(_, (n, _))| !self.contains(n)), + ); + } } impl Schema diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 5b344644199e..2c45fd0cc5d4 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::stream::FuturesUnordered; use futures::StreamExt; use polars_core::frame::DataFrame; +use polars_core::prelude::PlIndexSet; use polars_error::PolarsResult; use polars_io::prelude::ParallelStrategy; use polars_io::prelude::_internal::PrefilterMaskSetting; @@ -264,25 +265,26 @@ impl ParquetSourceNode { ); let predicate_arrow_field_indices = if use_prefiltered { - let v = physical_predicate + let mut live_columns = PlIndexSet::default(); + physical_predicate .as_ref() .unwrap() - .live_variables() - .and_then(|x| { - let mut out = x + .collect_live_columns(&mut live_columns); + let v = (!live_columns.is_empty()) + .then(|| { + let out = live_columns .iter() // Can be `None` - if the column is e.g. a hive column, or the row index column. .filter_map(|x| projected_arrow_schema.index_of(x)) .collect::>(); - out.sort_unstable(); - out.dedup(); // There is at least one non-predicate column, or pre-filtering was // explicitly requested (only useful for testing). (out.len() < projected_arrow_schema.len() || matches!(self.options.parallel, ParallelStrategy::Prefiltered)) .then_some(out) - }); + }) + .flatten(); use_prefiltered &= v.is_some();