Skip to content

Commit

Permalink
feat: add other aggregations (#669)
Browse files Browse the repository at this point in the history
Adds the following aggregations: count_if, max, mean, min, stddev, and
variance
  • Loading branch information
kevinjnguyen authored Aug 17, 2023
1 parent bf2ef0d commit c755d60
Show file tree
Hide file tree
Showing 29 changed files with 599 additions and 16 deletions.
6 changes: 6 additions & 0 deletions python/docs/source/reference/timestream/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ Windowed:
Timestream.collect
Timestream.count
Timestream.count_if
Timestream.first
Timestream.last
Timestream.max
Timestream.mean
Timestream.min
Timestream.stddev
Timestream.sum
Timestream.variance
```
120 changes: 120 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,126 @@ def count(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
return _aggregation("count", self, window)

def count_if(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the count of `true` values in `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the count value if true for the key in the window for
each point.
"""
return _aggregation("count_if", self, window)

def max(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the max value in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the max value for the key in the window for
each point.
"""
return _aggregation("max", self, window)

def min(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the min value in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the min value for the key in the window for
each point.
"""
return _aggregation("min", self, window)

def mean(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the mean value in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the mean value for the key in the window for
each point.
"""
return _aggregation("mean", self, window)

def stddev(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the standard deviation in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the standard deviation for the key in the window for
each point.
"""
return _aggregation("stddev", self, window)

def variance(self, window: Optional[kd.windows.Window] = None) -> Timestream:
"""
Create a Timestream containing the variance in the `window`.
Computed for each key separately.
Parameters
----------
window : Optional[Window]
The window to use for the aggregation.
If not specified, the entire Timestream is used.
Returns
-------
Timestream
Timestream containing the variance for the key in the window for
each point.
"""
return _aggregation("variance", self, window)

def cast(self, data_type: pa.DataType) -> Timestream:
"""
Cast the type of this Timestream to the given data type.
Expand Down
62 changes: 62 additions & 0 deletions python/pytests/aggregation/count_if_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
import kaskada as kd


@pytest.fixture(scope="module")
def count_if_source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n,is_valid",
"1996-12-19T16:39:57,A,5,10,true",
"1996-12-19T16:39:58,B,24,3,true",
"1996-12-19T16:39:59,A,17,6,false",
"1996-12-19T16:40:00,A,,9,false",
"1996-12-19T16:40:01,A,12,,true",
"1996-12-19T16:40:02,A,,,",
"1996-12-19T16:40:03,B,26,12,true",
"1996-12-19T16:40:04,B,30,1,true",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_count_if_unwindowed(count_if_source, golden) -> None:
is_valid = count_if_source.col("is_valid")
m = count_if_source.col("m")
golden.jsonl(
kd.record(
{
"is_valid": is_valid,
"count_if": is_valid.count_if(),
"m": m,
}
)
)


def test_count_if_windowed(count_if_source, golden) -> None:
is_valid = count_if_source.col("is_valid")
m = count_if_source.col("m")
golden.jsonl(
kd.record(
{
"is_valid": is_valid,
"count_if": is_valid.count_if(window=kd.windows.Since(m > 25)),
"m": m,
}
)
)


def test_count_if_since_true(count_if_source, golden) -> None:
is_valid = count_if_source.col("is_valid")
m = count_if_source.col("m")
golden.jsonl(
kd.record(
{
"is_valid": is_valid,
"count_if": is_valid.count_if(window=kd.windows.Since(True)),
"m": m,
}
)
)
18 changes: 18 additions & 0 deletions python/pytests/aggregation/count_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_count_unwindowed(source, golden) -> None:
m = source.col("m")
Expand Down
51 changes: 51 additions & 0 deletions python/pytests/aggregation/max_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_max_unwindowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(kd.record({"m": m, "max_m": m.max(), "n": n, "max_n": n.max()}))


def test_max_windowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(
kd.record(
{
"m": m,
"max_m": m.max(window=kd.windows.Since(m > 20)),
"n": n,
"max_n": n.max(window=kd.windows.Sliding(2, m > 10)),
}
)
)


def test_max_since_true(source, golden) -> None:
# `since(True)` should be the same as unwindowed, so equals the original vaule.
m_max_since_true = kd.record(
{
"m": source.col("m"),
"m_max": source.col("m").max(window=kd.windows.Since(True)),
}
)
golden.jsonl(m_max_since_true)
51 changes: 51 additions & 0 deletions python/pytests/aggregation/mean_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_mean_unwindowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(kd.record({"m": m, "mean_m": m.mean(), "n": n, "mean_n": n.mean()}))


def test_mean_windowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(
kd.record(
{
"m": m,
"mean_m": m.mean(window=kd.windows.Since(m > 20)),
"n": n,
"mean_n": n.mean(window=kd.windows.Sliding(2, m > 10)),
}
)
)


def test_mean_since_true(source, golden) -> None:
# `since(True)` should be the same as unwindowed, so equals the original vaule.
m_mean_since_true = kd.record(
{
"m": source.col("m"),
"m_mean": source.col("m").mean(window=kd.windows.Since(True)),
}
)
golden.jsonl(m_mean_since_true)
51 changes: 51 additions & 0 deletions python/pytests/aggregation/min_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57,A,5,10",
"1996-12-19T16:39:58,B,24,3",
"1996-12-19T16:39:59,A,17,6",
"1996-12-19T16:40:00,A,,9",
"1996-12-19T16:40:01,A,12,",
"1996-12-19T16:40:02,A,,",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_min_unwindowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(kd.record({"m": m, "min_m": m.min(), "n": n, "min_n": n.min()}))


def test_min_windowed(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(
kd.record(
{
"m": m,
"min_m": m.min(window=kd.windows.Since(m > 20)),
"n": n,
"min_n": n.min(window=kd.windows.Sliding(2, m > 10)),
}
)
)


def test_min_since_true(source, golden) -> None:
# `since(True)` should be the same as unwindowed, so equals the original vaule.
m_min_since_true = kd.record(
{
"m": source.col("m"),
"m_min": source.col("m").min(window=kd.windows.Since(True)),
}
)
golden.jsonl(m_min_since_true)
Loading

0 comments on commit c755d60

Please sign in to comment.