diff --git a/python/docs/source/reference/timestream/aggregation.md b/python/docs/source/reference/timestream/aggregation.md index 3511f21c3..c92b7de79 100644 --- a/python/docs/source/reference/timestream/aggregation.md +++ b/python/docs/source/reference/timestream/aggregation.md @@ -16,6 +16,7 @@ Windowed: :toctree: ../apidocs/ Timestream.collect + Timestream.count Timestream.first Timestream.last Timestream.sum diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index b24d4db38..3ea997769 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -993,6 +993,26 @@ def last(self, window: Optional[kd.windows.Window] = None) -> Timestream: """ return _aggregation("last", self, window) + def count(self, window: Optional[kd.windows.Window] = None) -> Timestream: + """ + Create a Timestream containing the count value in the `window`. + + Computed for each key separately. + + Parameters + ---------- + window : Optional[Window] + The window to use for the aggregation. + If not specified, the entire Timestream is used. + + Returns + ------- + Timestream + Timestream containing the count value for the key in the window for + each point. + """ + return _aggregation("count", self, window) + def cast(self, data_type: pa.DataType) -> Timestream: """ Cast the type of this Timestream to the given data type. diff --git a/python/pytests/aggregation/count_test.py b/python/pytests/aggregation/count_test.py new file mode 100644 index 000000000..05020462d --- /dev/null +++ b/python/pytests/aggregation/count_test.py @@ -0,0 +1,35 @@ +import kaskada as kd + + +def test_count_unwindowed(source, golden) -> None: + m = source.col("m") + n = source.col("n") + golden.jsonl( + kd.record({"m": m, "count_m": m.count(), "n": n, "count_n": n.count()}) + ) + + +def test_count_windowed(source, golden) -> None: + m = source.col("m") + n = source.col("n") + golden.jsonl( + kd.record( + { + "m": m, + "count_m": m.count(window=kd.windows.Since(m > 20)), + "n": n, + "count_n": n.count(window=kd.windows.Sliding(2, m > 10)), + } + ) + ) + + +def test_count_since_true(source, golden) -> None: + # `since(True)` should be the same as unwindowed, so equals to one whenever the value is non-null + m_sum_since_true = kd.record( + { + "m": source.col("m"), + "m_count": source.col("m").count(window=kd.windows.Since(True)), + } + ) + golden.jsonl(m_sum_since_true) diff --git a/python/pytests/aggregation_test.py b/python/pytests/aggregation/sum_test.py similarity index 64% rename from python/pytests/aggregation_test.py rename to python/pytests/aggregation/sum_test.py index 8cac84194..4a548d52d 100644 --- a/python/pytests/aggregation_test.py +++ b/python/pytests/aggregation/sum_test.py @@ -1,21 +1,4 @@ import kaskada as kd -import pytest - - -@pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,m,n", - "1996-12-19T16:39:57,A,5,10", - "1996-12-19T16:39:58,B,24,3", - "1996-12-19T16:39:59,A,17,6", - "1996-12-19T16:40:00,A,,9", - "1996-12-19T16:40:01,A,12,", - "1996-12-19T16:40:02,A,,", - ] - ) - return kd.sources.CsvString(content, time_column_name="time", key_column_name="key") def test_sum_unwindowed(source, golden) -> None: diff --git a/python/pytests/conftest.py b/python/pytests/conftest.py index 943eb3a36..be7c04eeb 100644 --- a/python/pytests/conftest.py +++ b/python/pytests/conftest.py @@ -110,3 +110,19 @@ def golden( ), f"golden directory {dirname} does not exist. run with `--save-golden` to create it." return GoldenFixture(dirname, test_name, save) + + +@pytest.fixture +def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,m,n", + "1996-12-19T16:39:57,A,5,10", + "1996-12-19T16:39:58,B,24,3", + "1996-12-19T16:39:59,A,17,6", + "1996-12-19T16:40:00,A,,9", + "1996-12-19T16:40:01,A,12,", + "1996-12-19T16:40:02,A,,", + ] + ) + return kd.sources.CsvString(content, time_column_name="time", key_column_name="key") diff --git a/python/pytests/golden/count_test/test_count_since_true.jsonl b/python/pytests/golden/count_test/test_count_since_true.jsonl new file mode 100644 index 000000000..5b532ded7 --- /dev/null +++ b/python/pytests/golden/count_test/test_count_since_true.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"m_count":1} +{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"m_count":1} +{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"m_count":1} +{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"m_count":0} +{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"m_count":1} +{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"m_count":0} diff --git a/python/pytests/golden/count_test/test_count_unwindowed.jsonl b/python/pytests/golden/count_test/test_count_unwindowed.jsonl new file mode 100644 index 000000000..ca8bb5bbb --- /dev/null +++ b/python/pytests/golden/count_test/test_count_unwindowed.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"count_m":1,"n":10.0,"count_n":1} +{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"count_m":1,"n":3.0,"count_n":1} +{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"count_m":2,"n":6.0,"count_n":2} +{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":2,"n":9.0,"count_n":3} +{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"count_m":3,"n":null,"count_n":3} +{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":3,"n":null,"count_n":3} diff --git a/python/pytests/golden/count_test/test_count_windowed.jsonl b/python/pytests/golden/count_test/test_count_windowed.jsonl new file mode 100644 index 000000000..fc7537616 --- /dev/null +++ b/python/pytests/golden/count_test/test_count_windowed.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"count_m":1,"n":10.0,"count_n":1} +{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"count_m":1,"n":3.0,"count_n":1} +{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"count_m":2,"n":6.0,"count_n":2} +{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":2,"n":9.0,"count_n":3} +{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"count_m":3,"n":null,"count_n":3} +{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":3,"n":null,"count_n":1} diff --git a/python/pytests/golden/aggregation_test/test_sum_since_true.jsonl b/python/pytests/golden/sum_test/test_sum_since_true.jsonl similarity index 100% rename from python/pytests/golden/aggregation_test/test_sum_since_true.jsonl rename to python/pytests/golden/sum_test/test_sum_since_true.jsonl diff --git a/python/pytests/golden/aggregation_test/test_sum_unwindowed.jsonl b/python/pytests/golden/sum_test/test_sum_unwindowed.jsonl similarity index 100% rename from python/pytests/golden/aggregation_test/test_sum_unwindowed.jsonl rename to python/pytests/golden/sum_test/test_sum_unwindowed.jsonl diff --git a/python/pytests/golden/aggregation_test/test_sum_windowed.jsonl b/python/pytests/golden/sum_test/test_sum_windowed.jsonl similarity index 100% rename from python/pytests/golden/aggregation_test/test_sum_windowed.jsonl rename to python/pytests/golden/sum_test/test_sum_windowed.jsonl