diff --git a/crates/polars-mem-engine/src/executors/hive_scan.rs b/crates/polars-mem-engine/src/executors/hive_scan.rs index b2879ec26473..c3a348f466ff 100644 --- a/crates/polars-mem-engine/src/executors/hive_scan.rs +++ b/crates/polars-mem-engine/src/executors/hive_scan.rs @@ -35,7 +35,7 @@ impl Result> LazyTryCell { match (self.f)() { Ok(v) => { - self.evaluated.insert(v); + self.evaluated = Some(v); self.get() }, Err(e) => Err(e), @@ -44,11 +44,8 @@ impl Result> LazyTryCell { } pub trait ScanExec { - fn read(&mut self) -> PolarsResult { - self.read_with_num_unfiltered_rows().map(|(_, df)| df) - } + fn read(&mut self) -> PolarsResult; fn num_unfiltered_rows(&mut self) -> PolarsResult; - fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)>; } pub struct HiveExec { @@ -96,6 +93,8 @@ impl Executor for HiveExec { state.record( || { let include_file_paths = self.file_options.include_file_paths.take(); + let predicate = self.predicate.take(); + let stats_evaluator = predicate.as_ref().and_then(|p| p.as_stats_evaluator()); let mut row_index = self.file_options.row_index.take(); let mut slice = self.file_options.slice.take(); @@ -115,6 +114,8 @@ impl Executor for HiveExec { todo!() }; + let stats_evaluator = stats_evaluator.filter(|_| options.use_statistics); + let mut dfs = Vec::with_capacity(self.sources.len()); for (source, hive_part) in self.sources.iter().zip(self.hive_parts.iter()) { @@ -139,7 +140,7 @@ impl Executor for HiveExec { part_source, self.file_info.clone(), None, - self.predicate.clone(), + None, // @TODO: add predicate with hive columns replaced options.clone(), cloud_options.clone(), file_options, @@ -153,14 +154,22 @@ impl Executor for HiveExec { if let Some(slice) = &slice { do_skip_file |= slice.0 >= num_unfiltered_rows.get()? as i64; } - // @TODO: Skipping based on the predicate + if let Some(stats_evaluator) = stats_evaluator { + do_skip_file |= !stats_evaluator + .should_read(hive_part.get_statistics()) + .unwrap_or(true); + } + + // Update the row_index to the proper offset. + if let Some(row_index) = row_index.as_mut() { + row_index.offset += num_unfiltered_rows.get()?; + } if do_skip_file { // Update the row_index to the proper offset. if let Some(row_index) = row_index.as_mut() { row_index.offset += num_unfiltered_rows.get()?; } - // Update the slice offset. if let Some(slice) = slice.as_mut() { slice.0 = slice.0.saturating_sub(num_unfiltered_rows.get()? as i64); @@ -169,13 +178,15 @@ impl Executor for HiveExec { continue; } - let (num_unfiltered_rows, mut df) = exec.read_with_num_unfiltered_rows()?; + // Read the DataFrame and needed metadata. + // @TODO: these should be merged into one call + let num_unfiltered_rows = num_unfiltered_rows.get()?; + let mut df = exec.read()?; // Update the row_index to the proper offset. if let Some(row_index) = row_index.as_mut() { row_index.offset += num_unfiltered_rows; } - // Update the slice. if let Some(slice) = slice.as_mut() { slice.0 = slice.0.saturating_sub(num_unfiltered_rows as i64); diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 93dea4f2cd9e..34153b56a96c 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -472,13 +472,6 @@ impl ParquetExec { Ok(result) } -} - -impl ScanExec for ParquetExec { - fn num_unfiltered_rows(&mut self) -> PolarsResult { - // @FIXME! This is extremely inefficient. - self.read_with_num_unfiltered_rows().map(|(n, _)| n) - } fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)> { // FIXME: The row index implementation is incorrect when a predicate is @@ -520,6 +513,17 @@ impl ScanExec for ParquetExec { } } +impl ScanExec for ParquetExec { + fn num_unfiltered_rows(&mut self) -> PolarsResult { + // @FIXME! This is extremely inefficient. + self.read_with_num_unfiltered_rows().map(|(n, _)| n) + } + + fn read(&mut self) -> PolarsResult { + self.read_with_num_unfiltered_rows().map(|(_, df)| df) + } +} + impl Executor for ParquetExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let profile_name = if state.has_node_timer() {