Skip to content

Commit

Permalink
feat: sparrow py collect function (#595)
Browse files Browse the repository at this point in the history
support `collect` function in sparrow-py

---------

Co-authored-by: Ben Chambers <[email protected]>
  • Loading branch information
jordanrfrazier and bjchambers authored Aug 3, 2023
1 parent 5be52a0 commit ed026ae
Show file tree
Hide file tree
Showing 22 changed files with 191 additions and 57 deletions.
25 changes: 19 additions & 6 deletions crates/sparrow-compiler/src/ast_to_dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,14 +557,27 @@ pub fn add_to_dfg(
//
// TODO: Flattening the window arguments is hacky and confusing. We should instead
// incorporate the tick directly into the function containing the window.
dfg.enter_env();
dfg.bind("$condition_input", args[1].inner().clone());
let window_arg = original_ast.map(|e| &e.args()[2]);
let (condition, duration) = match window_arg {
Some(window) => {
dfg.enter_env();
dfg.bind("$condition_input", args[1].inner().clone());

let window = &original_ast.unwrap().args()[2];
let (condition, duration) =
flatten_window_args_if_needed(window, dfg, data_context, diagnostics)?;
let result =
flatten_window_args_if_needed(window, dfg, data_context, diagnostics)?;
dfg.exit_env();
result
}
None => {
// If `expr` is None, we're running the Python builder code,
// which already flattened things.
//
// Note that this won't define the `condition_input` for the
// purposes of ticks.
(args[2].clone(), args[3].clone())
}
};

dfg.exit_env();
// [max, input, condition, duration]
vec![args[0].clone(), args[1].clone(), condition, duration]
} else if function.name() == "when" || function.name() == "if" {
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-compiler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Dfg {
);
}

// 2. The number of argflatten_uments should be correct.
// 2. The number of args should be correct.
match expr {
Expression::Literal(_) | Expression::LateBound(_) => {
anyhow::ensure!(
Expand Down
3 changes: 3 additions & 0 deletions crates/sparrow-compiler/src/functions/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub(super) fn register(registry: &mut Registry) {

registry
.register("collect<T: any>(const max: i64, input: T, window: window = null) -> list<T>")
.with_dfg_signature(
"collect<T: any>(const max: i64, input: T, window: bool = null, duration: i64 = null) -> list<T>",
)
.with_implementation(Implementation::Instruction(InstOp::Collect))
.set_internal();
}
2 changes: 1 addition & 1 deletion sparrow-py/pysrc/sparrow_py/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Kaskada query builder and local executon engine."""
from typing import Dict
from typing import Union
from typing import List
from typing import Union

from ._windows import SinceWindow
from ._windows import SlidingWindow
Expand Down
39 changes: 34 additions & 5 deletions sparrow-py/pysrc/sparrow_py/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ def is_not_null(self) -> "Expr":
"""Return a boolean expression indicating if the expression is not null."""
return Expr.call("is_valid", self)

def collect(
self, max: Optional[int] = None, window: Optional[Window] = None
) -> "Expr":
return _aggregation("collect", self, window, max)

def sum(self, window: Optional[Window] = None) -> "Expr":
"""Return the sum aggregation of the expression."""
return _aggregation("sum", self, window)
Expand All @@ -312,13 +317,37 @@ def run_to_csv_string(self) -> str:
return self.run().to_csv(index=False)


def _aggregation(op: str, input: Expr, window: Optional[Window]) -> Expr:
"""Create an aggregation with the given operation and input."""
def _aggregation(
op: str, input: Expr, window: Optional[Window], *args: Optional[Arg]
) -> Expr:
"""
Creates an aggregation.
Parameters
----------
op : str
The operation to create.
input : Expr
The input to the expression.
window : Optional[Window]
The window to use for the aggregation.
*args : Optional[Arg]
Additional arguments to provide before `input` and the flattened window.
Returns
-------
The resulting expression.
"""

# Note: things would be easier if we had a more normal order, which
# we could do as part of "aligning" Sparrow signatures to the new direction.
# However, `collect` currently has `collect(max, input, window)`, requiring
# us to add the *args like so.
if window is None:
return Expr.call(op, input, None, None)
return Expr.call(op, *args, input, None, None)
elif isinstance(window, SinceWindow):
return Expr.call(op, input, window._predicate, 1)
return Expr.call(op, *args, input, window._predicate, None)
elif isinstance(window, SlidingWindow):
return Expr.call(op, input, window._predicate, window._duration)
return Expr.call(op, *args, input, window._predicate, window._duration)
else:
raise ValueError(f"Unknown window type {window!r}")
61 changes: 61 additions & 0 deletions sparrow-py/pytests/collect_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Tests for the collect function."""
import pytest
from sparrow_py import SinceWindow
from sparrow_py import SlidingWindow
from sparrow_py import record
from sparrow_py.sources import CsvSource


@pytest.fixture(scope="module")
def source() -> CsvSource:
"""Create an empty table for testing."""
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57-08:00,A,5,10",
"1996-12-19T16:39:58-08:00,B,24,3",
"1996-12-19T16:39:59-08:00,A,17,6",
"1996-12-19T16:40:00-08:00,A,,9",
"1996-12-19T16:40:01-08:00,A,12,",
"1996-12-19T16:40:02-08:00,A,,",
]
)
return CsvSource("time", "key", content)


def test_collect_basic(source, golden) -> None:
"""Test we can collect values to a list"""
m = source["m"]
n = source["n"]
golden(
record(
{
"m": m,
"collect_m": m.collect(max=None),
"n": n,
"collect_n": n.collect(max=None),
}
)
)


def test_collect_with_max(source, golden) -> None:
"""Test we can collect values to a list with a max"""
m = source["m"]
n = source["n"]
golden(
record(
{
"m": m,
"collect_m_max_2": m.collect(max=2),
"n": n,
"collect_n_max_2": n.collect(max=2),
}
)
)


def test_collect_since_window(source, golden) -> None:
"""Test we can collect values to a list in a since window"""
m = source["m"]
golden(record({"m": m, "since_m": m.sum(window=SinceWindow(m > 10))}))
20 changes: 18 additions & 2 deletions sparrow-py/pytests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@ def golden(request: pytest.FixtureRequest, pytestconfig: pytest.Config):

def handler(
query: sparrow_py.Expr,
format: Union[Literal["csv"], Literal["parquet"]] = "csv",
format: Union[Literal["csv"], Literal["parquet"], Literal["json"]] = "json",
):
"""Check query results against a golden file."""
"""
Check query results against a golden file.
Parameters
----------
query : sparrow_py.Expr
The query to run.
format : str, optional
The format to store the golden file in.
Defaults to "json".
"""
nonlocal output

df = query.run()
Expand All @@ -51,6 +61,8 @@ def handler(
df.to_csv(filename, index=False)
elif format == "parquet":
df.to_parquet(filename)
elif format == "json":
df.to_json(filename, orient="records", lines=True)
else:
raise ValueError(f"Unknown format {format}")
else:
Expand All @@ -69,6 +81,10 @@ def handler(
correct = pd.read_csv(filename, dtype=dtypes, parse_dates=parse_dates)
elif format == "parquet":
correct = pd.read_parquet(filename)
elif format == "json":
correct = pd.read_json(
filename, orient="records", lines=True, dtype=df.dtypes.to_dict()
)
else:
raise ValueError(f"Unknown format {format}")
pd.testing.assert_frame_equal(df, correct)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"sum_m":5,"n":10.0,"sum_n":10}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"sum_m":24,"n":3.0,"sum_n":3}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"sum_m":22,"n":6.0,"sum_n":16}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"sum_m":22,"n":9.0,"sum_n":25}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"sum_m":34,"n":null,"sum_n":25}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"sum_m":34,"n":null,"sum_n":25}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"sum_m":5,"n":10.0,"sum_n":10}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"sum_m":24,"n":3.0,"sum_n":3}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"sum_m":22,"n":6.0,"sum_n":16}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"sum_m":22,"n":9.0,"sum_n":25}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"sum_m":34,"n":null,"sum_n":25}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"sum_m":34,"n":null,"sum_n":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"collect_m":[5.0],"n":10.0,"collect_n":[10.0]}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"collect_m":[24.0],"n":3.0,"collect_n":[3.0]}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"collect_m":[5.0,17.0],"n":6.0,"collect_n":[10.0,6.0]}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m":[5.0,17.0,null],"n":9.0,"collect_n":[10.0,6.0,9.0]}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"collect_m":[5.0,17.0,null,12.0],"n":null,"collect_n":[10.0,6.0,9.0,null]}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m":[5.0,17.0,null,12.0,null],"n":null,"collect_n":[10.0,6.0,9.0,null,null]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"since_m":5.0}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"since_m":24.0}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"since_m":22.0}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"since_m":null}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"since_m":12.0}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"since_m":null}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"collect_m_max_2":[5.0],"n":10.0,"collect_n_max_2":[10.0]}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"collect_m_max_2":[24.0],"n":3.0,"collect_n_max_2":[3.0]}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"collect_m_max_2":[5.0,17.0],"n":6.0,"collect_n_max_2":[10.0,6.0]}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m_max_2":[17.0,null],"n":9.0,"collect_n_max_2":[6.0,9.0]}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"collect_m_max_2":[null,12.0],"n":null,"collect_n_max_2":[9.0,null]}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"collect_m_max_2":[12.0,null],"n":null,"collect_n_max_2":[null,null]}
7 changes: 0 additions & 7 deletions sparrow-py/pytests/golden/math_test/test_math_int64.csv

