Skip to content

Commit

Permalink
Add example of using Expr::field without SessionContext and Session…
Browse files Browse the repository at this point in the history
…State
  • Loading branch information
alamb committed Apr 22, 2024
1 parent aee976a commit 4f5c52a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 5 deletions.
65 changes: 62 additions & 3 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{BooleanArray, Int32Array};
use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray, StructArray};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{DFField, DFSchema};
use datafusion::error::Result;
use datafusion::functions_array::rewrite::ArrayFunctionRewriter;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::prelude::*;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
Expand Down Expand Up @@ -68,6 +72,9 @@ async fn main() -> Result<()> {
// See how to evaluate expressions
evaluate_demo()?;

// See how to evaluate expressions on structured types
evaluate_structured_demo()?;

// See how to simplify expressions
simplify_demo()?;

Expand Down Expand Up @@ -110,6 +117,48 @@ fn evaluate_demo() -> Result<()> {
Ok(())
}

/// DataFusion can also handle structured types such as '{"a": 5, "b": "foo"}'
fn evaluate_structured_demo() -> Result<()> {
// For example, let's say you have an array of structs such as
// [
// {"a": 4, "b": "foo"},
// {"a": 5, "b": "bar"}
// {"a": 6, "b": "baz"}
// }
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["foo", "bar", "baz"])) as ArrayRef,
),
]);
let batch =
RecordBatch::try_from_iter([("struct_col", Arc::new(struct_array) as _)])?;

// To evaluate `struct_col.a < 6`: extract the value of the field `a` and
// check if it is less than 6
let expr = col("struct_col").field("a").lt(lit(6));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;

// The result contain an array that is true only for where `a < 6`
let expected_result = Arc::new(BooleanArray::from(vec![true, true, false])) as _;
assert!(
matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
"result: {:?}",
result
);

Ok(())
}

/// In addition to easy construction, DataFusion exposes APIs for simplifying
/// such expression so they are more efficient to evaluate. This code is also
/// used by the query engine to optimize queries.
Expand Down Expand Up @@ -248,18 +297,28 @@ fn make_ts_field(name: &str) -> Field {
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
/// Build a physical expression from a logical one, after applying
/// simplification, type coercion, and applying function rewrites
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
// register the standard DataFusion function library
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

// Support array functions by rewriting expressions
let rewriter = ArrayFunctionRewriter::new();

let expr = expr
.transform_up(|expr| {
rewriter.rewrite(expr, df_schema.as_ref(), &ConfigOptions::default())
})?
.data;

create_physical_expr(&expr, df_schema.as_ref(), &props)
}

Expand Down
12 changes: 10 additions & 2 deletions datafusion/functions-array/src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Operator};
use datafusion_functions::expr_fn::get_field;

/// Rewrites expressions into function calls to array functions
pub(crate) struct ArrayFunctionRewriter {}
/// Rewrites expressions such as `expr[field]` into function dor array functions
#[derive(Debug, Default)]
pub struct ArrayFunctionRewriter {}

impl ArrayFunctionRewriter {
/// Create a new `ArrayFunctionRewriter`
pub fn new() -> Self {
Self::default()
}
}

impl FunctionRewrite for ArrayFunctionRewriter {
fn name(&self) -> &str {
Expand Down

0 comments on commit 4f5c52a

Please sign in to comment.