Skip to content

Commit

Permalink
feat: support LargeList in make_array and
Browse files Browse the repository at this point in the history
array_length
  • Loading branch information
Weijun-H committed Nov 15, 2023
1 parent 841a9a6 commit 974a9e7
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 43 deletions.
26 changes: 16 additions & 10 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,11 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(array_into_list_array(values))
if values.len() <= i32::MAX as usize {
Arc::new(array_into_list_array::<i32>(values))
} else {
Arc::new(array_into_list_array::<i64>(values))
}
}

/// Converts a scalar value into an array of `size` rows.
Expand Down Expand Up @@ -2127,7 +2131,7 @@ impl ScalarValue {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(array_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array::<i32>(nested_array));

ScalarValue::List(arr)
}
Expand All @@ -2136,7 +2140,7 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(array_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array::<i32>(nested_array));

ScalarValue::List(arr)
}
Expand Down Expand Up @@ -3176,10 +3180,12 @@ mod tests {

#[test]
fn iter_to_array_string_test() {
let arr1 =
array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
let arr2 =
array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));
let arr1 = array_into_list_array::<i32>(Arc::new(StringArray::from(vec![
"foo", "bar", "baz",
])));
let arr2 = array_into_list_array::<i32>(Arc::new(StringArray::from(vec![
"rust", "world",
])));

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -4436,13 +4442,13 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));
let nl0 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl0_array)));

let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));
let nl1 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl1_array)));

let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));
let nl2 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl2_array)));

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ListArray};
use arrow_array::{Array, GenericListArray, ListArray, OffsetSizeTrait};
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
Expand Down Expand Up @@ -339,9 +339,9 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
pub fn array_into_list_array<O: OffsetSizeTrait>(arr: ArrayRef) -> GenericListArray<O> {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
GenericListArray::<O>::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
offsets,
arr,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl Accumulator for ArrayAggAccumulator {
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let list_array = array_into_list_array(concated_array);
let list_array = array_into_list_array::<i32>(concated_array);

Ok(ScalarValue::List(Arc::new(list_array)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ mod tests {
};

let arr = arrow::compute::sort(&arr, None).unwrap();
let list_arr = array_into_list_array(arr);
let list_arr = array_into_list_array::<i32>(arr);
ScalarValue::List(Arc::new(list_arr))
}

Expand Down
132 changes: 104 additions & 28 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use arrow_buffer::NullBuffer;

use arrow_schema::FieldRef;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
as_generic_list_array, as_generic_string_array, as_int64_array, as_list_array,
as_string_array,
};
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{
Expand Down Expand Up @@ -82,19 +83,22 @@ macro_rules! new_builder {
}};
}

/// Combines multiple arrays into a single ListArray
/// Combines multiple arrays into a single ListArray or LargeListArray
///
/// $ARGS: slice of arrays, each with $ARRAY_TYPE
/// $ARRAY_TYPE: the type of the list elements
/// $BUILDER_TYPE: the type of ArrayBuilder for the list elements
/// $OFFSIZE: the type of OffsetSizeTrait for the list elements
///
/// Returns: a ListArray where the elements each have the same type as
/// $ARRAY_TYPE and each element have a length of $ARGS.len()
macro_rules! array {
($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident, $OFFSIZE: ident) => {{
let builder = new_builder!($BUILDER_TYPE, $ARGS[0].len());
let mut builder =
ListBuilder::<$BUILDER_TYPE>::with_capacity(builder, $ARGS.len());
let mut builder = GenericListBuilder::<$OFFSIZE, $BUILDER_TYPE>::with_capacity(
builder,
$ARGS.len(),
);

let num_rows = $ARGS[0].len();
assert!(
Expand Down Expand Up @@ -311,13 +315,16 @@ macro_rules! call_array_function {
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
/// col1 col2 output
/// ```
fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> {
fn array_array<O: OffsetSizeTrait>(
args: &[ArrayRef],
data_type: DataType,
) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return plan_err!("Array requires at least one argument");
}

let res = match data_type {
let res: Arc<GenericListArray<O>> = match data_type {
DataType::List(..) => {
let row_count = args[0].len();
let column_count = args.len();
Expand Down Expand Up @@ -372,27 +379,27 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> {
.map(|x| x as &dyn Array)
.collect::<Vec<_>>();
let values = compute::concat(elements.as_slice())?;
let list_arr = ListArray::new(
let list_arr = GenericListArray::<O>::new(
field,
OffsetBuffer::from_lengths(list_array_lengths),
values,
Some(NullBuffer::new(buffer)),
);
Arc::new(list_arr)
}
DataType::Utf8 => array!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder),
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => array!(args, Float32Array, Float32Builder),
DataType::Float64 => array!(args, Float64Array, Float64Builder),
DataType::Int8 => array!(args, Int8Array, Int8Builder),
DataType::Int16 => array!(args, Int16Array, Int16Builder),
DataType::Int32 => array!(args, Int32Array, Int32Builder),
DataType::Int64 => array!(args, Int64Array, Int64Builder),
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder),
DataType::Utf8 => array!(args, StringArray, StringBuilder, O),
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder, O),
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder, O),
DataType::Float32 => array!(args, Float32Array, Float32Builder, O),
DataType::Float64 => array!(args, Float64Array, Float64Builder, O),
DataType::Int8 => array!(args, Int8Array, Int8Builder, O),
DataType::Int16 => array!(args, Int16Array, Int16Builder, O),
DataType::Int32 => array!(args, Int32Array, Int32Builder, O),
DataType::Int64 => array!(args, Int64Array, Int64Builder, O),
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder, O),
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder, O),
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder, O),
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder, O),
data_type => {
return not_impl_err!("Array is not implemented for type '{data_type:?}'.")
}
Expand All @@ -412,13 +419,28 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
}
}

