Skip to content

Commit

Permalink
ExprBuilder for Physical Aggregate Expr (apache#11617)
Browse files Browse the repository at this point in the history
* aggregate expr builder

Signed-off-by: jayzhan211 <[email protected]>

* replace parts of test

Signed-off-by: jayzhan211 <[email protected]>

* continue

Signed-off-by: jayzhan211 <[email protected]>

* cleanup all

Signed-off-by: jayzhan211 <[email protected]>

* clipp

Signed-off-by: jayzhan211 <[email protected]>

* add sort

Signed-off-by: jayzhan211 <[email protected]>

* rm field

Signed-off-by: jayzhan211 <[email protected]>

* address comment

Signed-off-by: jayzhan211 <[email protected]>

* fix import path

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Jul 24, 2024
1 parent e90b3ac commit 1356934
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 367 deletions.
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,11 @@ pub mod optimizer {
pub use datafusion_optimizer::*;
}

/// re-export of [`datafusion_physical_expr`] crate
pub mod physical_expr_common {
pub use datafusion_physical_expr_common::*;
}

/// re-export of [`datafusion_physical_expr`] crate
pub mod physical_expr {
pub use datafusion_physical_expr::*;
Expand Down
20 changes: 6 additions & 14 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub(crate) mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::cast;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
Expand Down Expand Up @@ -419,19 +419,11 @@ pub(crate) mod tests {

// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[self.column()],
&[],
&[],
&[],
schema,
self.column_name(),
false,
false,
false,
)
.unwrap()
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
.schema(Arc::new(schema.clone()))
.name(self.column_name())
.build()
.unwrap()
}

/// what argument would this aggregate need in the plan?
Expand Down
41 changes: 14 additions & 27 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::udaf::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
Expand Down Expand Up @@ -278,19 +278,11 @@ mod tests {
name: &str,
schema: &Schema,
) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[expr],
&[],
&[],
&[],
schema,
name,
false,
false,
false,
)
.unwrap()
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.name(name)
.build()
.unwrap()
}

#[test]
Expand Down Expand Up @@ -368,19 +360,14 @@ mod tests {
#[test]
fn aggregations_with_group_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("b", &schema)?],
&[],
&[],
&[],
&schema,
"Sum(b)",
false,
false,
false,
)?];
let aggr_expr =
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
.schema(Arc::clone(&schema))
.name("Sum(b)")
.build()
.unwrap(),
];
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("c", &schema)?, "c".to_string())];

Expand Down
23 changes: 9 additions & 14 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::udaf::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

Expand Down Expand Up @@ -103,19 +103,14 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.with_sort_information(vec![sort_keys]),
);

let aggregate_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("d", &schema).unwrap()],
&[],
&[],
&[],
&schema,
"sum1",
false,
false,
false,
)
.unwrap()];
let aggregate_expr =
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()])
.schema(Arc::clone(&schema))
.name("sum1")
.build()
.unwrap(),
];
let expr = group_by_columns
.iter()
.map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
Expand Down
Loading

0 comments on commit 1356934

Please sign in to comment.