added support for sparse aggregations
Oufattole committed May 29, 2024
1 parent 97938a8 commit 6f3b1ec
@@ -1,8 +1,10 @@
from import Callable

import pandas as pd

# pd.set_option("compute.use_numba", True)
import polars as pl
import polars.selectors as cs
from scipy.sparse import coo_matrix

Expand All @@ -25,61 +27,136 @@ def time_aggd_col_alias_fntr(window_size: str, agg: str) -> Callable[[str], str]
raise ValueError("Aggregation type 'agg' must be provided")

def f(c: str) -> str:
return "/".join([window_size] + c.split("/") + [agg])
if c in ["patient_id", "timestamp"]:
return c
return "/".join([window_size] + c.split("/") + [agg])

return f

def get_agg_pl_expr(window_size: str, agg: str):
code_cols = cs.ends_with("code")
value_cols = cs.ends_with("value")
# def sparse_groupby_sum(df):
# id_cols = ["patient_id", "timestamp"]
# ohe = OneHotEncoder(sparse_output=True)
# # Get all other columns we are not grouping by
# other_columns = [col for col in df.columns if col not in id_cols]
# # Get a 607875 x nDistinctIDs matrix in sparse row format with exactly
# # 1 nonzero entry per row
# onehot = ohe.fit_transform(df[id_cols].values.reshape(-1, 1))
# # Transpose it. then convert from sparse column back to sparse row, as
# # dot products of two sparse row matrices are faster than sparse col with
# # sparse row
# onehot = onehot.T.tocsr()
# # Dot the transposed matrix with the other columns of the df, converted to sparse row
# # format, then convert the resulting matrix back into a sparse
# # dataframe with the same column names
# out = pd.DataFrame.sparse.from_spmatrix(
# columns=other_columns)
# # Add in the groupby column to this resulting dataframe with the proper class labels
# out[groupby] = ohe.categories_[0]
# # This final groupby sum simply ensures the result is in the format you would expect
# # for a regular pandas groupby and sum, but you can just return out if this is going to be
# # a performance penalty. Note in that case that the groupby column may have changed index
# return out.groupby(groupby).sum()

