Skip to content

Commit

Permalink
feat: add count aggregation (#660)
Browse files Browse the repository at this point in the history
Adds the count aggregation to the new python implementation. Adds
`count` in #662 .
  • Loading branch information
kevinjnguyen authored Aug 16, 2023
1 parent d3dba5d commit fffc1ac
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 17 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/timestream/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Windowed:
:toctree: ../apidocs/
Timestream.collect
Timestream.count
Timestream.first
Timestream.last
Timestream.sum
Expand Down
20 changes: 20 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,26 @@ def last(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
return _aggregation("last", self, window)

def count(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the count value in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the count value for the key in the window for
each point.
"""
return _aggregation("count", self, window)

def cast(self, data_type: pa.DataType) -> Timestream:
"""
Cast the type of this Timestream to the given data type.
Expand Down
35 changes: 35 additions & 0 deletions python/pytests/aggregation/count_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import kaskada as kd


def test_count_unwindowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(
kd.record({"m": m, "count_m": m.count(), "n": n, "count_n": n.count()})
)


def test_count_windowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(
kd.record(
{
"m": m,
"count_m": m.count(window=kd.windows.Since(m > 20)),
"n": n,
"count_n": n.count(window=kd.windows.Sliding(2, m > 10)),
}
)
)


def test_count_since_true(source, golden) -> None:
# `since(True)` should be the same as unwindowed, so equals to one whenever the value is non-null
m_sum_since_true = kd.record(
{
"m": source.col("m"),
"m_count": source.col("m").count(window=kd.windows.Since(True)),
}
)
golden.jsonl(m_sum_since_true)
Original file line number Diff line number Diff line change
@@ -1,21 +1,4 @@
import kaskada as kd
import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_sum_unwindowed(source, golden) -> None:
Expand Down
16 changes: 16 additions & 0 deletions python/pytests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,19 @@ def golden(
), f"golden directory {dirname} does not exist. run with `--save-golden` to create it."

return GoldenFixture(dirname, test_name, save)


@pytest.fixture
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")
6 changes: 6 additions & 0 deletions python/pytests/golden/count_test/test_count_since_true.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"m_count":1}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"m_count":1}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"m_count":1}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"m_count":0}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"m_count":1}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"m_count":0}
6 changes: 6 additions & 0 deletions python/pytests/golden/count_test/test_count_unwindowed.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"count_m":1,"n":10.0,"count_n":1}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"count_m":1,"n":3.0,"count_n":1}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"count_m":2,"n":6.0,"count_n":2}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":2,"n":9.0,"count_n":3}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"count_m":3,"n":null,"count_n":3}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":3,"n":null,"count_n":3}
6 changes: 6 additions & 0 deletions python/pytests/golden/count_test/test_count_windowed.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"count_m":1,"n":10.0,"count_n":1}
{"_time":"1996-12-19T16:39:58.000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":24.0,"count_m":1,"n":3.0,"count_n":1}
{"_time":"1996-12-19T16:39:59.000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":17.0,"count_m":2,"n":6.0,"count_n":2}
{"_time":"1996-12-19T16:40:00.000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":2,"n":9.0,"count_n":3}
{"_time":"1996-12-19T16:40:01.000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":12.0,"count_m":3,"n":null,"count_n":3}
{"_time":"1996-12-19T16:40:02.000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":null,"count_m":3,"n":null,"count_n":1}

0 comments on commit fffc1ac

Please sign in to comment.