Skip to content

Commit

Permalink
feat: hack collect trailing with shift by (#658)
Browse files Browse the repository at this point in the history
Attempts to fix the behavior of trailing windows in collect by forcing a
merge with a shifted message to the next `duration`, which clears out
the window of the previous event.
  • Loading branch information
jordanrfrazier authored Aug 15, 2023
1 parent 67e5a58 commit 0de6eab
Show file tree
Hide file tree
Showing 31 changed files with 272 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ pub struct CollectToken<T>
where
T: Clone,
T: Serialize + DeserializeOwned,
Vec<VecDeque<Option<T>>>: Serialize + DeserializeOwned,
Vec<VecDeque<T>>: Serialize + DeserializeOwned,
{
state: Vec<VecDeque<Option<T>>>,
state: Vec<VecDeque<T>>,
/// Stores the times of the state values.
///
/// Comprised of lists of timestamps for each entity.
Expand All @@ -35,7 +35,7 @@ where
}
}

pub fn add_value(&mut self, max: usize, index: usize, input: Option<T>) {
pub fn add_value(&mut self, max: usize, index: usize, input: T) {
self.state[index].push_back(input);
if self.state[index].len() > max {
self.state[index].pop_front();
Expand All @@ -47,7 +47,7 @@ where
&mut self,
max: usize,
index: usize,
input: Option<T>,
input: T,
time: i64,
window_duration: i64,
) {
Expand All @@ -59,19 +59,29 @@ where
}
debug_assert_eq!(self.times[index].len(), self.state[index].len());

self.check_time(index, time, window_duration)
}

/// Pops all values and times that are outside of the window
pub fn check_time(&mut self, index: usize, time: i64, window_duration: i64) {
debug_assert_eq!(self.times[index].len(), self.state[index].len());
let min_time = time - window_duration;

// safety: we just added a time that can't be less than the min time
// and max is always greater than 0.
let mut front = self.times[index].front().unwrap();
while *front < min_time {
self.state[index].pop_front();
self.times[index].pop_front();
front = self.times[index].front().unwrap();
if let Some(mut front) = self.times[index].front() {
while *front <= min_time {
self.state[index].pop_front();
self.times[index].pop_front();

if let Some(f) = self.times[index].front() {
front = f
} else {
break;
}
}
}
}

pub fn state(&self, index: usize) -> &VecDeque<Option<T>> {
pub fn state(&self, index: usize) -> &VecDeque<T> {
&self.state[index]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ impl CollectBooleanEvaluator {
let entity_index = *entity_index as usize;

// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value(self.max, entity_index, input);
}

let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -158,14 +158,14 @@ impl CollectBooleanEvaluator {

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value(self.max, entity_index, input);
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -208,20 +208,22 @@ impl CollectBooleanEvaluator {

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value_with_time(
self.max,
entity_index,
input,
*input_time,
duration,
);
} else {
self.token.check_time(entity_index, *input_time, duration);
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ where
let entity_index = *entity_index as usize;

// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value(self.max, entity_index, input);
}

let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -180,14 +180,14 @@ where

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value(self.max, entity_index, input);
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -230,20 +230,22 @@ where

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value_with_time(
self.max,
entity_index,
input,
*input_time,
duration,
);
} else {
self.token.check_time(entity_index, *input_time, duration);
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.iter().copied());
list_builder.append_value(cur_list.iter().map(|i| Some(*i)));
} else {
list_builder.append_null();
}
Expand Down
20 changes: 11 additions & 9 deletions crates/sparrow-instructions/src/evaluators/list/collect_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ impl CollectStringEvaluator {
let entity_index = *entity_index as usize;

// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token
.add_value(self.max, entity_index, input.map(|s| s.to_owned()));
.add_value(self.max, entity_index, input.to_owned());
}

let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.clone());
list_builder.append_value(cur_list.iter().map(Some));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -159,15 +159,15 @@ impl CollectStringEvaluator {

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token
.add_value(self.max, entity_index, input.map(|s| s.to_owned()));
.add_value(self.max, entity_index, input.to_owned());
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.clone());
list_builder.append_value(cur_list.iter().map(Some));
} else {
list_builder.append_null();
}
Expand Down Expand Up @@ -210,20 +210,22 @@ impl CollectStringEvaluator {

// Update state
// Do not collect null values
if input.is_some() {
if let Some(input) = input {
self.token.add_value_with_time(
self.max,
entity_index,
input.map(|s| s.to_owned()),
input.to_owned(),
*input_time,
duration,
);
} else {
self.token.check_time(entity_index, *input_time, duration);
}

// Emit state
let cur_list = self.token.state(entity_index);
if cur_list.len() >= self.min {
list_builder.append_value(cur_list.clone());
list_builder.append_value(cur_list.iter().map(Some));
} else {
list_builder.append_null();
}
Expand Down
65 changes: 46 additions & 19 deletions crates/sparrow-instructions/src/evaluators/list/collect_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,35 +475,33 @@ impl CollectStructEvaluator {
// entity take indices
for (index, entity_index) in entity_indices.values().iter().enumerate() {
// Update state
let take_index = (old_state_flat.len() + index) as u32;
if input.is_valid(index) {
let take_index = (old_state_flat.len() + index) as u32;
entity_take_indices
.entry(*entity_index)
.and_modify(|v| {
v.push_back(take_index);
if v.len() > max {
v.pop_front();
}

// Iterates over the front of the vec and pops off any values that
// exist prior to the start of the current window
if let Some(front_i) = v.front() {
let mut oldest_time = combined_times.value(*front_i as usize);
// Note this uses the `combined_times` and `take_index`
// because it's possible we need to pop off new input
let min_window_start =
combined_times.value(take_index as usize) - duration;
while oldest_time < min_window_start {
v.pop_front();
// Safety: we know there's elements in the vec, and we can't
// have popped the last element because we just added one at a time
// greater than the `min_window_start`.
let front_i = v.front().unwrap();
oldest_time = combined_times.value(*front_i as usize);
}
}
})
.or_insert(vec![take_index].into());

pop_trailing_window_if_needed(
take_index as usize,
*entity_index,
&mut entity_take_indices,
combined_times,
duration,
);
} else {
pop_trailing_window_if_needed(
take_index as usize,
*entity_index,
&mut entity_take_indices,
combined_times,
duration,
);
}

// safety: map was resized to handle entity_index size
Expand Down Expand Up @@ -553,6 +551,35 @@ impl CollectStructEvaluator {
}
}

/// Pops the front element(s) from the window if time has progressed
/// past the window's duration.
fn pop_trailing_window_if_needed(
take_index: usize,
entity_index: u32,
entity_take_indices: &mut BTreeMap<u32, VecDeque<u32>>,
combined_times: &arrow::array::PrimitiveArray<arrow::datatypes::TimestampNanosecondType>,
duration: i64,
) {
let v = entity_take_indices
.entry(entity_index)
.or_insert(vec![].into());

if let Some(front_i) = v.front() {
let mut oldest_time = combined_times.value(*front_i as usize);
// Note this uses the `combined_times` and `take_index`
// because it's possible we need to pop off new input
let min_window_start = combined_times.value(take_index) - duration;
while oldest_time <= min_window_start {
v.pop_front();
if let Some(f_i) = v.front() {
oldest_time = combined_times.value(*f_i as usize);
} else {
return;
}
}
}
}

/// Uses the final entity take indices to get the new state
fn update_token_state(
entity_take_indices: &BTreeMap<u32, VecDeque<u32>>,
Expand Down
18 changes: 16 additions & 2 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,9 +1149,23 @@ def _aggregation(
raise NotImplementedError(
f"Aggregation '{op} does not support trailing windows"
)

trailing_ns = int(window.duration.total_seconds() * 1e9)
# HACK: Use null predicate and number of nanoseconds to encode trailing windows.
return Timestream._call(op, input, *args, None, trailing_ns)

# Create the shifted-forward input
input_shift = input.shift_by(window.duration)

# Merge, then extract just the input.
#
# Note: Assumes the "input" is discrete. Can probably
# use a transform to make it discrete (eg., `input.filter(True)`)
# or a special function to do that.
#
# HACK: This places an extra null row in the input to `collect`
# which allows us to "clear" the window when the appropriate
# `duration` has passed with no "real" inputs.
merged_input = record({ "input": input, "shift": input_shift }).col("input")
return Timestream._call("collect", merged_input, *args, None, trailing_ns)
else:
raise NotImplementedError(f"Unknown window type {window!r}")

Expand Down
1 change: 1 addition & 0 deletions python/pytests/collect_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def source() -> kd.sources.CsvString:
"1996-12-19T16:40:00,A,,9,,false",
"1996-12-19T16:40:01,A,12,,e,false",
"1996-12-19T16:40:02,A,,,f,true",
"1996-12-19T16:40:04,A,,,f,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m":[5,17],"n":9.0,"collect_n":[10,6,9]}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"collect_m":[5,17,12],"n":null,"collect_n":[10,6,9]}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m":[5,17,12],"n":null,"collect_n":[10,6,9]}
{"_time":"1996-12-19T16:40:04.000","_subsort":6,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m":[5,17,12],"n":null,"collect_n":[10,6,9]}
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","b":true,"collect_b":[true]}
{"_time":"1996-12-19T16:39:58.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","b":true,"collect_b":[true]}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[true]}
{"_time":"1996-12-19T16:39:59.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:00.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","b":false,"collect_b":[false]}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","b":false,"collect_b":[false,false]}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","b":true,"collect_b":[false,true]}
{"_time":"1996-12-19T16:40:01.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","b":false,"collect_b":[false]}
{"_time":"1996-12-19T16:40:02.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","b":true,"collect_b":[true]}
{"_time":"1996-12-19T16:40:03.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:04.000","_subsort":6,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
{"_time":"1996-12-19T16:40:05.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","b":null,"collect_b":[]}
Loading

0 comments on commit 0de6eab

Please sign in to comment.