Skip to content

Commit

Permalink
Consolidate Example: simplify_udwf_expression.rs into advanced_udwf.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
xarus01 committed Dec 23, 2024
1 parent 8fd792f commit c7f2723
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 139 deletions.
94 changes: 89 additions & 5 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ use arrow::{
};
use arrow_schema::Field;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::function::WindowUDFFieldArgs;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;

Expand Down Expand Up @@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
}
}

/// This UDWF will show how to use the WindowUDFImpl::simplify() API
#[derive(Debug, Clone)]
struct SimplifySmoothItUdf {
signature: Signature,
}

impl SimplifySmoothItUdf {
fn new() -> Self {
Self {
signature: Signature::exact(
// this function will always take one arguments of type f64
vec![DataType::Float64],
// this function is deterministic and will always return the same
// result for the same input
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for SimplifySmoothItUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"simplify_smooth_it"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
todo!()
}

/// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
/// default implementation will not be called (left as `todo!()`)
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
order_by: window_function.order_by,
window_frame: window_function.window_frame,
null_treatment: window_function.null_treatment,
}))
};

Some(Box::new(simplify))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

// create local execution context with `cars.csv` registered as a table named `cars`
async fn create_context() -> Result<SessionContext> {
// declare a new context. In spark API, this corresponds to a new spark SQL session
Expand All @@ -162,12 +226,15 @@ async fn main() -> Result<()> {
let smooth_it = WindowUDF::from(SmoothItUdf::new());
ctx.register_udwf(smooth_it.clone());

// Use SQL to run the new window function
let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
ctx.register_udwf(simplify_smooth_it.clone());

// Use SQL to retrieve entire table
let df = ctx.sql("SELECT * from cars").await?;
// print the results
df.show().await?;

// Use SQL to run the new window function:
// Use SQL to run smooth_it:
//
// `PARTITION BY car`:each distinct value of car (red, and green)
// should be treated as a separate partition (and will result in
Expand Down Expand Up @@ -201,7 +268,7 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// this time, call the new widow function with an explicit
// this time, call the function with an explicit
// window so evaluate will be invoked with each window.
//
// `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
Expand Down Expand Up @@ -232,5 +299,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// Use SQL to run simplify_smooth_it
let df = ctx
.sql(
"SELECT \
car, \
speed, \
simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
time \
from cars \
ORDER BY \
car",
)
.await?;

// print the results
df.show().await?;

Ok(())
}
133 changes: 0 additions & 133 deletions datafusion-examples/examples/simplify_udwf_expression.rs

This file was deleted.

2 changes: 1 addition & 1 deletion datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
/// optimizations manually for specific UDFs.
///
/// Example:
/// [`simplify_udwf_expression.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simplify_udwf_expression.rs>
/// [`advanced_udwf.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs>
///
/// # Returns
/// [None] if simplify is not defined or,
Expand Down

0 comments on commit c7f2723

Please sign in to comment.