Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions datafusion/functions-nested/benches/array_reverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{hint::black_box, sync::Arc};
use crate::criterion::Criterion;
use arrow::{
array::{ArrayRef, FixedSizeListArray, Int32Array, ListArray, ListViewArray},
buffer::{OffsetBuffer, ScalarBuffer},
buffer::{NullBuffer, OffsetBuffer, ScalarBuffer},
datatypes::{DataType, Field},
};
use datafusion_functions_nested::reverse::array_reverse_inner;
Expand All @@ -34,44 +34,80 @@ fn array_reverse(array: &ArrayRef) -> ArrayRef {
}

fn criterion_benchmark(c: &mut Criterion) {
// Construct large arrays for benchmarking
let array_len = 100000;
let step_size: usize = 1000;
let offsets: Vec<i32> = (0..array_len as i32).step_by(step_size).collect();
// Create array sizes with step size of 100, starting from 100.
let number_of_arrays = 1000;
let sizes = (0..number_of_arrays)
.map(|i| 100 + i * 100)
.collect::<Vec<i32>>();

// Calculate the total number of values
let total_values = sizes.iter().sum::<i32>();

// Calculate sizes and offsets from array lengths
let offsets = sizes
.iter()
.scan(0, |acc, &x| {
let offset = *acc;
*acc += x;
Some(offset)
})
.collect::<Vec<i32>>();
let offsets = ScalarBuffer::from(offsets);
let sizes: Vec<i32> = vec![step_size as i32; array_len / step_size];
let values = (0..array_len as i32).collect::<Vec<i32>>();
// Set every 10th array to null
let nulls = (0..number_of_arrays)
.map(|i| i % 10 != 0)
.collect::<Vec<bool>>();

let values = (0..total_values).collect::<Vec<i32>>();
let values = Arc::new(Int32Array::from(values));

// Create ListArray and ListViewArray
let nulls_list_array = Some(NullBuffer::from(
nulls[..((number_of_arrays as usize) - 1)].to_vec(),
));
let list_array: ArrayRef = Arc::new(ListArray::new(
Arc::new(Field::new("a", DataType::Int32, false)),
OffsetBuffer::new(offsets.clone()),
Arc::new(Int32Array::from(values.clone())),
None,
values.clone(),
nulls_list_array,
));
let fixed_size_list_array: ArrayRef = Arc::new(FixedSizeListArray::new(
Arc::new(Field::new("a", DataType::Int32, false)),
step_size as i32,
Arc::new(Int32Array::from(values.clone())),
None,
let nulls_list_view_array = Some(NullBuffer::from(
nulls[..(number_of_arrays as usize)].to_vec(),
));
let list_view_array: ArrayRef = Arc::new(ListViewArray::new(
Arc::new(Field::new("a", DataType::Int32, false)),
offsets,
ScalarBuffer::from(sizes),
Arc::new(Int32Array::from(values)),
None,
values.clone(),
nulls_list_view_array,
));

c.bench_function("array_reverse_list", |b| {
b.iter(|| array_reverse(&list_array))
});

c.bench_function("array_reverse_fixed_size_list", |b| {
b.iter(|| array_reverse(&fixed_size_list_array))
});

c.bench_function("array_reverse_list_view", |b| {
b.iter(|| array_reverse(&list_view_array))
});

// Create FixedSizeListArray
let array_len = 1000;
let num_arrays = 5000;
let total_values = num_arrays * array_len;
let values = (0..total_values).collect::<Vec<i32>>();
let values = Arc::new(Int32Array::from(values));
// Set every 10th array to null
let nulls = (0..num_arrays).map(|i| i % 10 != 0).collect::<Vec<bool>>();
let nulls = Some(NullBuffer::from(nulls));
let fixed_size_list_array: ArrayRef = Arc::new(FixedSizeListArray::new(
Arc::new(Field::new("a", DataType::Int32, false)),
array_len,
values.clone(),
nulls.clone(),
));
c.bench_function("array_reverse_fixed_size_list", |b| {
b.iter(|| array_reverse(&fixed_size_list_array))
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
117 changes: 82 additions & 35 deletions datafusion/functions-nested/src/reverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use crate::utils::make_scalar_function;
use arrow::array::{
Array, ArrayRef, Capacities, FixedSizeListArray, GenericListArray,
GenericListViewArray, MutableArrayData, OffsetSizeTrait, UInt32Array,
Array, ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray,
OffsetSizeTrait, UInt32Array, UInt64Array,
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::compute::take;
Expand Down Expand Up @@ -155,11 +155,8 @@ fn general_array_reverse<O: OffsetSizeTrait>(
field: &FieldRef,
) -> Result<ArrayRef> {
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut offsets = vec![O::usize_as(0)];
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);
let mut indices: Vec<O> = Vec::with_capacity(values.len());

for (row_index, (&start, &end)) in array.offsets().iter().tuple_windows().enumerate()
{
Expand All @@ -171,18 +168,34 @@ fn general_array_reverse<O: OffsetSizeTrait>(

let mut index = end - O::one();
while index >= start {
mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1);
indices.push(index);
index = index - O::one();
}
let size = end - start;
offsets.push(offsets[row_index] + size);
}

let data = mutable.freeze();
// Materialize values from underlying array with take
let indices_array: ArrayRef = if O::IS_LARGE {
Arc::new(UInt64Array::from(
indices
.iter()
.map(|i| i.as_usize() as u64)
.collect::<Vec<_>>(),
))
} else {
Arc::new(UInt32Array::from(
indices
.iter()
.map(|i| i.as_usize() as u32)
.collect::<Vec<_>>(),
))
};
Comment on lines +178 to +193
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated for ListView. I figured twice was not enough to extract to a function, but if we find a nicer way to do it, we can improve both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we figured out a nicer way of doing this but I still can't figure it out 😅

Went a bit crazy with generics in attempting once but didn't pan out; but it works and the if branch isn't a big deal, though I wonder if it's more efficient to just have Int32/Int64 arrays instead of their unsigned variants to avoid needing a map -> collect 🤔

let values = take(&values, &indices_array, None)?;
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::clone(field),
OffsetBuffer::<O>::new(offsets.into()),
arrow::array::make_array(data),
values,
array.nulls().cloned(),
)?))
}
Expand Down Expand Up @@ -231,7 +244,7 @@ fn list_view_reverse<O: OffsetSizeTrait>(

// Materialize values from underlying array with take
let indices_array: ArrayRef = if O::IS_LARGE {
Arc::new(arrow::array::UInt64Array::from(
Arc::new(UInt64Array::from(
indices
.iter()
.map(|i| i.as_usize() as u64)
Expand All @@ -245,13 +258,12 @@ fn list_view_reverse<O: OffsetSizeTrait>(
.collect::<Vec<_>>(),
))
};
let values_reversed = take(&values, &indices_array, None)?;

let values = take(&values, &indices_array, None)?;
Ok(Arc::new(GenericListViewArray::<O>::try_new(
Arc::clone(field),
ScalarBuffer::from(new_offsets),
ScalarBuffer::from(new_sizes),
values_reversed,
values,
array.nulls().cloned(),
)?))
}
Expand All @@ -260,42 +272,34 @@ fn fixed_size_array_reverse(
array: &FixedSizeListArray,
field: &FieldRef,
) -> Result<ArrayRef> {
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);
let value_length = array.value_length() as usize;
let values: &Arc<dyn Array> = array.values();

for row_index in 0..array.len() {
// skip the null value
if array.is_null(row_index) {
mutable.extend(0, 0, value_length);
continue;
}
let start = row_index * value_length;
let end = start + value_length;
for idx in (start..end).rev() {
mutable.extend(0, idx, idx + 1);
}
// Since each fixed size list in the physical array is the same size and we keep the order
// of the fixed size lists, we can reverse the indices for each fixed size list.
let mut indices: Vec<u64> = (0..values.len() as u64).collect();
for chunk in indices.chunks_mut(array.value_length() as usize) {
chunk.reverse();
}

let data = mutable.freeze();
// Materialize values from underlying array with take
let indices_array: ArrayRef = Arc::new(UInt64Array::from(indices));
let values = take(&values, &indices_array, None)?;

Ok(Arc::new(FixedSizeListArray::try_new(
Arc::clone(field),
array.value_length(),
arrow::array::make_array(data),
values,
array.nulls().cloned(),
)?))
}

#[cfg(test)]
mod tests {
use crate::reverse::list_view_reverse;
use crate::reverse::{fixed_size_array_reverse, list_view_reverse};
use arrow::{
array::{
AsArray, GenericListViewArray, Int32Array, LargeListViewArray, ListViewArray,
OffsetSizeTrait,
AsArray, FixedSizeListArray, GenericListViewArray, Int32Array,
LargeListViewArray, ListViewArray, OffsetSizeTrait,
},
buffer::{NullBuffer, ScalarBuffer},
datatypes::{DataType, Field, Int32Type},
Expand All @@ -312,6 +316,13 @@ mod tests {
.collect()
}

fn fixed_size_list_values(array: &FixedSizeListArray) -> Vec<Option<Vec<i32>>> {
array
.iter()
.map(|x| x.map(|x| x.as_primitive::<Int32Type>().values().to_vec()))
.collect()
}

#[test]
fn test_reverse_list_view() -> Result<()> {
let field = Arc::new(Field::new("a", DataType::Int32, false));
Expand Down Expand Up @@ -450,4 +461,40 @@ mod tests {
assert_eq!(expected, reversed);
Ok(())
}

#[test]
fn test_reverse_fixed_size_list() -> Result<()> {
let field = Arc::new(Field::new("a", DataType::Int32, false));
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
let result = fixed_size_array_reverse(
&FixedSizeListArray::new(
field,
3,
values,
Some(NullBuffer::from(vec![true, false, true])),
),
&Arc::new(Field::new("test", DataType::Int32, true)),
)?;
let reversed = fixed_size_list_values(result.as_fixed_size_list());
let expected = vec![Some(vec![3, 2, 1]), None, Some(vec![9, 8, 7])];
assert_eq!(expected, reversed);
Ok(())
}

#[test]
fn test_reverse_fixed_size_list_empty() -> Result<()> {
let field = Arc::new(Field::new("a", DataType::Int32, false));
let empty_array: Vec<i32> = vec![];
let values = Arc::new(Int32Array::from(empty_array));
let nulls = None;
let fixed_size_list = FixedSizeListArray::new(field, 3, values, nulls);
let result = fixed_size_array_reverse(
&fixed_size_list,
&Arc::new(Field::new("test", DataType::Int32, true)),
)?;
let reversed = fixed_size_list_values(result.as_fixed_size_list());
let expected: Vec<Option<Vec<i32>>> = vec![];
assert_eq!(expected, reversed);
Ok(())
}
}