Skip to content

Commit

Permalink
fix: panic and incorrect results in LogFunc::output_ordering() (#11571
Browse files Browse the repository at this point in the history
)

* fix: panic and incorrect results in `LogFunc::output_ordering()`

* fix for nulls_first
  • Loading branch information
jonahgao authored Jul 24, 2024
1 parent 5c37d00 commit c8ef545
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 16 deletions.
123 changes: 119 additions & 4 deletions datafusion/functions/src/math/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ impl ScalarUDFImpl for LogFunc {
}

fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
match (input[0].sort_properties, input[1].sort_properties) {
(first @ SortProperties::Ordered(value), SortProperties::Ordered(base))
if !value.descending && base.descending
|| value.descending && !base.descending =>
let (base_sort_properties, num_sort_properties) = if input.len() == 1 {
// log(x) defaults to log(10, x)
(SortProperties::Singleton, input[0].sort_properties)
} else {
(input[0].sort_properties, input[1].sort_properties)
};
match (num_sort_properties, base_sort_properties) {
(first @ SortProperties::Ordered(num), SortProperties::Ordered(base))
if num.descending != base.descending
&& num.nulls_first == base.nulls_first =>
{
Ok(first)
}
Expand Down Expand Up @@ -230,6 +236,7 @@ mod tests {

use super::*;

use arrow::compute::SortOptions;
use datafusion_common::cast::{as_float32_array, as_float64_array};
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -334,4 +341,112 @@ mod tests {
assert_eq!(args[0], lit(2));
assert_eq!(args[1], lit(3));
}

#[test]
fn test_log_output_ordering() {
// [Unordered, Ascending, Descending, Literal]
let orders = vec![
ExprProperties::new_unknown(),
ExprProperties::new_unknown().with_order(SortProperties::Ordered(
SortOptions {
descending: false,
nulls_first: true,
},
)),
ExprProperties::new_unknown().with_order(SortProperties::Ordered(
SortOptions {
descending: true,
nulls_first: true,
},
)),
ExprProperties::new_unknown().with_order(SortProperties::Singleton),
];

let log = LogFunc::new();

// Test log(num)
for order in orders.iter().cloned() {
let result = log.output_ordering(&[order.clone()]).unwrap();
assert_eq!(result, order.sort_properties);
}

// Test log(base, num), where `nulls_first` is the same
let mut results = Vec::with_capacity(orders.len() * orders.len());
for base_order in orders.iter() {
for num_order in orders.iter().cloned() {
let result = log
.output_ordering(&[base_order.clone(), num_order])
.unwrap();
results.push(result);
}
}
let expected = vec![
// base: Unordered
SortProperties::Unordered,
SortProperties::Unordered,
SortProperties::Unordered,
SortProperties::Unordered,
// base: Ascending, num: Unordered
SortProperties::Unordered,
// base: Ascending, num: Ascending
SortProperties::Unordered,
// base: Ascending, num: Descending
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Ascending, num: Literal
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Descending, num: Unordered
SortProperties::Unordered,
// base: Descending, num: Ascending
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Descending, num: Descending
SortProperties::Unordered,
// base: Descending, num: Literal
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Literal, num: Unordered
SortProperties::Unordered,
// base: Literal, num: Ascending
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Literal, num: Descending
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Literal, num: Literal
SortProperties::Singleton,
];
assert_eq!(results, expected);

// Test with different `nulls_first`
let base_order = ExprProperties::new_unknown().with_order(
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
);
let num_order = ExprProperties::new_unknown().with_order(
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: false,
}),
);
assert_eq!(
log.output_ordering(&[base_order, num_order]).unwrap(),
SortProperties::Unordered
);
}
}
31 changes: 19 additions & 12 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ select column1 + column2 from foo group by column1, column2 ORDER BY column2 des
7
3

# Test issue: https://github.com/apache/datafusion/issues/11549
query I
select column1 from foo order by log(column2);
----
1
3
5

# Cleanup
statement ok
Expand Down Expand Up @@ -512,7 +519,7 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
)
STORED AS CSV
WITH ORDER(c11)
WITH ORDER(c12 DESC)
WITH ORDER(c12 DESC NULLS LAST)
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true');

Expand Down Expand Up @@ -547,34 +554,34 @@ physical_plan
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT LOG(c11, c12) as log_c11_base_c12
EXPLAIN SELECT LOG(c12, c11) as log_c11_base_c12
FROM aggregate_test_100
ORDER BY log_c11_base_c12;
----
logical_plan
01)Sort: log_c11_base_c12 ASC NULLS LAST
02)--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c11_base_c12
02)--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c11_base_c12
03)----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c11_base_c12]
02)--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c11_base_c12]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], has_header=true

query TT
EXPLAIN SELECT LOG(c12, c11) as log_c12_base_c11
EXPLAIN SELECT LOG(c11, c12) as log_c12_base_c11
FROM aggregate_test_100
ORDER BY log_c12_base_c11 DESC;
ORDER BY log_c12_base_c11 DESC NULLS LAST;
----
logical_plan
01)Sort: log_c12_base_c11 DESC NULLS FIRST
02)--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c12_base_c11
01)Sort: log_c12_base_c11 DESC NULLS LAST
02)--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c12_base_c11
03)----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC]
02)--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c12_base_c11]
01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST]
02)--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c12_base_c11]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], has_header=true

statement ok
drop table aggregate_test_100;
Expand Down

0 comments on commit c8ef545

Please sign in to comment.