Skip to content

Commit

Permalink
feat: Unlatched spread for all the things (#637)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers authored Aug 9, 2023
1 parent bb7c5be commit a547f9c
Showing 1 changed file with 20 additions and 134 deletions.
154 changes: 20 additions & 134 deletions crates/sparrow-runtime/src/execute/operation/spread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use std::sync::Arc;

use anyhow::Context;
use arrow::array::{
new_null_array, Array, ArrayData, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder,
GenericStringArray, GenericStringBuilder, Int32BufferBuilder, ListArray, MapArray,
OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StructArray,
new_null_array, Array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder,
GenericStringArray, GenericStringBuilder, Int32Builder, OffsetSizeTrait, PrimitiveArray,
PrimitiveBuilder, StructArray,
};
use arrow::datatypes::{self, ArrowPrimitiveType, DataType, Fields};
use bitvec::vec::BitVec;
use itertools::{izip, Itertools};
use sparrow_arrow::downcast::{
downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array,
downcast_struct_array,
downcast_boolean_array, downcast_primitive_array, downcast_string_array, downcast_struct_array,
};
use sparrow_arrow::utils::make_null_array;
use sparrow_instructions::GroupingIndices;
Expand Down Expand Up @@ -43,7 +42,7 @@ impl<'de> serde::Deserialize<'de> for Spread {
let e = SerializedSpread::deserialize(deserializer)?;
let Some(spread_impl) = e.into_spread_impl() else {
use serde::de::Error;
return Err(D::Error::custom("expected owned"))
return Err(D::Error::custom("expected owned"));
};

Ok(Self { spread_impl })
Expand Down Expand Up @@ -167,10 +166,9 @@ enum SerializedSpread<'a> {
UnlatchedString(Boo<'a, UnlatchedStringSpread<i32>>),
LatchedLargeString(Boo<'a, LatchedStringSpread<i64>>),
UnlatchedLargeString(Boo<'a, UnlatchedStringSpread<i64>>),
UnlatchedUInt64List(Boo<'a, UnlatchedUInt64ListSpread>),
UnlatchedMap(Boo<'a, UnlatchedMapSpread>),
LatchedStruct(Boo<'a, StructSpread<LatchedStructSpreadState>>),
UnlatchedStruct(Boo<'a, StructSpread<UnlatchedStructSpreadState>>),
UnlatchedFallback(Boo<'a, UnlatchedFallbackSpread>),
}

fn into_spread_impl<T: SpreadImpl + 'static>(spread: Boo<'_, T>) -> Option<Box<dyn SpreadImpl>> {
Expand Down Expand Up @@ -246,10 +244,9 @@ impl<'a> SerializedSpread<'a> {
SerializedSpread::UnlatchedString(spread) => into_spread_impl(spread),
SerializedSpread::LatchedLargeString(spread) => into_spread_impl(spread),
SerializedSpread::UnlatchedLargeString(spread) => into_spread_impl(spread),
SerializedSpread::UnlatchedUInt64List(spread) => into_spread_impl(spread),
SerializedSpread::UnlatchedMap(spread) => into_spread_impl(spread),
SerializedSpread::LatchedStruct(spread) => into_spread_impl(spread),
SerializedSpread::UnlatchedStruct(spread) => into_spread_impl(spread),
SerializedSpread::UnlatchedFallback(spread) => into_spread_impl(spread),
}
}
}
Expand Down Expand Up @@ -347,19 +344,7 @@ impl Spread {
Box::new(StructSpread::try_new_unlatched(fields)?)
}
}
DataType::Map(_, _) => {
anyhow::ensure!(!latched, "Latched map spread not supported");
Box::new(UnlatchedMapSpread)
}
DataType::List(field) => {
anyhow::ensure!(!latched, "Latched list spread not supported");
anyhow::ensure!(
field.data_type() == &DataType::UInt64,
"Unsupported type {:?} for list spread",
field.data_type()
);
Box::new(UnlatchedUInt64ListSpread)
}
_ if !latched => Box::new(UnlatchedFallbackSpread),
unsupported => anyhow::bail!(
"Unsupported type for spread: {:?} (latched = {latched})",
unsupported
Expand Down Expand Up @@ -1676,132 +1661,33 @@ pub(super) fn bit_run_iterator(
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct UnlatchedUInt64ListSpread;
struct UnlatchedFallbackSpread;

impl ToSerializedSpread for UnlatchedUInt64ListSpread {
impl ToSerializedSpread for UnlatchedFallbackSpread {
fn to_serialized_spread(&self) -> SerializedSpread<'_> {
SerializedSpread::UnlatchedUInt64List(Boo::Borrowed(self))
SerializedSpread::UnlatchedFallback(Boo::Borrowed(self))
}
}

impl SpreadImpl for UnlatchedUInt64ListSpread {
impl SpreadImpl for UnlatchedFallbackSpread {
fn spread_signaled(
&mut self,
grouping: &GroupingIndices,
values: &ArrayRef,
signal: &BooleanArray,
) -> anyhow::Result<ArrayRef> {
// This is a little tricky. Since we're an unlatched spread, we don't
// need to spread the underlying values (each item in the list will be
// referenced exactly once). Instead, we need to spread out the offset
// array.

let values = values.as_ref().as_list();
let mut offset_builder = Int32BufferBuilder::new(grouping.len() + 1);

let mut null_builder = BooleanBufferBuilder::new(grouping.len());

// Ensure the buffers are aligned to the offset.
offset_builder.append_n_zeroed(values.offset());
null_builder.append_n(values.offset(), false);

let mut offset_iter = values.value_offsets().iter();
let mut offset = *offset_iter.next().context("missing offset")?;
offset_builder.append(offset);

let mut index = 0;
let mut indices = Int32Builder::with_capacity(grouping.len());
let mut next_index = 0;
for signal in signal.iter() {
if matches!(signal, Some(true)) {
offset = *offset_iter.next().context("missing offset")?;
null_builder.append(values.is_valid(index));
index += 1;
if signal.unwrap_or(false) {
indices.append_value(next_index);
next_index += 1;
} else {
null_builder.append(false);
indices.append_null();
}
offset_builder.append(offset);
}

let data_builder = values.to_data().into_builder();
let offset = offset_builder.finish();
let array_data = data_builder
.len(grouping.len())
.null_bit_buffer(Some(null_builder.finish().into_inner()))
.buffers(vec![offset])
.build()?;
let result = ListArray::from(array_data);

Ok(Arc::new(result))
}

fn spread_true(
&mut self,
grouping: &GroupingIndices,
values: &ArrayRef,
) -> anyhow::Result<ArrayRef> {
anyhow::ensure!(grouping.len() == values.len());
Ok(values.clone())
}

fn spread_false(
&mut self,
grouping: &GroupingIndices,
value_type: &DataType,
) -> anyhow::Result<ArrayRef> {
Ok(new_null_array(value_type, grouping.len()))
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct UnlatchedMapSpread;

impl ToSerializedSpread for UnlatchedMapSpread {
fn to_serialized_spread(&self) -> SerializedSpread<'_> {
SerializedSpread::UnlatchedMap(Boo::Borrowed(self))
}
}

impl SpreadImpl for UnlatchedMapSpread {
fn spread_signaled(
&mut self,
grouping: &GroupingIndices,
values: &ArrayRef,
signal: &BooleanArray,
) -> anyhow::Result<ArrayRef> {
let map_values = downcast_map_array(values.as_ref())?;

let mut offset_builder = Int32BufferBuilder::new(grouping.len() + 1);
let mut null_builder = BooleanBufferBuilder::new(grouping.len());

// Ensure the buffers are aligned to the offset.
offset_builder.append_n_zeroed(values.offset());
null_builder.append_n(values.offset(), false);

let mut offset_iter = map_values.value_offsets().iter();
let mut offset = *offset_iter.next().context("missing offset")?;
offset_builder.append(offset);

let mut index = 0;
for signal in signal.iter() {
if matches!(signal, Some(true)) {
offset = *offset_iter.next().context("missing offset")?;
null_builder.append(values.is_valid(index));
index += 1;
} else {
null_builder.append(false);
}
offset_builder.append(offset);
}

let data_builder = values.to_data().into_builder();
let offset = offset_builder.finish();
let array_data = data_builder
.len(grouping.len())
.null_bit_buffer(Some(null_builder.finish().into_inner()))
.buffers(vec![offset])
.build()?;
let result = MapArray::from(array_data);

Ok(Arc::new(result))
let indices = indices.finish();
arrow::compute::take(values.as_ref(), &indices, None).context("failed to take values")
}

fn spread_true(
Expand Down

0 comments on commit a547f9c

Please sign in to comment.