Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quantile_tdigest #284

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
from numpy.typing import ArrayLike, DTypeLike

from . import aggregate_flox, aggregate_npg, xrutils
from . import aggregate_flox, aggregate_npg, sketches, xrutils
from . import xrdtypes as dtypes

if TYPE_CHECKING:
Expand Down Expand Up @@ -119,7 +119,10 @@ def _normalize_dtype(dtype: DTypeLike, array_dtype: np.dtype, fill_value=None) -
elif not isinstance(dtype, np.dtype):
dtype = np.dtype(dtype)
if fill_value not in [None, dtypes.INF, dtypes.NINF, dtypes.NA]:
dtype = np.result_type(dtype, fill_value)
try:
dtype = np.result_type(dtype, fill_value)
except TypeError:
pass
return dtype


Expand Down Expand Up @@ -567,6 +570,30 @@ def quantile_new_dims_func(q) -> tuple[Dim]:
mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None)
nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None)


from crick import TDigest

quantile_tdigest = Aggregation(
"quantile_tdigest",
numpy=(sketches.tdigest_aggregate,),
chunk=(sketches.tdigest_chunk,),
combine=(sketches.tdigest_combine,),
finalize=sketches.tdigest_aggregate,
fill_value=TDigest(),
final_dtype=np.float64,
)

nanquantile_tdigest = Aggregation(
"nanquantile_tdigest",
numpy=(sketches.tdigest_aggregate,),
chunk=(sketches.tdigest_chunk,),
combine=(sketches.tdigest_combine,),
finalize=sketches.tdigest_aggregate,
fill_value=TDigest(),
final_dtype=np.float64,
)


aggregations = {
"any": any_,
"all": all_,
Expand Down Expand Up @@ -599,6 +626,8 @@ def quantile_new_dims_func(q) -> tuple[Dim]:
"nanquantile": nanquantile,
"mode": mode,
"nanmode": nanmode,
"quantile_tdigest": quantile_tdigest,
"nanquantile_tdigest": nanquantile_tdigest,
}


Expand Down
42 changes: 42 additions & 0 deletions flox/sketches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import numpy as np
import numpy_groupies as npg


def tdigest_chunk(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, **kwargs):
from crick import TDigest

def _(arr):
digest = TDigest()
# we receive object arrays from numpy_groupies
digest.update(arr.astype(array.dtype, copy=False))
return digest

result = npg.aggregate_numpy.aggregate(
group_idx, array, func=_, size=size, fill_value=fill_value, axis=axis, dtype=object
)
return result


def tdigest_combine(digests, axis=-1, keepdims=True):
from crick import TDigest

def _(arr):
t = TDigest()
t.merge(*arr)
return np.array([t], dtype=object)

if not isinstance(axis, tuple):
axis = (axis,)

# If reducing along multiple axes, we can just keep combining ;)
result = digests
for ax in axis:
result = np.apply_along_axis(_, ax, result)

return result


def tdigest_aggregate(digests, q, axis=-1, keepdims=True):
for idx in np.ndindex(digests.shape):
digests[idx] = digests[idx].quantile(q)
return digests
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ exclude=["asv_bench/pkgs"]
module=[
"asv_runner.*",
"cachey",
"crick",
"cftime",
"cubed.*",
"dask.*",
Expand Down
Loading