From 81c8244125e1c26f009ec0b9d28713fb5ab131cd Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sat, 20 Jul 2024 17:03:10 +0800 Subject: [PATCH] add a draft implementation for new design and benchmark --- datafusion/core/Cargo.toml | 6 + datafusion/core/benches/map_query_sql.rs | 91 ++++++++++++++ .../tests/dataframe/dataframe_functions.rs | 23 +++- datafusion/functions-array/src/map.rs | 11 +- datafusion/functions/src/core/map.rs | 117 +++++++++++++++++- 5 files changed, 244 insertions(+), 4 deletions(-) create mode 100644 datafusion/core/benches/map_query_sql.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c937a6f6e59a..24df2442bd99 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -217,3 +217,9 @@ name = "topk_aggregate" [[bench]] harness = false name = "parquet_statistic" + +[[bench]] +harness = false +name = "map_query_sql" +required-features = ["array_expressions"] + diff --git a/datafusion/core/benches/map_query_sql.rs b/datafusion/core/benches/map_query_sql.rs new file mode 100644 index 000000000000..0885d3751ad5 --- /dev/null +++ b/datafusion/core/benches/map_query_sql.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use parking_lot::Mutex; +use rand::prelude::ThreadRng; +use rand::Rng; +use tokio::runtime::Runtime; + +use datafusion::prelude::SessionContext; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr; +use datafusion_functions_array::map::{map, map_from_array}; + +mod data_utils; + +fn keys(rng: &mut ThreadRng) -> Vec { + let mut keys = vec![]; + for _ in 0..1000 { + keys.push(rng.gen_range(0..9999).to_string()); + } + keys +} + +fn values(rng: &mut ThreadRng) -> Vec { + let mut values = vec![]; + for _ in 0..1000 { + values.push(rng.gen_range(0..9999)); + } + values +} + +fn t_batch() -> RecordBatch { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap() +} + +fn create_context() -> datafusion_common::Result>> { + let ctx = SessionContext::new(); + ctx.register_batch("t", t_batch())?; + Ok(Arc::new(Mutex::new(ctx))) +} + +fn criterion_benchmark(c: &mut Criterion) { + let ctx = create_context().unwrap(); + let rt = Runtime::new().unwrap(); + let df = rt.block_on(ctx.lock().table("t")).unwrap(); + + let mut rng = rand::thread_rng(); + let keys = keys(&mut rng); + let values = values(&mut rng); + let mut key_buffer = Vec::new(); + let mut value_buffer = Vec::new(); + + for i in 0..1000 { + key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone())))); + value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i])))); + } + c.bench_function("map_1000", |b| { + b.iter(|| { + black_box( + rt.block_on( + df.clone() + .select(vec![map(key_buffer.clone(), value_buffer.clone())]) + .unwrap() + .collect(), + ) + .unwrap(), + ); + }); + }); + c.bench_function("map_one_1000", |b| { + b.iter(|| { + black_box( + rt.block_on( + df.clone() + .select(vec![map_from_array( + key_buffer.clone(), + value_buffer.clone(), + )]) + .unwrap() + .collect(), + ) + .unwrap(), + ); + }); + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index f68ae507e35e..3a1f919bc51e 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -34,7 +34,7 @@ use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::expr::Alias; use datafusion_expr::ExprSchemable; use datafusion_functions_aggregate::expr_fn::{approx_median, approx_percentile_cont}; -use datafusion_functions_array::map::map; +use datafusion_functions_array::map::{map, map_from_array}; fn test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -1091,7 +1091,10 @@ async fn test_fn_array_to_string() -> Result<()> { #[tokio::test] async fn test_fn_map() -> Result<()> { - let expr = map(vec![lit("a"), lit("b"), lit("c")], vec![lit(1), lit(2), lit(3)]); + let expr = map( + vec![lit("a"), lit("b"), lit("c")], + vec![lit(1), lit(2), lit(3)], + ); let expected = [ "+---------------------------------------------------------------------------------------+", "| map(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3))) |", @@ -1103,5 +1106,21 @@ async fn test_fn_map() -> Result<()> { "+---------------------------------------------------------------------------------------+", ]; assert_fn_batches!(expr, expected); + + let expr = map_from_array( + vec![lit("a"), lit("b"), lit("c")], + vec![lit(1), lit(2), lit(3)], + ); + let expected = [ + "+-------------------------------------------------------------------------------------------+", + "| map_one(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3))) |", + "+-------------------------------------------------------------------------------------------+", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "+-------------------------------------------------------------------------------------------+", + ]; + assert_fn_batches!(expr, expected); Ok(()) } diff --git a/datafusion/functions-array/src/map.rs b/datafusion/functions-array/src/map.rs index 6beb853c9ff8..840dcb25e3f2 100644 --- a/datafusion/functions-array/src/map.rs +++ b/datafusion/functions-array/src/map.rs @@ -17,7 +17,7 @@ use datafusion_expr::Expr; use datafusion_expr::expr::ScalarFunction; -use datafusion_functions::core::map::map_udf; +use datafusion_functions::core::map::{map_one_udf, map_udf}; use crate::make_array::make_array; pub fn map(keys: Vec, values: Vec) -> Expr { @@ -28,3 +28,12 @@ pub fn map(keys: Vec, values: Vec) -> Expr { vec![keys, values], )) } + +pub fn map_from_array(keys: Vec, values: Vec) -> Expr { + let keys = make_array(keys); + let values = make_array(values); + Expr::ScalarFunction(ScalarFunction::new_udf( + map_one_udf(), + vec![keys, values], + )) +} diff --git a/datafusion/functions/src/core/map.rs b/datafusion/functions/src/core/map.rs index 9bd021374863..834061372c85 100644 --- a/datafusion/functions/src/core/map.rs +++ b/datafusion/functions/src/core/map.rs @@ -19,8 +19,9 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; -use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray}; +use arrow::array::{Array, ArrayData, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray}; use arrow::datatypes::{DataType, Field, SchemaBuilder}; +use arrow::datatypes::DataType::{Int32, Utf8}; use arrow_buffer::{Buffer, ToByteSlice}; use datafusion_common::{exec_err, ScalarValue}; @@ -28,6 +29,7 @@ use datafusion_common::Result; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; make_udf_function!(MapFunc, MAP, map_udf); +make_udf_function!(MapOneFunc, MAP_ONE, map_one_udf); /// Check if we can evaluate the expr to constant directly. /// @@ -41,6 +43,52 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool { .all(|arg| matches!(arg, ColumnarValue::Scalar(_))) } +fn get_scalar_from_col(c: &ColumnarValue) -> ScalarValue { + match c { + ColumnarValue::Scalar(s) => s.clone(), + _ => todo!(""), + } +} + +fn make_map_batch_one_args(args: &[ColumnarValue]) -> Result { + if args.len() % 2 != 0 { + return exec_err!( + "make_map requires exactly 2 arguments, got {} instead", + args.len() + ); + } + + let len = args.len() / 2; + let key_iter = args[0..len].iter().map(get_scalar_from_col); + let key = ScalarValue::iter_to_array(key_iter)?; + let val_iter = args[len..].iter().map(get_scalar_from_col); + let value = ScalarValue::iter_to_array(val_iter)?; + + + let key = get_first_element(key); + let value = get_first_element(value); + let can_evaluate_to_const = can_evaluate_to_const(args); + make_map_batch_internal(key, value, can_evaluate_to_const) +} + +fn get_first_element(value: ArrayRef) -> ArrayRef { + match value.data_type() { + DataType::List(_) => { + let list_array = value.as_any().downcast_ref::().unwrap(); + list_array.value(0) + } + DataType::LargeList(_) => { + let list_array = value.as_any().downcast_ref::().unwrap(); + list_array.value(0) + } + DataType::FixedSizeList(_, _) => { + let list_array = value.as_any().downcast_ref::().unwrap(); + list_array.value(0) + } + _ => value, + } +} + fn make_map_batch(args: &[ColumnarValue]) -> Result { if args.len() != 2 { return exec_err!( @@ -193,3 +241,70 @@ fn get_element_type(data_type: &DataType) -> Result<&DataType> { ), } } + +#[derive(Debug)] +pub struct MapOneFunc { + signature: Signature, +} + +impl Default for MapOneFunc { + fn default() -> Self { + Self::new() + } +} + +impl MapOneFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for MapOneFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "map_one" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.len() == 1 { + return exec_err!( + "map_one only accepts 1 arguments, got {} instead", + arg_types.len() + ); + } + + // let key_type = &arg_types[0]; + // let val_type = arg_types.last().unwrap(); + // + let mut builder = SchemaBuilder::new(); + // TODO: get the correct type + builder.push(Field::new( + "key", + Utf8, + false, + )); + builder.push(Field::new( + "value", + Int32, + true, + )); + let fields = builder.finish().fields; + Ok(DataType::Map( + Arc::new(Field::new("entries", DataType::Struct(fields), false)), + false, + )) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + make_map_batch_one_args(args) + } +}