Skip to content

Commit

Permalink
add hive predicate evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Dec 7, 2024
1 parent 4aa65e4 commit 341493d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
31 changes: 21 additions & 10 deletions crates/polars-mem-engine/src/executors/hive_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<T: Copy, E, F: FnMut() -> Result<T, E>> LazyTryCell<T, E, F> {

match (self.f)() {
Ok(v) => {
self.evaluated.insert(v);
self.evaluated = Some(v);
self.get()
},
Err(e) => Err(e),
Expand All @@ -44,11 +44,8 @@ impl<T: Copy, E, F: FnMut() -> Result<T, E>> LazyTryCell<T, E, F> {
}

pub trait ScanExec {
fn read(&mut self) -> PolarsResult<DataFrame> {
self.read_with_num_unfiltered_rows().map(|(_, df)| df)
}
fn read(&mut self) -> PolarsResult<DataFrame>;
fn num_unfiltered_rows(&mut self) -> PolarsResult<IdxSize>;
fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)>;
}

pub struct HiveExec {
Expand Down Expand Up @@ -96,6 +93,8 @@ impl Executor for HiveExec {
state.record(
|| {
let include_file_paths = self.file_options.include_file_paths.take();
let predicate = self.predicate.take();
let stats_evaluator = predicate.as_ref().and_then(|p| p.as_stats_evaluator());
let mut row_index = self.file_options.row_index.take();
let mut slice = self.file_options.slice.take();

Expand All @@ -115,6 +114,8 @@ impl Executor for HiveExec {
todo!()
};

let stats_evaluator = stats_evaluator.filter(|_| options.use_statistics);

let mut dfs = Vec::with_capacity(self.sources.len());

for (source, hive_part) in self.sources.iter().zip(self.hive_parts.iter()) {
Expand All @@ -139,7 +140,7 @@ impl Executor for HiveExec {
part_source,
self.file_info.clone(),
None,
self.predicate.clone(),
None, // @TODO: add predicate with hive columns replaced
options.clone(),
cloud_options.clone(),
file_options,
Expand All @@ -153,14 +154,22 @@ impl Executor for HiveExec {
if let Some(slice) = &slice {
do_skip_file |= slice.0 >= num_unfiltered_rows.get()? as i64;
}
// @TODO: Skipping based on the predicate
if let Some(stats_evaluator) = stats_evaluator {
do_skip_file |= !stats_evaluator
.should_read(hive_part.get_statistics())
.unwrap_or(true);
}

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}

if do_skip_file {
// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}

// Update the slice offset.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows.get()? as i64);
Expand All @@ -169,13 +178,15 @@ impl Executor for HiveExec {
continue;
}

let (num_unfiltered_rows, mut df) = exec.read_with_num_unfiltered_rows()?;
// Read the DataFrame and needed metadata.
// @TODO: these should be merged into one call
let num_unfiltered_rows = num_unfiltered_rows.get()?;
let mut df = exec.read()?;

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows;
}

// Update the slice.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows as i64);
Expand Down
18 changes: 11 additions & 7 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,6 @@ impl ParquetExec {

Ok(result)
}
}

impl ScanExec for ParquetExec {
fn num_unfiltered_rows(&mut self) -> PolarsResult<IdxSize> {
// @FIXME! This is extremely inefficient.
self.read_with_num_unfiltered_rows().map(|(n, _)| n)
}

fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)> {
// FIXME: The row index implementation is incorrect when a predicate is
Expand Down Expand Up @@ -520,6 +513,17 @@ impl ScanExec for ParquetExec {
}
}

impl ScanExec for ParquetExec {
fn num_unfiltered_rows(&mut self) -> PolarsResult<IdxSize> {
// @FIXME! This is extremely inefficient.
self.read_with_num_unfiltered_rows().map(|(n, _)| n)
}

fn read(&mut self) -> PolarsResult<DataFrame> {
self.read_with_num_unfiltered_rows().map(|(_, df)| df)
}
}

impl Executor for ParquetExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let profile_name = if state.has_node_timer() {
Expand Down

0 comments on commit 341493d

Please sign in to comment.