Skip to content

Commit

Permalink
use make_scalar_array_function
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 20, 2024
1 parent 51716ea commit eff3a16
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 27 deletions.
160 changes: 133 additions & 27 deletions datafusion/functions/src/core/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::{Array, ArrayData, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray};
use arrow::array::{Array, ArrayData, ArrayRef, Capacities, FixedSizeListArray, GenericListArray, LargeListArray, ListArray, make_array, MapArray, MutableArrayData, new_null_array, NullArray, OffsetSizeTrait, StructArray};
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use arrow::datatypes::DataType::Int32;
use arrow_buffer::{Buffer, ToByteSlice};
use arrow::datatypes::DataType::{Int32, LargeList, Null};
use arrow_buffer::{Buffer, OffsetBuffer, ToByteSlice};

use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{exec_err, plan_err, ScalarValue};
use datafusion_common::Result;
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use crate::utils::make_scalar_array_function;

make_udf_function!(MapFunc, MAP, map_udf);
make_udf_function!(MapOneFunc, MAP_ONE, map_one_udf);
Expand All @@ -50,7 +52,7 @@ fn get_scalar_from_col(c: &ColumnarValue) -> ScalarValue {
}
}

fn make_map_batch_one_args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn make_map_batch_one_args(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() % 2 != 0 {
return exec_err!(
"make_map requires exactly 2 arguments, got {} instead",
Expand All @@ -59,15 +61,12 @@ fn make_map_batch_one_args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}

let len = args.len() / 2;
let key_iter = args[0..len].iter().map(get_scalar_from_col);
let key = ScalarValue::iter_to_array(key_iter)?;
let val_iter = args[len..].iter().map(get_scalar_from_col);
let value = ScalarValue::iter_to_array(val_iter)?;
let key = make_array_inner(&args[0..len])?;
let value = make_array_inner(&args[len..])?;

let key = get_first_element(key);
let value = get_first_element(value);
let can_evaluate_to_const = can_evaluate_to_const(args);
make_map_batch_internal(key, value, can_evaluate_to_const)
make_map_batch_internal(key, value, true)
}

fn get_first_element(value: ArrayRef) -> ArrayRef {
Expand All @@ -88,19 +87,17 @@ fn get_first_element(value: ArrayRef) -> ArrayRef {
}
}

fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn make_map_batch(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!(
"make_map requires exactly 2 arguments, got {} instead",
args.len()
);
}

let can_evaluate_to_const = can_evaluate_to_const(args);

let key = get_first_array_ref(&args[0])?;
let value = get_first_array_ref(&args[1])?;
make_map_batch_internal(key, value, can_evaluate_to_const)
let key = get_first_element(Arc::clone(&args[0]));
let value = get_first_element(Arc::clone(&args[1]));
make_map_batch_internal(key, value, true)
}

fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
Expand All @@ -119,7 +116,7 @@ fn make_map_batch_internal(
keys: ArrayRef,
values: ArrayRef,
can_evaluate_to_const: bool,
) -> Result<ColumnarValue> {
) -> Result<ArrayRef> {
if keys.null_count() > 0 {
return exec_err!("map key cannot be null");
}
Expand Down Expand Up @@ -158,13 +155,7 @@ fn make_map_batch_internal(
.add_buffer(entry_offsets_buffer)
.add_child_data(entry_struct.to_data())
.build()?;
let map_array = Arc::new(MapArray::from(map_data));

Ok(if can_evaluate_to_const {
ColumnarValue::Scalar(ScalarValue::try_from_array(map_array.as_ref(), 0)?)
} else {
ColumnarValue::Array(map_array)
})
Ok(Arc::new(MapArray::from(map_data)))
}

#[derive(Debug)]
Expand Down Expand Up @@ -225,7 +216,7 @@ impl ScalarUDFImpl for MapFunc {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_map_batch(args)
make_scalar_array_function(make_map_batch)(args)
}
}

Expand Down Expand Up @@ -304,6 +295,121 @@ impl ScalarUDFImpl for MapOneFunc {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_map_batch_one_args(args)
make_scalar_array_function(make_map_batch_one_args)(args)
}
}

pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
let mut data_type = Null;
for arg in arrays {
let arg_data_type = arg.data_type();
if !arg_data_type.equals_datatype(&Null) {
data_type = arg_data_type.clone();
break;
}
}

match data_type {
// Either an empty array or all nulls:
Null => {
let length = arrays.iter().map(|a| a.len()).sum();
// By default Int64
let array = new_null_array(&DataType::Int64, length);
Ok(Arc::new(array_into_list_array_nullable(array)))
}
LargeList(..) => array_array::<i64>(arrays, data_type),
_ => array_array::<i32>(arrays, data_type),
}
}

/// Convert one or more [`ArrayRef`] of the same type into a
/// `ListArray` or 'LargeListArray' depending on the offset size.
///
/// # Example (non nested)
///
/// Calling `array(col1, col2)` where col1 and col2 are non nested
/// would return a single new `ListArray`, where each row was a list
/// of 2 elements:
///
/// ```text
/// ┌─────────┐ ┌─────────┐ ┌──────────────┐
/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │
/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │
/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │
/// └─────────┘ └─────────┘ └──────────────┘
/// col1 col2 output
/// ```
///
/// # Example (nested)
///
/// Calling `array(col1, col2)` where col1 and col2 are lists
/// would return a single new `ListArray`, where each row was a list
/// of the corresponding elements of col1 and col2.
///
/// ``` text
/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐
/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │
/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │
/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │
/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │
/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
/// col1 col2 output
/// ```
fn array_array<O: OffsetSizeTrait>(
args: &[ArrayRef],
data_type: DataType,
) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return plan_err!("Array requires at least one argument");
}

let mut data = vec![];
let mut total_len = 0;
for arg in args {
let arg_data = if arg.as_any().is::<NullArray>() {
ArrayData::new_empty(&data_type)
} else {
arg.to_data()
};
total_len += arg_data.len();
data.push(arg_data);
}

let mut offsets: Vec<O> = Vec::with_capacity(total_len);
offsets.push(O::usize_as(0));

let capacity = Capacities::Array(total_len);
let data_ref = data.iter().collect::<Vec<_>>();
let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);

let num_rows = args[0].len();
for row_idx in 0..num_rows {
for (arr_idx, arg) in args.iter().enumerate() {
if !arg.as_any().is::<NullArray>()
&& !arg.is_null(row_idx)
&& arg.is_valid(row_idx)
{
mutable.extend(arr_idx, row_idx, row_idx + 1);
} else {
mutable.extend_nulls(1);
}
}
offsets.push(O::usize_as(mutable.len()));
}
let data = mutable.freeze();

Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new("item", data_type, true)),
OffsetBuffer::new(offsets.into()),
make_array(data),
None,
)?))
}
31 changes: 31 additions & 0 deletions datafusion/functions/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,37 @@ where
})
}

pub(crate) fn make_scalar_array_function<F>(inner: F) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
Arc::new(move |args: &[ColumnarValue]| {
// first, identify if any of the arguments is an Array. If yes, store its `len`,
// as any scalar will need to be converted to an array of len `len`.
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});

let is_scalar = len.is_none();

let args = ColumnarValue::values_to_arrays(args)?;

let result = (inner)(&args);

if is_scalar {
// If all inputs are scalar, keeps output as scalar
let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0));
result.map(ColumnarValue::Scalar)
} else {
result.map(ColumnarValue::Array)
}
})
}


#[cfg(test)]
pub mod test {
/// $FUNC ScalarUDFImpl to test
Expand Down

0 comments on commit eff3a16

Please sign in to comment.