Skip to content

Commit

Permalink
feat: Convert CumeDist to UDWF (apache#13051)
Browse files Browse the repository at this point in the history
* Transferred cumedist

* fixes

* remove expr tests

* small fix

* small fix

* check

* clippy fix

* roundtrip fix
  • Loading branch information
jonathanc-n authored Oct 23, 2024
1 parent 521966a commit a4e6b07
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 199 deletions.
6 changes: 0 additions & 6 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [Window Function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// Relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
CumeDist,
/// Integer ranging from 1 to the argument value, dividing the partition as equally as possible
Ntile,
/// returns value evaluated at the row that is the first row of the window frame
Expand All @@ -56,7 +54,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
CumeDist => "CUME_DIST",
Ntile => "NTILE",
FirstValue => "first_value",
LastValue => "last_value",
Expand All @@ -69,7 +66,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
"FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
"LAST_VALUE" => BuiltInWindowFunction::LastValue,
Expand Down Expand Up @@ -102,7 +98,6 @@ impl BuiltInWindowFunction {

match self {
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()),
Expand All @@ -113,7 +108,6 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// Note: The physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => {
Signature::any(1, Volatility::Immutable)
}
Expand Down
23 changes: 1 addition & 22 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2567,15 +2567,6 @@ mod test {
Ok(())
}

#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = find_df_window_func("cume_dist").unwrap();
let observed = fun.return_type(&[], &[], "")?;
assert_eq!(DataType::Float64, observed);

Ok(())
}

#[test]
fn test_ntile_return_type() -> Result<()> {
let fun = find_df_window_func("ntile").unwrap();
Expand All @@ -2587,13 +2578,7 @@ mod test {

#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"cume_dist",
"ntile",
"first_value",
"last_value",
"nth_value",
];
let names = vec!["ntile", "first_value", "last_value", "nth_value"];
for name in names {
let fun = find_df_window_func(name).unwrap();
let fun2 = find_df_window_func(name.to_uppercase().as_str()).unwrap();
Expand All @@ -2609,12 +2594,6 @@ mod test {

#[test]
fn test_find_df_window_function() {
assert_eq!(
find_df_window_func("cume_dist"),
Some(WindowFunctionDefinition::BuiltInWindowFunction(
built_in_window_function::BuiltInWindowFunction::CumeDist
))
);
assert_eq!(
find_df_window_func("first_value"),
Some(WindowFunctionDefinition::BuiltInWindowFunction(
Expand Down
5 changes: 0 additions & 5 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `cume_dist` window function
pub fn cume_dist() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, vec![]))
}

/// Create an expression to represent the `ntile` window function
pub fn ntile(arg: Expr) -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile, vec![arg]))
Expand Down
170 changes: 170 additions & 0 deletions datafusion/functions-window/src/cume_dist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! `cume_dist` window function implementation

use datafusion_common::arrow::array::{ArrayRef, Float64Array};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
use datafusion_common::Result;
use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
use datafusion_expr::{
Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use field::WindowUDFFieldArgs;
use std::any::Any;
use std::fmt::Debug;
use std::iter;
use std::ops::Range;
use std::sync::{Arc, OnceLock};

define_udwf_and_expr!(
CumeDist,
cume_dist,
"Calculates the cumulative distribution of a value in a group of values."
);

/// CumeDist calculates the cume_dist in the window function with order by
#[derive(Debug)]
pub struct CumeDist {
signature: Signature,
}

impl CumeDist {
pub fn new() -> Self {
Self {
signature: Signature::any(0, Volatility::Immutable),
}
}
}

impl Default for CumeDist {
fn default() -> Self {
Self::new()
}
}

impl WindowUDFImpl for CumeDist {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"cume_dist"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<CumeDistEvaluator>::default())
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, false))
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_cume_dist_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_cume_dist_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_RANKING)
.with_description(
"Relative rank of the current row: (number of rows preceding or peer with current row) / (total rows).",
)
.with_syntax_example("cume_dist()")
.build()
.unwrap()
})
}

#[derive(Debug, Default)]
pub(crate) struct CumeDistEvaluator;

impl PartitionEvaluator for CumeDistEvaluator {
/// Computes the cumulative distribution for all rows in the partition
fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let scalar = num_rows as f64;
let result = Float64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(0_u64, |acc, range| {
let len = range.end - range.start;
*acc += len as u64;
let value: f64 = (*acc as f64) / scalar;
let result = iter::repeat(value).take(len);
Some(result)
})
.flatten(),
);
Ok(Arc::new(result))
}

fn include_rank(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion_common::cast::as_float64_array;

fn test_f64_result(
num_rows: usize,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
let evaluator = CumeDistEvaluator;
let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values().to_vec();
assert_eq!(expected, result);
Ok(())
}

#[test]
#[allow(clippy::single_range_in_vec_init)]
fn test_cume_dist() -> Result<()> {
test_f64_result(0, vec![], vec![])?;

test_f64_result(1, vec![0..1], vec![1.0])?;

test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;

test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;

Ok(())
}
}
3 changes: 3 additions & 0 deletions datafusion/functions-window/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::WindowUDF;
#[macro_use]
pub mod macros;

pub mod cume_dist;
pub mod lead_lag;

pub mod rank;
Expand All @@ -40,6 +41,7 @@ mod utils;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
pub use super::cume_dist::cume_dist;
pub use super::lead_lag::lag;
pub use super::lead_lag::lead;
pub use super::rank::{dense_rank, percent_rank, rank};
Expand All @@ -49,6 +51,7 @@ pub mod expr_fn {
/// Returns all default window functions
pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
vec![
cume_dist::cume_dist_udwf(),
row_number::row_number_udwf(),
lead_lag::lead_udwf(),
lead_lag::lag_udwf(),
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod unknown_column;

/// Module with some convenient methods used in expression building
pub use crate::aggregate::stats::StatsType;
pub use crate::window::cume_dist::{cume_dist, CumeDist};
pub use crate::window::nth_value::NthValue;
pub use crate::window::ntile::Ntile;
pub use crate::PhysicalSortExpr;
Expand Down
Loading

0 comments on commit a4e6b07

Please sign in to comment.