Skip to content

Commit

Permalink
Added some starter code for these changes; more will be needed.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcdermott committed Aug 10, 2024
1 parent c9595f2 commit 3314937
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ classifiers = [
]
dependencies = [
"polars", "pyarrow", "loguru", "hydra-core", "numpy", "scipy<1.14.0", "pandas", "tqdm", "xgboost",
"scikit-learn", "hydra-optuna-sweeper", "hydra-joblib-launcher", "ml-mixins", "meds==0.3"
"scikit-learn", "hydra-optuna-sweeper", "hydra-joblib-launcher", "ml-mixins", "meds==0.3",
#"MEDS-transforms==0.0.4",
]

[project.scripts]
Expand Down
2 changes: 1 addition & 1 deletion src/MEDS_tabular_automl/configs/describe_codes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defaults:
- default
- _self_

input_dir: ${MEDS_cohort_dir}/data
input_dir: ${output_cohort_dir}/data
# Where to store output code frequency data
output_filepath: ${output_cohort_dir}/metadata/codes.parquet

Expand Down
2 changes: 1 addition & 1 deletion src/MEDS_tabular_automl/configs/tabularization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defaults:
# Raw data
# Where the code metadata is stored
input_code_metadata_fp: ${output_cohort_dir}/metadata/codes.parquet
input_dir: ${MEDS_cohort_dir}/data
input_dir: ${output_cohort_data}/data
output_dir: ${output_cohort_dir}/tabularize

name: tabularization
35 changes: 16 additions & 19 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ def test_integration():
cfg = compose(config_name="describe_codes", overrides=overrides) # config.yaml

# Create the directories
(MEDS_cohort_dir / "data").mkdir(parents=True, exist_ok=True)
(output_cohort_dir / "data").mkdir(parents=True, exist_ok=True)

# Store MEDS outputs
all_data = []
for split, data in MEDS_OUTPUTS.items():
file_path = MEDS_cohort_dir / "data" / f"{split}.parquet"
file_path = output_cohort_dir / "data" / f"{split}.parquet"
file_path.parent.mkdir(exist_ok=True)
df = pl.read_csv(StringIO(data))
df.with_columns(pl.col("time").str.to_datetime("%Y-%m-%dT%H:%M:%S%.f")).write_parquet(file_path)
all_data.append(df)

all_data = pl.concat(all_data, how="diagnoal_relaxed")

# Check the files are not empty
meds_files = list_subdir_files(Path(cfg.input_dir), "parquet")
Expand All @@ -84,7 +88,7 @@ def test_integration():
for f in meds_files:
assert pl.read_parquet(f).shape[0] > 0, "MEDS Data Tabular Dataframe Should not be Empty!"
split_json = json.load(StringIO(SPLITS_JSON))
splits_fp = MEDS_cohort_dir / "splits.json"
splits_fp = output_cohort_dir / ".shards.json"
json.dump(split_json, splits_fp.open("w"))

# Step 1: Run the describe_codes script
Expand Down Expand Up @@ -205,22 +209,15 @@ def test_integration():
): # path to config.yaml
overrides = [f"{k}={v}" for k, v in cache_config.items()]
cfg = compose(config_name="task_specific_caching", overrides=overrides) # config.yaml
# Create fake labels
for f in list_subdir_files(Path(cfg.MEDS_cohort_dir) / "data", "parquet"):
df = pl.scan_parquet(f)
df = get_unique_time_events_df(get_events_df(df, feature_columns)).collect()
pseudo_labels = pl.Series(([0, 1] * df.shape[0])[: df.shape[0]])
df = df.with_columns(pl.Series(name="label", values=pseudo_labels))
df = df.select(pl.col(["patient_id", "time", "label"]))
df = df.with_row_index("event_id")

split = f.parent.stem
shard_num = f.stem
out_f = Path(cfg.input_label_dir) / Path(
get_shard_prefix(Path(cfg.MEDS_cohort_dir) / "data", f)
).with_suffix(".parquet")
out_f.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_f)

df = get_unique_time_events_df(get_events_df(all_data, feature_columns)).collect()
pseudo_labels = pl.Series(([0, 1] * df.shape[0])[: df.shape[0]])
df = df.with_columns(pl.Series(name="boolean_value", values=pseudo_labels))
df = df.select("patient_id", pl.col("time").alias("prediction_time"), "boolean_value")

out_fp = Path(cfg.input_label_dir) / "0.parquet"
out_fp.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_fp)

stderr, stdout_ws = run_command("generate-subsets", ["[30d]"], {}, "generate-subsets window_sizes")
stderr, stdout_agg = run_command(
Expand Down
33 changes: 15 additions & 18 deletions tests/test_tabularize.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,18 @@ def test_tabularize():
cfg = compose(config_name="describe_codes", overrides=overrides) # config.yaml

# Create the directories
(MEDS_cohort_dir / "data").mkdir(parents=True, exist_ok=True)
(output_cohort_dir / "data").mkdir(parents=True, exist_ok=True)

# Store MEDS outputs
all_data = []
for split, data in MEDS_OUTPUTS.items():
file_path = MEDS_cohort_dir / "data" / f"{split}.parquet"
file_path = output_cohort_dir / "data" / f"{split}.parquet"
file_path.parent.mkdir(exist_ok=True)
df = pl.read_csv(StringIO(data))
df.with_columns(pl.col("time").str.to_datetime("%Y-%m-%dT%H:%M:%S%.f")).write_parquet(file_path)
all_data.append(df)

all_data = pl.concat(all_data, how="diagnoal_relaxed")

# Check the files are not empty
meds_files = list_subdir_files(Path(cfg.input_dir), "parquet")
Expand All @@ -187,7 +191,7 @@ def test_tabularize():
for f in meds_files:
assert pl.read_parquet(f).shape[0] > 0, "MEDS Data Tabular Dataframe Should not be Empty!"
split_json = json.load(StringIO(SPLITS_JSON))
splits_fp = MEDS_cohort_dir / "splits.json"
splits_fp = output_cohort_dir / ".shards.json"
json.dump(split_json, splits_fp.open("w"))
# Step 1: Describe Codes - compute code frequencies
describe_codes.main(cfg)
Expand Down Expand Up @@ -293,21 +297,14 @@ def test_tabularize():
cfg = compose(config_name="task_specific_caching", overrides=overrides) # config.yaml

# Create fake labels
for f in list_subdir_files(Path(cfg.MEDS_cohort_dir) / "data", "parquet"):
df = pl.scan_parquet(f)
df = get_unique_time_events_df(get_events_df(df, feature_columns)).collect()
pseudo_labels = pl.Series(([0, 1] * df.shape[0])[: df.shape[0]])
df = df.with_columns(pl.Series(name="label", values=pseudo_labels))
df = df.select(pl.col(["patient_id", "time", "label"]))
df = df.with_row_index("event_id")

split = f.parent.stem
shard_num = f.stem
out_f = Path(cfg.input_label_dir) / Path(
get_shard_prefix(Path(cfg.MEDS_cohort_dir) / "data", f)
).with_suffix(".parquet")
out_f.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_f)
df = get_unique_time_events_df(get_events_df(all_data, feature_columns)).collect()
pseudo_labels = pl.Series(([0, 1] * df.shape[0])[: df.shape[0]])
df = df.with_columns(pl.Series(name="boolean_value", values=pseudo_labels))
df = df.select("patient_id", pl.col("time").alias("prediction_time"), "boolean_value")

out_fp = Path(cfg.input_label_dir) / "0.parquet"
out_fp.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out_fp)

cache_task.main(cfg)

Expand Down

0 comments on commit 3314937

Please sign in to comment.