From 4f5c52a103e1b452b26b20b1e8770ce86b6f5b4f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Apr 2024 17:19:31 -0400 Subject: [PATCH] Add example of using `Expr::field` without SessionContext and SessionState --- datafusion-examples/examples/expr_api.rs | 65 +++++++++++++++++++++-- datafusion/functions-array/src/rewrite.rs | 12 ++++- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 5f9f3106e14d..e5dc2a6d9b0f 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -18,20 +18,24 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::array::{BooleanArray, Int32Array}; +use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray, StructArray}; use arrow::record_batch::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::{DFField, DFSchema}; use datafusion::error::Result; +use datafusion::functions_array::rewrite::ArrayFunctionRewriter; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::physical_expr::{ analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, }; use datafusion::prelude::*; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::BinaryExpr; +use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ColumnarValue, ExprSchemable, Operator}; @@ -68,6 +72,9 @@ async fn main() -> Result<()> { // See how to evaluate expressions evaluate_demo()?; + // See how to evaluate expressions on structured types + evaluate_structured_demo()?; + // See how to simplify expressions simplify_demo()?; @@ -110,6 +117,48 @@ fn evaluate_demo() -> Result<()> { Ok(()) } +/// DataFusion can also handle structured types such as '{"a": 5, "b": "foo"}' +fn evaluate_structured_demo() -> Result<()> { + // For example, let's say you have an array of structs such as + // [ + // {"a": 4, "b": "foo"}, + // {"a": 5, "b": "bar"} + // {"a": 6, "b": "baz"} + // } + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef, + ), + ( + Arc::new(Field::new("b", DataType::Utf8, false)), + Arc::new(StringArray::from(vec!["foo", "bar", "baz"])) as ArrayRef, + ), + ]); + let batch = + RecordBatch::try_from_iter([("struct_col", Arc::new(struct_array) as _)])?; + + // To evaluate `struct_col.a < 6`: extract the value of the field `a` and + // check if it is less than 6 + let expr = col("struct_col").field("a").lt(lit(6)); + + // First, you make a "physical expression" from the logical `Expr` + let physical_expr = physical_expr(&batch.schema(), expr)?; + + // Now, you can evaluate the expression against the RecordBatch + let result = physical_expr.evaluate(&batch)?; + + // The result contain an array that is true only for where `a < 6` + let expected_result = Arc::new(BooleanArray::from(vec![true, true, false])) as _; + assert!( + matches!(&result, ColumnarValue::Array(r) if r == &expected_result), + "result: {:?}", + result + ); + + Ok(()) +} + /// In addition to easy construction, DataFusion exposes APIs for simplifying /// such expression so they are more efficient to evaluate. This code is also /// used by the query engine to optimize queries. @@ -248,11 +297,12 @@ fn make_ts_field(name: &str) -> Field { make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) } -/// Build a physical expression from a logical one, after applying simplification and type coercion +/// Build a physical expression from a logical one, after applying +/// simplification, type coercion, and applying function rewrites pub fn physical_expr(schema: &Schema, expr: Expr) -> Result> { let df_schema = schema.clone().to_dfschema_ref()?; - // Simplify + // register the standard DataFusion function library let props = ExecutionProps::new(); let simplifier = ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone())); @@ -260,6 +310,15 @@ pub fn physical_expr(schema: &Schema, expr: Expr) -> Result Self { + Self::default() + } +} impl FunctionRewrite for ArrayFunctionRewriter { fn name(&self) -> &str {