let len = arrays.len();
match data_type {
// Either an empty array or all nulls:
DataType::Null => {
let array = new_null_array(&DataType::Null, arrays.len());
Ok(Arc::new(array_into_list_array(array)))
if len <= i32::MAX as usize {
Ok(Arc::new(array_into_list_array::<i32>(array)))
} else if len <= i64::MAX as usize {
Ok(Arc::new(array_into_list_array::<i64>(array)))
} else {
exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len)
}
}
data_type => {
if len <= i32::MAX as usize {
array_array::<i32>(arrays, data_type)
} else if len <= i64::MAX as usize {
array_array::<i64>(arrays, data_type)
} else {
exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len)
}
}
data_type => array_array(arrays, data_type),
}
}

Expand Down Expand Up @@ -1688,7 +1710,20 @@ pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {

/// Array_length SQL function
pub fn array_length(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
match &args[0].data_type() {
DataType::List(_) => _array_length_list::<i32>(args),
DataType::LargeList(_) => _array_length_list::<i64>(args),
_ => Err(DataFusionError::Internal(format!(
"array_length does not support type '{:?}'",
args[0].data_type()
))),
}
}

/// array_length for List and LargeList
fn _array_length_list<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_generic_list_array::<O>(&args[0])?;

let dimension = if args.len() == 2 {
as_int64_array(&args[1])?.clone()
} else {
Expand Down Expand Up @@ -2048,8 +2083,10 @@ mod tests {
Some(vec![Some(6), Some(7), Some(8)]),
]));

let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef;
let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef;
let array2d_1 =
Arc::new(array_into_list_array::<i32>(array1d_1.clone())) as ArrayRef;
let array2d_2 =
Arc::new(array_into_list_array::<i32>(array1d_2.clone())) as ArrayRef;

let res =
align_array_dimensions(vec![array1d_1.to_owned(), array2d_2.to_owned()])
Expand All @@ -2063,8 +2100,8 @@ mod tests {
expected_dim
);

let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef;
let array3d_2 = array_into_list_array(array2d_2.to_owned());
let array3d_1 = Arc::new(array_into_list_array::<i32>(array2d_1)) as ArrayRef;
let array3d_2 = array_into_list_array::<i32>(array2d_2.to_owned());
let res =
align_array_dimensions(vec![array1d_1, Arc::new(array3d_2.clone())]).unwrap();

Expand Down Expand Up @@ -2121,6 +2158,12 @@ mod tests {
);
}

#[test]
fn test_return_large_array() {
let list_array = return_large_array();
assert_eq!(list_array.data_type().to_string(), "failed to cast to");
}

#[test]
fn test_array_element() {
// array_element([1, 2, 3, 4], 1) = 1
Expand Down Expand Up @@ -2867,6 +2910,8 @@ mod tests {
fn test_array_length() {
// array_length([1, 2, 3, 4]) = 4
let list_array = return_array();
// cast thr list_array to LargeList
println!("{:?}", list_array.data_type());
let arr = array_length(&[list_array.clone()])
.expect("failed to initialize function array_ndims");
let result =
Expand All @@ -2875,6 +2920,16 @@ mod tests {
assert_eq!(result, &UInt64Array::from_value(4, 1));

// array_length([1, 2, 3, 4], 1) = 4
let array =
array_length(&[list_array.clone(), Arc::new(Int64Array::from_value(1, 1))])
.expect("failed to initialize function array_ndims");
let result =
as_uint64_array(&array).expect("failed to initialize function array_ndims");

assert_eq!(result, &UInt64Array::from_value(4, 1));

// for LargeList
// array_length([1, 2, 3, 4], 2) = 4
let array = array_length(&[list_array, Arc::new(Int64Array::from_value(1, 1))])
.expect("failed to initialize function array_ndims");
let result =
Expand Down Expand Up @@ -3015,6 +3070,27 @@ mod tests {
make_array(&args).expect("failed to initialize function array")
}

fn return_large_array() -> ArrayRef {
// Returns: [1, 2, 3, 4]
let capacity = i32::MAX as usize + 10;
let args = vec![Arc::new(Int64Array::from(vec![Some(1)])) as ArrayRef; capacity];

println!("args.len() = {}", args.len());

make_array(&args).expect("failed to initialize function array")
}

fn return_extra_array() -> ArrayRef {
// Returns: [11, 12, 13, 14]
let args = [
Arc::new(Int64Array::from(vec![Some(11)])) as ArrayRef,
Arc::new(Int64Array::from(vec![Some(12)])) as ArrayRef,
Arc::new(Int64Array::from(vec![Some(13)])) as ArrayRef,
Arc::new(Int64Array::from(vec![Some(14)])) as ArrayRef,
];
make_array(&args).expect("failed to initialize function array")
}

fn return_nested_array() -> ArrayRef {
// Returns: [[1, 2, 3, 4], [5, 6, 7, 8]]
let args = [
Expand Down

0 comments on commit 974a9e7

Please sign in to comment.