diff --git a/datafusion/functions-array/src/except.rs b/datafusion/functions-array/src/except.rs deleted file mode 100644 index 1faaf80e69f6..000000000000 --- a/datafusion/functions-array/src/except.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! implementation kernel for array_except function - -use crate::utils::check_datatypes; -use arrow::row::{RowConverter, SortField}; -use arrow_array::cast::AsArray; -use arrow_array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait}; -use arrow_buffer::OffsetBuffer; -use arrow_schema::{DataType, FieldRef}; -use datafusion_common::{exec_err, internal_err}; -use datafusion_expr::expr::ScalarFunction; -use datafusion_expr::Expr; -use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; -use std::collections::HashSet; -use std::sync::Arc; - -make_udf_function!( - ArrayExcept, - array_except, - first_array second_array, - "returns an array of the elements that appear in the first array but not in the second.", - array_except_udf -); - -#[derive(Debug)] -pub(super) struct ArrayExcept { - signature: Signature, - aliases: Vec, -} - -impl ArrayExcept { - pub fn new() -> Self { - Self { - signature: Signature::any(2, Volatility::Immutable), - aliases: vec!["array_except".to_string(), "list_except".to_string()], - } - } -} - -impl ScalarUDFImpl for ArrayExcept { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "array_except" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { - match (&arg_types[0].clone(), &arg_types[1].clone()) { - (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()), - (dt, _) => Ok(dt.clone()), - } - } - - fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { - let args = ColumnarValue::values_to_arrays(args)?; - array_except_inner(&args).map(ColumnarValue::Array) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} - -/// Array_except SQL function -pub fn array_except_inner(args: &[ArrayRef]) -> datafusion_common::Result { - if args.len() != 2 { - return exec_err!("array_except needs two arguments"); - } - - let array1 = &args[0]; - let array2 = &args[1]; - - match (array1.data_type(), array2.data_type()) { - (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()), - (DataType::List(field), DataType::List(_)) => { - check_datatypes("array_except", &[array1, array2])?; - let list1 = array1.as_list::(); - let list2 = array2.as_list::(); - let result = general_except::(list1, list2, field)?; - Ok(Arc::new(result)) - } - (DataType::LargeList(field), DataType::LargeList(_)) => { - check_datatypes("array_except", &[array1, array2])?; - let list1 = array1.as_list::(); - let list2 = array2.as_list::(); - let result = general_except::(list1, list2, field)?; - Ok(Arc::new(result)) - } - (dt1, dt2) => { - internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}") - } - } -} - -fn general_except( - l: &GenericListArray, - r: &GenericListArray, - field: &FieldRef, -) -> datafusion_common::Result> { - let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - - let l_values = l.values().to_owned(); - let r_values = r.values().to_owned(); - let l_values = converter.convert_columns(&[l_values])?; - let r_values = converter.convert_columns(&[r_values])?; - - let mut offsets = Vec::::with_capacity(l.len() + 1); - offsets.push(OffsetSize::usize_as(0)); - - let mut rows = Vec::with_capacity(l_values.num_rows()); - let mut dedup = HashSet::new(); - - for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) { - let l_slice = l_w[0].as_usize()..l_w[1].as_usize(); - let r_slice = r_w[0].as_usize()..r_w[1].as_usize(); - for i in r_slice { - let right_row = r_values.row(i); - dedup.insert(right_row); - } - for i in l_slice { - let left_row = l_values.row(i); - if dedup.insert(left_row) { - rows.push(left_row); - } - } - - offsets.push(OffsetSize::usize_as(rows.len())); - dedup.clear(); - } - - if let Some(values) = converter.convert_rows(rows)?.first() { - Ok(GenericListArray::::new( - field.to_owned(), - OffsetBuffer::new(offsets.into()), - values.to_owned(), - l.nulls().cloned(), - )) - } else { - internal_err!("array_except failed to convert rows") - } -} diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index fb16acdef2bd..428296c88f15 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -31,7 +31,6 @@ pub mod macros; mod array_has; mod concat; mod core; -mod except; mod extract; mod kernels; mod position; @@ -57,7 +56,6 @@ pub mod expr_fn { pub use super::concat::array_concat; pub use super::concat::array_prepend; pub use super::core::make_array; - pub use super::except::array_except; pub use super::extract::array_element; pub use super::extract::array_pop_back; pub use super::extract::array_pop_front; @@ -71,6 +69,7 @@ pub mod expr_fn { pub use super::replace::array_replace_all; pub use super::replace::array_replace_n; pub use super::set_ops::array_distinct; + pub use super::set_ops::array_except; pub use super::set_ops::array_intersect; pub use super::set_ops::array_union; pub use super::udf::array_dims; @@ -102,7 +101,6 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { concat::array_append_udf(), concat::array_prepend_udf(), concat::array_concat_udf(), - except::array_except_udf(), extract::array_element_udf(), extract::array_pop_back_udf(), extract::array_pop_front_udf(), @@ -121,6 +119,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { set_ops::array_distinct_udf(), set_ops::array_intersect_udf(), set_ops::array_union_udf(), + set_ops::array_except_udf(), position::array_position_udf(), position::array_positions_udf(), remove::array_remove_udf(), diff --git a/datafusion/functions-array/src/set_ops.rs b/datafusion/functions-array/src/set_ops.rs index df5bc91a2689..ef4856f51f0e 100644 --- a/datafusion/functions-array/src/set_ops.rs +++ b/datafusion/functions-array/src/set_ops.rs @@ -18,6 +18,7 @@ //! Array Intersection, Union, and Distinct functions use crate::core::make_array_inner; + use crate::utils::make_scalar_function; use arrow::array::{new_empty_array, Array, ArrayRef, GenericListArray, OffsetSizeTrait}; use arrow::buffer::OffsetBuffer; @@ -60,6 +61,14 @@ make_udf_function!( array_distinct_udf ); +make_udf_function!( + ArrayExcept, + array_except, + first_array second_array, + "returns an array of the elements that appear in the first array but not in the second.", + array_except_udf +); + #[derive(Debug)] pub(super) struct ArrayUnion { signature: Signature, @@ -240,6 +249,7 @@ fn array_distinct_inner(args: &[ArrayRef]) -> Result { enum SetOp { Union, Intersect, + Except, } impl Display for SetOp { @@ -247,6 +257,7 @@ impl Display for SetOp { match self { SetOp::Union => write!(f, "array_union"), SetOp::Intersect => write!(f, "array_intersect"), + SetOp::Except => write!(f, "array_except"), } } } @@ -273,55 +284,88 @@ fn generic_set_lists( let mut offsets = vec![OffsetSize::usize_as(0)]; let mut new_arrays = vec![]; + let mut nulls = vec![]; let converter = RowConverter::new(vec![SortField::new(dt)])?; for (first_arr, second_arr) in l.iter().zip(r.iter()) { - if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) { - let l_values = converter.convert_columns(&[first_arr])?; - let r_values = converter.convert_columns(&[second_arr])?; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect::>() - } else { - vec![] - }; - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); - } - } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); - } - } + let last_offset = match offsets.last().copied() { + Some(offset) => offset, + None => return internal_err!("offsets should not be empty"), + }; + let (l_values, r_values) = match (first_arr, second_arr) { + (Some(_), None) if set_op == SetOp::Intersect => { + offsets.push(last_offset); + nulls.push(false); + continue; + } + (None, Some(_)) if matches!(set_op, SetOp::Intersect | SetOp::Except) => { + offsets.push(last_offset); + nulls.push(false); + continue; + } + (None, None) => { + offsets.push(last_offset); + nulls.push(false); + continue; + } + (first_arr, second_arr) => { + let first_arr = first_arr.unwrap_or(new_empty_array(&l.value_type())); + let second_arr = second_arr.unwrap_or(new_empty_array(&r.value_type())); + let l_values = converter.convert_columns(&[first_arr])?; + let r_values = converter.convert_columns(&[second_arr])?; + // swap l_values and r_values if set_op is Except, because the values + // in the second array should be removed from the first array + if set_op == SetOp::Except { + (r_values, l_values) + } else { + (l_values, r_values) } } + }; - let last_offset = match offsets.last().copied() { - Some(offset) => offset, - None => return internal_err!("offsets should not be empty"), - }; - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => array.clone(), - None => { - return internal_err!("{set_op}: failed to get array from rows"); + let l_iter = l_values.iter().sorted().dedup(); + let values_set: HashSet<_> = l_iter.clone().collect(); + let mut rows = if set_op == SetOp::Union { + l_iter.collect::>() + } else { + vec![] + }; + for r_val in r_values.iter().sorted().dedup() { + match set_op { + SetOp::Union | SetOp::Except => { + if !values_set.contains(&r_val) { + rows.push(r_val); + } } - }; - new_arrays.push(array); + SetOp::Intersect => { + if values_set.contains(&r_val) { + rows.push(r_val); + } + } + } } + + offsets.push(last_offset + OffsetSize::usize_as(rows.len())); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.first() { + Some(array) => array.clone(), + None => { + return internal_err!("{set_op}: failed to get array from rows"); + } + }; + new_arrays.push(array); + nulls.push(true); } let offsets = OffsetBuffer::new(offsets.into()); let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); let values = compute::concat(&new_arrays_ref)?; - let arr = GenericListArray::::try_new(field, offsets, values, None)?; + let arr = GenericListArray::::try_new( + field, + offsets, + values, + Some(nulls.into()), + )?; Ok(Arc::new(arr)) } @@ -332,7 +376,7 @@ fn general_set_op( ) -> Result { match (array1.data_type(), array2.data_type()) { (DataType::Null, DataType::List(field)) => { - if set_op == SetOp::Intersect { + if matches!(set_op, SetOp::Intersect | SetOp::Except) { return Ok(new_empty_array(&DataType::Null)); } let array = as_list_array(&array2)?; @@ -347,7 +391,7 @@ fn general_set_op( general_array_distinct::(array, field) } (DataType::Null, DataType::LargeList(field)) => { - if set_op == SetOp::Intersect { + if matches!(set_op, SetOp::Intersect | SetOp::Except) { return Ok(new_empty_array(&DataType::Null)); } let array = as_large_list_array(&array2)?; @@ -438,3 +482,62 @@ fn general_array_distinct( None, )?)) } + +#[derive(Debug)] +pub(super) struct ArrayExcept { + signature: Signature, + aliases: Vec, +} + +impl ArrayExcept { + pub fn new() -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + aliases: vec!["array_except".to_string(), "list_except".to_string()], + } + } +} + +impl ScalarUDFImpl for ArrayExcept { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_except" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + match (&arg_types[0].clone(), &arg_types[1].clone()) { + (DataType::Null, DataType::Null) | (DataType::Null, _) => Ok(DataType::Null), + (DataType::List(field), _) => Ok(DataType::List(field.clone())), + (DataType::LargeList(field), _) => Ok(DataType::LargeList(field.clone())), + _ => exec_err!( + "Not reachable, data_type should be List, LargeList or FixedSizeList" + ), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + make_scalar_function(array_except_inner)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +/// Array_except SQL function +pub fn array_except_inner(args: &[ArrayRef]) -> datafusion_common::Result { + if args.len() != 2 { + return exec_err!("array_except needs two arguments"); + } + + let array1 = &args[0]; + let array2 = &args[1]; + + general_set_op(array1, array2, SetOp::Except) +} diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index ad979a316709..cf2dfa1ef17a 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -405,6 +405,18 @@ AS VALUES (arrow_cast(make_array([5,6], [5,6], NULL, NULL, NULL), 'FixedSizeList(5, List(Int64))')) ; +statement ok +CREATE TABLE array_setop_table +AS VALUES + ([1, 2, 2, 3], [2, 3, 4]), + ([2, 3, 3], [3]), + ([3], [3, 3, 4]), + (null, [3, 4]), + ([1, 2], null), + (null, null) +; + + statement ok CREATE TABLE array_intersect_table_1D AS VALUES @@ -3891,6 +3903,17 @@ select array_union(arrow_cast(['hello'], 'LargeList(Utf8)'), arrow_cast(['hello' ---- [hello, datafusion] +query ? +select array_union(column1, column2) +from array_setop_table; +---- +[1, 2, 3, 4] +[2, 3] +[3, 4] +[3, 4] +[1, 2] +NULL + # list_to_string scalar function #4 (function alias `array_to_string`) query TTT @@ -5329,6 +5352,17 @@ from array_distinct_table_2D_fixed; [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] [, [5, 6]] +query ? +select array_intersect(column1, column2) +from array_setop_table; +---- +[2, 3] +[3] +[3] +NULL +NULL +NULL + query ??? select array_intersect(column1, column2), array_intersect(column3, column4), @@ -5747,19 +5781,8 @@ NULL ## array_except -statement ok -CREATE TABLE array_except_table -AS VALUES - ([1, 2, 2, 3], [2, 3, 4]), - ([2, 3, 3], [3]), - ([3], [3, 3, 4]), - (null, [3, 4]), - ([1, 2], null), - (null, null) -; - query ? -select array_except(column1, column2) from array_except_table; +select array_except(column1, column2) from array_setop_table; ---- [1] [2] @@ -5768,8 +5791,6 @@ NULL [1, 2] NULL -statement ok -drop table array_except_table; statement ok CREATE TABLE array_except_nested_list_table @@ -5847,7 +5868,7 @@ select array_except(column1, column2) from array_except_table_bool; [true] [true] [false] -[true, false] +[false, true] NULL statement ok @@ -6583,3 +6604,6 @@ drop table fixed_size_arrays_values_without_nulls; statement ok drop table test_create_array_table; + +statement ok +drop table array_setop_table; \ No newline at end of file