From 53a3590da66b837803af2a92b84c240c7854a1bd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 30 Sep 2024 10:12:35 +0200 Subject: [PATCH] fix: Fix projection pushdown bug in IEJOINS (#19015) --- crates/polars-lazy/src/frame/mod.rs | 2 +- crates/polars-lazy/src/tests/mod.rs | 18 ------------------ crates/polars-ops/src/frame/join/args.rs | 11 +++++++++++ crates/polars-plan/src/plans/ir/format.rs | 12 ++++++++++++ .../optimizer/projection_pushdown/joins.rs | 16 +++++++++++++--- .../unit/operations/test_inequality_join.py | 19 +++++++++++++++++++ 6 files changed, 56 insertions(+), 22 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 54163fe33544..99ddd979d65b 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -2091,7 +2091,7 @@ impl JoinBuilder { /// Finish builder pub fn finish(self) -> LazyFrame { let mut opt_state = self.lf.opt_state; - let other = self.other.expect("with not set"); + let other = self.other.expect("'with' not set in join builder"); // If any of the nodes reads from files we must activate this plan as well. if other.opt_state.contains(OptFlags::FILE_CACHING) { diff --git a/crates/polars-lazy/src/tests/mod.rs b/crates/polars-lazy/src/tests/mod.rs index f4ba3e876a65..a30203b24fe6 100644 --- a/crates/polars-lazy/src/tests/mod.rs +++ b/crates/polars-lazy/src/tests/mod.rs @@ -186,21 +186,3 @@ pub(crate) fn get_df() -> DataFrame { .finish() .unwrap() } - -#[test] -fn test_foo() -> PolarsResult<()> { - let df = df![ - "A" => [1], - "B" => [1], - ]?; - - let q = df.lazy(); - - let out = q - .group_by([col("A")]) - .agg([cols(["A", "B"]).name().prefix("_agg")]) - .explain(false)?; - - println!("{out}"); - Ok(()) -} diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index 10eee5d765df..b4d347170cbb 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -171,6 +171,17 @@ impl JoinType { false } } + + pub fn is_ie(&self) -> bool { + #[cfg(feature = "iejoin")] + { + matches!(self, JoinType::IEJoin(_)) + } + #[cfg(not(feature = "iejoin"))] + { + false + } + } } #[derive(Copy, Clone, PartialEq, Eq, Default, Hash)] diff --git a/crates/polars-plan/src/plans/ir/format.rs b/crates/polars-plan/src/plans/ir/format.rs index 76de9f3beb24..c461b525cb9a 100644 --- a/crates/polars-plan/src/plans/ir/format.rs +++ b/crates/polars-plan/src/plans/ir/format.rs @@ -452,6 +452,12 @@ impl<'a, T: AsExpr> Display for ExprIRSliceDisplay<'a, T> { } } +impl<'a, T: AsExpr> fmt::Debug for ExprIRSliceDisplay<'a, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + impl<'a> Display for ExprIRDisplay<'a> { #[recursive] fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -657,6 +663,12 @@ impl<'a> Display for ExprIRDisplay<'a> { } } +impl<'a> fmt::Debug for ExprIRDisplay<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + pub(crate) struct ColumnsDisplay<'a>(pub(crate) &'a Schema); impl fmt::Display for ColumnsDisplay<'_> { diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs index 6eb8bc033015..f8ee048c2464 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs @@ -10,11 +10,11 @@ fn add_keys_to_accumulated_state( local_projection: &mut Vec, projected_names: &mut PlHashSet, expr_arena: &mut Arena, - // only for left hand side table we add local names + // Only for left hand side table we add local names. add_local: bool, ) -> Option { add_expr_to_accumulated(expr, acc_projections, projected_names, expr_arena); - // the projections may do more than simply project. + // The projections may do more than simply project. // e.g. col("foo").truncate() * col("bar") // that means we don't want to execute the projection as that is already done by // the JOIN executor @@ -234,7 +234,7 @@ pub(super) fn process_join( let mut names_right = PlHashSet::with_capacity(n); let mut local_projection = Vec::with_capacity(n); - // if there are no projections we don't have to do anything (all columns are projected) + // If there are no projections we don't have to do anything (all columns are projected) // otherwise we build local projections to sort out proper column names due to the // join operation // @@ -253,6 +253,16 @@ pub(super) fn process_join( // We need the join columns so we push the projection downwards for e in &left_on { if !local_projected_names.insert(e.output_name().clone()) { + // A join can have multiple leaf names, so we must still ensure all leaf names are projected. + if options.args.how.is_ie() { + add_expr_to_accumulated( + e.node(), + &mut pushdown_left, + &mut names_left, + expr_arena, + ); + } + continue; } diff --git a/py-polars/tests/unit/operations/test_inequality_join.py b/py-polars/tests/unit/operations/test_inequality_join.py index 7492cf4a05be..89a81a4c0923 100644 --- a/py-polars/tests/unit/operations/test_inequality_join.py +++ b/py-polars/tests/unit/operations/test_inequality_join.py @@ -548,3 +548,22 @@ def test_single_inequality_with_slice(offset: int, length: int) -> None: expected_rows = set(expected_full.iter_rows()) for row in actual.iter_rows(): assert row in expected_rows, f"{row} not in expected rows" + + +def test_ie_join_projection_pd_19005() -> None: + lf = pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).with_row_index() + q = ( + lf.join_where( + lf, + pl.col.index < pl.col.index_right, + pl.col.index.cast(pl.Int64) + pl.col.a > pl.col.a_right, + ) + .group_by(pl.col.index) + .agg(pl.col.index_right) + ) + + out = q.collect() + assert out.schema == pl.Schema( + [("index", pl.get_index_type()), ("index_right", pl.List(pl.get_index_type()))] + ) + assert out.shape == (0, 2)