Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow periodic filtering of aggregations using a tumbling window #758

Merged
merged 13 commits into from
Sep 25, 2023
6 changes: 2 additions & 4 deletions crates/sparrow-compiler/src/ast_to_dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ pub fn add_to_dfg(
// If `expr` is None, we're running the Python builder code,
// which already flattened things.
//
// Note that this won't define the `condition_input` for the
// purposes of ticks.
// For ticks, the `input` has already been bound as a python rewrite.
(args[1].clone(), args[2].clone())
}
};
Expand Down Expand Up @@ -599,8 +598,7 @@ pub fn add_to_dfg(
// If `expr` is None, we're running the Python builder code,
// which already flattened things.
//
// Note that this won't define the `condition_input` for the
// purposes of ticks.
// For ticks, the `input` has already been bound as a python rewrite.
(args[3].clone(), args[4].clone())
}
};
Expand Down
16 changes: 13 additions & 3 deletions crates/sparrow-compiler/src/plan/interpolations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ impl Interpolations {
// TimeOf should always produce a discrete value
Interpolation::Null
}
StepKind::Expression(_) => infer_interpolation(&node_interpolations, children),
StepKind::Transform => infer_interpolation(&node_interpolations, children),
StepKind::Expression(_) => {
// Expressions skip the operations (last child) when inferring interpolation
// as they occur after the operation is applied.
infer_interpolation(&node_interpolations, &children[0..children.len() - 1])
}
StepKind::Transform => {
// Transforms include the operation since they apply the operation to the input.
// e.g. (transform value select_op) - if `value` is `as-of`, the
// interpolation of the result is still `null` due to `select_op` being
// `null`.
infer_interpolation(&node_interpolations, children)
}
StepKind::Window(_) => {
anyhow::bail!("Window arguments should be flattened in the DFG")
}
Expand All @@ -87,7 +97,7 @@ fn infer_interpolation(
node_interpolations: &[Interpolation],
children: &[egg::Id],
) -> Interpolation {
let mut interpolations = children[0..children.len() - 1]
let mut interpolations = children
.iter()
.map(|child| node_interpolations[usize::from(*child)]);
if interpolations.all(|i| i == Interpolation::AsOf) {
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-compiler/src/plan/transform_to_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a> TransformToPlan<'a> {
.get_expression_type(producing_operation_index, producing_expression_index)?
.clone();

let interpolation = plan_builder.interpolations.interpolation(value);
let interpolation = plan_builder.interpolations.interpolation(transform_id);

let direct_input = self.get_direct_input(
plan_builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ operations:
Input:
producing_operation: 0
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
- arguments:
Expand Down Expand Up @@ -214,7 +214,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 1
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ operations:
Input:
producing_operation: 0
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 8
- arguments:
Expand Down Expand Up @@ -214,7 +214,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 1
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 10
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ operations:
Input:
producing_operation: 2
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ operations:
Input:
producing_operation: 2
input_column: 4
interpolation: 2
interpolation: 1
column:
ProducerExpression: 4
- arguments: []
Expand Down Expand Up @@ -388,7 +388,7 @@ operations:
Input:
producing_operation: 5
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 6
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 7
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ operations:
Input:
producing_operation: 0
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 9
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ operations:
Input:
producing_operation: 3
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 5
operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ operations:
Input:
producing_operation: 1
input_column: 3
interpolation: 2
interpolation: 1
column:
ProducerExpression: 9
operator:
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-main/tests/e2e/collect_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ async fn test_collect_struct_since_hourly() {
1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,true,false,
1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,false,true
1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B,,false,,true,false,true
1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true
1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,,,
1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,,,
1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,true,true,,,,
1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,,,
1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,,
Expand Down
16 changes: 15 additions & 1 deletion python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,10 @@ def _aggregation(
predicate = window.predicate
if callable(predicate):
predicate = predicate(input)
return Timestream._call(op, input, *args, predicate, window.duration)
# Sliding windows produce non-cumulative values, hence the filter at the end.
return Timestream._call(op, input, *args, predicate, window.duration).filter(
predicate
)
elif isinstance(window, kd.windows.Trailing):
if op != "collect":
raise NotImplementedError(
Expand All @@ -1255,6 +1258,17 @@ def _aggregation(
# `duration` has passed with no "real" inputs.
merged_input = record({"input": input, "shift": input_shift}).col("input")
return Timestream._call("collect", merged_input, *args, None, trailing_ns)
elif isinstance(window, kd.windows.Tumbling):
# Tumbling windows are analogous to Since windows, aside from output behavior.
# Tumbling windows only emit once per window. However, this behavior is not implemented
# in Sparrow yet, so we hack this by using a Since window with a filter applied afterwards
# with the same predicate. Note this hack is brittle, adds additional work with merging and filters,
# and generally is not how we want to handle new window behaviors.
predicate = window.predicate
if callable(predicate):
predicate = predicate(input)

return Timestream._call(op, input, *args, predicate, None).filter(predicate)
else:
raise NotImplementedError(f"Unknown window type {window!r}")

Expand Down
Loading
Loading