From fed4977b3c404591207af9c19e567432bbce4419 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Sat, 2 Dec 2023 17:19:17 +0800 Subject: [PATCH] Avoid concat for `array_replace` (#8337) * add benchmark Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * address clippy Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * fix comment Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 Co-authored-by: Andrew Lamb --- datafusion/core/Cargo.toml | 4 + datafusion/core/benches/array_expression.rs | 73 ++++++++++ .../physical-expr/src/array_expressions.rs | 125 ++++++++---------- 3 files changed, 135 insertions(+), 67 deletions(-) create mode 100644 datafusion/core/benches/array_expression.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 0b7aa1509820..7caf91e24f2f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -167,3 +167,7 @@ name = "sort" [[bench]] harness = false name = "topk_aggregate" + +[[bench]] +harness = false +name = "array_expression" diff --git a/datafusion/core/benches/array_expression.rs b/datafusion/core/benches/array_expression.rs new file mode 100644 index 000000000000..95bc93e0e353 --- /dev/null +++ b/datafusion/core/benches/array_expression.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ArrayRef, Int64Array, ListArray}; +use datafusion_physical_expr::array_expressions; +use std::sync::Arc; + +fn criterion_benchmark(c: &mut Criterion) { + // Construct large arrays for benchmarking + + let array_len = 100000000; + + let array = (0..array_len).map(|_| Some(2_i64)).collect::>(); + let list_array = ListArray::from_iter_primitive::(vec![ + Some(array.clone()), + Some(array.clone()), + Some(array), + ]); + let from_array = Int64Array::from_value(2, 3); + let to_array = Int64Array::from_value(-2, 3); + + let args = vec![ + Arc::new(list_array) as ArrayRef, + Arc::new(from_array) as ArrayRef, + Arc::new(to_array) as ArrayRef, + ]; + + let array = (0..array_len).map(|_| Some(-2_i64)).collect::>(); + let expected_array = ListArray::from_iter_primitive::(vec![ + Some(array.clone()), + Some(array.clone()), + Some(array), + ]); + + // Benchmark array functions + + c.bench_function("array_replace", |b| { + b.iter(|| { + assert_eq!( + array_expressions::array_replace_all(args.as_slice()) + .unwrap() + .as_list::(), + criterion::black_box(&expected_array) + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 7059c6a9f37f..84dfe3b9ff75 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -35,8 +35,7 @@ use datafusion_common::cast::{ }; use datafusion_common::utils::{array_into_list_array, list_ndims}; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, - DataFusionError, Result, + exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, }; use itertools::Itertools; @@ -1320,84 +1319,76 @@ fn general_replace( ) -> Result { // Build up the offsets for the final output array let mut offsets: Vec = vec![0]; - let data_type = list_array.value_type(); - let mut new_values = vec![]; + let values = list_array.values(); + let original_data = values.to_data(); + let to_data = to_array.to_data(); + let capacity = Capacities::Array(original_data.len()); - // n is the number of elements to replace in this row - for (row_index, (list_array_row, n)) in - list_array.iter().zip(arr_n.iter()).enumerate() - { - let last_offset: i32 = offsets - .last() - .copied() - .ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?; + // First array is the original array, second array is the element to replace with. + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &to_data], + false, + capacity, + ); - match list_array_row { - Some(list_array_row) => { - // Compute all positions in list_row_array (that is itself an - // array) that are equal to `from_array_row` - let eq_array = compare_element_to_list( - &list_array_row, - &from_array, - row_index, - true, - )?; + let mut valid = BooleanBufferBuilder::new(list_array.len()); - // Use MutableArrayData to build the replaced array - let original_data = list_array_row.to_data(); - let to_data = to_array.to_data(); - let capacity = Capacities::Array(original_data.len() + to_data.len()); + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + if list_array.is_null(row_index) { + offsets.push(offsets[row_index]); + valid.append(false); + continue; + } - // First array is the original array, second array is the element to replace with. - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data, &to_data], - false, - capacity, - ); - let original_idx = 0; - let replace_idx = 1; - - let mut counter = 0; - for (i, to_replace) in eq_array.iter().enumerate() { - if let Some(true) = to_replace { - mutable.extend(replace_idx, row_index, row_index + 1); - counter += 1; - if counter == *n { - // copy original data for any matches past n - mutable.extend(original_idx, i + 1, eq_array.len()); - break; - } - } else { - // copy original data for false / null matches - mutable.extend(original_idx, i, i + 1); - } - } + let start = offset_window[0] as usize; + let end = offset_window[1] as usize; - let data = mutable.freeze(); - let replaced_array = arrow_array::make_array(data); + let list_array_row = list_array.value(row_index); - offsets.push(last_offset + replaced_array.len() as i32); - new_values.push(replaced_array); - } - None => { - // Null element results in a null row (no new offsets) - offsets.push(last_offset); + // Compute all positions in list_row_array (that is itself an + // array) that are equal to `from_array_row` + let eq_array = + compare_element_to_list(&list_array_row, &from_array, row_index, true)?; + + let original_idx = 0; + let replace_idx = 1; + let n = arr_n[row_index]; + let mut counter = 0; + + // All elements are false, no need to replace, just copy original data + if eq_array.false_count() == eq_array.len() { + mutable.extend(original_idx, start, end); + offsets.push(offsets[row_index] + (end - start) as i32); + valid.append(true); + continue; + } + + for (i, to_replace) in eq_array.iter().enumerate() { + if let Some(true) = to_replace { + mutable.extend(replace_idx, row_index, row_index + 1); + counter += 1; + if counter == n { + // copy original data for any matches past n + mutable.extend(original_idx, start + i + 1, end); + break; + } + } else { + // copy original data for false / null matches + mutable.extend(original_idx, start + i, start + i + 1); } } + + offsets.push(offsets[row_index] + (end - start) as i32); + valid.append(true); } - let values = if new_values.is_empty() { - new_empty_array(&data_type) - } else { - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - arrow::compute::concat(&new_values)? - }; + let data = mutable.freeze(); Ok(Arc::new(ListArray::try_new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new("item", list_array.value_type(), true)), OffsetBuffer::new(offsets.into()), - values, - list_array.nulls().cloned(), + arrow_array::make_array(data), + Some(NullBuffer::new(valid.finish())), )?)) }