Skip to content

Commit

Permalink
refactor(rust): Trim sliced-out memory from ListArrays in list arithm…
Browse files Browse the repository at this point in the history
…etic (#19276)
  • Loading branch information
nameexhaustion authored Oct 17, 2024
1 parent 26e4a53 commit 8cb6539
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 51 deletions.
43 changes: 43 additions & 0 deletions crates/polars-arrow/src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,49 @@ impl<O: Offset> ListArray<O> {
impl_sliced!();
impl_mut_validity!();
impl_into_array!();

pub fn trim_to_normalized_offsets_recursive(&self) -> Self {
let offsets = self.offsets();
let values = self.values();

let first_idx = *offsets.first();
let len = offsets.range().to_usize();

if first_idx.to_usize() == 0 && values.len() == len {
return self.clone();
}

let offsets = if first_idx.to_usize() == 0 {
offsets.clone()
} else {
let v = offsets.iter().map(|x| *x - first_idx).collect::<Vec<_>>();
unsafe { OffsetsBuffer::<O>::new_unchecked(v.into()) }
};

let values = values.sliced(first_idx.to_usize(), len);

let values = match values.dtype() {
ArrowDataType::List(_) => {
let inner: &ListArray<i32> = values.as_ref().as_any().downcast_ref().unwrap();
Box::new(inner.trim_to_normalized_offsets_recursive()) as Box<dyn Array>
},
ArrowDataType::LargeList(_) => {
let inner: &ListArray<i64> = values.as_ref().as_any().downcast_ref().unwrap();
Box::new(inner.trim_to_normalized_offsets_recursive()) as Box<dyn Array>
},
_ => values,
};

assert_eq!(offsets.first().to_usize(), 0);
assert_eq!(values.len(), offsets.range().to_usize());

Self::new(
self.dtype().clone(),
offsets,
values,
self.validity().cloned(),
)
}
}

// Accessors
Expand Down
54 changes: 8 additions & 46 deletions crates/polars-core/src/chunked_array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,6 @@ impl ListChunked {
}
}

/// Returns an iterator over the offsets of this chunked array.
///
/// The offsets are returned as though the array consisted of a single chunk.
pub fn iter_offsets(&self) -> impl Iterator<Item = i64> + '_ {
let mut offsets = self.downcast_iter().map(|arr| arr.offsets().iter());
let first_iter = offsets.next().unwrap();

// The first offset doesn't have to be 0, it can be sliced to `n` in the array.
// So we must correct for this.
let correction = first_iter.clone().next().unwrap();

OffsetsIterator {
current_offsets_iter: first_iter,
current_adjusted_offset: 0,
offset_adjustment: -correction,
offsets_iters: offsets,
}
}

/// Ignore the list indices and apply `func` to the inner type as [`Series`].
pub fn apply_to_inner(
&self,
Expand Down Expand Up @@ -110,33 +91,14 @@ impl ListChunked {
)
})
}
}

pub struct OffsetsIterator<'a, N>
where
N: Iterator<Item = std::slice::Iter<'a, i64>>,
{
offsets_iters: N,
current_offsets_iter: std::slice::Iter<'a, i64>,
current_adjusted_offset: i64,
offset_adjustment: i64,
}

impl<'a, N> Iterator for OffsetsIterator<'a, N>
where
N: Iterator<Item = std::slice::Iter<'a, i64>>,
{
type Item = i64;

fn next(&mut self) -> Option<Self::Item> {
if let Some(offset) = self.current_offsets_iter.next() {
self.current_adjusted_offset = offset + self.offset_adjustment;
Some(self.current_adjusted_offset)
} else {
self.current_offsets_iter = self.offsets_iters.next()?;
let first = self.current_offsets_iter.next().unwrap();
self.offset_adjustment = self.current_adjusted_offset - first;
self.next()
}
pub fn rechunk_and_trim_to_normalized_offsets(&self) -> Self {
Self::with_chunk(
self.name().clone(),
self.rechunk()
.downcast_get(0)
.unwrap()
.trim_to_normalized_offsets_recursive(),
)
}
}
12 changes: 7 additions & 5 deletions crates/polars-core/src/series/arithmetic/list_borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ impl NumericListOp {
feature_gated!("list_arithmetic", {
use either::Either;

// Ideally we only need to rechunk the leaf array, but getting the
// list offsets of a ListChunked triggers a rechunk anyway, so we just
// do it here.
let lhs = lhs.rechunk();
let rhs = rhs.rechunk();
// `trim_to_normalized_offsets` ensures we don't perform excessive
// memory allocation / compute on memory regions that have been
// sliced out.
let lhs = lhs.list_rechunk_and_trim_to_normalized_offsets();
let rhs = rhs.list_rechunk_and_trim_to_normalized_offsets();

let binary_op_exec = match BinaryListNumericOpHelper::try_new(
self.clone(),
Expand All @@ -58,10 +58,12 @@ impl NumericListOp {
rhs.len(),
{
let (a, b) = lhs.list_offsets_and_validities_recursive();
debug_assert!(a.iter().all(|x| *x.first() as usize == 0));
(a, b, lhs.clone())
},
{
let (a, b) = rhs.list_offsets_and_validities_recursive();
debug_assert!(a.iter().all(|x| *x.first() as usize == 0));
(a, b, rhs.clone())
},
lhs.rechunk_validity(),
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-core/src/series/ops/reshape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ impl Series {
(offsets, validities)
}

/// For ListArrays, recursively normalizes the offsets to begin from 0, and
/// slices excess length from the values array.
pub fn list_rechunk_and_trim_to_normalized_offsets(&self) -> Self {
if let Some(ca) = self.try_list() {
ca.rechunk_and_trim_to_normalized_offsets().into_series()
} else {
self.rechunk()
}
}

/// Convert the values of this Series to a ListChunked with a length of 1,
/// so a Series of `[1, 2, 3]` becomes `[[1, 2, 3]]`.
pub fn implode(&self) -> PolarsResult<ListChunked> {
Expand Down

0 comments on commit 8cb6539

Please sign in to comment.