From 43a16c9c24b7e5b7aa47555a984775997ab0950d Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:58:41 +0300 Subject: [PATCH 01/25] Add value tracking to ConstExpr for improved union optimization --- .../physical-expr/src/equivalence/class.rs | 17 ++- .../src/equivalence/properties.rs | 114 ++++++++++++------ 2 files changed, 96 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index cc26d12fb029..0ce19eae3a65 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -24,7 +24,7 @@ use std::fmt::Display; use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::JoinType; +use datafusion_common::{JoinType, ScalarValue}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use indexmap::{IndexMap, IndexSet}; @@ -62,6 +62,7 @@ pub struct ConstExpr { /// Does the constant have the same value across all partitions? See /// struct docs for more details across_partitions: bool, + value: Option } impl PartialEq for ConstExpr { @@ -80,9 +81,15 @@ impl ConstExpr { expr, // By default, assume constant expressions are not same across partitions. across_partitions: false, + value: None, } } + pub fn with_value(mut self, value: ScalarValue) -> Self { + self.value = Some(value); + self + } + /// Set the `across_partitions` flag /// /// See struct docs for more details @@ -106,6 +113,10 @@ impl ConstExpr { self.expr } + pub fn value(&self) -> Option<&ScalarValue> { + self.value.as_ref() + } + pub fn map(&self, f: F) -> Option where F: Fn(&Arc) -> Option>, @@ -114,6 +125,7 @@ impl ConstExpr { maybe_expr.map(|expr| Self { expr, across_partitions: self.across_partitions, + value: self.value.clone(), }) } @@ -152,6 +164,9 @@ impl Display for ConstExpr { if self.across_partitions { write!(f, "(across_partitions)")?; } + if let Some(value) = self.value.as_ref() { + write!(f, "({})", value)?; + } Ok(()) } } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index fe866450b2b2..ce9ce3e30ec2 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -293,30 +293,34 @@ impl EquivalenceProperties { mut self, constants: impl IntoIterator, ) -> Self { - let (const_exprs, across_partition_flags): ( - Vec>, - Vec, - ) = constants - .into_iter() - .map(|const_expr| { - let across_partitions = const_expr.across_partitions(); - let expr = const_expr.owned_expr(); - (expr, across_partitions) - }) - .unzip(); - for (expr, across_partitions) in self - .eq_group - .normalize_exprs(const_exprs) - .into_iter() - .zip(across_partition_flags) - { - if !const_exprs_contains(&self.constants, &expr) { - let const_expr = - ConstExpr::from(expr).with_across_partitions(across_partitions); - self.constants.push(const_expr); + let normalized_constants: Vec<_> = constants + .into_iter() + .filter_map(|c| { + let across_partitions = c.across_partitions(); + let value = c.value().cloned(); + let expr = c.owned_expr(); + let normalized_expr = self.eq_group.normalize_expr(expr); + + if const_exprs_contains(&self.constants, &normalized_expr) { + return None; } - } + let mut const_expr = ConstExpr::from(normalized_expr) + .with_across_partitions(across_partitions); + + + if let Some(value) = value { + const_expr = const_expr.with_value(value); + } + + Some(const_expr) + }) + .collect(); + + // Add all new normalized constants + self.constants.extend(normalized_constants); + + // Discover any new orderings based on the constants for ordering in self.normalized_oeq_class().iter() { if let Err(e) = self.discover_new_orderings(&ordering[0].expr) { log::debug!("error discovering new orderings: {e}"); @@ -1861,22 +1865,32 @@ fn calculate_union_binary( } // First, calculate valid constants for the union. An expression is constant - // at the output of the union if it is constant in both sides. - let constants: Vec<_> = lhs + // at the output of the union if it is constant in both sides with matching values. + let constants = lhs .constants() .iter() - .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) - .map(|const_expr| { - // TODO: When both sides have a constant column, and the actual - // constant value is the same, then the output properties could - // reflect the constant is valid across all partitions. However we - // don't track the actual value that the ConstExpr takes on, so we - // can't determine that yet - ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) + .filter_map(|lhs_const| { + // Find matching constant expression in RHS + rhs.constants() + .iter() + .find(|rhs_const| rhs_const.expr().eq(lhs_const.expr())) + .map(|rhs_const| { + // Create new constant with across_partitions=false since we're in a union + let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())) + .with_across_partitions(false); + + // If both sides have matching constant values, preserve the value + if let (Some(lhs_val), Some(rhs_val)) = (lhs_const.value(), rhs_const.value()) { + if lhs_val == rhs_val { + const_expr = const_expr.with_value(lhs_val.clone()); + } + } + const_expr + }) }) - .collect(); + .collect::>(); - // remove any constants that are shared in both outputs (avoid double counting them) + // Remove any constants that are shared in both outputs (avoid double counting them) for c in &constants { lhs = lhs.remove_constant(c); rhs = rhs.remove_constant(c); @@ -2146,6 +2160,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, TimeUnit}; + use datafusion_common::ScalarValue; use datafusion_expr::Operator; #[test] @@ -3684,4 +3699,35 @@ mod tests { sort_expr } + + #[test] + fn test_union_constant_value_preservation() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let col_a = col("a", &schema)?; + let literal_10 = ScalarValue::Int32(Some(10)); + + // Create first input with a=10 + let const_expr1 = ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + let input1 = EquivalenceProperties::new(Arc::clone(&schema)) + .with_constants(vec![const_expr1]); + + // Create second input with a=10 + let const_expr2 = ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + let input2 = EquivalenceProperties::new(Arc::clone(&schema)) + .with_constants(vec![const_expr2]); + + // Calculate union properties + let union_props = calculate_union(vec![input1, input2], schema)?; + + // Verify column 'a' remains constant with value 10 + let const_a = &union_props.constants()[0]; + assert!(const_a.expr().eq(&col_a)); + assert_eq!(const_a.value(), Some(&literal_10)); + + Ok(()) + } } From 8549b4f18542d4ccaee7f8d1865f87db89456e56 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 25 Nov 2024 08:57:49 +0300 Subject: [PATCH 02/25] Update PartialEq impl --- datafusion/physical-expr/src/equivalence/class.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 0ce19eae3a65..83dfb39f9828 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -67,7 +67,7 @@ pub struct ConstExpr { impl PartialEq for ConstExpr { fn eq(&self, other: &Self) -> bool { - self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) + self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) && self.value == other.value } } From ca97df570ba8d2323080d9252fe124603b205653 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 25 Nov 2024 08:57:55 +0300 Subject: [PATCH 03/25] Minor change --- datafusion/physical-expr/src/equivalence/properties.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index ce9ce3e30ec2..215b75e04d91 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -293,7 +293,7 @@ impl EquivalenceProperties { mut self, constants: impl IntoIterator, ) -> Self { - let normalized_constants: Vec<_> = constants + let normalized_constants = constants .into_iter() .filter_map(|c| { let across_partitions = c.across_partitions(); @@ -315,7 +315,7 @@ impl EquivalenceProperties { Some(const_expr) }) - .collect(); + .collect::>(); // Add all new normalized constants self.constants.extend(normalized_constants); From 1ae670125d1b1997bae83644f8fe6fa2927729d9 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:45:56 +0300 Subject: [PATCH 04/25] Add docstring for ConstExpr value --- datafusion/physical-expr/src/equivalence/class.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 83dfb39f9828..53a8d8bc47fa 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -62,6 +62,7 @@ pub struct ConstExpr { /// Does the constant have the same value across all partitions? See /// struct docs for more details across_partitions: bool, + /// The value of the constant expression value: Option } From c9b4869aa72410df101e4a1f1c2cc5f208d74b2d Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:59:25 +0300 Subject: [PATCH 05/25] Improve constant propagation across union partitions --- datafusion/physical-expr/src/equivalence/properties.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 215b75e04d91..fd5caf45ee53 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1875,14 +1875,14 @@ fn calculate_union_binary( .iter() .find(|rhs_const| rhs_const.expr().eq(lhs_const.expr())) .map(|rhs_const| { - // Create new constant with across_partitions=false since we're in a union - let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())) - .with_across_partitions(false); + let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); - // If both sides have matching constant values, preserve the value + // If both sides have matching constant values, preserve the value and set across_partitions=true if let (Some(lhs_val), Some(rhs_val)) = (lhs_const.value(), rhs_const.value()) { if lhs_val == rhs_val { - const_expr = const_expr.with_value(lhs_val.clone()); + const_expr = const_expr + .with_across_partitions(true) + .with_value(lhs_val.clone()); } } const_expr From 5a1838efd63c2d764a31127b10ad2413465e3625 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Fri, 13 Dec 2024 14:04:46 +0300 Subject: [PATCH 06/25] Add assertion for across_partitions --- datafusion/physical-expr/src/equivalence/properties.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index fd5caf45ee53..874b19dd8ddd 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -3726,6 +3726,7 @@ mod tests { // Verify column 'a' remains constant with value 10 let const_a = &union_props.constants()[0]; assert!(const_a.expr().eq(&col_a)); + assert_eq!(const_a.across_partitions(), true); assert_eq!(const_a.value(), Some(&literal_10)); Ok(()) From 8e773c3237a03f81248c7277faa2aba75bea3c10 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 17 Dec 2024 09:36:13 +0300 Subject: [PATCH 07/25] fix fmt --- .../physical-expr/src/equivalence/class.rs | 6 ++- .../src/equivalence/properties.rs | 47 ++++++++++--------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 53a8d8bc47fa..a5b291e35a98 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -63,12 +63,14 @@ pub struct ConstExpr { /// struct docs for more details across_partitions: bool, /// The value of the constant expression - value: Option + value: Option, } impl PartialEq for ConstExpr { fn eq(&self, other: &Self) -> bool { - self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) && self.value == other.value + self.across_partitions == other.across_partitions + && self.expr.eq(&other.expr) + && self.value == other.value } } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 874b19dd8ddd..20c5bca8d7c4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -294,28 +294,27 @@ impl EquivalenceProperties { constants: impl IntoIterator, ) -> Self { let normalized_constants = constants - .into_iter() - .filter_map(|c| { - let across_partitions = c.across_partitions(); - let value = c.value().cloned(); - let expr = c.owned_expr(); - let normalized_expr = self.eq_group.normalize_expr(expr); - - if const_exprs_contains(&self.constants, &normalized_expr) { - return None; - } - - let mut const_expr = ConstExpr::from(normalized_expr) - .with_across_partitions(across_partitions); + .into_iter() + .filter_map(|c| { + let across_partitions = c.across_partitions(); + let value = c.value().cloned(); + let expr = c.owned_expr(); + let normalized_expr = self.eq_group.normalize_expr(expr); + + if const_exprs_contains(&self.constants, &normalized_expr) { + return None; + } + let mut const_expr = ConstExpr::from(normalized_expr) + .with_across_partitions(across_partitions); - if let Some(value) = value { - const_expr = const_expr.with_value(value); - } + if let Some(value) = value { + const_expr = const_expr.with_value(value); + } - Some(const_expr) - }) - .collect::>(); + Some(const_expr) + }) + .collect::>(); // Add all new normalized constants self.constants.extend(normalized_constants); @@ -1878,7 +1877,9 @@ fn calculate_union_binary( let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); // If both sides have matching constant values, preserve the value and set across_partitions=true - if let (Some(lhs_val), Some(rhs_val)) = (lhs_const.value(), rhs_const.value()) { + if let (Some(lhs_val), Some(rhs_val)) = + (lhs_const.value(), rhs_const.value()) + { if lhs_val == rhs_val { const_expr = const_expr .with_across_partitions(true) @@ -3711,12 +3712,14 @@ mod tests { let literal_10 = ScalarValue::Int32(Some(10)); // Create first input with a=10 - let const_expr1 = ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + let const_expr1 = + ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); let input1 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr1]); // Create second input with a=10 - let const_expr2 = ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + let const_expr2 = + ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); let input2 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr2]); From f876889d2260892688124ad4608e680a8d35ffa9 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 17 Dec 2024 09:57:00 +0300 Subject: [PATCH 08/25] Update properties.rs --- datafusion/physical-expr/src/equivalence/properties.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 20c5bca8d7c4..1f4f205dd6ce 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -3729,7 +3729,7 @@ mod tests { // Verify column 'a' remains constant with value 10 let const_a = &union_props.constants()[0]; assert!(const_a.expr().eq(&col_a)); - assert_eq!(const_a.across_partitions(), true); + assert!(const_a.across_partitions()); assert_eq!(const_a.value(), Some(&literal_10)); Ok(()) From f09364885b02058ef04c6fc19e1cbac22aa876df Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:32:57 +0300 Subject: [PATCH 09/25] Remove redundant constant removal loop --- datafusion/physical-expr/src/equivalence/properties.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 1f4f205dd6ce..ba62e9aa589d 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1891,12 +1891,6 @@ fn calculate_union_binary( }) .collect::>(); - // Remove any constants that are shared in both outputs (avoid double counting them) - for c in &constants { - lhs = lhs.remove_constant(c); - rhs = rhs.remove_constant(c); - } - // Next, calculate valid orderings for the union by searching for prefixes // in both sides. let mut orderings = UnionEquivalentOrderingBuilder::new(); From a3bcfa85ce235ca95278de64c456408f1c46fc81 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:17:19 +0300 Subject: [PATCH 10/25] Remove unnecessary mut --- datafusion/physical-expr/src/equivalence/properties.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index ba62e9aa589d..7d3c54929c92 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1855,7 +1855,7 @@ impl Hash for ExprWrapper { /// *all* output partitions, that is the same as being true for all *input* /// partitions fn calculate_union_binary( - mut lhs: EquivalenceProperties, + lhs: EquivalenceProperties, mut rhs: EquivalenceProperties, ) -> Result { // Harmonize the schema of the rhs with the schema of the lhs (which is the accumulator schema): From f370b82f09e846f9595827223dd69e9a1cdb5de1 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Wed, 18 Dec 2024 02:42:39 +0300 Subject: [PATCH 11/25] Set across_partitions=true when both sides are constant --- datafusion/physical-expr/src/equivalence/properties.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 7d3c54929c92..41c36b819c7b 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1876,6 +1876,11 @@ fn calculate_union_binary( .map(|rhs_const| { let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); + // If both sides are constant across partitions, set across_partitions=true + if lhs_const.across_partitions() && rhs_const.across_partitions() { + const_expr = const_expr.with_across_partitions(true); + } + // If both sides have matching constant values, preserve the value and set across_partitions=true if let (Some(lhs_val), Some(rhs_val)) = (lhs_const.value(), rhs_const.value()) From dc0014786cf65b0fae311cded259dae1ddc39d89 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Wed, 18 Dec 2024 02:43:18 +0300 Subject: [PATCH 12/25] Extract and use constant values in filter expressions --- datafusion/physical-plan/src/filter.rs | 30 ++++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 07898e8d22d8..e6503c8e597c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,7 +41,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ @@ -218,13 +218,29 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - res_constants.push( - ConstExpr::from(binary.right()).with_across_partitions(true), - ) + // When left side is constant, extract value from right side if it's a literal + let (expr, lit) = ( + binary.right(), + binary.right().as_any().downcast_ref::(), + ); + let mut const_expr = + ConstExpr::from(expr).with_across_partitions(true); + if let Some(lit) = lit { + const_expr = const_expr.with_value(lit.value().clone()); + } + res_constants.push(const_expr); } else if input_eqs.is_expr_constant(binary.right()) { - res_constants.push( - ConstExpr::from(binary.left()).with_across_partitions(true), - ) + // When right side is constant, extract value from left side if it's a literal + let (expr, lit) = ( + binary.left(), + binary.left().as_any().downcast_ref::(), + ); + let mut const_expr = + ConstExpr::from(expr).with_across_partitions(true); + if let Some(lit) = lit { + const_expr = const_expr.with_value(lit.value().clone()); + } + res_constants.push(const_expr); } } } From 16a7de69418583cc39cf028de5be5c9d717da7a6 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Wed, 18 Dec 2024 02:45:39 +0300 Subject: [PATCH 13/25] Add initial SLT for constant value tracking across UNION ALL --- .../const_value_tracking_across_union.slt | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt diff --git a/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt b/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt new file mode 100644 index 000000000000..c6b4895ef00d --- /dev/null +++ b/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt @@ -0,0 +1,50 @@ +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.execution.batch_size = 2; + +# Constant value tracking across union +query TT +explain +SELECT * FROM( +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +) +UNION ALL +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +)) +ORDER BY c1 +---- +physical_plan +01)CoalescePartitionsExec +02)--UnionExec +03)----CoalesceBatchesExec: target_batch_size=2 +04)------FilterExec: c1@0 = a +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true +07)----CoalesceBatchesExec: target_batch_size=2 +08)------FilterExec: c1@0 = a +09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true From b3bc40d870c36ef3f1df48f19fd97e201a92d37a Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:09:44 +0300 Subject: [PATCH 14/25] Assign values to ConstExpr where possible --- .../src/equivalence/properties.rs | 64 ++++++++++++++++--- datafusion/physical-plan/src/filter.rs | 9 ++- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 41c36b819c7b..eb0e28566196 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -257,14 +257,34 @@ impl EquivalenceProperties { if self.is_expr_constant(left) { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { - self.constants - .push(ConstExpr::from(right).with_across_partitions(true)); + // Try to get value from left constant expression + let value = self + .constants + .iter() + .find(|c| c.expr().eq(left)) + .and_then(|c| c.value().cloned()); + + let mut const_expr = ConstExpr::from(right).with_across_partitions(true); + if let Some(val) = value { + const_expr = const_expr.with_value(val); + } + self.constants.push(const_expr); } } else if self.is_expr_constant(right) { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { - self.constants - .push(ConstExpr::from(left).with_across_partitions(true)); + // Try to get value from right constant expression + let value = self + .constants + .iter() + .find(|c| c.expr().eq(right)) + .and_then(|c| c.value().cloned()); + + let mut const_expr = ConstExpr::from(left).with_across_partitions(true); + if let Some(val) = value { + const_expr = const_expr.with_value(val); + } + self.constants.push(const_expr); } } @@ -878,19 +898,38 @@ impl EquivalenceProperties { .constants .iter() .flat_map(|const_expr| { - const_expr.map(|expr| self.eq_group.project_expr(mapping, expr)) + const_expr + .map(|expr| self.eq_group.project_expr(mapping, expr)) + .map(|projected_expr| { + let mut new_const = ConstExpr::from(projected_expr) + .with_across_partitions(const_expr.across_partitions()); + if let Some(value) = const_expr.value() { + new_const = new_const.with_value(value.clone()); + } + new_const + }) }) .collect::>(); + // Add projection expressions that are known to be constant: for (source, target) in mapping.iter() { if self.is_expr_constant(source) && !const_exprs_contains(&projected_constants, target) { let across_partitions = self.is_expr_constant_accross_partitions(source); + // Try to get value from source constant expression + let value = self + .constants + .iter() + .find(|c| c.expr().eq(source)) + .and_then(|c| c.value().cloned()); + // Expression evaluates to single value - projected_constants.push( - ConstExpr::from(target).with_across_partitions(across_partitions), - ); + let mut const_expr = ConstExpr::from(target).with_across_partitions(across_partitions); + if let Some(val) = value { + const_expr = const_expr.with_value(val); + } + projected_constants.push(const_expr); } } projected_constants @@ -1102,9 +1141,14 @@ impl EquivalenceProperties { .into_iter() .map(|const_expr| { let across_partitions = const_expr.across_partitions(); + let value = const_expr.value().cloned(); let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?; - Ok(ConstExpr::new(new_const_expr) - .with_across_partitions(across_partitions)) + let mut new_const_expr = ConstExpr::new(new_const_expr) + .with_across_partitions(across_partitions); + if let Some(value) = value { + new_const_expr = new_const_expr.with_value(value.clone()); + } + Ok(new_const_expr) }) .collect::>>()?; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e6503c8e597c..5999836a6dfe 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -268,8 +268,15 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { + let value = stats.column_statistics[column.index()] + .min_value + .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(true) + let mut const_expr = ConstExpr::new(expr).with_across_partitions(true); + if let Some(value) = value { + const_expr = const_expr.with_value(value.clone()); + } + return const_expr; }); // This is for statistics eq_properties = eq_properties.with_constants(constants); From d00706738d9a050530b83076b9c543f10b021c0d Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:09:54 +0300 Subject: [PATCH 15/25] Revert "Set across_partitions=true when both sides are constant" This reverts commit 3051cd470b0ad4a70cd8bd3518813f5ce0b3a449. --- datafusion/physical-expr/src/equivalence/properties.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index eb0e28566196..7f7316ddcb79 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1920,11 +1920,6 @@ fn calculate_union_binary( .map(|rhs_const| { let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); - // If both sides are constant across partitions, set across_partitions=true - if lhs_const.across_partitions() && rhs_const.across_partitions() { - const_expr = const_expr.with_across_partitions(true); - } - // If both sides have matching constant values, preserve the value and set across_partitions=true if let (Some(lhs_val), Some(rhs_val)) = (lhs_const.value(), rhs_const.value()) From 0292f326be278fd6c9c640afe704caa71f2aeba5 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:13:34 +0300 Subject: [PATCH 16/25] Temporarily take value from literal --- .../src/equivalence/properties.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 7f7316ddcb79..ec86e21dd793 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -258,11 +258,11 @@ impl EquivalenceProperties { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { // Try to get value from left constant expression - let value = self - .constants - .iter() - .find(|c| c.expr().eq(left)) - .and_then(|c| c.value().cloned()); + let value = if let Some(lit) = left.as_any().downcast_ref::() { + Some(lit.value().clone()) + } else { + None + }; let mut const_expr = ConstExpr::from(right).with_across_partitions(true); if let Some(val) = value { @@ -274,11 +274,11 @@ impl EquivalenceProperties { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { // Try to get value from right constant expression - let value = self - .constants - .iter() - .find(|c| c.expr().eq(right)) - .and_then(|c| c.value().cloned()); + let value = if let Some(lit) = right.as_any().downcast_ref::() { + Some(lit.value().clone()) + } else { + None + }; let mut const_expr = ConstExpr::from(left).with_across_partitions(true); if let Some(val) = value { From f737c656b4d8b2cd222a79ae413ced18c806916e Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:18:56 +0300 Subject: [PATCH 17/25] Lint fixes --- .../src/equivalence/properties.rs | 24 +++++++++---------- datafusion/physical-plan/src/filter.rs | 2 +- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index ec86e21dd793..bf661bcf9bdf 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -258,11 +258,10 @@ impl EquivalenceProperties { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { // Try to get value from left constant expression - let value = if let Some(lit) = left.as_any().downcast_ref::() { - Some(lit.value().clone()) - } else { - None - }; + let value = left + .as_any() + .downcast_ref::() + .map(|lit| lit.value().clone()); let mut const_expr = ConstExpr::from(right).with_across_partitions(true); if let Some(val) = value { @@ -274,11 +273,10 @@ impl EquivalenceProperties { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { // Try to get value from right constant expression - let value = if let Some(lit) = right.as_any().downcast_ref::() { - Some(lit.value().clone()) - } else { - None - }; + let value = right + .as_any() + .downcast_ref::() + .map(|lit| lit.value().clone()); let mut const_expr = ConstExpr::from(left).with_across_partitions(true); if let Some(val) = value { @@ -901,12 +899,12 @@ impl EquivalenceProperties { const_expr .map(|expr| self.eq_group.project_expr(mapping, expr)) .map(|projected_expr| { - let mut new_const = ConstExpr::from(projected_expr) + let mut new_const_expr = projected_expr .with_across_partitions(const_expr.across_partitions()); if let Some(value) = const_expr.value() { - new_const = new_const.with_value(value.clone()); + new_const_expr = new_const_expr.with_value(value.clone()); } - new_const + new_const_expr }) }) .collect::>(); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5999836a6dfe..e56fa6ca73cd 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -276,7 +276,7 @@ impl FilterExec { if let Some(value) = value { const_expr = const_expr.with_value(value.clone()); } - return const_expr; + const_expr }); // This is for statistics eq_properties = eq_properties.with_constants(constants); From b974d89b92643c4fab21e1754bffeb1d66ffff80 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Thu, 19 Dec 2024 11:24:27 +0300 Subject: [PATCH 18/25] Cargo fmt --- datafusion/physical-expr/src/equivalence/properties.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index bf661bcf9bdf..732925af808b 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -923,7 +923,8 @@ impl EquivalenceProperties { .and_then(|c| c.value().cloned()); // Expression evaluates to single value - let mut const_expr = ConstExpr::from(target).with_across_partitions(across_partitions); + let mut const_expr = + ConstExpr::from(target).with_across_partitions(across_partitions); if let Some(val) = value { const_expr = const_expr.with_value(val); } From 57913f876e5d358620dfe83f0e7a5bae04a88fa8 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 23 Dec 2024 07:06:11 +0300 Subject: [PATCH 19/25] Add get_expr_constant_value --- .../src/equivalence/properties.rs | 38 ++++++++++++++++++- datafusion/physical-plan/src/filter.rs | 18 ++++----- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 732925af808b..751edb1f72ed 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -36,7 +36,9 @@ use crate::{ use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, plan_err, JoinSide, JoinType, Result}; +use datafusion_common::{ + internal_err, plan_err, JoinSide, JoinType, Result, ScalarValue, +}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::utils::ExprPropertiesNode; @@ -1088,6 +1090,40 @@ impl EquivalenceProperties { is_constant_recurse(&normalized_constants, &normalized_expr) } + /// Returns the constant value of the given expression if it is known to be constant. + /// + /// This function first normalizes the expression using the equivalence group, then: + /// - If the normalized expression is a literal, returns its value + /// - If the normalized expression matches a known constant expression, returns its value + /// - Otherwise returns None + /// + /// # Arguments + /// + /// - `expr`: A reference to a `Arc` representing the expression + /// to check for a constant value. + /// + /// # Returns + /// + /// Returns `Some(ScalarValue)` if the expression is known to have a constant value, + /// `None` otherwise. + pub fn get_expr_constant_value( + &self, + expr: &Arc, + ) -> Option { + let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); + + if let Some(lit) = normalized_expr.as_any().downcast_ref::() { + return Some(lit.value().clone()); + } + + for const_expr in self.constants.iter() { + if normalized_expr.eq(const_expr.expr()) { + return const_expr.value().cloned(); + } + } + None + } + /// Retrieves the properties for a given physical expression. /// /// This function constructs an [`ExprProperties`] object for the given diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e56fa6ca73cd..8539cbaac8b6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,7 +41,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; +use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ @@ -219,26 +219,26 @@ impl FilterExec { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { // When left side is constant, extract value from right side if it's a literal - let (expr, lit) = ( + let (expr, value) = ( binary.right(), - binary.right().as_any().downcast_ref::(), + input_eqs.get_expr_constant_value(binary.right()), ); let mut const_expr = ConstExpr::from(expr).with_across_partitions(true); - if let Some(lit) = lit { - const_expr = const_expr.with_value(lit.value().clone()); + if let Some(value) = value { + const_expr = const_expr.with_value(value.clone()); } res_constants.push(const_expr); } else if input_eqs.is_expr_constant(binary.right()) { // When right side is constant, extract value from left side if it's a literal - let (expr, lit) = ( + let (expr, value) = ( binary.left(), - binary.left().as_any().downcast_ref::(), + input_eqs.get_expr_constant_value(binary.left()), ); let mut const_expr = ConstExpr::from(expr).with_across_partitions(true); - if let Some(lit) = lit { - const_expr = const_expr.with_value(lit.value().clone()); + if let Some(value) = value { + const_expr = const_expr.with_value(value.clone()); } res_constants.push(const_expr); } From 1917c0ea36243810caafc18182a2efe983f6e319 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 23 Dec 2024 07:22:48 +0300 Subject: [PATCH 20/25] Make `with_value()` accept optional value --- .../physical-expr/src/equivalence/class.rs | 4 +- .../src/equivalence/properties.rs | 77 ++++++------------- datafusion/physical-plan/src/filter.rs | 32 ++++---- 3 files changed, 39 insertions(+), 74 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index a5b291e35a98..d67813ea5329 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -88,8 +88,8 @@ impl ConstExpr { } } - pub fn with_value(mut self, value: ScalarValue) -> Self { - self.value = Some(value); + pub fn with_value(mut self, value: Option) -> Self { + self.value = value; self } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 751edb1f72ed..759b2c17ceb4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -259,31 +259,17 @@ impl EquivalenceProperties { if self.is_expr_constant(left) { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { - // Try to get value from left constant expression - let value = left - .as_any() - .downcast_ref::() - .map(|lit| lit.value().clone()); - - let mut const_expr = ConstExpr::from(right).with_across_partitions(true); - if let Some(val) = value { - const_expr = const_expr.with_value(val); - } + let const_expr = ConstExpr::from(right) + .with_across_partitions(true) + .with_value(self.get_expr_constant_value(left)); self.constants.push(const_expr); } } else if self.is_expr_constant(right) { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { - // Try to get value from right constant expression - let value = right - .as_any() - .downcast_ref::() - .map(|lit| lit.value().clone()); - - let mut const_expr = ConstExpr::from(left).with_across_partitions(true); - if let Some(val) = value { - const_expr = const_expr.with_value(val); - } + let const_expr = ConstExpr::from(left) + .with_across_partitions(true) + .with_value(self.get_expr_constant_value(right)); self.constants.push(const_expr); } } @@ -325,12 +311,9 @@ impl EquivalenceProperties { return None; } - let mut const_expr = ConstExpr::from(normalized_expr) - .with_across_partitions(across_partitions); - - if let Some(value) = value { - const_expr = const_expr.with_value(value); - } + let const_expr = ConstExpr::from(normalized_expr) + .with_across_partitions(across_partitions) + .with_value(value); Some(const_expr) }) @@ -901,12 +884,9 @@ impl EquivalenceProperties { const_expr .map(|expr| self.eq_group.project_expr(mapping, expr)) .map(|projected_expr| { - let mut new_const_expr = projected_expr - .with_across_partitions(const_expr.across_partitions()); - if let Some(value) = const_expr.value() { - new_const_expr = new_const_expr.with_value(value.clone()); - } - new_const_expr + projected_expr + .with_across_partitions(const_expr.across_partitions()) + .with_value(const_expr.value().cloned()) }) }) .collect::>(); @@ -917,20 +897,14 @@ impl EquivalenceProperties { && !const_exprs_contains(&projected_constants, target) { let across_partitions = self.is_expr_constant_accross_partitions(source); - // Try to get value from source constant expression - let value = self - .constants - .iter() - .find(|c| c.expr().eq(source)) - .and_then(|c| c.value().cloned()); + let value = self.get_expr_constant_value(source); // Expression evaluates to single value - let mut const_expr = - ConstExpr::from(target).with_across_partitions(across_partitions); - if let Some(val) = value { - const_expr = const_expr.with_value(val); - } - projected_constants.push(const_expr); + projected_constants.push( + ConstExpr::from(target) + .with_across_partitions(across_partitions) + .with_value(value), + ); } } projected_constants @@ -1178,12 +1152,9 @@ impl EquivalenceProperties { let across_partitions = const_expr.across_partitions(); let value = const_expr.value().cloned(); let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?; - let mut new_const_expr = ConstExpr::new(new_const_expr) - .with_across_partitions(across_partitions); - if let Some(value) = value { - new_const_expr = new_const_expr.with_value(value.clone()); - } - Ok(new_const_expr) + Ok(ConstExpr::new(new_const_expr) + .with_across_partitions(across_partitions) + .with_value(value)) }) .collect::>>()?; @@ -1962,7 +1933,7 @@ fn calculate_union_binary( if lhs_val == rhs_val { const_expr = const_expr .with_across_partitions(true) - .with_value(lhs_val.clone()); + .with_value(Some(lhs_val.clone())); } } const_expr @@ -3786,13 +3757,13 @@ mod tests { // Create first input with a=10 let const_expr1 = - ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + ConstExpr::new(Arc::clone(&col_a)).with_value(Some(literal_10.clone())); let input1 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr1]); // Create second input with a=10 let const_expr2 = - ConstExpr::new(Arc::clone(&col_a)).with_value(literal_10.clone()); + ConstExpr::new(Arc::clone(&col_a)).with_value(Some(literal_10.clone())); let input2 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr2]); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8539cbaac8b6..3b21bb0b8f6f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -218,29 +218,25 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - // When left side is constant, extract value from right side if it's a literal let (expr, value) = ( binary.right(), input_eqs.get_expr_constant_value(binary.right()), ); - let mut const_expr = - ConstExpr::from(expr).with_across_partitions(true); - if let Some(value) = value { - const_expr = const_expr.with_value(value.clone()); - } - res_constants.push(const_expr); + res_constants.push( + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(true) + .with_value(value), + ); } else if input_eqs.is_expr_constant(binary.right()) { - // When right side is constant, extract value from left side if it's a literal let (expr, value) = ( binary.left(), input_eqs.get_expr_constant_value(binary.left()), ); - let mut const_expr = - ConstExpr::from(expr).with_across_partitions(true); - if let Some(value) = value { - const_expr = const_expr.with_value(value.clone()); - } - res_constants.push(const_expr); + res_constants.push( + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(true) + .with_value(value), + ); } } } @@ -272,11 +268,9 @@ impl FilterExec { .min_value .get_value(); let expr = Arc::new(column) as _; - let mut const_expr = ConstExpr::new(expr).with_across_partitions(true); - if let Some(value) = value { - const_expr = const_expr.with_value(value.clone()); - } - const_expr + ConstExpr::new(expr) + .with_across_partitions(true) + .with_value(value.cloned()) }); // This is for statistics eq_properties = eq_properties.with_constants(constants); From c72f19a5feb030ccc9d8fc2ecbb66f529df237f8 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:48:32 +0300 Subject: [PATCH 21/25] Add todo --- datafusion/physical-expr/src/equivalence/class.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index d67813ea5329..6a30a49013ce 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -55,6 +55,18 @@ use indexmap::{IndexMap, IndexSet}; /// // create a constant expression from a physical expression /// let const_expr = ConstExpr::from(col); /// ``` +// TODO: Consider refactoring the `across_partitions` and `value` fields into an enum: +// +// ``` +// enum PartitionValues { +// Uniform(Option), // Same value across all partitions +// Heterogeneous(Vec>) // Different values per partition +// } +// ``` +// +// This would provide a more type-safe representation of partition values. +// Note: This is a breaking change for the equivalence API and should be +// addressed in a separate issue/PR. #[derive(Debug, Clone)] pub struct ConstExpr { /// The expression that is known to be constant (e.g. a `Column`) From 5c1a8b860fb9cab423c571d6c2e2a5dd3eaf4873 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:18:04 +0300 Subject: [PATCH 22/25] Move test to union.slt --- .../const_value_tracking_across_union.slt | 50 ---------------- datafusion/sqllogictest/test_files/union.slt | 59 +++++++++++++++++++ 2 files changed, 59 insertions(+), 50 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt diff --git a/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt b/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt deleted file mode 100644 index c6b4895ef00d..000000000000 --- a/datafusion/sqllogictest/test_files/const_value_tracking_across_union.slt +++ /dev/null @@ -1,50 +0,0 @@ -statement ok -CREATE EXTERNAL TABLE aggregate_test_100 ( - c1 VARCHAR NOT NULL, - c2 TINYINT NOT NULL, - c3 SMALLINT NOT NULL, - c4 SMALLINT, - c5 INT, - c6 BIGINT NOT NULL, - c7 SMALLINT NOT NULL, - c8 INT NOT NULL, - c9 BIGINT UNSIGNED NOT NULL, - c10 VARCHAR NOT NULL, - c11 FLOAT NOT NULL, - c12 DOUBLE NOT NULL, - c13 VARCHAR NOT NULL -) -STORED AS CSV -LOCATION '../../testing/data/csv/aggregate_test_100.csv' -OPTIONS ('format.has_header' 'true'); - -statement ok -set datafusion.explain.physical_plan_only = true; - -statement ok -set datafusion.execution.batch_size = 2; - -# Constant value tracking across union -query TT -explain -SELECT * FROM( -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -) -UNION ALL -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -)) -ORDER BY c1 ----- -physical_plan -01)CoalescePartitionsExec -02)--UnionExec -03)----CoalesceBatchesExec: target_batch_size=2 -04)------FilterExec: c1@0 = a -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true -07)----CoalesceBatchesExec: target_batch_size=2 -08)------FilterExec: c1@0 = a -09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d94780744db9..7b8992b966ad 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -777,3 +777,62 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a ---- [[-1]] [[1]] + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +statement ok +set datafusion.execution.batch_size = 2; + +# Constant value tracking across union +query TT +explain +SELECT * FROM( +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +) +UNION ALL +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +)) +ORDER BY c1 +---- +logical_plan +01)Sort: aggregate_test_100.c1 ASC NULLS LAST +02)--Union +03)----Filter: aggregate_test_100.c1 = Utf8("a") +04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] +05)----Filter: aggregate_test_100.c1 = Utf8("a") +06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] +physical_plan +01)CoalescePartitionsExec +02)--UnionExec +03)----CoalesceBatchesExec: target_batch_size=2 +04)------FilterExec: c1@0 = a +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true +07)----CoalesceBatchesExec: target_batch_size=2 +08)------FilterExec: c1@0 = a +09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true + +# Clean up after the test +statement ok +drop table aggregate_test_100; From 25e95f4894f7ee9907b6d497d23684efbd464549 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:18:22 +0300 Subject: [PATCH 23/25] Fix changed slt after merge --- datafusion/sqllogictest/test_files/aggregate.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 94c629707379..cd62e5625342 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -259,7 +259,7 @@ logical_plan 15)------------EmptyRelation physical_plan 01)ProjectionExec: expr=[array_length(array_agg(DISTINCT a.foo)@1) as array_length(array_agg(DISTINCT a.foo)), sum(DISTINCT Int64(1))@2 as sum(DISTINCT Int64(1))] -02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted 03)----CoalesceBatchesExec: target_batch_size=8192 04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted From bee8667615a8181bf2dfb23504b1bbb9d54cf091 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 25 Dec 2024 14:42:49 +0300 Subject: [PATCH 24/25] Simplify constexpr --- .../physical-expr/src/equivalence/class.rs | 72 +++++----- .../physical-expr/src/equivalence/mod.rs | 2 +- .../physical-expr/src/equivalence/ordering.rs | 9 +- .../src/equivalence/properties.rs | 132 +++++++++--------- datafusion/physical-expr/src/lib.rs | 4 +- datafusion/physical-plan/src/filter.rs | 16 +-- 6 files changed, 124 insertions(+), 111 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index e1306bc96d4c..9e535a94eb6e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -64,7 +64,7 @@ use indexmap::{IndexMap, IndexSet}; // } // ``` // -// This would provide a more type-safe representation of partition values. +// This would provide more flexible representation of partition values. // Note: This is a breaking change for the equivalence API and should be // addressed in a separate issue/PR. #[derive(Debug, Clone)] @@ -73,16 +73,32 @@ pub struct ConstExpr { expr: Arc, /// Does the constant have the same value across all partitions? See /// struct docs for more details - across_partitions: bool, - /// The value of the constant expression - value: Option, + across_partitions: AcrossPartitions, +} + +#[derive(PartialEq, Clone, Debug)] +/// Represents whether a constant expression's value is uniform or varies across partitions. +/// +/// The `AcrossPartitions` enum is used to describe the nature of a constant expression +/// in a physical execution plan: +/// +/// - `Heterogeneous`: The constant expression may have different values for different partitions. +/// - `Uniform(Option)`: The constant expression has the same value across all partitions, +/// or is `None` if the value is not specified. +pub enum AcrossPartitions { + Heterogeneous, + Uniform(Option), +} + +impl Default for AcrossPartitions { + fn default() -> Self { + Self::Heterogeneous + } } impl PartialEq for ConstExpr { fn eq(&self, other: &Self) -> bool { - self.across_partitions == other.across_partitions - && self.expr.eq(&other.expr) - && self.value == other.value + self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) } } @@ -95,20 +111,14 @@ impl ConstExpr { Self { expr, // By default, assume constant expressions are not same across partitions. - across_partitions: false, - value: None, + across_partitions: Default::default(), } } - pub fn with_value(mut self, value: Option) -> Self { - self.value = value; - self - } - /// Set the `across_partitions` flag /// /// See struct docs for more details - pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { + pub fn with_across_partitions(mut self, across_partitions: AcrossPartitions) -> Self { self.across_partitions = across_partitions; self } @@ -116,8 +126,8 @@ impl ConstExpr { /// Is the expression the same across all partitions? /// /// See struct docs for more details - pub fn across_partitions(&self) -> bool { - self.across_partitions + pub fn across_partitions(&self) -> AcrossPartitions { + self.across_partitions.clone() } pub fn expr(&self) -> &Arc { @@ -128,10 +138,6 @@ impl ConstExpr { self.expr } - pub fn value(&self) -> Option<&ScalarValue> { - self.value.as_ref() - } - pub fn map(&self, f: F) -> Option where F: Fn(&Arc) -> Option>, @@ -139,8 +145,7 @@ impl ConstExpr { let maybe_expr = f(&self.expr); maybe_expr.map(|expr| Self { expr, - across_partitions: self.across_partitions, - value: self.value.clone(), + across_partitions: self.across_partitions.clone(), }) } @@ -170,17 +175,20 @@ impl ConstExpr { } } -/// Display implementation for `ConstExpr` -/// -/// Example `c` or `c(across_partitions)` impl Display for ConstExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.expr)?; - if self.across_partitions { - write!(f, "(across_partitions)")?; - } - if let Some(value) = self.value.as_ref() { - write!(f, "({})", value)?; + match &self.across_partitions { + AcrossPartitions::Heterogeneous => { + write!(f, "(heterogeneous)")?; + } + AcrossPartitions::Uniform(value) => { + if let Some(val) = value { + write!(f, "(uniform: {})", val)?; + } else { + write!(f, "(uniform: unknown)")?; + } + } } Ok(()) } diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 902e53a7f236..93d42adbe5de 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -27,7 +27,7 @@ mod ordering; mod projection; mod properties; -pub use class::{ConstExpr, EquivalenceClass, EquivalenceGroup}; +pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, EquivalenceGroup}; pub use ordering::OrderingEquivalenceClass; pub use projection::ProjectionMapping; pub use properties::{ diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 06f85b657e09..24e2fc7dbaf5 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -262,7 +262,7 @@ mod tests { }; use crate::expressions::{col, BinaryExpr, Column}; use crate::utils::tests::TestScalarUDF; - use crate::{ConstExpr, PhysicalExpr, PhysicalSortExpr}; + use crate::{AcrossPartitions, ConstExpr, PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; @@ -583,9 +583,10 @@ mod tests { let eq_group = EquivalenceGroup::new(eq_group); eq_properties.add_equivalence_group(eq_group); - let constants = constants - .into_iter() - .map(|expr| ConstExpr::from(expr).with_across_partitions(true)); + let constants = constants.into_iter().map(|expr| { + ConstExpr::from(expr) + .with_across_partitions(AcrossPartitions::Uniform(None)) + }); eq_properties = eq_properties.with_constants(constants); let reqs = convert_to_sort_exprs(&reqs); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index ec4929c8d95e..24e77c899ee1 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::{fmt, mem}; use super::ordering::collapse_lex_ordering; -use crate::equivalence::class::const_exprs_contains; +use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, @@ -36,9 +36,7 @@ use crate::{ use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{ - internal_err, plan_err, JoinSide, JoinType, Result, ScalarValue, -}; +use datafusion_common::{internal_err, plan_err, JoinSide, JoinType, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::utils::ExprPropertiesNode; @@ -219,7 +217,9 @@ impl EquivalenceProperties { /// Removes constant expressions that may change across partitions. /// This method should be used when data from different partitions are merged. pub fn clear_per_partition_constants(&mut self) { - self.constants.retain(|item| item.across_partitions()); + self.constants.retain(|item| { + matches!(item.across_partitions(), AcrossPartitions::Uniform(_)) + }) } /// Extends this `EquivalenceProperties` by adding the orderings inside the @@ -260,16 +260,14 @@ impl EquivalenceProperties { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { let const_expr = ConstExpr::from(right) - .with_across_partitions(true) - .with_value(self.get_expr_constant_value(left)); + .with_across_partitions(self.get_expr_constant_value(left)); self.constants.push(const_expr); } } else if self.is_expr_constant(right) { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { let const_expr = ConstExpr::from(left) - .with_across_partitions(true) - .with_value(self.get_expr_constant_value(right)); + .with_across_partitions(self.get_expr_constant_value(right)); self.constants.push(const_expr); } } @@ -303,7 +301,6 @@ impl EquivalenceProperties { .into_iter() .filter_map(|c| { let across_partitions = c.across_partitions(); - let value = c.value().cloned(); let expr = c.owned_expr(); let normalized_expr = self.eq_group.normalize_expr(expr); @@ -312,8 +309,7 @@ impl EquivalenceProperties { } let const_expr = ConstExpr::from(normalized_expr) - .with_across_partitions(across_partitions) - .with_value(value); + .with_across_partitions(across_partitions); Some(const_expr) }) @@ -557,7 +553,7 @@ impl EquivalenceProperties { /// is satisfied based on the orderings within, equivalence classes, and /// constant expressions. /// - /// # Arguments + /// # Parameters /// /// - `req`: A reference to a `PhysicalSortRequirement` for which the ordering /// satisfaction check will be done. @@ -925,7 +921,7 @@ impl EquivalenceProperties { /// constants based on the existing constants and the mapping. It ensures /// that constants are appropriately propagated through the projection. /// - /// # Arguments + /// # Parameters /// /// - `mapping`: A reference to a `ProjectionMapping` representing the /// mapping of source expressions to target expressions in the projection. @@ -946,7 +942,6 @@ impl EquivalenceProperties { .map(|projected_expr| { projected_expr .with_across_partitions(const_expr.across_partitions()) - .with_value(const_expr.value().cloned()) }) }) .collect::>(); @@ -956,15 +951,17 @@ impl EquivalenceProperties { if self.is_expr_constant(source) && !const_exprs_contains(&projected_constants, target) { - let across_partitions = self.is_expr_constant_accross_partitions(source); - let value = self.get_expr_constant_value(source); - - // Expression evaluates to single value - projected_constants.push( - ConstExpr::from(target) - .with_across_partitions(across_partitions) - .with_value(value), - ); + if self.is_expr_constant_accross_partitions(source) { + projected_constants.push( + ConstExpr::from(target) + .with_across_partitions(self.get_expr_constant_value(source)), + ) + } else { + projected_constants.push( + ConstExpr::from(target) + .with_across_partitions(AcrossPartitions::Heterogeneous), + ) + } } } projected_constants @@ -1071,7 +1068,7 @@ impl EquivalenceProperties { /// This function determines whether the provided expression is constant /// based on the known constants. /// - /// # Arguments + /// # Parameters /// /// - `expr`: A reference to a `Arc` representing the /// expression to be checked. @@ -1096,7 +1093,7 @@ impl EquivalenceProperties { /// This function determines whether the provided expression is constant /// across partitions based on the known constants. /// - /// # Arguments + /// # Parameters /// /// - `expr`: A reference to a `Arc` representing the /// expression to be checked. @@ -1112,50 +1109,55 @@ impl EquivalenceProperties { // As an example, assume that we know columns `a` and `b` are constant. // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will // return `false`. - let const_exprs = self.constants.iter().flat_map(|const_expr| { - if const_expr.across_partitions() { - Some(Arc::clone(const_expr.expr())) - } else { - None - } - }); + let const_exprs = self + .constants + .iter() + .filter_map(|const_expr| { + if matches!( + const_expr.across_partitions(), + AcrossPartitions::Uniform { .. } + ) { + Some(Arc::clone(const_expr.expr())) + } else { + None + } + }) + .collect::>(); let normalized_constants = self.eq_group.normalize_exprs(const_exprs); let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); is_constant_recurse(&normalized_constants, &normalized_expr) } - /// Returns the constant value of the given expression if it is known to be constant. - /// - /// This function first normalizes the expression using the equivalence group, then: - /// - If the normalized expression is a literal, returns its value - /// - If the normalized expression matches a known constant expression, returns its value - /// - Otherwise returns None + /// Retrieves the constant value of a given physical expression, if it exists. /// - /// # Arguments + /// Normalizes the input expression and checks if it matches any known constants + /// in the current context. Returns whether the expression has a uniform value, + /// varies across partitions, or is not constant. /// - /// - `expr`: A reference to a `Arc` representing the expression - /// to check for a constant value. + /// # Parameters + /// - `expr`: A reference to the physical expression to evaluate. /// /// # Returns - /// - /// Returns `Some(ScalarValue)` if the expression is known to have a constant value, - /// `None` otherwise. + /// - `AcrossPartitions::Uniform(value)`: If the expression has the same value across partitions. + /// - `AcrossPartitions::Heterogeneous`: If the expression varies across partitions. + /// - `None`: If the expression is not recognized as constant. pub fn get_expr_constant_value( &self, expr: &Arc, - ) -> Option { + ) -> AcrossPartitions { let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); if let Some(lit) = normalized_expr.as_any().downcast_ref::() { - return Some(lit.value().clone()); + return AcrossPartitions::Uniform(Some(lit.value().clone())); } for const_expr in self.constants.iter() { if normalized_expr.eq(const_expr.expr()) { - return const_expr.value().cloned(); + return const_expr.across_partitions(); } } - None + + AcrossPartitions::Heterogeneous } /// Retrieves the properties for a given physical expression. @@ -1210,11 +1212,9 @@ impl EquivalenceProperties { .into_iter() .map(|const_expr| { let across_partitions = const_expr.across_partitions(); - let value = const_expr.value().cloned(); let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?; Ok(ConstExpr::new(new_const_expr) - .with_across_partitions(across_partitions) - .with_value(value)) + .with_across_partitions(across_partitions)) }) .collect::>>()?; @@ -1335,7 +1335,7 @@ fn update_properties( /// This function determines whether the provided expression is constant /// based on the known constants. /// -/// # Arguments +/// # Parameters /// /// - `constants`: A `&[Arc]` containing expressions known to /// be a constant. @@ -1990,13 +1990,15 @@ fn calculate_union_binary( let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); // If both sides have matching constant values, preserve the value and set across_partitions=true - if let (Some(lhs_val), Some(rhs_val)) = - (lhs_const.value(), rhs_const.value()) + if let ( + AcrossPartitions::Uniform(Some(lhs_val)), + AcrossPartitions::Uniform(Some(rhs_val)), + ) = (lhs_const.across_partitions(), rhs_const.across_partitions()) { if lhs_val == rhs_val { - const_expr = const_expr - .with_across_partitions(true) - .with_value(Some(lhs_val.clone())); + const_expr = const_expr.with_across_partitions( + AcrossPartitions::Uniform(Some(lhs_val)), + ) } } const_expr @@ -4205,14 +4207,14 @@ mod tests { let literal_10 = ScalarValue::Int32(Some(10)); // Create first input with a=10 - let const_expr1 = - ConstExpr::new(Arc::clone(&col_a)).with_value(Some(literal_10.clone())); + let const_expr1 = ConstExpr::new(Arc::clone(&col_a)) + .with_across_partitions(AcrossPartitions::Uniform(Some(literal_10.clone()))); let input1 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr1]); // Create second input with a=10 - let const_expr2 = - ConstExpr::new(Arc::clone(&col_a)).with_value(Some(literal_10.clone())); + let const_expr2 = ConstExpr::new(Arc::clone(&col_a)) + .with_across_partitions(AcrossPartitions::Uniform(Some(literal_10.clone()))); let input2 = EquivalenceProperties::new(Arc::clone(&schema)) .with_constants(vec![const_expr2]); @@ -4222,8 +4224,10 @@ mod tests { // Verify column 'a' remains constant with value 10 let const_a = &union_props.constants()[0]; assert!(const_a.expr().eq(&col_a)); - assert!(const_a.across_partitions()); - assert_eq!(const_a.value(), Some(&literal_10)); + assert_eq!( + const_a.across_partitions(), + AcrossPartitions::Uniform(Some(literal_10)) + ); Ok(()) } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 405b6bbd69f4..4c55f4ddba93 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,7 +45,9 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; +pub use equivalence::{ + calculate_union, AcrossPartitions, ConstExpr, EquivalenceProperties, +}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 77d7a0963952..8e7c14f0baed 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -45,7 +45,8 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, + ExprBoundaries, PhysicalExpr, }; use crate::execution_plan::CardinalityEffect; @@ -218,24 +219,22 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - let (expr, value) = ( + let (expr, across_parts) = ( binary.right(), input_eqs.get_expr_constant_value(binary.right()), ); res_constants.push( ConstExpr::new(Arc::clone(expr)) - .with_across_partitions(true) - .with_value(value), + .with_across_partitions(across_parts), ); } else if input_eqs.is_expr_constant(binary.right()) { - let (expr, value) = ( + let (expr, across_parts) = ( binary.left(), input_eqs.get_expr_constant_value(binary.left()), ); res_constants.push( ConstExpr::new(Arc::clone(expr)) - .with_across_partitions(true) - .with_value(value), + .with_across_partitions(across_parts), ); } } @@ -269,8 +268,7 @@ impl FilterExec { .get_value(); let expr = Arc::new(column) as _; ConstExpr::new(expr) - .with_across_partitions(true) - .with_value(value.cloned()) + .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) }); // This is for statistics eq_properties = eq_properties.with_constants(constants); From 6cccb57033bb5f19290578b5c5ddce717d525a40 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 25 Dec 2024 15:23:21 +0300 Subject: [PATCH 25/25] Update properties.rs --- datafusion/physical-expr/src/equivalence/properties.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 24e77c899ee1..78e9fd59f166 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -120,7 +120,7 @@ use itertools::Itertools; /// PhysicalSortExpr::new_default(col_c).desc(), /// ])); /// -/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1]") +/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1(heterogeneous)]") /// ``` #[derive(Debug, Clone)] pub struct EquivalenceProperties {