Skip to content

Commit

Permalink
Implement data processing for MEDS format to pivot tables into two in…
Browse files Browse the repository at this point in the history
…dexed file types:

- Code data: Files with a patient_id and timestamp index, containing columns for each code with binary presence indicators (1 and 0).
- Value data: Files similar in structure but containing the numerical values observed for each code.
  • Loading branch information
Oufattole committed May 27, 2024
1 parent 63b9ba6 commit cd067f8
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 236 deletions.
2 changes: 1 addition & 1 deletion scripts/identify_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def store_columns(
.. _link: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.groupby_rolling.html # noqa: E501
"""
# create output dir
flat_dir = Path(cfg.tabularized_data_dir) / "flat_reps"
flat_dir = Path(cfg.tabularized_data_dir)
flat_dir.mkdir(exist_ok=True, parents=True)

# load MEDS data
Expand Down
52 changes: 35 additions & 17 deletions scripts/tabularize_ts.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
"""WIP."""
import hydra
from omegaconf import DictConfig
from tqdm import tqdm

from MEDS_tabular_automl.utils import setup_environment
from MEDS_tabular_automl.generate_ts_features import get_flat_ts_rep
from MEDS_tabular_automl.utils import setup_environment, write_df


@hydra.main(version_base=None, config_path="../configs", config_name="tabularize")
def tabularize_ts_data(
cfg: DictConfig,
):
"""Writes a flat (historically summarized) representation of the dataset to disk.
"""Processes a medical dataset to generates and stores flat representatiosn of time-series data.
This file caches a set of files useful for building flat representations of the dataset to disk,
suitable for, e.g., sklearn style modeling for downstream tasks. It will produce a few sets of files:
* A new directory ``self.config.save_dir / "flat_reps"`` which contains the following:
* A subdirectory ``raw`` which contains: (1) a json file with the configuration arguments and (2) a
set of parquet files containing flat (e.g., wide) representations of summarized events per subject,
broken out by split and subject chunk.
* A set of subdirectories ``past/*`` which contains summarized views over the past ``*`` time period
per subject per event, for all time periods in ``window_sizes``, if any.
This function handles MEDS format data and pivots tables to create two types of data files
with patient_id and timestamp indexes:
code data: containing a column for every code and 1 and 0 values indicating presence
value data: containing a column for every code which the numerical value observed.
Args:
cfg:
Expand All @@ -35,8 +31,8 @@ def tabularize_ts_data(
specified in this argument. These are strings specifying time deltas, using this syntax:
`link`_. Each window size will be summarized to a separate directory, and will share the same
subject file split as is used in the raw representation files.
codes: A list of codes to include in the flat representation. If `None`, all codes will be included
in the flat representation.
codes: A list of codes to include in the flat representation. If `None`, all codes will be
included in the flat representation.
aggs: A list of aggregations to apply to the raw representation. Must have length greater than 0.
n_patients_per_sub_shard: The number of subjects that should be included in each output file.
Lowering this number increases the number of files written, making the process of creating and
Expand All @@ -45,7 +41,29 @@ def tabularize_ts_data(
directory.
do_update: bool = True
seed: The seed to use for random number generation.
.. _link: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.groupby_rolling.html # noqa: E501
"""
setup_environment(cfg)
flat_dir, split_to_df, feature_columns = setup_environment(cfg)
# Produce ts representation
ts_subdir = flat_dir / "ts"

for sp, subjects_dfs in tqdm(list(split_to_df.items()), desc="Flattening Splits"):
sp_dir = ts_subdir / sp

for i, shard_df in enumerate(tqdm(subjects_dfs, desc="Subject chunks", leave=False)):
code_fp = sp_dir / f"{i}_code.parquet"
value_fp = sp_dir / f"{i}_value.parquet"
if code_fp.exists() or value_fp.exists():
if cfg.do_update:
continue
elif not cfg.do_overwrite:
raise FileExistsError(
f"do_overwrite is {cfg.do_overwrite} and {code_fp.exists()}"
f" or {value_fp.exists()} exists!"
)

code_df, value_df = get_flat_ts_rep(
feature_columns=feature_columns,
shard_df=shard_df,
)
write_df(code_df, code_fp, do_overwrite=cfg.do_overwrite)
write_df(value_df, value_fp, do_overwrite=cfg.do_overwrite)
Loading

0 comments on commit cd067f8

Please sign in to comment.