def sparse_rolling(df, timedelta, agg):
"""Iterates through rolling windows while maintaining sparsity.
>>> df = pd.DataFrame({'patient_id': {0: 1, 1: 1, 2: 1},
... 'timestamp': {0: pd.Timestamp('2021-01-01 00:00:00'),
... 1: pd.Timestamp('2021-01-01 00:00:00'), 2: pd.Timestamp('2020-01-01 00:00:00')},
... 'A/code': {0: 1, 1: 1, 2: 0}, 'B/code': {0: 0, 1: 0, 2: 1}, 'C/code': {0: 0, 1: 0, 2: 0}})
>>> for col in ["A/code", "B/code", "C/code"]: df[col] = pd.arrays.SparseArray(df[col])
>>> sparse_rolling(df, pd.Timedelta("1d"), "sum").dtypes
A/code Sparse[int64, 0]
B/code Sparse[int64, 0]
C/code Sparse[int64, 0]
timestamp datetime64[ns]
dtype: object
df = df.drop(columns="patient_id")
out_dfs = []
timestamps = []
for each in df.rolling(on="timestamp", window=timedelta):
df = pd.concat(out_dfs, axis=1).T
df["timestamp"] = timestamps
return df

def compute_agg(df, window_size: str, agg: str):
"""Applies aggreagtion to dataframe.
Dataframe is expected to only have the relevant columns for aggregating
It should have the patient_id and timestamp columns, and then only code columns
if agg is a code aggregation or only value columns if it is a value aggreagation.
>>> from MEDS_tabular_automl.generate_ts_features import get_flat_ts_rep
>>> feature_columns = ['A/value/sum', 'A/code/count', 'B/value/sum', 'B/code/count',
... "C/value/sum", "C/code/count", "A/static/present"]
>>> data = {'patient_id': [1, 1, 1, 2, 2, 2],
... 'code': ['A', 'A', 'B', 'B', 'C', 'C'],
... 'timestamp': ['2021-01-01', '2021-01-01', '2020-01-01', '2021-01-04', None, None],
... 'numerical_value': [1, 2, 2, 2, 3, 4]}
>>> df = pl.DataFrame(data).lazy()
>>> df = get_flat_ts_rep(feature_columns, df)
>>> df
patient_id timestamp A/value B/value C/value A/code B/code C/code
0 1 2021-01-01 1 0 0 1 0 0
1 1 2021-01-01 2 0 0 1 0 0
2 1 2020-01-01 0 2 0 0 1 0
3 2 2021-01-04 0 2 0 0 1 0
>>> df['timestamp'] = pd.to_datetime(df['timestamp'])
>>> df.dtypes
patient_id int64
timestamp datetime64[ns]
A/value Sparse[int64, 0]
B/value Sparse[int64, 0]
C/value Sparse[int64, 0]
A/code Sparse[int64, 0]
B/code Sparse[int64, 0]
C/code Sparse[int64, 0]
dtype: object
>>> output = compute_agg(df[['patient_id', 'timestamp', 'A/code', 'B/code', 'C/code']],
... "1d", "code/count")
>>> output
1d/A/code/count 1d/B/code/count 1d/C/code/count timestamp patient_id
0 1 0 0 2021-01-01 1
1 2 0 0 2021-01-01 1
2 0 1 0 2020-01-01 1
0 0 1 0 2021-01-04 2
>>> output.dtypes
1d/A/code/count Sparse[int64, 0]
1d/B/code/count Sparse[int64, 0]
1d/C/code/count Sparse[int64, 0]
timestamp datetime64[ns]
patient_id int64
dtype: object
if window_size == "full":
match agg:
case "code/count":
return code_cols.cumsum().map_alias(time_aggd_col_alias_fntr(window_size, "count"))
case "value/count":
return (
.map_alias(time_aggd_col_alias_fntr(window_size, "count"))
case "value/sum":
return value_cols.cumsum().map_alias(time_aggd_col_alias_fntr(window_size, "sum"))
case "value/sum_sqd":
return (value_cols**2).cumsum().map_alias(time_aggd_col_alias_fntr(window_size, "sum_sqd"))
case "value/min":
value_cols.cummin().map_alias(time_aggd_col_alias_fntr(window_size, "min"))
case "value/max":
value_cols.cummax().map_alias(time_aggd_col_alias_fntr(window_size, "max"))
case _:
raise ValueError(
f"Invalid aggregation '{agg}' provided for window_size '{window_size}'."
f" Please choose from the valid options: {VALID_AGGREGATIONS}"
timedelta = df["timestamp"].max() - df["timestamp"].min() + pd.Timedelta(days=1)
match agg:
case "code/count":
return code_cols.sum().map_alias(time_aggd_col_alias_fntr(window_size, "count"))
case "value/count":
return (
value_cols.is_not_null().sum().map_alias(time_aggd_col_alias_fntr(window_size, "count"))
case "value/has_values_count":
return (
(value_cols.is_not_null() & value_cols.is_not_nan())
.map_alias(time_aggd_col_alias_fntr(window_size, "has_values_count"))
case "value/sum":
return value_cols.sum().map_alias(time_aggd_col_alias_fntr(window_size, "sum"))
case "value/sum_sqd":
return (value_cols**2).sum().map_alias(time_aggd_col_alias_fntr(window_size, "sum_sqd"))
case "value/min":
value_cols.min().map_alias(time_aggd_col_alias_fntr(window_size, "min"))
case "value/max":
value_cols.max().map_alias(time_aggd_col_alias_fntr(window_size, "max"))
case _:
raise ValueError(f"Invalid aggregation `{agg}` for window_size `{window_size}`")
timedelta = pd.Timedelta(window_size)
group = df.groupby("patient_id")
match agg:
case "code/count" | "value/sum":
agg = "sum"
out_dfs = []
for patient_id, subset_df in group:
df = sparse_rolling(subset_df, timedelta, agg)
df["patient_id"] = patient_id
out_df = pd.concat(out_dfs, axis=0)
return out_df.rename(columns=time_aggd_col_alias_fntr(window_size, "count"))

case _:
raise ValueError(f"Invalid aggregation `{agg}` for window_size `{window_size}`")

def _generate_summary(df: pd.DataFrame, window_size: str, agg: str) -> pl.LazyFrame:
Expand All @@ -103,31 +180,31 @@ def _generate_summary(df: pd.DataFrame, window_size: str, agg: str) -> pl.LazyFr
... 'numerical_value': [1, 2, 2, 2, 3, 4]}
>>> df = pl.DataFrame(data).lazy()
>>> pivot_df = get_flat_ts_rep(feature_columns, df)
>>> pivot_df['timestamp'] = pd.to_datetime(pivot_df['timestamp'])
>>> pivot_df
patient_id timestamp A/value B/value C/value A/code B/code C/code
0 1 2021-01-01 1 0 0 1 0 0
1 1 2021-01-01 2 0 0 1 0 0
2 1 2020-01-01 0 2 0 0 1 0
3 2 2021-01-04 0 2 0 0 1 0
patient_id timestamp A/value B/value C/value A/code B/code C/code
0 1 2021-01-01 1 0 0 1 0 0
1 1 2021-01-01 2 0 0 1 0 0
2 1 2020-01-01 0 2 0 0 1 0
3 2 2021-01-04 0 2 0 0 1 0
>>> _generate_summary(pivot_df, "full", "value/sum")
patient_id timestamp A/value/sum B/value/sum C/value/sum
full/A/value/count full/B/value/count full/C/value/count timestamp patient_id
0 1 0 0 2021-01-01 1
1 3 0 0 2021-01-01 1
2 3 2 0 2021-01-01 1
0 0 2 0 2021-01-04 2
raise ValueError(f"Invalid aggregation: {agg}. Valid options are: {VALID_AGGREGATIONS}")
if window_size == "full":
out_df = df.groupby("patient_id").agg(
get_agg_pl_expr(window_size, agg),
out_df = out_df.explode(*[c for c in out_df.columns if c != "patient_id"])
code_cols = [c for c in df.columns if c.endswith("code")]
value_cols = [c for c in df.columns if c.endswith("value")]
cols = code_cols
out_df = df.rolling(
get_agg_pl_expr(window_size, agg),
cols = value_cols
id_cols = ["patient_id", "timestamp"]
df = df.loc[:, id_cols + cols]
out_df = compute_agg(df, window_size, agg)
return out_df

Expand All @@ -152,20 +229,41 @@ def generate_summary(
pl.LazyFrame: A LazyFrame containing the summarized data with all required features present.
# >>> from datetime import date
# >>> wide_df = pd.DataFrame({"patient_id": [1, 1, 1, 2],
# ... "A/code": [1, 1, 0, 0],
# ... "B/code": [0, 0, 1, 1],
# ... "A/value": [1, 2, 3, None],
# ... "B/value": [None, None, None, 4.0],
# ... "timestamp": [date(2021, 1, 1), date(2021, 1, 1),date(2020, 1, 3), date(2021, 1, 4)],
# ... }).lazy()
# >>> feature_columns = ["A/code", "B/code", "A/value", "B/value"]
# >>> aggregations = ["code/count", "value/sum"]
# >>> window_sizes = ["full", "1d"]
# >>> generate_summary(feature_columns, wide_df.lazy(), window_sizes, aggregations).collect()
>>> from datetime import date
>>> wide_df = pd.DataFrame({"patient_id": [1, 1, 1, 2],
... "A/code": [1, 1, 0, 0],
... "B/code": [0, 0, 1, 1],
... "A/value": [1, 2, 3, None],
... "B/value": [None, None, None, 4.0],
... "timestamp": [date(2021, 1, 1), date(2021, 1, 1),date(2020, 1, 3), date(2021, 1, 4)],
... })
>>> wide_df['timestamp'] = pd.to_datetime(wide_df['timestamp'])
>>> for col in ["A/code", "B/code", "A/value", "B/value"]:
... wide_df[col] = pd.arrays.SparseArray(wide_df[col])
>>> feature_columns = ["A/code", "B/code", "A/value", "B/value"]
>>> aggregations = ["code/count", "value/sum"]
>>> window_sizes = ["full", "1d"]
>>> generate_summary(feature_columns, wide_df, window_sizes, aggregations)[
... ["1d/A/code/count", "full/B/code/count", "full/B/value/sum"]]
1d/A/code/count full/B/code/count full/B/value/sum
0 NaN 1.0 0
1 NaN 1.0 0
2 NaN 1.0 0
0 NaN 1.0 0
0 NaN NaN 0
1 NaN NaN 0
2 NaN NaN 0
0 NaN NaN 0
0 0 NaN 0
1 1.0 NaN 0
2 2.0 NaN 0
0 0 NaN 0
0 NaN NaN 0
1 NaN NaN 0
2 NaN NaN 0
0 NaN NaN 0
df = df.sort(["patient_id", "timestamp"])
df = df.sort_values(["patient_id", "timestamp"])
final_columns = []
out_dfs = []
# Generate summaries for each window size and aggregation
Expand All @@ -177,19 +275,22 @@ def generate_summary(
# only iterate through code_types that exist in the dataframe columns
if any([c.endswith(code_type) for c in df.columns]):
timestamp_dtype = df.dtypes[df.columns.index("timestamp")]
assert timestamp_dtype in [
], f"timestamp must be of type Date, but is {timestamp_dtype}"
# timestamp_dtype = df.dtypes[df.columns.index("timestamp")]
# assert timestamp_dtype in [
# pl.Datetime,
# pl.Date,
# ], f"timestamp must be of type Date, but is {timestamp_dtype}"
out_df = _generate_summary(df, window_size, agg)

final_columns = sorted(final_columns)
# Combine all dataframes using successive joins
result_df = pl.concat(out_dfs, how="align")
result_df = pd.concat(out_dfs)
# Add in missing feature columns with default values
missing_columns = [col for col in final_columns if col not in result_df.columns]
result_df = result_df.with_columns([pl.lit(None).alias(col) for col in missing_columns])
result_df =*["patient_id", "timestamp"], *final_columns))

result_df[missing_columns] = pd.DataFrame.sparse.from_spmatrix(
coo_matrix((result_df.shape[0], len(missing_columns)))
result_df = result_df[["patient_id", "timestamp"] + final_columns]
return result_df

