Skip to content

Commit

Permalink
feat: allow periodic filtering of aggregations using a tumbling window (
Browse files Browse the repository at this point in the history
#758)

tumbling windows are hacked using `filter`. This adds performance
concerns regarding additional merges and filter, but it's okay for now.

Closes #759 
Closes #766
  • Loading branch information
jordanrfrazier authored Sep 25, 2023
1 parent dcc00ea commit 4b8fd2f
Show file tree
Hide file tree
Showing 50 changed files with 330 additions and 160 deletions.
6 changes: 2 additions & 4 deletions crates/sparrow-compiler/src/ast_to_dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ pub fn add_to_dfg(
// If `expr` is None, we're running the Python builder code,
// which already flattened things.
//
// Note that this won't define the `condition_input` for the
// purposes of ticks.
// For ticks, the `input` has already been bound as a python rewrite.
(args[1].clone(), args[2].clone())
}
};
Expand Down Expand Up @@ -599,8 +598,7 @@ pub fn add_to_dfg(
// If `expr` is None, we're running the Python builder code,
// which already flattened things.
//
// Note that this won't define the `condition_input` for the
// purposes of ticks.
// For ticks, the `input` has already been bound as a python rewrite.
(args[3].clone(), args[4].clone())
}
};
Expand Down
16 changes: 13 additions & 3 deletions crates/sparrow-compiler/src/plan/interpolations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ impl Interpolations {
// TimeOf should always produce a discrete value
Interpolation::Null
}
StepKind::Expression(_) => infer_interpolation(&node_interpolations, children),
StepKind::Transform => infer_interpolation(&node_interpolations, children),
StepKind::Expression(_) => {
// Expressions skip the operations (last child) when inferring interpolation
// as they occur after the operation is applied.
infer_interpolation(&node_interpolations, &children[0..children.len() - 1])
}
StepKind::Transform => {
// Transforms include the operation since they apply the operation to the input.
// e.g. (transform value select_op) - if `value` is `as-of`, the
// interpolation of the result is still `null` due to `select_op` being
// `null`.
infer_interpolation(&node_interpolations, children)
}
StepKind::Window(_) => {
anyhow::bail!("Window arguments should be flattened in the DFG")
}
Expand All @@ -87,7 +97,7 @@ fn infer_interpolation(
node_interpolations: &[Interpolation],
children: &[egg::Id],
) -> Interpolation {
let mut interpolations = children[0..children.len() - 1]
let mut interpolations = children
.iter()
.map(|child| node_interpolations[usize::from(*child)]);
if interpolations.all(|i| i == Interpolation::AsOf) {
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-compiler/src/plan/transform_to_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a> TransformToPlan<'a> {
.get_expression_type(producing_operation_index, producing_expression_index)?
.clone();

let interpolation = plan_builder.interpolations.interpolation(value);
let interpolation = plan_builder.interpolations.interpolation(transform_id);

let direct_input = self.get_direct_input(
plan_builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ operations:
Input:
producing_operation: 0
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
- arguments:
Expand Down Expand Up @@ -214,7 +214,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 1
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ operations:
Input:
producing_operation: 0
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
- arguments:
Expand Down Expand Up @@ -214,7 +214,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 1
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 10
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ operations:
Input:
producing_operation: 2
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ operations:
Input:
producing_operation: 2
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 4
- arguments: []
Expand Down Expand Up @@ -388,7 +388,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 7
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 9
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ operations:
Input:
producing_operation: 3
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 9
operator:
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-main/tests/e2e/collect_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ async fn test_collect_struct_since_hourly() {
1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,true,false,
1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,false,true
1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B,,false,,true,false,true
1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true
1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,,,
1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,,,
1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,true,true,,,,
1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,,,
1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,,
Expand Down
16 changes: 15 additions & 1 deletion python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,10 @@ def _aggregation(
predicate = window.predicate
if callable(predicate):
predicate = predicate(input)
return Timestream._call(op, input, *args, predicate, window.duration)
# Sliding windows produce non-cumulative values, hence the filter at the end.
return Timestream._call(op, input, *args, predicate, window.duration).filter(
predicate
)
elif isinstance(window, kd.windows.Trailing):
if op != "collect":
raise NotImplementedError(
Expand All @@ -1283,6 +1286,17 @@ def _aggregation(
# `duration` has passed with no "real" inputs.
merged_input = record({"input": input, "shift": input_shift}).col("input")
return Timestream._call("collect", merged_input, *args, None, trailing_ns)
elif isinstance(window, kd.windows.Tumbling):
# Tumbling windows are analogous to Since windows, aside from output behavior.
# Tumbling windows only emit once per window. However, this behavior is not implemented
# in Sparrow yet, so we hack this by using a Since window with a filter applied afterwards
# with the same predicate. Note this hack is brittle, adds additional work with merging and filters,
# and generally is not how we want to handle new window behaviors.
predicate = window.predicate
if callable(predicate):
predicate = predicate(input)

return Timestream._call(op, input, *args, predicate, None).filter(predicate)
else:
raise NotImplementedError(f"Unknown window type {window!r}")

Expand Down
Loading

0 comments on commit 4b8fd2f

Please sign in to comment.