Skip to content

Commit

Permalink
Handle ordering of first last aggregation inside aggregator (#8662)
Browse files Browse the repository at this point in the history
* Initial commit

* Update tests in distinct_on

* Update group by joins slt

* Remove unused code

* Minor changes

* Minor changes

* Simplifications

* Update comments

* Review

* Fix clippy

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Dec 28, 2023
1 parent b2cbc78 commit 06ed3dd
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 376 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.get(0) {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ macro_rules! arrow_err {

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use exec_err as _exec_err;
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,10 @@ fn update_join_on(
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(Column, Column)],
) -> Option<Vec<(Column, Column)>> {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
// issue is fixed.
#[allow(clippy::map_identity)]
let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
.iter()
.map(|(left, right)| (left, right))
Expand Down
4 changes: 4 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/guarantees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> {
guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
) -> Self {
Self {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
// issue is fixed.
#[allow(clippy::map_identity)]
guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
}
}
Expand Down
131 changes: 98 additions & 33 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
Expand All @@ -29,9 +29,10 @@ use crate::{
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Accumulator;

/// FIRST_VALUE aggregate expression
Expand Down Expand Up @@ -211,10 +212,45 @@ impl FirstValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in FIRST_VALUE");
};
// Update when there is no entry in the state, or we have an "earlier"
// entry according to sort requirements.
if !self.is_set
|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_gt()
{
self.first = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in FIRST_VALUE");
};
if self.ordering_req.is_empty() {
// Get first entry according to receive order (0th index)
return Ok((!value.is_empty()).then_some(0));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| SortColumn {
values: values.clone(),
options: Some(req.options),
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand All @@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
if !values[0].is_empty() && !self.is_set {
let row = get_row_at_idx(values, 0)?;
// Update with first value in the array.
self.update_with_new_row(&row);
if let Some(first_idx) = self.get_first_idx(values)? {
let row = get_row_at_idx(values, first_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator {
// Update with first value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&first_row[0..is_set_idx]);
self.update_with_new_row(&first_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -459,10 +493,50 @@ impl LastValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.last = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in LAST_VALUE");
};
// Update when there is no entry in the state, or we have a "later"
// entry (either according to sort requirements or the order of execution).
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
{
self.last = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};
if self.ordering_req.is_empty() {
// Get last entry according to the order of data:
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take the reverse ordering requirement. This enables us to
// use "fetch = 1" to get the last value.
SortColumn {
values: values.clone(),
options: Some(!req.options),
}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand All @@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !values[0].is_empty() {
let row = get_row_at_idx(values, values[0].len() - 1)?;
// Update with last value in the array.
self.update_with_new_row(&row);
if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator {
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&last_row[0..is_set_idx]);
self.update_with_new_row(&last_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -559,26 +632,18 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator};

use arrow::compute::concat;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;

use arrow::compute::concat;
use std::sync::Arc;

#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator =
Expand Down
30 changes: 15 additions & 15 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use self::groups_accumulator::GroupsAccumulator;
use crate::expressions::OrderSensitiveArrayAgg;
use crate::{PhysicalExpr, PhysicalSortExpr};

use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_expr::Accumulator;

mod hyperloglog;
mod tdigest;

pub(crate) mod approx_distinct;
pub(crate) mod approx_median;
Expand All @@ -46,19 +50,18 @@ pub(crate) mod median;
pub(crate) mod string_agg;
#[macro_use]
pub(crate) mod min_max;
pub mod build_in;
pub(crate) mod groups_accumulator;
mod hyperloglog;
pub mod moving_min_max;
pub(crate) mod regr;
pub(crate) mod stats;
pub(crate) mod stddev;
pub(crate) mod sum;
pub(crate) mod sum_distinct;
mod tdigest;
pub mod utils;
pub(crate) mod variance;

pub mod build_in;
pub mod moving_min_max;
pub mod utils;

/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
Expand Down Expand Up @@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {

/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
/// However, a `FirstValue` depends on the input ordering (if the order changes,
/// the first value in the list would change).
/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
}
18 changes: 12 additions & 6 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

//! Utilities used in aggregates
use std::any::Any;
use std::sync::Arc;

use crate::{AggregateExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;

use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;

/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
Expand All @@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays(
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Result<Vec<_>>>()
.collect()
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down Expand Up @@ -205,3 +206,8 @@ pub(crate) fn ordering_fields(
})
.collect()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req.iter().map(|item| item.options).collect()
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2453,7 +2453,7 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
let last_offset: OffsetSize = offsets.last().copied().unwrap();
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.get(0) {
let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!("array_distinct: failed to get array from rows")
Expand Down
Loading

0 comments on commit 06ed3dd

Please sign in to comment.