diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 149bf8beb96e9..79fd19f8ac06a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -182,6 +182,10 @@ name = "math_query_sql" harness = false name = "filter_query_sql" +[[bench]] +harness = false +name = "struct_query_sql" + [[bench]] harness = false name = "window_query_sql" diff --git a/datafusion/core/benches/struct_query_sql.rs b/datafusion/core/benches/struct_query_sql.rs new file mode 100644 index 0000000000000..3ef7292c66271 --- /dev/null +++ b/datafusion/core/benches/struct_query_sql.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{ + array::{Float32Array, Float64Array}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::prelude::SessionContext; +use datafusion::{datasource::MemTable, error::Result}; +use futures::executor::block_on; +use std::sync::Arc; +use tokio::runtime::Runtime; + +async fn query(ctx: &SessionContext, sql: &str) { + let rt = Runtime::new().unwrap(); + + // execute the query + let df = rt.block_on(ctx.sql(sql)).unwrap(); + criterion::black_box(rt.block_on(df.collect()).unwrap()); +} + +fn create_context(array_len: usize, batch_size: usize) -> Result { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("f32", DataType::Float32, false), + Field::new("f64", DataType::Float64, false), + ])); + + // define data. + let batches = (0..array_len / batch_size) + .map(|i| { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float32Array::from(vec![i as f32; batch_size])), + Arc::new(Float64Array::from(vec![i as f64; batch_size])), + ], + ) + .unwrap() + }) + .collect::>(); + + let ctx = SessionContext::new(); + + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + let provider = MemTable::try_new(schema, vec![batches])?; + ctx.register_table("t", Arc::new(provider))?; + + Ok(ctx) +} + +fn criterion_benchmark(c: &mut Criterion) { + let array_len = 524_288; // 2^19 + let batch_size = 4096; // 2^12 + + c.bench_function("struct", |b| { + let ctx = create_context(array_len, batch_size).unwrap(); + b.iter(|| block_on(query(&ctx, "select struct(f32, f64) from t"))) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/core/named_struct.rs b/datafusion/functions/src/core/named_struct.rs index 70c9a425790c2..d5f2054c2de8e 100644 --- a/datafusion/functions/src/core/named_struct.rs +++ b/datafusion/functions/src/core/named_struct.rs @@ -17,79 +17,13 @@ use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Fields}; -use datafusion_common::{exec_err, internal_err, HashSet, Result, ScalarValue}; -use datafusion_expr::{ColumnarValue, Documentation, ReturnInfo, ReturnTypeArgs}; +use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_expr::{ColumnarValue, Documentation, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use datafusion_macros::user_doc; use std::any::Any; use std::sync::Arc; -/// Put values in a struct array. -fn named_struct_expr(args: &[ColumnarValue]) -> Result { - // Do not accept 0 arguments. - if args.is_empty() { - return exec_err!( - "named_struct requires at least one pair of arguments, got 0 instead" - ); - } - - if args.len() % 2 != 0 { - return exec_err!( - "named_struct requires an even number of arguments, got {} instead", - args.len() - ); - } - - let (names, values): (Vec<_>, Vec<_>) = args - .chunks_exact(2) - .enumerate() - .map(|(i, chunk)| { - let name_column = &chunk[0]; - let name = match name_column { - ColumnarValue::Scalar(ScalarValue::Utf8(Some(name_scalar))) => { - name_scalar - } - // TODO: Implement Display for ColumnarValue - _ => { - return exec_err!( - "named_struct even arguments must be string literals at position {}", - i * 2 - ) - } - }; - - Ok((name, chunk[1].clone())) - }) - .collect::>>()? - .into_iter() - .unzip(); - - { - // Check to enforce the uniqueness of struct field name - let mut unique_field_names = HashSet::new(); - for name in names.iter() { - if unique_field_names.contains(name) { - return exec_err!( - "named_struct requires unique field names. Field {name} is used more than once." - ); - } - unique_field_names.insert(name); - } - } - - let fields: Fields = names - .into_iter() - .zip(&values) - .map(|(name, value)| Arc::new(Field::new(name, value.data_type().clone(), true))) - .collect::>() - .into(); - - let arrays = ColumnarValue::values_to_arrays(&values)?; - - let struct_array = StructArray::new(fields, arrays, None); - Ok(ColumnarValue::Array(Arc::new(struct_array))) -} - #[user_doc( doc_section(label = "Struct Functions"), description = "Returns an Arrow struct using the specified name and input expressions pairs.", @@ -203,12 +137,12 @@ impl ScalarUDFImpl for NamedStructFunc { )))) } - fn invoke_batch( - &self, - args: &[ColumnarValue], - _number_rows: usize, - ) -> Result { - named_struct_expr(args) + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let DataType::Struct(fields) = args.return_type else { + return internal_err!("incorrect named_struct return type"); + }; + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + Ok(ColumnarValue::Array(Arc::new(StructArray::new(fields.clone(), arrays, None)))) } fn documentation(&self) -> Option<&Documentation> { diff --git a/datafusion/functions/src/core/struct.rs b/datafusion/functions/src/core/struct.rs index f5bff2cc726b4..cd9949aaef4ee 100644 --- a/datafusion/functions/src/core/struct.rs +++ b/datafusion/functions/src/core/struct.rs @@ -15,46 +15,15 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayRef, StructArray}; -use arrow::datatypes::{DataType, Field, Fields}; -use datafusion_common::{exec_err, Result}; -use datafusion_expr::{ColumnarValue, Documentation}; +use arrow::array::StructArray; +use arrow::datatypes::{DataType, Field}; +use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_expr::{ColumnarValue, Documentation, ScalarFunctionArgs}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use datafusion_macros::user_doc; use std::any::Any; use std::sync::Arc; -fn array_struct(args: &[ArrayRef]) -> Result { - // do not accept 0 arguments. - if args.is_empty() { - return exec_err!("struct requires at least one argument"); - } - - let fields = args - .iter() - .enumerate() - .map(|(i, arg)| { - let field_name = format!("c{i}"); - Ok(Arc::new(Field::new( - field_name.as_str(), - arg.data_type().clone(), - true, - ))) - }) - .collect::>>()? - .into(); - - let arrays = args.to_vec(); - - Ok(Arc::new(StructArray::new(fields, arrays, None))) -} - -/// put values in a struct array. -fn struct_expr(args: &[ColumnarValue]) -> Result { - let arrays = ColumnarValue::values_to_arrays(args)?; - Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?)) -} - #[user_doc( doc_section(label = "Struct Functions"), description = "Returns an Arrow struct using the specified input expressions optionally named. @@ -133,20 +102,26 @@ impl ScalarUDFImpl for StructFunc { } fn return_type(&self, arg_types: &[DataType]) -> Result { - let return_fields = arg_types + if arg_types.is_empty() { + return exec_err!("struct requires at least one argument, got 0 instead"); + } + + let fields = arg_types .iter() .enumerate() .map(|(pos, dt)| Field::new(format!("c{pos}"), dt.clone(), true)) - .collect::>(); - Ok(DataType::Struct(Fields::from(return_fields))) + .collect::>() + .into(); + + Ok(DataType::Struct(fields)) } - fn invoke_batch( - &self, - args: &[ColumnarValue], - _number_rows: usize, - ) -> Result { - struct_expr(args) + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let DataType::Struct(fields) = args.return_type else { + return internal_err!("incorrect struct return type"); + }; + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + Ok(ColumnarValue::Array(Arc::new(StructArray::new(fields.clone(), arrays, None)))) } fn documentation(&self) -> Option<&Documentation> {