This file was deleted.

6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/math_test/test_math_int64.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"n":10.0,"add":15.0,"sub":-5.0}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"n":3.0,"add":27.0,"sub":21.0}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"n":6.0,"add":23.0,"sub":11.0}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"n":9.0,"add":null,"sub":null}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"n":null,"add":null,"sub":null}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"n":null,"add":null,"sub":null}
7 changes: 0 additions & 7 deletions sparrow-py/pytests/golden/record_test/test_extend_record.csv

This file was deleted.

6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/record_test/test_extend_record.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","add":15.0,"time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"n":10.0}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","add":27.0,"time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"n":3.0}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","add":23.0,"time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"n":6.0}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","add":null,"time":"1996-12-19T16:40:00-08:00","key":"A","m":null,"n":9.0}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","add":null,"time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0,"n":null}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","add":null,"time":"1996-12-19T16:40:02-08:00","key":"A","m":null,"n":null}
7 changes: 0 additions & 7 deletions sparrow-py/pytests/golden/record_test/test_record.csv

This file was deleted.

6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/record_test/test_record.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"n":10.0}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"n":3.0}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"n":6.0}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"n":9.0}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"n":null}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"n":null}
7 changes: 0 additions & 7 deletions sparrow-py/pytests/golden/record_test/test_select_record.csv

This file was deleted.

6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/record_test/test_select_record.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0}
{"_time":851042398000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0}
{"_time":851042399000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0}
{"_time":851042400000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null}
{"_time":851042401000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0}
{"_time":851042402000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null}

0 comments on commit ed026ae

Please sign in to comment.