Skip to content

Commit

Permalink
feat: use collect for lag in python builder (#635)
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier authored Aug 10, 2023
1 parent 15c0d14 commit 45deed2
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
8 changes: 5 additions & 3 deletions sparrow-py/pysrc/sparrow_py/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ def filter(self, condition: Timestream) -> Timestream:

def collect(
self,
*,
max: Optional[int],
min: Optional[int] = 0,
window: Optional[kt.windows.Window] = None,
Expand Down Expand Up @@ -708,7 +709,8 @@ def lag(self, n: int) -> Timestream:
Timestream
Timestream containing the value `n` points before each point.
"""
return Timestream._call("lag", n, self)
# hack to support structs/lists (as collect supports lists)
return self.collect(max=n + 1, min=n + 1)[0]

def if_(self, condition: Union[Timestream, Literal]) -> Timestream:
"""
Expand Down Expand Up @@ -875,7 +877,7 @@ def shift_until(self, predicate: Timestream) -> Timestream:
"""
return Timestream._call("shift_until", predicate, self)

def sum(self, window: Optional[kt.windows.Window] = None) -> Timestream:
def sum(self, *, window: Optional[kt.windows.Window] = None) -> Timestream:
"""
Create a Timestream summing the values in the `window`.
Expand All @@ -894,7 +896,7 @@ def sum(self, window: Optional[kt.windows.Window] = None) -> Timestream:
"""
return _aggregation("sum", self, window)

def first(self, window: Optional[kt.windows.Window] = None) -> Timestream:
def first(self, *, window: Optional[kt.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the first value in the `window`.
Expand Down
6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/lag_test/test_lag_list.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"list_m":[5],"lag_list_m":null}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"list_m":[24],"lag_list_m":null}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"list_m":[5,17],"lag_list_m":[5]}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"list_m":[5,17],"lag_list_m":[5,17]}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"list_m":[5,17,12],"lag_list_m":[5,17]}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"list_m":[5,17,12],"lag_list_m":[5,17,12]}
6 changes: 6 additions & 0 deletions sparrow-py/pytests/golden/lag_test/test_lag_struct.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":null,"key":null,"m":null,"n":null}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":null,"key":null,"m":null,"n":null}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57.000","key":"A","m":5.0,"n":10.0}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59.000","key":"A","m":17.0,"n":6.0}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00.000","key":"A","m":null,"n":9.0}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01.000","key":"A","m":12.0,"n":null}
12 changes: 6 additions & 6 deletions sparrow-py/pytests/golden/with_key_test/test_with_key_last.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"_time":851042397000000000,"_subsort":0,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"new_key":"C"}
{"_time":851042398000000000,"_subsort":1,"_key_hash":1021973589662386405,"_key":"D","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"new_key":"D"}
{"_time":851042399000000000,"_subsort":2,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"new_key":"C"}
{"_time":851042400000000000,"_subsort":3,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:00-08:00","key":"A","m":9.0,"new_key":"C"}
{"_time":851042401000000000,"_subsort":4,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0,"new_key":"C"}
{"_time":851042402000000000,"_subsort":5,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:02-08:00","key":"A","m":null,"new_key":"C"}
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:39:57.000","key":"A","m":5.0,"new_key":"C"}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":1021973589662386405,"_key":"D","time":"1996-12-19T16:39:58.000","key":"B","m":24.0,"new_key":"D"}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:39:59.000","key":"A","m":17.0,"new_key":"C"}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:00.000","key":"A","m":9.0,"new_key":"C"}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:01.000","key":"A","m":12.0,"new_key":"C"}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":2521269998124177631,"_key":"C","time":"1996-12-19T16:40:02.000","key":"A","m":null,"new_key":"C"}
17 changes: 17 additions & 0 deletions sparrow-py/pytests/lag_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,20 @@ def test_lag(source, golden) -> None:
}
)
)


def test_lag_struct(source, golden) -> None:
golden.jsonl(source.lag(1))


def test_lag_list(source, golden) -> None:
m = source.col("m")
golden.jsonl(
kt.record(
{
"m": m,
"list_m": m.collect(max=None),
"lag_list_m": m.collect(max=None).lag(1),
}
)
)

0 comments on commit 45deed2

Please sign in to comment.