diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index cdcc9aa4fbc5d..3bc2f063886b0 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1725,7 +1725,11 @@ impl ScalarValue { } else { Self::iter_to_array(values.iter().cloned()).unwrap() }; - Arc::new(array_into_list_array(values)) + if values.len() <= i32::MAX as usize { + Arc::new(array_into_list_array::(values)) + } else { + Arc::new(array_into_list_array::(values)) + } } /// Converts a scalar value into an array of `size` rows. @@ -2127,7 +2131,7 @@ impl ScalarValue { let list_array = as_list_array(array); let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. - let arr = Arc::new(array_into_list_array(nested_array)); + let arr = Arc::new(array_into_list_array::(nested_array)); ScalarValue::List(arr) } @@ -2136,7 +2140,7 @@ impl ScalarValue { let list_array = as_fixed_size_list_array(array)?; let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. - let arr = Arc::new(array_into_list_array(nested_array)); + let arr = Arc::new(array_into_list_array::(nested_array)); ScalarValue::List(arr) } @@ -3176,10 +3180,12 @@ mod tests { #[test] fn iter_to_array_string_test() { - let arr1 = - array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); - let arr2 = - array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"]))); + let arr1 = array_into_list_array::(Arc::new(StringArray::from(vec![ + "foo", "bar", "baz", + ]))); + let arr2 = array_into_list_array::(Arc::new(StringArray::from(vec![ + "rust", "world", + ]))); let scalars = vec![ ScalarValue::List(Arc::new(arr1)), @@ -4436,13 +4442,13 @@ mod tests { // Define list-of-structs scalars let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap(); - let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array))); + let nl0 = ScalarValue::List(Arc::new(array_into_list_array::(nl0_array))); let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap(); - let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array))); + let nl1 = ScalarValue::List(Arc::new(array_into_list_array::(nl1_array))); let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap(); - let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array))); + let nl2 = ScalarValue::List(Arc::new(array_into_list_array::(nl2_array))); // iter_to_array for list-of-struct let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap(); diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index f031f7880436b..108c919a671f8 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -25,7 +25,7 @@ use arrow::compute; use arrow::compute::{partition, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; -use arrow_array::{Array, ListArray}; +use arrow_array::{Array, GenericListArray, ListArray, OffsetSizeTrait}; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; @@ -339,9 +339,9 @@ pub fn longest_consecutive_prefix>( /// Wrap an array into a single element `ListArray`. /// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]` -pub fn array_into_list_array(arr: ArrayRef) -> ListArray { +pub fn array_into_list_array(arr: ArrayRef) -> GenericListArray { let offsets = OffsetBuffer::from_lengths([arr.len()]); - ListArray::new( + GenericListArray::::new( Arc::new(Field::new("item", arr.data_type().to_owned(), true)), offsets, arr, diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index 91d5c867d3125..953472c6b9cc1 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -169,7 +169,7 @@ impl Accumulator for ArrayAggAccumulator { } let concated_array = arrow::compute::concat(&element_arrays)?; - let list_array = array_into_list_array(concated_array); + let list_array = array_into_list_array::(concated_array); Ok(ScalarValue::List(Arc::new(list_array))) } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 1efae424cc699..b65ea1bb25697 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -206,7 +206,7 @@ mod tests { }; let arr = arrow::compute::sort(&arr, None).unwrap(); - let list_arr = array_into_list_array(arr); + let list_arr = array_into_list_array::(arr); ScalarValue::List(Arc::new(list_arr)) } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 01d495ee7f6ba..71c811d6d080d 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -29,7 +29,8 @@ use arrow_buffer::NullBuffer; use arrow_schema::FieldRef; use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_list_array, as_string_array, + as_generic_list_array, as_generic_string_array, as_int64_array, as_list_array, + as_string_array, }; use datafusion_common::utils::array_into_list_array; use datafusion_common::{ @@ -82,19 +83,22 @@ macro_rules! new_builder { }}; } -/// Combines multiple arrays into a single ListArray +/// Combines multiple arrays into a single ListArray or LargeListArray /// /// $ARGS: slice of arrays, each with $ARRAY_TYPE /// $ARRAY_TYPE: the type of the list elements /// $BUILDER_TYPE: the type of ArrayBuilder for the list elements +/// $OFFSIZE: the type of OffsetSizeTrait for the list elements /// /// Returns: a ListArray where the elements each have the same type as /// $ARRAY_TYPE and each element have a length of $ARGS.len() macro_rules! array { - ($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{ + ($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident, $OFFSIZE: ident) => {{ let builder = new_builder!($BUILDER_TYPE, $ARGS[0].len()); - let mut builder = - ListBuilder::<$BUILDER_TYPE>::with_capacity(builder, $ARGS.len()); + let mut builder = GenericListBuilder::<$OFFSIZE, $BUILDER_TYPE>::with_capacity( + builder, + $ARGS.len(), + ); let num_rows = $ARGS[0].len(); assert!( @@ -311,13 +315,16 @@ macro_rules! call_array_function { /// └──────────────┘ └──────────────┘ └─────────────────────────────┘ /// col1 col2 output /// ``` -fn array_array(args: &[ArrayRef], data_type: DataType) -> Result { +fn array_array( + args: &[ArrayRef], + data_type: DataType, +) -> Result { // do not accept 0 arguments. if args.is_empty() { return plan_err!("Array requires at least one argument"); } - let res = match data_type { + let res: Arc> = match data_type { DataType::List(..) => { let row_count = args[0].len(); let column_count = args.len(); @@ -372,7 +379,7 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result { .map(|x| x as &dyn Array) .collect::>(); let values = compute::concat(elements.as_slice())?; - let list_arr = ListArray::new( + let list_arr = GenericListArray::::new( field, OffsetBuffer::from_lengths(list_array_lengths), values, @@ -380,19 +387,19 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result { ); Arc::new(list_arr) } - DataType::Utf8 => array!(args, StringArray, StringBuilder), - DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder), - DataType::Boolean => array!(args, BooleanArray, BooleanBuilder), - DataType::Float32 => array!(args, Float32Array, Float32Builder), - DataType::Float64 => array!(args, Float64Array, Float64Builder), - DataType::Int8 => array!(args, Int8Array, Int8Builder), - DataType::Int16 => array!(args, Int16Array, Int16Builder), - DataType::Int32 => array!(args, Int32Array, Int32Builder), - DataType::Int64 => array!(args, Int64Array, Int64Builder), - DataType::UInt8 => array!(args, UInt8Array, UInt8Builder), - DataType::UInt16 => array!(args, UInt16Array, UInt16Builder), - DataType::UInt32 => array!(args, UInt32Array, UInt32Builder), - DataType::UInt64 => array!(args, UInt64Array, UInt64Builder), + DataType::Utf8 => array!(args, StringArray, StringBuilder, O), + DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder, O), + DataType::Boolean => array!(args, BooleanArray, BooleanBuilder, O), + DataType::Float32 => array!(args, Float32Array, Float32Builder, O), + DataType::Float64 => array!(args, Float64Array, Float64Builder, O), + DataType::Int8 => array!(args, Int8Array, Int8Builder, O), + DataType::Int16 => array!(args, Int16Array, Int16Builder, O), + DataType::Int32 => array!(args, Int32Array, Int32Builder, O), + DataType::Int64 => array!(args, Int64Array, Int64Builder, O), + DataType::UInt8 => array!(args, UInt8Array, UInt8Builder, O), + DataType::UInt16 => array!(args, UInt16Array, UInt16Builder, O), + DataType::UInt32 => array!(args, UInt32Array, UInt32Builder, O), + DataType::UInt64 => array!(args, UInt64Array, UInt64Builder, O), data_type => { return not_impl_err!("Array is not implemented for type '{data_type:?}'.") } @@ -412,13 +419,28 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result { } } + let len = arrays.len(); match data_type { // Either an empty array or all nulls: DataType::Null => { let array = new_null_array(&DataType::Null, arrays.len()); - Ok(Arc::new(array_into_list_array(array))) + if len <= i32::MAX as usize { + Ok(Arc::new(array_into_list_array::(array))) + } else if len <= i64::MAX as usize { + Ok(Arc::new(array_into_list_array::(array))) + } else { + exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len) + } + } + data_type => { + if len <= i32::MAX as usize { + array_array::(arrays, data_type) + } else if len <= i64::MAX as usize { + array_array::(arrays, data_type) + } else { + exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len) + } } - data_type => array_array(arrays, data_type), } } @@ -1688,7 +1710,20 @@ pub fn flatten(args: &[ArrayRef]) -> Result { /// Array_length SQL function pub fn array_length(args: &[ArrayRef]) -> Result { - let list_array = as_list_array(&args[0])?; + match &args[0].data_type() { + DataType::List(_) => _array_length_list::(args), + DataType::LargeList(_) => _array_length_list::(args), + _ => Err(DataFusionError::Internal(format!( + "array_length does not support type '{:?}'", + args[0].data_type() + ))), + } +} + +/// array_length for List and LargeList +fn _array_length_list(args: &[ArrayRef]) -> Result { + let list_array = as_generic_list_array::(&args[0])?; + let dimension = if args.len() == 2 { as_int64_array(&args[1])?.clone() } else { @@ -2048,8 +2083,10 @@ mod tests { Some(vec![Some(6), Some(7), Some(8)]), ])); - let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef; - let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef; + let array2d_1 = + Arc::new(array_into_list_array::(array1d_1.clone())) as ArrayRef; + let array2d_2 = + Arc::new(array_into_list_array::(array1d_2.clone())) as ArrayRef; let res = align_array_dimensions(vec![array1d_1.to_owned(), array2d_2.to_owned()]) @@ -2063,8 +2100,8 @@ mod tests { expected_dim ); - let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef; - let array3d_2 = array_into_list_array(array2d_2.to_owned()); + let array3d_1 = Arc::new(array_into_list_array::(array2d_1)) as ArrayRef; + let array3d_2 = array_into_list_array::(array2d_2.to_owned()); let res = align_array_dimensions(vec![array1d_1, Arc::new(array3d_2.clone())]).unwrap(); @@ -2121,6 +2158,12 @@ mod tests { ); } + #[test] + fn test_return_large_array() { + let list_array = return_large_array(); + assert_eq!(list_array.data_type().to_string(), "failed to cast to"); + } + #[test] fn test_array_element() { // array_element([1, 2, 3, 4], 1) = 1 @@ -2867,6 +2910,8 @@ mod tests { fn test_array_length() { // array_length([1, 2, 3, 4]) = 4 let list_array = return_array(); + // cast thr list_array to LargeList + println!("{:?}", list_array.data_type()); let arr = array_length(&[list_array.clone()]) .expect("failed to initialize function array_ndims"); let result = @@ -2875,6 +2920,16 @@ mod tests { assert_eq!(result, &UInt64Array::from_value(4, 1)); // array_length([1, 2, 3, 4], 1) = 4 + let array = + array_length(&[list_array.clone(), Arc::new(Int64Array::from_value(1, 1))]) + .expect("failed to initialize function array_ndims"); + let result = + as_uint64_array(&array).expect("failed to initialize function array_ndims"); + + assert_eq!(result, &UInt64Array::from_value(4, 1)); + + // for LargeList + // array_length([1, 2, 3, 4], 2) = 4 let array = array_length(&[list_array, Arc::new(Int64Array::from_value(1, 1))]) .expect("failed to initialize function array_ndims"); let result = @@ -3015,6 +3070,27 @@ mod tests { make_array(&args).expect("failed to initialize function array") } + fn return_large_array() -> ArrayRef { + // Returns: [1, 2, 3, 4] + let capacity = i32::MAX as usize + 10; + let args = vec![Arc::new(Int64Array::from(vec![Some(1)])) as ArrayRef; capacity]; + + println!("args.len() = {}", args.len()); + + make_array(&args).expect("failed to initialize function array") + } + + fn return_extra_array() -> ArrayRef { + // Returns: [11, 12, 13, 14] + let args = [ + Arc::new(Int64Array::from(vec![Some(11)])) as ArrayRef, + Arc::new(Int64Array::from(vec![Some(12)])) as ArrayRef, + Arc::new(Int64Array::from(vec![Some(13)])) as ArrayRef, + Arc::new(Int64Array::from(vec![Some(14)])) as ArrayRef, + ]; + make_array(&args).expect("failed to initialize function array") + } + fn return_nested_array() -> ArrayRef { // Returns: [[1, 2, 3, 4], [5, 6, 7, 8]] let args = [