Skip to content

Commit

Permalink
standardized file storage using file_name.py and updated from using n…
Browse files Browse the repository at this point in the history
…py files for storing sparse matrices to using npz files that contain the array with data,row,cols and contain the shape of the sparse matrix
  • Loading branch information
Oufattole committed Jun 2, 2024
1 parent 85f38b5 commit 7ea3230
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 292 deletions.
38 changes: 17 additions & 21 deletions scripts/identify_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
from pathlib import Path

import hydra
import numpy as np
import polars as pl
from loguru import logger
from omegaconf import DictConfig, OmegaConf

from MEDS_tabular_automl.file_name import FileNameResolver
from MEDS_tabular_automl.mapper import wrap as rwlock_wrap
from MEDS_tabular_automl.utils import (
compute_feature_frequencies,
load_meds_data,
load_tqdm,
)
from MEDS_tabular_automl.utils import compute_feature_frequencies, load_tqdm


def store_config_yaml(config_fp: Path, cfg: DictConfig):
Expand Down Expand Up @@ -73,14 +71,12 @@ def store_columns(
"""
iter_wrapper = load_tqdm(cfg.tqdm)
# create output dir
flat_dir = Path(cfg.tabularized_data_dir)
f_name_resolver = FileNameResolver(cfg)
flat_dir = f_name_resolver.tabularize_dir
flat_dir.mkdir(exist_ok=True, parents=True)

# load MEDS data
split_to_fps = load_meds_data(cfg.MEDS_cohort_dir, load_data=False)

# store params in json file
config_fp = flat_dir / "config.yaml"
config_fp = f_name_resolver.get_config_path()
store_config_yaml(config_fp, cfg)

# 0. Identify Output Columns and Frequencies
Expand All @@ -96,11 +92,11 @@ def read_fn(in_fp):
return pl.scan_parquet(in_fp)

# Map: Iterates through shards and caches feature frequencies
feature_freq_fp = flat_dir / "feature_freqs"
feature_freq_fp.mkdir(exist_ok=True)
for shard_fp in iter_wrapper(split_to_fps["train"]):
name = shard_fp.stem
out_fp = feature_freq_fp / f"{name}.json"
train_shards = f_name_resolver.list_meds_files(split="train")
np.random.shuffle(train_shards)
feature_dir = f_name_resolver.tabularize_dir
for shard_fp in iter_wrapper(train_shards):
out_fp = feature_dir / "identify_train_columns" / f"{shard_fp.stem}.json"
rwlock_wrap(
shard_fp,
out_fp,
Expand All @@ -123,16 +119,16 @@ def compute_fn(feature_freq_list):

def write_fn(data, out_fp):
feature_freqs, feature_columns = data
json.dump(feature_columns, open(out_fp / "feature_columns.json", "w"))
json.dump(feature_freqs, open(flat_dir / "feature_freqs.json", "w"))
json.dump(feature_columns, open(f_name_resolver.get_feature_columns_fp(), "w"))
json.dump(feature_freqs, open(f_name_resolver.get_feature_freqs_fp(), "w"))

def read_fn(in_fp):
files = list(in_fp.glob("*.json"))
def read_fn(feature_dir):
files = list(feature_dir.glob("*.json"))
return [json.load(open(fp)) for fp in files]

rwlock_wrap(
feature_freq_fp,
flat_dir,
feature_dir / "identify_train_columns",
feature_dir,
read_fn,
write_fn,
compute_fn,
Expand Down
116 changes: 56 additions & 60 deletions scripts/summarize_over_windows.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
#!/usr/bin/env python

"""Aggregates time-series data for feature columns across different window sizes."""
import os
import json
from itertools import product

import hydra
import numpy as np
import polars as pl
from loguru import logger
from omegaconf import DictConfig

from MEDS_tabular_automl.file_name import FileNameResolver
from MEDS_tabular_automl.generate_summarized_reps import generate_summary
from MEDS_tabular_automl.generate_ts_features import get_flat_ts_rep
from MEDS_tabular_automl.mapper import wrap as rwlock_wrap
from MEDS_tabular_automl.utils import setup_environment, write_df


def hydra_loguru_init() -> None:
"""Adds loguru output to the logs that hydra scrapes.
Must be called from a hydra main!
"""
hydra_path = hydra.core.hydra_config.HydraConfig.get().runtime.output_dir
logger.add(os.path.join(hydra_path, "main.log"))
from MEDS_tabular_automl.utils import hydra_loguru_init, load_tqdm, write_df


@hydra.main(version_base=None, config_path="../configs", config_name="tabularize")
Expand Down Expand Up @@ -53,58 +47,60 @@ def summarize_ts_data_over_windows(
FileNotFoundError: If specified directories or files in the configuration are not found.
ValueError: If required columns like 'code' or 'value' are missing in the data files.
"""
iter_wrapper = load_tqdm(cfg.tqdm)
if not cfg.test:
hydra_loguru_init()
flat_dir, split_to_fps, feature_columns = setup_environment(cfg, load_data=False)
f_name_resolver = FileNameResolver(cfg)
# Produce ts representation
ts_subdir = flat_dir / "ts"

for sp, shard_fps in split_to_fps.items():
sp_dir = ts_subdir / sp

for i, shard_fp in enumerate(shard_fps):
for window_size in cfg.window_sizes:
for agg in cfg.aggs:
pivot_fp = sp_dir / window_size / agg / f"{i}.pkl"
if pivot_fp.exists() and not cfg.do_overwrite:
raise FileExistsError(
f"do_overwrite is {cfg.do_overwrite} and {pivot_fp.exists()} exists!"
)

def read_fn(fp):
return pl.scan_parquet(fp)

def compute_fn(shard_df):
# Load Sparse DataFrame
pivot_df = get_flat_ts_rep(
feature_columns=feature_columns,
shard_df=shard_df,
)

# Summarize data -- applying aggregations on various window sizes
summary_df = generate_summary(
feature_columns,
pivot_df,
window_size,
agg,
)
assert summary_df.shape[1] > 2, "No data found in the summarized dataframe"

logger.info("Writing pivot file")
return summary_df

def write_fn(out_df, out_fp):
write_df(out_df, out_fp, do_overwrite=cfg.do_overwrite)

rwlock_wrap(
shard_fp,
pivot_fp,
read_fn,
write_fn,
compute_fn,
do_overwrite=cfg.do_overwrite,
do_return=False,
)
meds_shard_fps = f_name_resolver.list_meds_files()
feature_columns = json.load(open(f_name_resolver.get_feature_columns_fp()))

# shuffle tasks
tabularization_tasks = list(product(meds_shard_fps, cfg.window_sizes, cfg.aggs))
np.random.shuffle(tabularization_tasks)

# iterate through them
for shard_fp, window_size, agg in iter_wrapper(tabularization_tasks):
shard_num = shard_fp.stem
split = shard_fp.parent.stem
assert split in ["train", "held_out", "tuning"], f"Invalid split {split}"
ts_fp = f_name_resolver.get_flat_ts_rep(split, shard_num, window_size, agg)
if ts_fp.exists() and not cfg.do_overwrite:
raise FileExistsError(f"do_overwrite is {cfg.do_overwrite} and {ts_fp.exists()} exists!")

def read_fn(fp):
return pl.scan_parquet(fp)

def compute_fn(shard_df):
# Load Sparse DataFrame
index_df, sparse_matrix = get_flat_ts_rep(feature_columns, shard_df)

# Summarize data -- applying aggregations on a specific window size + aggregation combination
summary_df = generate_summary(
feature_columns,
index_df,
sparse_matrix,
window_size,
agg,
)
assert summary_df.shape[1] > 2, "No data found in the summarized dataframe"

logger.info("Writing pivot file")
return summary_df

def write_fn(out_matrix, out_fp):
coo_matrix = out_matrix.tocoo()
write_df(coo_matrix, out_fp, do_overwrite=cfg.do_overwrite)

rwlock_wrap(
shard_fp,
ts_fp,
read_fn,
write_fn,
compute_fn,
do_overwrite=cfg.do_overwrite,
do_return=False,
)


if __name__ == "__main__":
Expand Down
80 changes: 43 additions & 37 deletions scripts/tabularize_static.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
#!/usr/bin/env python
"""Tabularizes static data in MEDS format into tabular representations."""

import json
from itertools import product
from pathlib import Path

import hydra
import numpy as np
import polars as pl
from omegaconf import DictConfig, OmegaConf

from MEDS_tabular_automl.file_name import FileNameResolver
from MEDS_tabular_automl.generate_static_features import get_flat_static_rep
from MEDS_tabular_automl.mapper import wrap as rwlock_wrap
from MEDS_tabular_automl.utils import setup_environment, write_df
from MEDS_tabular_automl.utils import hydra_loguru_init, load_tqdm, write_df

pl.enable_string_cache()

Expand Down Expand Up @@ -96,44 +100,46 @@ def tabularize_static_data(
.. _link: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.groupby_rolling.html # noqa: E501
"""
flat_dir, split_to_fp, feature_columns = setup_environment(cfg, load_data=False)

# Produce static representation
static_subdir = flat_dir / "static"

static_dfs = {}
for sp, shard_fps in split_to_fp.items():
static_dfs[sp] = []
sp_dir = static_subdir / sp

for i, shard_fp in enumerate(shard_fps):
fp = sp_dir / f"{i}.parquet"
static_dfs[sp].append(fp)
if fp.exists() and not cfg.do_overwrite:
raise FileExistsError(f"do_overwrite is {cfg.do_overwrite} and {fp} exists!")

def read_fn(in_fp):
return pl.scan_parquet(in_fp)

def compute_fn(shard_df):
return get_flat_static_rep(
feature_columns=feature_columns,
shard_df=shard_df,
)

def write_fn(data, out_df):
write_df(data, out_df, do_overwrite=cfg.do_overwrite)

rwlock_wrap(
shard_fp,
fp,
read_fn,
write_fn,
compute_fn,
do_overwrite=cfg.do_overwrite,
do_return=False,
iter_wrapper = load_tqdm(cfg.tqdm)
if not cfg.test:
hydra_loguru_init()
f_name_resolver = FileNameResolver(cfg)
# Produce ts representation
meds_shard_fps = f_name_resolver.list_meds_files()
# f_name_resolver.get_meds_dir()
feature_columns = json.load(open(f_name_resolver.get_feature_columns_fp()))

# shuffle tasks
tabularization_tasks = list(product(meds_shard_fps, cfg.window_sizes, cfg.aggs))
np.random.shuffle(tabularization_tasks)

for shard_fp in iter_wrapper(meds_shard_fps):
static_fp = f_name_resolver.get_flat_static_rep(shard_fp.parent.stem, shard_fp.stem)
if static_fp.exists() and not cfg.do_overwrite:
raise FileExistsError(f"do_overwrite is {cfg.do_overwrite} and {static_fp} exists!")

def read_fn(in_fp):
return pl.scan_parquet(in_fp)

def compute_fn(shard_df):
return get_flat_static_rep(
feature_columns=feature_columns,
shard_df=shard_df,
)

def write_fn(data, out_df):
write_df(data, out_df, do_overwrite=cfg.do_overwrite)

rwlock_wrap(
shard_fp,
static_fp,
read_fn,
write_fn,
compute_fn,
do_overwrite=cfg.do_overwrite,
do_return=False,
)


if __name__ == "__main__":
tabularize_static_data()
65 changes: 0 additions & 65 deletions scripts/tabularize_ts.py

This file was deleted.

Loading

0 comments on commit 7ea3230

Please sign in to comment.