Skip to content

Commit

Permalink
Merge branch 'esgpt_caching' into xgboost
Browse files Browse the repository at this point in the history
  • Loading branch information
teyaberg committed Jun 2, 2024
2 parents 91a2056 + c8f4144 commit f7605b5
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 80 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@ jobs:

- name: Install packages
run: |
pip install -e .
pip install pytest
pip install pytest-cov[toml]
pip install -e .[tests]
#----------------------------------------------
# run test suite
#----------------------------------------------
- name: Run tests
run: |
pytest -v --doctest-modules --cov
pytest -v --doctest-modules --cov --ignore=hf_cohort/
- name: Upload coverage to Codecov
uses: codecov/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion hf_cohort/aces_task_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main(cfg):
.rename({"trigger": "timestamp", "subject_id": "patient_id"})
.sort(by=["patient_id", "timestamp"])
)
feature_columns = json.read(Path(cfg.tabularized_data_dir) / "feature_columns.json")
feature_columns = json.load(open(Path(cfg.tabularized_data_dir) / "feature_columns.json"))
data_df = pl.scan_parquet(in_fp)
data_df = get_unique_time_events_df(get_events_df(data_df, feature_columns))
data_df = data_df.drop(["code", "numerical_value"])
Expand Down
44 changes: 32 additions & 12 deletions hf_cohort/hf_cohort_e2e.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env bash

METHOD=meds

MEDS_DIR=/storage/shared/meds_tabular_ml/ebcl_dataset/processed
OUTPUT_DIR=/storage/shared/meds_tabular_ml/ebcl_dataset/processed/tabularize
# N_PARALLEL_WORKERS="$1"
WINDOW_SIZES="window_sizes=[1d]"
AGGS="aggs=[code/count,value/sum]"
N_PARALLEL_WORKERS="$1"
WINDOW_SIZES="window_sizes=[1d,7d,30d,365d,full]"
AGGS="aggs=[static/present,code/count,value/count,value/sum,value/sum_sqd,value/min,value/max]"
# WINDOW_SIZES="window_sizes=[1d,7d,30d,365d,full]"
# AGGS="aggs=[static/present,static/first,code/count,value/count,value/sum,value/sum_sqd,value/min,value/max]"

Expand All @@ -21,12 +23,30 @@ POLARS_MAX_THREADS=32 python scripts/tabularize_static.py \
tabularized_data_dir=$OUTPUT_DIR \
min_code_inclusion_frequency=1 "$WINDOW_SIZES" do_overwrite=False "$AGGS"

echo "Running summarize_over_windows.py with $N_PARALLEL_WORKERS workers in parallel"
POLARS_MAX_THREADS=1 python scripts/summarize_over_windows.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
MEDS_cohort_dir=$MEDS_DIR \
tabularized_data_dir=$OUTPUT_DIR \
min_code_inclusion_frequency=1 do_overwrite=False \
"$WINDOW_SIZES" "$AGGS"

ID=$RANDOM
LOG_DIR="logs/$METHOD/$ID-logs"
mkdir -p $LOG_DIR
{ time \
mprof run --include-children --exit-code --output "$LOG_DIR/mprofile.dat" \
POLARS_MAX_THREADS=1 python scripts/summarize_over_windows.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
MEDS_cohort_dir=$MEDS_DIR \
tabularized_data_dir=$OUTPUT_DIR \
min_code_inclusion_frequency=1 do_overwrite=False \
"$WINDOW_SIZES" "$AGGS" \
2> $LOG_DIR/cmd.stderr
} 2> $LOG_DIR/timings.txt

cmd_exit_status=${PIPESTATUS[0]}
# Check the exit status of the second command in the pipeline (mprof run ...)
if [ -n "$cmd_exit_status" ] && [ "$cmd_exit_status" -ne 0 ]; then
echo "build_dataset.sh failed with status $cmd_exit_status."
echo "Stderr from build_dataset.sh (see $LOG_DIR/cmd.stderr):"
tail $LOG_DIR/cmd.stderr
exit "$cmd_exit_status"
fi
mprof plot -o $LOG_DIR/mprofile.png $LOG_DIR/mprofile.dat
mprof peak $LOG_DIR/mprofile.dat > $LOG_DIR/peak_memory_usage.txt
10 changes: 8 additions & 2 deletions src/MEDS_tabular_automl/file_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
class FileNameResolver:
def __init__(self, cfg: DictConfig):
self.cfg = cfg
self.meds_dir = Path(cfg.MEDS_cohort_dir)
self.tabularize_dir = Path(cfg.tabularized_data_dir)

@property
def meds_dir(self):
return Path(self.cfg.MEDS_cohort_dir)

@property
def tabularize_dir(self):
return Path(self.cfg.tabularized_data_dir)

def get_meds_dir(self):
return self.meds_dir / "final_cohort"
Expand Down
2 changes: 1 addition & 1 deletion src/MEDS_tabular_automl/generate_static_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

