diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 03b3c7761ac6..9e535a94eb6e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -24,7 +24,7 @@ use std::fmt::Display; use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::JoinType; +use datafusion_common::{JoinType, ScalarValue}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use indexmap::{IndexMap, IndexSet}; @@ -55,13 +55,45 @@ use indexmap::{IndexMap, IndexSet}; /// // create a constant expression from a physical expression /// let const_expr = ConstExpr::from(col); /// ``` +// TODO: Consider refactoring the `across_partitions` and `value` fields into an enum: +// +// ``` +// enum PartitionValues { +// Uniform(Option), // Same value across all partitions +// Heterogeneous(Vec>) // Different values per partition +// } +// ``` +// +// This would provide more flexible representation of partition values. +// Note: This is a breaking change for the equivalence API and should be +// addressed in a separate issue/PR. #[derive(Debug, Clone)] pub struct ConstExpr { /// The expression that is known to be constant (e.g. a `Column`) expr: Arc, /// Does the constant have the same value across all partitions? See /// struct docs for more details - across_partitions: bool, + across_partitions: AcrossPartitions, +} + +#[derive(PartialEq, Clone, Debug)] +/// Represents whether a constant expression's value is uniform or varies across partitions. +/// +/// The `AcrossPartitions` enum is used to describe the nature of a constant expression +/// in a physical execution plan: +/// +/// - `Heterogeneous`: The constant expression may have different values for different partitions. +/// - `Uniform(Option)`: The constant expression has the same value across all partitions, +/// or is `None` if the value is not specified. +pub enum AcrossPartitions { + Heterogeneous, + Uniform(Option), +} + +impl Default for AcrossPartitions { + fn default() -> Self { + Self::Heterogeneous + } } impl PartialEq for ConstExpr { @@ -79,14 +111,14 @@ impl ConstExpr { Self { expr, // By default, assume constant expressions are not same across partitions. - across_partitions: false, + across_partitions: Default::default(), } } /// Set the `across_partitions` flag /// /// See struct docs for more details - pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { + pub fn with_across_partitions(mut self, across_partitions: AcrossPartitions) -> Self { self.across_partitions = across_partitions; self } @@ -94,8 +126,8 @@ impl ConstExpr { /// Is the expression the same across all partitions? /// /// See struct docs for more details - pub fn across_partitions(&self) -> bool { - self.across_partitions + pub fn across_partitions(&self) -> AcrossPartitions { + self.across_partitions.clone() } pub fn expr(&self) -> &Arc { @@ -113,7 +145,7 @@ impl ConstExpr { let maybe_expr = f(&self.expr); maybe_expr.map(|expr| Self { expr, - across_partitions: self.across_partitions, + across_partitions: self.across_partitions.clone(), }) } @@ -143,14 +175,20 @@ impl ConstExpr { } } -/// Display implementation for `ConstExpr` -/// -/// Example `c` or `c(across_partitions)` impl Display for ConstExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.expr)?; - if self.across_partitions { - write!(f, "(across_partitions)")?; + match &self.across_partitions { + AcrossPartitions::Heterogeneous => { + write!(f, "(heterogeneous)")?; + } + AcrossPartitions::Uniform(value) => { + if let Some(val) = value { + write!(f, "(uniform: {})", val)?; + } else { + write!(f, "(uniform: unknown)")?; + } + } } Ok(()) } diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 902e53a7f236..93d42adbe5de 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -27,7 +27,7 @@ mod ordering; mod projection; mod properties; -pub use class::{ConstExpr, EquivalenceClass, EquivalenceGroup}; +pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, EquivalenceGroup}; pub use ordering::OrderingEquivalenceClass; pub use projection::ProjectionMapping; pub use properties::{ diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 06f85b657e09..24e2fc7dbaf5 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -262,7 +262,7 @@ mod tests { }; use crate::expressions::{col, BinaryExpr, Column}; use crate::utils::tests::TestScalarUDF; - use crate::{ConstExpr, PhysicalExpr, PhysicalSortExpr}; + use crate::{AcrossPartitions, ConstExpr, PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; @@ -583,9 +583,10 @@ mod tests { let eq_group = EquivalenceGroup::new(eq_group); eq_properties.add_equivalence_group(eq_group); - let constants = constants - .into_iter() - .map(|expr| ConstExpr::from(expr).with_across_partitions(true)); + let constants = constants.into_iter().map(|expr| { + ConstExpr::from(expr) + .with_across_partitions(AcrossPartitions::Uniform(None)) + }); eq_properties = eq_properties.with_constants(constants); let reqs = convert_to_sort_exprs(&reqs); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index f019b2e570ff..78e9fd59f166 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::{fmt, mem}; use super::ordering::collapse_lex_ordering; -use crate::equivalence::class::const_exprs_contains; +use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, @@ -120,7 +120,7 @@ use itertools::Itertools; /// PhysicalSortExpr::new_default(col_c).desc(), /// ])); /// -/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1]") +/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1(heterogeneous)]") /// ``` #[derive(Debug, Clone)] pub struct EquivalenceProperties { @@ -217,7 +217,9 @@ impl EquivalenceProperties { /// Removes constant expressions that may change across partitions. /// This method should be used when data from different partitions are merged. pub fn clear_per_partition_constants(&mut self) { - self.constants.retain(|item| item.across_partitions()); + self.constants.retain(|item| { + matches!(item.across_partitions(), AcrossPartitions::Uniform(_)) + }) } /// Extends this `EquivalenceProperties` by adding the orderings inside the @@ -257,14 +259,16 @@ impl EquivalenceProperties { if self.is_expr_constant(left) { // Left expression is constant, add right as constant if !const_exprs_contains(&self.constants, right) { - self.constants - .push(ConstExpr::from(right).with_across_partitions(true)); + let const_expr = ConstExpr::from(right) + .with_across_partitions(self.get_expr_constant_value(left)); + self.constants.push(const_expr); } } else if self.is_expr_constant(right) { // Right expression is constant, add left as constant if !const_exprs_contains(&self.constants, left) { - self.constants - .push(ConstExpr::from(left).with_across_partitions(true)); + let const_expr = ConstExpr::from(left) + .with_across_partitions(self.get_expr_constant_value(right)); + self.constants.push(const_expr); } } @@ -293,30 +297,28 @@ impl EquivalenceProperties { mut self, constants: impl IntoIterator, ) -> Self { - let (const_exprs, across_partition_flags): ( - Vec>, - Vec, - ) = constants + let normalized_constants = constants .into_iter() - .map(|const_expr| { - let across_partitions = const_expr.across_partitions(); - let expr = const_expr.owned_expr(); - (expr, across_partitions) + .filter_map(|c| { + let across_partitions = c.across_partitions(); + let expr = c.owned_expr(); + let normalized_expr = self.eq_group.normalize_expr(expr); + + if const_exprs_contains(&self.constants, &normalized_expr) { + return None; + } + + let const_expr = ConstExpr::from(normalized_expr) + .with_across_partitions(across_partitions); + + Some(const_expr) }) - .unzip(); - for (expr, across_partitions) in self - .eq_group - .normalize_exprs(const_exprs) - .into_iter() - .zip(across_partition_flags) - { - if !const_exprs_contains(&self.constants, &expr) { - let const_expr = - ConstExpr::from(expr).with_across_partitions(across_partitions); - self.constants.push(const_expr); - } - } + .collect::>(); + + // Add all new normalized constants + self.constants.extend(normalized_constants); + // Discover any new orderings based on the constants for ordering in self.normalized_oeq_class().iter() { if let Err(e) = self.discover_new_orderings(&ordering[0].expr) { log::debug!("error discovering new orderings: {e}"); @@ -551,7 +553,7 @@ impl EquivalenceProperties { /// is satisfied based on the orderings within, equivalence classes, and /// constant expressions. /// - /// # Arguments + /// # Parameters /// /// - `req`: A reference to a `PhysicalSortRequirement` for which the ordering /// satisfaction check will be done. @@ -919,7 +921,7 @@ impl EquivalenceProperties { /// constants based on the existing constants and the mapping. It ensures /// that constants are appropriately propagated through the projection. /// - /// # Arguments + /// # Parameters /// /// - `mapping`: A reference to a `ProjectionMapping` representing the /// mapping of source expressions to target expressions in the projection. @@ -935,19 +937,31 @@ impl EquivalenceProperties { .constants .iter() .flat_map(|const_expr| { - const_expr.map(|expr| self.eq_group.project_expr(mapping, expr)) + const_expr + .map(|expr| self.eq_group.project_expr(mapping, expr)) + .map(|projected_expr| { + projected_expr + .with_across_partitions(const_expr.across_partitions()) + }) }) .collect::>(); + // Add projection expressions that are known to be constant: for (source, target) in mapping.iter() { if self.is_expr_constant(source) && !const_exprs_contains(&projected_constants, target) { - let across_partitions = self.is_expr_constant_accross_partitions(source); - // Expression evaluates to single value - projected_constants.push( - ConstExpr::from(target).with_across_partitions(across_partitions), - ); + if self.is_expr_constant_accross_partitions(source) { + projected_constants.push( + ConstExpr::from(target) + .with_across_partitions(self.get_expr_constant_value(source)), + ) + } else { + projected_constants.push( + ConstExpr::from(target) + .with_across_partitions(AcrossPartitions::Heterogeneous), + ) + } } } projected_constants @@ -1054,7 +1068,7 @@ impl EquivalenceProperties { /// This function determines whether the provided expression is constant /// based on the known constants. /// - /// # Arguments + /// # Parameters /// /// - `expr`: A reference to a `Arc` representing the /// expression to be checked. @@ -1079,7 +1093,7 @@ impl EquivalenceProperties { /// This function determines whether the provided expression is constant /// across partitions based on the known constants. /// - /// # Arguments + /// # Parameters /// /// - `expr`: A reference to a `Arc` representing the /// expression to be checked. @@ -1095,18 +1109,57 @@ impl EquivalenceProperties { // As an example, assume that we know columns `a` and `b` are constant. // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will // return `false`. - let const_exprs = self.constants.iter().flat_map(|const_expr| { - if const_expr.across_partitions() { - Some(Arc::clone(const_expr.expr())) - } else { - None - } - }); + let const_exprs = self + .constants + .iter() + .filter_map(|const_expr| { + if matches!( + const_expr.across_partitions(), + AcrossPartitions::Uniform { .. } + ) { + Some(Arc::clone(const_expr.expr())) + } else { + None + } + }) + .collect::>(); let normalized_constants = self.eq_group.normalize_exprs(const_exprs); let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); is_constant_recurse(&normalized_constants, &normalized_expr) } + /// Retrieves the constant value of a given physical expression, if it exists. + /// + /// Normalizes the input expression and checks if it matches any known constants + /// in the current context. Returns whether the expression has a uniform value, + /// varies across partitions, or is not constant. + /// + /// # Parameters + /// - `expr`: A reference to the physical expression to evaluate. + /// + /// # Returns + /// - `AcrossPartitions::Uniform(value)`: If the expression has the same value across partitions. + /// - `AcrossPartitions::Heterogeneous`: If the expression varies across partitions. + /// - `None`: If the expression is not recognized as constant. + pub fn get_expr_constant_value( + &self, + expr: &Arc, + ) -> AcrossPartitions { + let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); + + if let Some(lit) = normalized_expr.as_any().downcast_ref::() { + return AcrossPartitions::Uniform(Some(lit.value().clone())); + } + + for const_expr in self.constants.iter() { + if normalized_expr.eq(const_expr.expr()) { + return const_expr.across_partitions(); + } + } + + AcrossPartitions::Heterogeneous + } + /// Retrieves the properties for a given physical expression. /// /// This function constructs an [`ExprProperties`] object for the given @@ -1282,7 +1335,7 @@ fn update_properties( /// This function determines whether the provided expression is constant /// based on the known constants. /// -/// # Arguments +/// # Parameters /// /// - `constants`: A `&[Arc]` containing expressions known to /// be a constant. @@ -1915,7 +1968,7 @@ impl Hash for ExprWrapper { /// *all* output partitions, that is the same as being true for all *input* /// partitions fn calculate_union_binary( - mut lhs: EquivalenceProperties, + lhs: EquivalenceProperties, mut rhs: EquivalenceProperties, ) -> Result { // Harmonize the schema of the rhs with the schema of the lhs (which is the accumulator schema): @@ -1924,26 +1977,34 @@ fn calculate_union_binary( } // First, calculate valid constants for the union. An expression is constant - // at the output of the union if it is constant in both sides. - let constants: Vec<_> = lhs + // at the output of the union if it is constant in both sides with matching values. + let constants = lhs .constants() .iter() - .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) - .map(|const_expr| { - // TODO: When both sides have a constant column, and the actual - // constant value is the same, then the output properties could - // reflect the constant is valid across all partitions. However we - // don't track the actual value that the ConstExpr takes on, so we - // can't determine that yet - ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) + .filter_map(|lhs_const| { + // Find matching constant expression in RHS + rhs.constants() + .iter() + .find(|rhs_const| rhs_const.expr().eq(lhs_const.expr())) + .map(|rhs_const| { + let mut const_expr = ConstExpr::new(Arc::clone(lhs_const.expr())); + + // If both sides have matching constant values, preserve the value and set across_partitions=true + if let ( + AcrossPartitions::Uniform(Some(lhs_val)), + AcrossPartitions::Uniform(Some(rhs_val)), + ) = (lhs_const.across_partitions(), rhs_const.across_partitions()) + { + if lhs_val == rhs_val { + const_expr = const_expr.with_across_partitions( + AcrossPartitions::Uniform(Some(lhs_val)), + ) + } + } + const_expr + }) }) - .collect(); - - // remove any constants that are shared in both outputs (avoid double counting them) - for c in &constants { - lhs = lhs.remove_constant(c); - rhs = rhs.remove_constant(c); - } + .collect::>(); // Next, calculate valid orderings for the union by searching for prefixes // in both sides. @@ -2210,6 +2271,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, TimeUnit}; + use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_functions::string::concat; @@ -4133,4 +4195,40 @@ mod tests { Ok(()) } + + #[test] + fn test_union_constant_value_preservation() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let col_a = col("a", &schema)?; + let literal_10 = ScalarValue::Int32(Some(10)); + + // Create first input with a=10 + let const_expr1 = ConstExpr::new(Arc::clone(&col_a)) + .with_across_partitions(AcrossPartitions::Uniform(Some(literal_10.clone()))); + let input1 = EquivalenceProperties::new(Arc::clone(&schema)) + .with_constants(vec![const_expr1]); + + // Create second input with a=10 + let const_expr2 = ConstExpr::new(Arc::clone(&col_a)) + .with_across_partitions(AcrossPartitions::Uniform(Some(literal_10.clone()))); + let input2 = EquivalenceProperties::new(Arc::clone(&schema)) + .with_constants(vec![const_expr2]); + + // Calculate union properties + let union_props = calculate_union(vec![input1, input2], schema)?; + + // Verify column 'a' remains constant with value 10 + let const_a = &union_props.constants()[0]; + assert!(const_a.expr().eq(&col_a)); + assert_eq!( + const_a.across_partitions(), + AcrossPartitions::Uniform(Some(literal_10)) + ); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 405b6bbd69f4..4c55f4ddba93 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,7 +45,9 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; +pub use equivalence::{ + calculate_union, AcrossPartitions, ConstExpr, EquivalenceProperties, +}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 901907cf38fa..8e7c14f0baed 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -45,7 +45,8 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, + ExprBoundaries, PhysicalExpr, }; use crate::execution_plan::CardinalityEffect; @@ -218,13 +219,23 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { + let (expr, across_parts) = ( + binary.right(), + input_eqs.get_expr_constant_value(binary.right()), + ); res_constants.push( - ConstExpr::from(binary.right()).with_across_partitions(true), - ) + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(across_parts), + ); } else if input_eqs.is_expr_constant(binary.right()) { + let (expr, across_parts) = ( + binary.left(), + input_eqs.get_expr_constant_value(binary.left()), + ); res_constants.push( - ConstExpr::from(binary.left()).with_across_partitions(true), - ) + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(across_parts), + ); } } } @@ -252,8 +263,12 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { + let value = stats.column_statistics[column.index()] + .min_value + .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(true) + ConstExpr::new(expr) + .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) }); // This is for statistics eq_properties = eq_properties.with_constants(constants); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 94c629707379..cd62e5625342 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -259,7 +259,7 @@ logical_plan 15)------------EmptyRelation physical_plan 01)ProjectionExec: expr=[array_length(array_agg(DISTINCT a.foo)@1) as array_length(array_agg(DISTINCT a.foo)), sum(DISTINCT Int64(1))@2 as sum(DISTINCT Int64(1))] -02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted 03)----CoalesceBatchesExec: target_batch_size=8192 04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d94780744db9..7b8992b966ad 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -777,3 +777,62 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a ---- [[-1]] [[1]] + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +statement ok +set datafusion.execution.batch_size = 2; + +# Constant value tracking across union +query TT +explain +SELECT * FROM( +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +) +UNION ALL +( + SELECT * FROM aggregate_test_100 WHERE c1='a' +)) +ORDER BY c1 +---- +logical_plan +01)Sort: aggregate_test_100.c1 ASC NULLS LAST +02)--Union +03)----Filter: aggregate_test_100.c1 = Utf8("a") +04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] +05)----Filter: aggregate_test_100.c1 = Utf8("a") +06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] +physical_plan +01)CoalescePartitionsExec +02)--UnionExec +03)----CoalesceBatchesExec: target_batch_size=2 +04)------FilterExec: c1@0 = a +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true +07)----CoalesceBatchesExec: target_batch_size=2 +08)------FilterExec: c1@0 = a +09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true + +# Clean up after the test +statement ok +drop table aggregate_test_100;