Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate builtin first/last aggregate function and use UDAF #10091

Closed
wants to merge 105 commits into from
Closed
Show file tree
Hide file tree
Changes from 102 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
0eaf289
backup
jayzhan211 Apr 3, 2024
5338f61
move PhysicalExpr
jayzhan211 Apr 3, 2024
450ae4b
cleanup
jayzhan211 Apr 3, 2024
3624964
move physical sort
jayzhan211 Apr 3, 2024
835f147
cleanup dependencies
jayzhan211 Apr 3, 2024
c5d80c8
add readme
jayzhan211 Apr 3, 2024
7851de7
disable doc test
jayzhan211 Apr 3, 2024
f5aafb3
move column
jayzhan211 Apr 4, 2024
7bfc074
fmt
jayzhan211 Apr 4, 2024
675d2fe
move aggregatexp
jayzhan211 Apr 4, 2024
5220087
move other two utils
jayzhan211 Apr 4, 2024
113a000
license
jayzhan211 Apr 4, 2024
fea87e3
switch to ignore
jayzhan211 Apr 4, 2024
06d87bc
move reverse order
jayzhan211 Apr 4, 2024
26e5782
rename to common
jayzhan211 Apr 4, 2024
26f852c
cleanup
jayzhan211 Apr 5, 2024
65bf4a1
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
2bc58c1
backup
jayzhan211 Apr 5, 2024
ae9db96
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
30d5576
move acc to first value
jayzhan211 Apr 5, 2024
672edc7
move builtin expr too
jayzhan211 Apr 5, 2024
109b790
use macro
jayzhan211 Apr 5, 2024
87d589f
fmt
jayzhan211 Apr 5, 2024
398e4e2
fix doc
jayzhan211 Apr 5, 2024
04c7f5e
add todo
jayzhan211 Apr 5, 2024
01a1ddf
rm comments
jayzhan211 Apr 5, 2024
4871414
rm unused
jayzhan211 Apr 5, 2024
1ef212b
rm unused code
jayzhan211 Apr 5, 2024
b6d53a5
change to private
jayzhan211 Apr 5, 2024
9aa15a2
fix lock
jayzhan211 Apr 5, 2024
e90464b
cleanup
jayzhan211 Apr 5, 2024
ece925f
cleanup
jayzhan211 Apr 5, 2024
89ccc89
support roundtrip
jayzhan211 Apr 5, 2024
41a830a
remmove old format state
jayzhan211 Apr 6, 2024
ec3baa7
replace with last-udf
jayzhan211 Apr 6, 2024
6801bc2
introduce expr with arguments
jayzhan211 Apr 6, 2024
224bce0
remove proto
jayzhan211 Apr 6, 2024
7a38494
fix signature
jayzhan211 Apr 6, 2024
8de10c7
todo move AggregateFunctionExpr
jayzhan211 Apr 6, 2024
d235d2a
move aggregate related things to aggr crate
jayzhan211 Apr 6, 2024
dc886e4
Merge branch 'move-agg-crate-2' into last-udf
jayzhan211 Apr 6, 2024
8caf8b8
backup
jayzhan211 Apr 6, 2024
51cd272
move back to common
jayzhan211 Apr 6, 2024
38b2ce7
taplo
jayzhan211 Apr 6, 2024
7205bdc
Merge branch 'move-agg-crate-2' into last-udf
jayzhan211 Apr 6, 2024
35fcc26
backup
jayzhan211 Apr 6, 2024
ecbfe88
backup
jayzhan211 Apr 7, 2024
1b70b34
Merge remote-tracking branch 'upstream/main' into last-udf
jayzhan211 Apr 7, 2024
e506662
move out the ordering ruel
jayzhan211 Apr 10, 2024
2812886
introduce rule
jayzhan211 Apr 10, 2024
d20dde3
revert test result
jayzhan211 Apr 10, 2024
c671033
pass mulit order test
jayzhan211 Apr 10, 2024
3f43dc1
cleanup
jayzhan211 Apr 10, 2024
c8b46a4
with new childes
jayzhan211 Apr 10, 2024
fcb297b
revert slt
jayzhan211 Apr 10, 2024
ab10168
revert back
jayzhan211 Apr 10, 2024
fffb54a
rm rewrite in new child
jayzhan211 Apr 10, 2024
fec3eaf
backup
jayzhan211 Apr 10, 2024
5b68b48
only move conversion to optimizer
jayzhan211 Apr 10, 2024
2ab74c9
find test that do reverse
jayzhan211 Apr 11, 2024
03abf19
add test for first and last
jayzhan211 Apr 11, 2024
83c721a
pass all test
jayzhan211 Apr 11, 2024
34b8f01
upd test
jayzhan211 Apr 11, 2024
b17e8f7
upd test
jayzhan211 Apr 12, 2024
a313af0
cleanup
jayzhan211 Apr 12, 2024
a37fb3e
cleanup
jayzhan211 Apr 12, 2024
8de501c
cleanup
jayzhan211 Apr 12, 2024
107f4c8
add aggregate test
jayzhan211 Apr 12, 2024
345d4c6
cleanup
jayzhan211 Apr 12, 2024
030d7b9
final draft
jayzhan211 Apr 12, 2024
cf3f33e
cleanup again
jayzhan211 Apr 12, 2024
47699fe
backup
jayzhan211 Apr 12, 2024
9d59703
Merge remote-tracking branch 'origin/aggr-rule' into last-udf
jayzhan211 Apr 12, 2024
6d9ee9f
pull out finer ordering code and reuse
jayzhan211 Apr 13, 2024
abd40c6
clippy
jayzhan211 Apr 13, 2024
c1a7f60
Merge remote-tracking branch 'origin/aggr-rule' into last-udf
jayzhan211 Apr 13, 2024
ca99822
Merge remote-tracking branch 'upstream/main' into last-udf
jayzhan211 Apr 13, 2024
bf9c05b
fix convert firstlast
jayzhan211 Apr 13, 2024
a3a773d
pass all test
jayzhan211 Apr 13, 2024
3e2c186
remove finer in optimize rule
jayzhan211 Apr 13, 2024
d7c2590
add comments and clenaup
jayzhan211 Apr 13, 2024
fed356a
rename fun
jayzhan211 Apr 13, 2024
0d3d461
rename fun
jayzhan211 Apr 13, 2024
d3daa4e
fmt
jayzhan211 Apr 13, 2024
dcb3e90
avoid unnecessary recursion and rename
jayzhan211 Apr 13, 2024
2951fbc
remove unnecessary rule
jayzhan211 Apr 15, 2024
af45a22
Merge remote-tracking branch 'upstream/main' into aggr-rule
jayzhan211 Apr 15, 2024
35b84ae
Merge remote-tracking branch 'upstream/main' into aggr-rule
jayzhan211 Apr 15, 2024
fffcd3b
fix merge
jayzhan211 Apr 15, 2024
1b12274
Merge remote-tracking branch 'upstream/main' into last-udf-3
jayzhan211 Apr 15, 2024
2aae217
Merge branch 'aggr-rule' into last-udf-3
jayzhan211 Apr 15, 2024
55f6011
cleanup
jayzhan211 Apr 15, 2024
38f5fa5
remove builtin first and last
jayzhan211 Apr 15, 2024
3eb1cf6
remove expr
jayzhan211 Apr 15, 2024
6b07330
rm test1
jayzhan211 Apr 15, 2024
16120e6
cleanup
jayzhan211 Apr 15, 2024
acba534
Merge remote-tracking branch 'upstream/main' into last-udf-3
jayzhan211 Apr 15, 2024
e36b3a3
cleanup
jayzhan211 Apr 15, 2024
c98b6c6
Merge remote-tracking branch 'upstream/main' into last-udf-3
jayzhan211 Apr 15, 2024
51aa6f9
fix func name
jayzhan211 Apr 15, 2024
8edad9e
pull out reverse sort exprs
jayzhan211 Apr 16, 2024
431df45
support all args for first/last expr_fn
jayzhan211 Apr 20, 2024
dae4a28
rm once-cell
jayzhan211 Apr 20, 2024
38b7c96
Merge remote-tracking branch 'upstream/main' into last-udf-3
jayzhan211 May 1, 2024
c4a62f5
fix name
jayzhan211 May 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
Expand Down
83 changes: 65 additions & 18 deletions datafusion/core/src/physical_optimizer/convert_first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use datafusion_common::{
config::ConfigOptions,
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_physical_expr::expressions::{FirstValue, LastValue};
use datafusion_physical_expr::{
equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr,
EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_expr::Expr;
use datafusion_functions_aggregate::first_last::{FirstValue, LastValue};
use datafusion_physical_expr::{equivalence::ProjectionMapping, EquivalenceProperties};
use datafusion_physical_expr_common::aggregate::{AggregateExpr, AggregateFunctionExpr};
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;
use datafusion_physical_expr_common::utils::reverse_order_bys;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode},
Expand Down Expand Up @@ -199,15 +200,15 @@ fn try_convert_first_last_if_better(
eq_properties: &EquivalenceProperties,
) -> Result<()> {
for aggr_expr in aggr_exprs.iter_mut() {
let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
let reverse_aggr_req = reverse_order_bys(aggr_req);
let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
let aggr_fun_expr = aggr_expr.as_any().downcast_ref::<AggregateFunctionExpr>();
let aggr_sort_expr = aggr_expr.order_bys().unwrap_or(&[]);
let reversed_ordering = reverse_order_bys(aggr_sort_expr);
let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_sort_expr);
let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);

if let Some(first_value) = aggr_expr.as_any().downcast_ref::<FirstValue>() {
let mut first_value = first_value.clone();
PhysicalSortRequirement::from_sort_exprs(&reversed_ordering);

if aggr_fun_expr.is_some_and(|e| e.fun().name() == "FIRST_VALUE") {
let mut first_value = aggr_fun_expr.unwrap().clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where derive(Clone) is needed

if eq_properties.ordering_satisfy_requirement(&concat_slices(
prefix_requirement,
&aggr_req,
Expand All @@ -218,10 +219,26 @@ fn try_convert_first_last_if_better(
prefix_requirement,
&reverse_aggr_req,
)) {
let name = first_value.name();

let name = if let Some(n) = name.strip_prefix("FIRST") {
format!("LAST{}", n)
} else {
let expr = first_value.expressions().swap_remove(0);
format!("LAST_VALUE({})", expr)
};

let reversed_sort_exprs = reverse_sort_exprs(&first_value);

// Converting to LAST_VALUE enables more efficient execution
// given the existing ordering:
let mut last_value = first_value.convert_to_last();
last_value = last_value.with_requirement_satisfied(true);
let last_value = first_value
.with_name(name)
.with_fun(LastValue::new().into())
.with_sort_exprs(reversed_sort_exprs)
.with_ordering_req(reversed_ordering)
.with_requirement_satisfied(true);

*aggr_expr = Arc::new(last_value) as _;
} else {
// Requirement is not satisfied with existing ordering.
Expand All @@ -230,8 +247,8 @@ fn try_convert_first_last_if_better(
}
continue;
}
if let Some(last_value) = aggr_expr.as_any().downcast_ref::<LastValue>() {
let mut last_value = last_value.clone();
if aggr_fun_expr.is_some_and(|e| e.fun().name() == "LAST_VALUE") {
let mut last_value = aggr_fun_expr.unwrap().clone();
if eq_properties.ordering_satisfy_requirement(&concat_slices(
prefix_requirement,
&aggr_req,
Expand All @@ -244,8 +261,25 @@ fn try_convert_first_last_if_better(
)) {
// Converting to FIRST_VALUE enables more efficient execution
// given the existing ordering:
let mut first_value = last_value.convert_to_first();
first_value = first_value.with_requirement_satisfied(true);

let name = last_value.name();

let name = if let Some(n) = name.strip_prefix("LAST") {
format!("FIRST{}", n)
} else {
let expr = last_value.expressions().swap_remove(0);
format!("FIRST_VALUE({})", expr)
};

let reversed_sort_exprs = reverse_sort_exprs(&last_value);

let first_value = last_value
.with_name(name)
.with_fun(FirstValue::new().into())
.with_sort_exprs(reversed_sort_exprs)
.with_ordering_req(reversed_ordering)
.with_requirement_satisfied(true);

*aggr_expr = Arc::new(first_value) as _;
} else {
// Requirement is not satisfied with existing ordering.
Expand All @@ -258,3 +292,16 @@ fn try_convert_first_last_if_better(

Ok(())
}

fn reverse_sort_exprs(e: &AggregateFunctionExpr) -> Vec<Expr> {
e.sort_exprs()
.iter()
.map(|e| {
if let Expr::Sort(s) = e {
Expr::Sort(s.reverse())
} else {
e.clone()
}
})
.collect::<Vec<_>>()
}
1 change: 1 addition & 0 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use datafusion_expr::{
Expr,
};
pub use datafusion_functions::expr_fn::*;
pub use datafusion_functions_aggregate::expr_fn::*;
#[cfg(feature = "array_expressions")]
pub use datafusion_functions_array::expr_fn::*;

Expand Down
16 changes: 2 additions & 14 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ pub enum AggregateFunction {
ApproxDistinct,
/// Aggregation into an array
ArrayAgg,
/// First value in a group according to some ordering
FirstValue,
/// Last value in a group according to some ordering
LastValue,
/// N'th value in a group according to some ordering
NthValue,
/// Variance (Sample)
Expand Down Expand Up @@ -121,8 +117,6 @@ impl AggregateFunction {
Median => "MEDIAN",
ApproxDistinct => "APPROX_DISTINCT",
ArrayAgg => "ARRAY_AGG",
FirstValue => "FIRST_VALUE",
LastValue => "LAST_VALUE",
NthValue => "NTH_VALUE",
Variance => "VAR",
VariancePop => "VAR_POP",
Expand Down Expand Up @@ -178,8 +172,6 @@ impl FromStr for AggregateFunction {
"min" => AggregateFunction::Min,
"sum" => AggregateFunction::Sum,
"array_agg" => AggregateFunction::ArrayAgg,
"first_value" => AggregateFunction::FirstValue,
"last_value" => AggregateFunction::LastValue,
"nth_value" => AggregateFunction::NthValue,
"string_agg" => AggregateFunction::StringAgg,
// statistical
Expand Down Expand Up @@ -294,9 +286,7 @@ impl AggregateFunction {
Ok(coerced_data_types[0].clone())
}
AggregateFunction::Grouping => Ok(DataType::Int32),
AggregateFunction::FirstValue
| AggregateFunction::LastValue
| AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
AggregateFunction::StringAgg => Ok(DataType::LargeUtf8),
}
}
Expand Down Expand Up @@ -351,9 +341,7 @@ impl AggregateFunction {
| AggregateFunction::Stddev
| AggregateFunction::StddevPop
| AggregateFunction::Median
| AggregateFunction::ApproxMedian
| AggregateFunction::FirstValue
| AggregateFunction::LastValue => {
| AggregateFunction::ApproxMedian => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
AggregateFunction::NthValue => Signature::any(2, Volatility::Immutable),
Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,15 @@ impl Sort {
nulls_first,
}
}

/// Create a new Sort expression with the opposite sort direction
pub fn reverse(&self) -> Self {
Self {
expr: self.expr.clone(),
asc: !self.asc,
nulls_first: self.nulls_first,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ pub use logical_plan::*;
pub use operator::Operator;
pub use partition_evaluator::PartitionEvaluator;
pub use signature::{
FuncMonotonicity, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD,
ArrayFunctionSignature, FuncMonotonicity, Signature, TypeSignature, Volatility,
TIMEZONE_WILDCARD,
};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl};
Expand Down
28 changes: 12 additions & 16 deletions datafusion/expr/src/type_coercion/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn coerce_types(
) -> Result<Vec<DataType>> {
use DataType::*;
// Validate input_types matches (at least one of) the func signature.
check_arg_count(agg_fun, input_types, &signature.type_signature)?;
check_arg_count(agg_fun.name(), input_types, &signature.type_signature)?;

match agg_fun {
AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
Expand Down Expand Up @@ -293,10 +293,9 @@ pub fn coerce_types(
}
Ok(input_types.to_vec())
}
AggregateFunction::Median
| AggregateFunction::FirstValue
| AggregateFunction::LastValue => Ok(input_types.to_vec()),
AggregateFunction::NthValue => Ok(input_types.to_vec()),
AggregateFunction::Median | AggregateFunction::NthValue => {
Ok(input_types.to_vec())
}
AggregateFunction::Grouping => Ok(vec![input_types[0].clone()]),
AggregateFunction::StringAgg => {
if !is_string_agg_supported_arg_type(&input_types[0]) {
Expand All @@ -323,17 +322,16 @@ pub fn coerce_types(
/// This method DOES NOT validate the argument types - only that (at least one,
/// in the case of [`TypeSignature::OneOf`]) signature matches the desired
/// number of input types.
fn check_arg_count(
agg_fun: &AggregateFunction,
pub fn check_arg_count(
func_name: &str,
input_types: &[DataType],
signature: &TypeSignature,
) -> Result<()> {
match signature {
TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
if input_types.len() != *agg_count {
return plan_err!(
"The function {:?} expects {:?} arguments, but {:?} were provided",
agg_fun,
"The function {func_name} expects {:?} arguments, but {:?} were provided",
agg_count,
input_types.len()
);
Expand All @@ -342,8 +340,7 @@ fn check_arg_count(
TypeSignature::Exact(types) => {
if types.len() != input_types.len() {
return plan_err!(
"The function {:?} expects {:?} arguments, but {:?} were provided",
agg_fun,
"The function {func_name} expects {:?} arguments, but {:?} were provided",
types.len(),
input_types.len()
);
Expand All @@ -352,19 +349,18 @@ fn check_arg_count(
TypeSignature::OneOf(variants) => {
let ok = variants
.iter()
.any(|v| check_arg_count(agg_fun, input_types, v).is_ok());
.any(|v| check_arg_count(func_name, input_types, v).is_ok());
if !ok {
return plan_err!(
"The function {:?} does not accept {:?} function arguments.",
agg_fun,
"The function {func_name} does not accept {:?} function arguments.",
input_types.len()
);
}
}
TypeSignature::VariadicAny => {
if input_types.is_empty() {
return plan_err!(
"The function {agg_fun:?} expects at least one argument"
"The function {func_name} expects at least one argument"
);
}
}
Expand Down Expand Up @@ -594,7 +590,7 @@ mod tests {
let input_types = vec![DataType::Int64, DataType::Int32];
let signature = fun.signature();
let result = coerce_types(&fun, &input_types, &signature);
assert_eq!("Error during planning: The function Min expects 1 arguments, but 2 were provided", result.unwrap_err().strip_backtrace());
assert_eq!("Error during planning: The function MIN expects 1 arguments, but 2 were provided", result.unwrap_err().strip_backtrace());

// test input args is invalid data type for sum or avg
let fun = AggregateFunction::Sum;
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }
paste = "1.0.14"
sqlparser = { workspace = true }
Loading
Loading