def convert_to_matrix(df, num_events, num_features):
"""Converts a Polars DataFrame to a sparse matrix."""
dense_matrix = df.drop(columns="patient_id").collect().to_numpy()
dense_matrix = df.drop("patient_id").collect().to_numpy()
data_list = []
rows = []
cols = []
Expand Down
72 changes: 26 additions & 46 deletions src/MEDS_tabular_automl/generate_summarized_reps.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def sparse_rolling(df, sparse_matrix, timedelta, agg):
def get_rolling_window_indicies(index_df, window_size):
"""Get the indices for the rolling windows."""
if window_size == "full":
timedelta = pd.Timedelta(150*52, unit="W") # just use 150 years as time delta
timedelta = pd.Timedelta(150 * 52, unit="W") # just use 150 years as time delta
else:
timedelta = pd.Timedelta(window_size)
return (
Expand Down Expand Up @@ -168,34 +168,17 @@ def compute_agg(index_df, matrix: sparray, window_size: str, agg: str, num_featu
if agg is a code aggregation or only value columns if it is a value aggreagation.
Example:
>>> from MEDS_tabular_automl.generate_ts_features import get_flat_ts_rep
>>> feature_columns = ['A/value/sum', 'A/code/count', 'B/value/sum', 'B/code/count',
... "C/value/sum", "C/code/count", "A/static/present"]
>>> data = {'patient_id': [1, 1, 1, 2, 2, 2],
... 'code': ['A', 'A', 'B', 'B', 'C', 'C'],
... 'timestamp': ['2021-01-01', '2021-01-01', '2020-01-01', '2021-01-04', None, None],
... 'numerical_value': [1, 2, 2, 2, 3, 4]}
>>> df = pl.DataFrame(data).lazy()
>>> df = get_flat_ts_rep(feature_columns, df)
>>> df
patient_id timestamp A/value B/value C/value A/code B/code C/code
0 1 2021-01-01 1 0 0 1 0 0
1 1 2021-01-01 2 0 0 1 0 0
2 1 2020-01-01 0 2 0 0 1 0
3 2 2021-01-04 0 2 0 0 1 0
>>> df['timestamp'] = pd.to_datetime(df['timestamp'])
>>> df.dtypes
patient_id int64
timestamp datetime64[ns]
A/value Sparse[int64, 0]
B/value Sparse[int64, 0]
C/value Sparse[int64, 0]
A/code Sparse[int64, 0]
B/code Sparse[int64, 0]
C/code Sparse[int64, 0]
dtype: object
>>> output = compute_agg(df[['patient_id', 'timestamp', 'A/code', 'B/code', 'C/code']],
... "1d", "code/count")
>>> from datetime import datetime
>>> df = pd.DataFrame({
... "patient_id": [1, 1, 1, 2],
... "timestamp": [
... datetime(2021, 1, 1), datetime(2021, 1, 1), datetime(2020, 1, 3), datetime(2021, 1, 4)
... ],
... "A/code": [1, 1, 0, 0],
... "B/code": [0, 0, 1, 1],
... "C/code": [0, 0, 0, 0],
... })
>>> output = compute_agg(df, "1d", "code/count")
>>> output
1d/A/code/count 1d/B/code/count 1d/C/code/count timestamp patient_id
0 1 0 0 2021-01-01 1
Expand Down Expand Up @@ -247,23 +230,20 @@ def _generate_summary(
- pl.LazyFrame: The summarized data frame.
Expect:
>>> from MEDS_tabular_automl.generate_ts_features import get_flat_ts_rep
>>> feature_columns = ['A/value/sum', 'A/code/count', 'B/value/sum', 'B/code/count',
... "C/value/sum", "C/code/count", "A/static/present"]
>>> data = {'patient_id': [1, 1, 1, 2, 2, 2],
... 'code': ['A', 'A', 'B', 'B', 'C', 'C'],
... 'timestamp': ['2021-01-01', '2021-01-01', '2020-01-01', '2021-01-04', None, None],
... 'numerical_value': [1, 2, 2, 2, 3, 4]}
>>> df = pl.DataFrame(data).lazy()
>>> pivot_df = get_flat_ts_rep(feature_columns, df)
>>> pivot_df['timestamp'] = pd.to_datetime(pivot_df['timestamp'])
>>> pivot_df
patient_id timestamp A/value B/value C/value A/code B/code C/code
0 1 2021-01-01 1 0 0 1 0 0
1 1 2021-01-01 2 0 0 1 0 0
2 1 2020-01-01 0 2 0 0 1 0
3 2 2021-01-04 0 2 0 0 1 0
>>> _generate_summary(pivot_df, "full", "value/sum")
>>> from datetime import datetime
>>> wide_df = pd.DataFrame({
... "patient_id": [1, 1, 1, 2],
... "timestamp": [
... datetime(2021, 1, 1), datetime(2021, 1, 1), datetime(2020, 1, 3), datetime(2021, 1, 4)
... ],
... "A/code": [1, 1, 0, 0],
... "B/code": [0, 0, 1, 1],
... "C/code": [0, 0, 0, 0],
... "A/value": [1, 2, 0, 0],
... "B/value": [0, 0, 2, 2],
... "C/value": [0, 0, 0, 0],
... })
>>> _generate_summary(wide_df, "full", "value/sum")
full/A/value/count full/B/value/count full/C/value/count timestamp patient_id
0 1 0 0 2021-01-01 1
1 3 0 0 2021-01-01 1
Expand Down
17 changes: 8 additions & 9 deletions src/MEDS_tabular_automl/generate_ts_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ def feature_name_to_code(feature_name: str) -> str:
"""Converts a feature name to a code name."""
return "/".join(feature_name.split("/")[:-1])


def get_long_code_df(df, ts_columns):
"""Pivots the codes data frame to a long format one-hot rep for time series data."""
column_to_int = {feature_name_to_code(col): i for i, col in enumerate(ts_columns)}
rows = range(df.select(pl.len()).collect().item())
cols = (
df.with_columns(
pl.col("code").cast(str).replace(column_to_int).cast(int).alias("code_index")
)
df.with_columns(pl.col("code").cast(str).replace(column_to_int).cast(int).alias("code_index"))
.select("code_index")
.collect()
.to_series()
Expand All @@ -42,12 +41,12 @@ def get_long_code_df(df, ts_columns):
def get_long_value_df(df, ts_columns):
"""Pivots the numerical value data frame to a long format for time series data."""
column_to_int = {feature_name_to_code(col): i for i, col in enumerate(ts_columns)}
value_df = df.with_row_index("index").drop_nulls("numerical_value").filter(pl.col("code").is_in(ts_columns))
value_df = (
df.with_row_index("index").drop_nulls("numerical_value").filter(pl.col("code").is_in(ts_columns))
)
rows = value_df.select(pl.col("index")).collect().to_series().to_numpy()
cols = (
value_df.with_columns(
pl.col("code").cast(str).replace(column_to_int).cast(int).alias("value_index")
)
value_df.with_columns(pl.col("code").cast(str).replace(column_to_int).cast(int).alias("value_index"))
.select("value_index")
.collect()
.to_series()
Expand Down Expand Up @@ -104,10 +103,10 @@ def summarize_dynamic_measurements(

# Generate sparse matrix
if agg in CODE_AGGREGATIONS:
code_df = df.drop(columns=id_cols + ["numerical_value"])
code_df = df.drop(*(id_cols + ["numerical_value"]))
data, (rows, cols) = get_long_code_df(code_df, ts_columns)
elif agg in VALUE_AGGREGATIONS:
value_df = df.drop(columns=id_cols)
value_df = df.drop(*id_cols)
data, (rows, cols) = get_long_value_df(value_df, ts_columns)

sp_matrix = csr_array(
Expand Down
8 changes: 4 additions & 4 deletions src/MEDS_tabular_automl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> list[str]:
static_df = shard_df.filter(
pl.col("patient_id").is_not_null() & pl.col("code").is_not_null() & pl.col("timestamp").is_null()
)
static_code_freqs_df = static_df.groupby("code").agg(pl.count("code").alias("count")).collect()
static_code_freqs_df = static_df.group_by("code").agg(pl.count("code").alias("count")).collect()
static_code_freqs = {
row["code"] + "/static/present": row["count"] for row in static_code_freqs_df.iter_rows(named=True)
}

static_value_df = static_df.filter(pl.col("numerical_value").is_not_null())
static_value_freqs_df = (
static_value_df.groupby("code").agg(pl.count("numerical_value").alias("count")).collect()
static_value_df.group_by("code").agg(pl.count("numerical_value").alias("count")).collect()
)
static_value_freqs = {
row["code"] + "/static/first": row["count"] for row in static_value_freqs_df.iter_rows(named=True)
Expand All @@ -275,11 +275,11 @@ def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> list[str]:
ts_df = shard_df.filter(
pl.col("patient_id").is_not_null() & pl.col("code").is_not_null() & pl.col("timestamp").is_not_null()
)
code_freqs_df = ts_df.groupby("code").agg(pl.count("code").alias("count")).collect()
code_freqs_df = ts_df.group_by("code").agg(pl.count("code").alias("count")).collect()
code_freqs = {row["code"] + "/code": row["count"] for row in code_freqs_df.iter_rows(named=True)}

value_df = ts_df.filter(pl.col("numerical_value").is_not_null())
value_freqs_df = value_df.groupby("code").agg(pl.count("numerical_value").alias("count")).collect()
value_freqs_df = value_df.group_by("code").agg(pl.count("numerical_value").alias("count")).collect()
value_freqs = {row["code"] + "/value": row["count"] for row in value_freqs_df.iter_rows(named=True)}

combined_freqs = {**static_code_freqs, **static_value_freqs, **code_freqs, **value_freqs}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tabularize.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def test_tabularize():
file_path = MEDS_cohort_dir / "final_cohort" / f"{split}.parquet"
file_path.parent.mkdir(exist_ok=True)
df = pl.read_csv(StringIO(data))
df.with_columns(pl.col("timestamp").str.to_datetime("%Y-%m-%dT%H:%M:%S.%f")).write_parquet(
df.with_columns(pl.col("timestamp").str.to_datetime("%Y-%m-%dT%H:%M:%S%.f")).write_parquet(
file_path
)

Expand Down

0 comments on commit f7605b5

Please sign in to comment.