From c7f2723054b6af273e81be5ef6187b14ce087d92 Mon Sep 17 00:00:00 2001 From: Jack Park Date: Sun, 22 Dec 2024 23:41:51 -0800 Subject: [PATCH] Consolidate Example: simplify_udwf_expression.rs into advanced_udwf.rs --- datafusion-examples/examples/advanced_udwf.rs | 94 ++++++++++++- .../examples/simplify_udwf_expression.rs | 133 ------------------ datafusion/expr/src/udwf.rs | 2 +- 3 files changed, 90 insertions(+), 139 deletions(-) delete mode 100644 datafusion-examples/examples/simplify_udwf_expression.rs diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/advanced_udwf.rs index 1c20e292f091..49e890467d21 100644 --- a/datafusion-examples/examples/advanced_udwf.rs +++ b/datafusion-examples/examples/advanced_udwf.rs @@ -24,11 +24,14 @@ use arrow::{ }; use arrow_schema::Field; use datafusion::error::Result; +use datafusion::functions_aggregate::average::avg_udaf; use datafusion::prelude::*; use datafusion_common::ScalarValue; -use datafusion_expr::function::WindowUDFFieldArgs; +use datafusion_expr::expr::WindowFunction; +use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs}; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ - PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl, + Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl, }; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; @@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator { } } +/// This UDWF will show how to use the WindowUDFImpl::simplify() API +#[derive(Debug, Clone)] +struct SimplifySmoothItUdf { + signature: Signature, +} + +impl SimplifySmoothItUdf { + fn new() -> Self { + Self { + signature: Signature::exact( + // this function will always take one arguments of type f64 + vec![DataType::Float64], + // this function is deterministic and will always return the same + // result for the same input + Volatility::Immutable, + ), + } + } +} +impl WindowUDFImpl for SimplifySmoothItUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "simplify_smooth_it" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { + todo!() + } + + /// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg` + /// default implementation will not be called (left as `todo!()`) + fn simplify(&self) -> Option { + let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| { + Ok(Expr::WindowFunction(WindowFunction { + fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()), + args: window_function.args, + partition_by: window_function.partition_by, + order_by: window_function.order_by, + window_frame: window_function.window_frame, + null_treatment: window_function.null_treatment, + })) + }; + + Some(Box::new(simplify)) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + Ok(Field::new(field_args.name(), DataType::Float64, true)) + } +} + // create local execution context with `cars.csv` registered as a table named `cars` async fn create_context() -> Result { // declare a new context. In spark API, this corresponds to a new spark SQL session @@ -162,12 +226,15 @@ async fn main() -> Result<()> { let smooth_it = WindowUDF::from(SmoothItUdf::new()); ctx.register_udwf(smooth_it.clone()); - // Use SQL to run the new window function + let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new()); + ctx.register_udwf(simplify_smooth_it.clone()); + + // Use SQL to retrieve entire table let df = ctx.sql("SELECT * from cars").await?; // print the results df.show().await?; - // Use SQL to run the new window function: + // Use SQL to run smooth_it: // // `PARTITION BY car`:each distinct value of car (red, and green) // should be treated as a separate partition (and will result in @@ -201,7 +268,7 @@ async fn main() -> Result<()> { // print the results df.show().await?; - // this time, call the new widow function with an explicit + // this time, call the function with an explicit // window so evaluate will be invoked with each window. // // `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation @@ -232,5 +299,22 @@ async fn main() -> Result<()> { // print the results df.show().await?; + // Use SQL to run simplify_smooth_it + let df = ctx + .sql( + "SELECT \ + car, \ + speed, \ + simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\ + time \ + from cars \ + ORDER BY \ + car", + ) + .await?; + + // print the results + df.show().await?; + Ok(()) } diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs b/datafusion-examples/examples/simplify_udwf_expression.rs deleted file mode 100644 index 117063df4e0d..000000000000 --- a/datafusion-examples/examples/simplify_udwf_expression.rs +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; - -use arrow_schema::{DataType, Field}; - -use datafusion::execution::context::SessionContext; -use datafusion::functions_aggregate::average::avg_udaf; -use datafusion::{error::Result, execution::options::CsvReadOptions}; -use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs}; -use datafusion_expr::{ - expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, Signature, - Volatility, WindowUDF, WindowUDFImpl, -}; -use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; - -/// This UDWF will show how to use the WindowUDFImpl::simplify() API -#[derive(Debug, Clone)] -struct SimplifySmoothItUdf { - signature: Signature, -} - -impl SimplifySmoothItUdf { - fn new() -> Self { - Self { - signature: Signature::exact( - // this function will always take one arguments of type f64 - vec![DataType::Float64], - // this function is deterministic and will always return the same - // result for the same input - Volatility::Immutable, - ), - } - } -} -impl WindowUDFImpl for SimplifySmoothItUdf { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "simplify_smooth_it" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn partition_evaluator( - &self, - _partition_evaluator_args: PartitionEvaluatorArgs, - ) -> Result> { - todo!() - } - - /// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`. - fn simplify(&self) -> Option { - let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| { - Ok(Expr::WindowFunction(WindowFunction { - fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()), - args: window_function.args, - partition_by: window_function.partition_by, - order_by: window_function.order_by, - window_frame: window_function.window_frame, - null_treatment: window_function.null_treatment, - })) - }; - - Some(Box::new(simplify)) - } - - fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - Ok(Field::new(field_args.name(), DataType::Float64, true)) - } -} - -// create local execution context with `cars.csv` registered as a table named `cars` -async fn create_context() -> Result { - // declare a new context. In spark API, this corresponds to a new spark SQL session - let ctx = SessionContext::new(); - - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - println!("pwd: {}", std::env::current_dir().unwrap().display()); - let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string(); - let read_options = CsvReadOptions::default().has_header(true); - - ctx.register_csv("cars", &csv_path, read_options).await?; - Ok(ctx) -} - -#[tokio::main] -async fn main() -> Result<()> { - let ctx = create_context().await?; - let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new()); - ctx.register_udwf(simplify_smooth_it.clone()); - - // Use SQL to run the new window function - let df = ctx.sql("SELECT * from cars").await?; - // print the results - df.show().await?; - - let df = ctx - .sql( - "SELECT \ - car, \ - speed, \ - simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\ - time \ - from cars \ - ORDER BY \ - car", - ) - .await?; - // print the results - df.show().await?; - - Ok(()) -} diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 4bfc3f07bb14..39e1e8f261a2 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync { /// optimizations manually for specific UDFs. /// /// Example: - /// [`simplify_udwf_expression.rs`]: + /// [`advanced_udwf.rs`]: /// /// # Returns /// [None] if simplify is not defined or,