Skip to content

Commit

Permalink
feat: trailing windows in collect for DataType::Struct (#641)
Browse files Browse the repository at this point in the history
Only adds trailing windows for `DataType::Struct` in `collect()`
function.
  • Loading branch information
jordanrfrazier authored Aug 10, 2023
1 parent 98c3be3 commit 15c0d14
Show file tree
Hide file tree
Showing 29 changed files with 593 additions and 109 deletions.
4 changes: 2 additions & 2 deletions crates/sparrow-arrow/src/concat_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ pub fn concat_take(
array2: &ArrayRef,
indices: &UInt32Array,
) -> anyhow::Result<ArrayRef> {
let combined = arrow::compute::concat(&[array1, array2])?;
Ok(arrow::compute::take(&combined, indices, None)?)
let combined = arrow_select::concat::concat(&[array1, array2])?;
Ok(arrow_select::take::take(&combined, indices, None)?)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use arrow::array::{new_null_array, Array, ArrayRef, AsArray};
use std::sync::Arc;

use arrow::array::{new_empty_array, new_null_array, Array, ArrayRef, AsArray};
use arrow_schema::{DataType, Field, TimeUnit};

use crate::{ComputeStore, StateToken, StoreKey};

Expand All @@ -10,6 +13,14 @@ pub struct CollectStructToken {
/// A [ListArray] comprised of lists of structs for each entity.
#[serde(with = "sparrow_arrow::serde::array_ref")]
pub state: ArrayRef,
/// Stores the times of the state values.
///
/// A [ListArray] comprised of lists of timestamps for each entity.
///
/// This array is only used when we have a `trailing` window.
/// Likely this should be separated into a different implementation.
#[serde(with = "sparrow_arrow::serde::array_ref")]
pub times: ArrayRef,
}

impl StateToken for CollectStructToken {
Expand All @@ -18,7 +29,9 @@ impl StateToken for CollectStructToken {
let state: CollectStructToken = state;
self.state = state.state;
};
Ok(())

// TODO: restore times
panic!("time restoration not implemented")
}

fn store(&self, key: &StoreKey, store: &ComputeStore) -> anyhow::Result<()> {
Expand All @@ -28,19 +41,45 @@ impl StateToken for CollectStructToken {

impl CollectStructToken {
pub fn new(state: ArrayRef) -> Self {
Self { state }
let field_ref = Arc::new(Field::new(
"item",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
));
let times_type = DataType::List(field_ref);
Self {
state,
times: new_empty_array(&times_type),
}
}

pub fn new_with_time(state: ArrayRef, times: ArrayRef) -> Self {
Self { state, times }
}

pub fn resize(&mut self, len: usize) -> anyhow::Result<()> {
let diff = len - self.state.len();

// Resize the state
let null_array = new_null_array(self.state.data_type(), diff);
let null_array = null_array.as_ref().as_list::<i32>();
let new_state = arrow::compute::concat(&[&self.state, null_array])?;
self.state = new_state.clone();

// Resize the times
let null_array = new_null_array(self.times.data_type(), diff);
let null_array = null_array.as_ref().as_list::<i32>();
let new_times = arrow::compute::concat(&[&self.times, null_array])?;
self.times = new_times.clone();

Ok(())
}

pub fn set_state_and_time(&mut self, new_state: ArrayRef, new_times: ArrayRef) {
self.state = new_state;
self.times = new_times;
}

pub fn set_state(&mut self, new_state: ArrayRef) {
self.state = new_state;
}
Expand Down
Loading

0 comments on commit 15c0d14

Please sign in to comment.