From bc12b74f0aeb8a847b44e56f858aa94dad70c2a9 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:08:45 -0400 Subject: [PATCH 01/48] Updated docstring of first funcion of generate_static_features.py --- src/MEDS_tabular_automl/generate_static_features.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index c2164c4..d34bc86 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -24,8 +24,17 @@ ) -def convert_to_matrix(df, num_events, num_features): - """Converts a Polars DataFrame to a sparse matrix.""" +def convert_to_matrix(df: pl.DataFrame, num_events: int, num_features: int) -> csr_array: + """Converts a Polars DataFrame to a sparse matrix. + + Args: + df: The DataFrame to convert. + num_events: Number of events to set matrix dimension. + num_features: Number of features to set matrix dimension. + + Returns: + A sparse matrix representation of the DataFrame. + """ dense_matrix = df.drop("patient_id").collect().to_numpy() data_list = [] rows = [] From bbc1b96a38319388b861e8fab48b3164944d7798 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:16:30 -0400 Subject: [PATCH 02/48] Updated docstring of second function --- .../generate_static_features.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index d34bc86..b577633 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -50,18 +50,19 @@ def convert_to_matrix(df: pl.DataFrame, num_events: int, num_features: int) -> c return matrix -def get_sparse_static_rep(static_features, static_df, meds_df, feature_columns) -> coo_array: - """Merges static and time-series dataframes. - - This function merges the static and time-series dataframes based on the patient_id column. +def get_sparse_static_rep( + static_features: list[str], static_df: pl.DataFrame, meds_df: pl.DataFrame, feature_columns: list[str] +) -> coo_array: + """Merges static and time-series dataframes into a sparse representation based on the patient_id column. Args: - - feature_columns (List[str]): A list of feature columns to include in the merged dataframe. - - static_df (pd.DataFrame): A dataframe containing static features. - - ts_df (pd.DataFrame): A dataframe containing time-series features. + static_features: A list of static feature names. + static_df: A DataFrame containing static features. + meds_df: A DataFrame containing time-series features. + feature_columns (list[str]): A list of feature columns to include in the merged DataFrame. Returns: - - pd.DataFrame: A merged dataframe containing static and time-series features. + A sparse array representation of the merged static and time-series features. """ # Make static data sparse and merge it with the time-series data logger.info("Make static data sparse and merge it with the time-series data") From 8a8a61830defb7f453682f2c52a32fe4dbd57b67 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:24:09 -0400 Subject: [PATCH 03/48] Updated docstring of third function --- .../generate_static_features.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index b577633..bfd2459 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -99,18 +99,17 @@ def summarize_static_measurements( ) -> pl.LazyFrame: """Aggregates static measurements for feature columns that are marked as 'present' or 'first'. - Parameters: - - feature_columns (list[str]): List of feature column identifiers that are specifically marked - for staticanalysis. - - df (DF_T): Data frame from which features will be extracted and summarized. - - Returns: - - pl.LazyFrame: A LazyFrame containing the summarized data pivoted by 'patient_id' - for each static feature. - This function first filters for features that need to be recorded as the first occurrence or simply as present, then performs a pivot to reshape the data for each patient, providing a tabular format where each row represents a patient and each column represents a static feature. + + Args: + agg: The type of aggregation ('present' or 'first'). + feature_columns: A list of feature column identifiers marked for static analysis. + df: DataFrame from which features will be extracted and summarized. + + Returns: + A LazyFrame containing summarized data pivoted by 'patient_id' for each static feature. """ if agg == STATIC_VALUE_AGGREGATION: static_features = get_feature_names(agg=agg, feature_columns=feature_columns) From 6b7264ee88b6fb213d555d7c8aa8619621262009 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:34:07 -0400 Subject: [PATCH 04/48] Updated docstring of fourth function --- .../generate_static_features.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index bfd2459..c48798a 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -4,7 +4,9 @@ efficient data manipulation. Functions: -- _summarize_static_measurements: Summarizes static measurements from a given DataFrame. +- convert_to_matrix: Converts a Polars DataFrame to a sparse matrix. +- get_sparse_static_rep: Merges static and time-series dataframes into a sparse representation. +- summarize_static_measurements: Summarizes static measurements from a given DataFrame. - get_flat_static_rep: Produces a tabular representation of static data features. """ @@ -170,16 +172,17 @@ def get_flat_static_rep( ) -> coo_array: """Produces a raw representation for static data from a specified shard DataFrame. - Parameters: - - feature_columns (list[str]): List of feature columns to include in the static representation. - - shard_df (DF_T): The shard DataFrame containing patient data. + This function selects the appropriate static features, summarizes them using + summarize_static_measurements, and then normalizes the resulting data to ensure + it is suitable for further analysis or machine learning tasks. - Returns: - - pl.LazyFrame: A LazyFrame that includes all static features for the data provided. + Args: + agg: The aggregation method for static data. + feature_columns: A list of feature columns to include. + shard_df: The shard DataFrame containing the patient data. - This function selects the appropriate static features, summarizes them using - _summarize_static_measurements, and then normalizes the resulting data to ensure it is - suitable for further analysis or machine learning tasks. + Returns: + A LazyFrame sparse array representing the static features for the provided shard of data. """ static_features = get_feature_names(agg=agg, feature_columns=feature_columns) static_measurements = summarize_static_measurements(agg, static_features, df=shard_df) From 48f173571a6b5f0973efec87456c8d4dc4c1ed61 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:42:58 -0400 Subject: [PATCH 05/48] Added docstring of first funcion of generate_summarized_reps.py --- .../generate_summarized_reps.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index 55f8ede..b145937 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -16,7 +16,19 @@ ) -def sparse_aggregate(sparse_matrix, agg): +def sparse_aggregate(sparse_matrix: sparray, agg: str) -> sparray: + """Aggregates values in a sparse matrix according to the specified method. + + Args: + sparse_matrix: The sparse matrix to aggregate. + agg: The aggregation method to apply, such as 'sum', 'min', 'max', 'sum_sqd', or 'count'. + + Returns: + sparray: The aggregated sparse matrix. + + Raises: + ValueError: If the aggregation method is not implemented. + """ if agg == "sum": merged_matrix = sparse_matrix.sum(axis=0, dtype=sparse_matrix.dtype) elif agg == "min": From 86c529fbcc296ecf9dbbde0f353a4520acce027d Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:48:42 -0400 Subject: [PATCH 06/48] Updated docstring of second function --- src/MEDS_tabular_automl/generate_summarized_reps.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index b145937..b4e8885 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -44,8 +44,16 @@ def sparse_aggregate(sparse_matrix: sparray, agg: str) -> sparray: return merged_matrix -def get_rolling_window_indicies(index_df, window_size): - """Get the indices for the rolling windows.""" +def get_rolling_window_indicies(index_df: pl.DataFrame, window_size: str) -> pl.DataFrame: + """Computes the start and end indices for rolling window operations on a DataFrame. + + Args: + index_df: The DataFrame containing the indices. + window_size: The size of the window as a string denoting time, e.g., '7d' for 7 days. + + Returns: + DataFrame with columns 'min_index' and 'max_index' representing the range of each window. + """ if window_size == "full": timedelta = pd.Timedelta(150 * 52, unit="W") # just use 150 years as time delta else: From bfd04fc06b1ed92a82bbc539f04562242572f56d Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:52:55 -0400 Subject: [PATCH 07/48] Updated docstring of third function --- .../generate_summarized_reps.py | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index b4e8885..88956ec 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -67,8 +67,24 @@ def get_rolling_window_indicies(index_df: pl.DataFrame, window_size: str) -> pl. ) -def aggregate_matrix(windows, matrix, agg, num_features, use_tqdm=False): - """Aggregate the matrix based on the windows.""" +def aggregate_matrix( + windows: pl.DataFrame, matrix: sparray, agg: str, num_features: int, use_tqdm: bool = False +) -> csr_array: + """Aggregates a matrix according to defined windows and specified aggregation method. + + Args: + windows: DataFrame containing 'min_index' and 'max_index' for each window. + matrix: The matrix to aggregate. + agg: The aggregation method to apply. + num_features: Number of features in the matrix. + use_tqdm: Flag to enable progress display. + + Returns: + Aggregated sparse matrix. + + Raises: + TypeError: If the type of the aggregated matrix is not compatible for further operations. + """ tqdm = load_tqdm(use_tqdm) agg = agg.split("/")[-1] matrix = csr_array(matrix) From 8509f4f22fb8ec8673175e641dd5bbbd017e484f Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:58:34 -0400 Subject: [PATCH 08/48] Updated docstring of fourth function --- .../generate_summarized_reps.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index 88956ec..e506c78 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -122,12 +122,30 @@ def aggregate_matrix( return out_matrix -def compute_agg(index_df, matrix: sparray, window_size: str, agg: str, num_features: int, use_tqdm=False): - """Applies aggreagtion to dataframe. +def compute_agg( + index_df: pl.DataFrame, + matrix: sparray, + window_size: str, + agg: str, + num_features: int, + use_tqdm: bool = False, +) -> sparray: + """Applies aggregation to a sparse matrix using rolling window indices derived from a DataFrame. - Dataframe is expected to only have the relevant columns for aggregating It should have the patient_id and + Dataframe is expected to only have the relevant columns for aggregating. It should have the patient_id and timestamp columns, and then only code columns if agg is a code aggregation or only value columns if it is a value aggreagation. + + Args: + index_df: DataFrame with 'patient_id' and 'timestamp' columns used for grouping. + matrix: Sparse matrix to be aggregated. + window_size: String defining the rolling window size. + agg: String specifying the aggregation method. + num_features: Number of features in the matrix. + use_tqdm: Flag to enable or disable tqdm progress bar. + + Returns: + The aggregated sparse matrix. """ group_df = ( index_df.with_row_index("index") From a198954bee5ec8ef18b05cf96e644bf762434d13 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 14:12:21 -0400 Subject: [PATCH 09/48] Updated docstring of fifth function --- .../generate_summarized_reps.py | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index e506c78..1336c2e 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -24,7 +24,7 @@ def sparse_aggregate(sparse_matrix: sparray, agg: str) -> sparray: agg: The aggregation method to apply, such as 'sum', 'min', 'max', 'sum_sqd', or 'count'. Returns: - sparray: The aggregated sparse matrix. + The aggregated sparse matrix. Raises: ValueError: If the aggregation method is not implemented. @@ -73,11 +73,11 @@ def aggregate_matrix( """Aggregates a matrix according to defined windows and specified aggregation method. Args: - windows: DataFrame containing 'min_index' and 'max_index' for each window. + windows: The DataFrame containing 'min_index' and 'max_index' for each window. matrix: The matrix to aggregate. agg: The aggregation method to apply. - num_features: Number of features in the matrix. - use_tqdm: Flag to enable progress display. + num_features: The number of features in the matrix. + use_tqdm: The flag to enable progress display. Returns: Aggregated sparse matrix. @@ -129,7 +129,7 @@ def compute_agg( agg: str, num_features: int, use_tqdm: bool = False, -) -> sparray: +) -> csr_array: """Applies aggregation to a sparse matrix using rolling window indices derived from a DataFrame. Dataframe is expected to only have the relevant columns for aggregating. It should have the patient_id and @@ -137,12 +137,12 @@ def compute_agg( a value aggreagation. Args: - index_df: DataFrame with 'patient_id' and 'timestamp' columns used for grouping. - matrix: Sparse matrix to be aggregated. - window_size: String defining the rolling window size. - agg: String specifying the aggregation method. - num_features: Number of features in the matrix. - use_tqdm: Flag to enable or disable tqdm progress bar. + index_df: The DataFrame with 'patient_id' and 'timestamp' columns used for grouping. + matrix: The sparse matrix to be aggregated. + window_size: The string defining the rolling window size. + agg: The string specifying the aggregation method. + num_features: The number of features in the matrix. + use_tqdm: The flag to enable or disable tqdm progress bar. Returns: The aggregated sparse matrix. @@ -165,23 +165,28 @@ def compute_agg( def _generate_summary( - ts_columns: list[str], index_df: pd.DataFrame, matrix: sparray, window_size: str, agg: str, - num_features, - use_tqdm=False, -) -> pl.LazyFrame: + num_features: int, + use_tqdm: bool = False, +) -> csr_array: """Generate a summary of the data frame for a given window size and aggregation. Args: - - df (DF_T): The data frame to summarize. - - window_size (str): The window size to use for the summary. - - agg (str): The aggregation to apply to the data frame. + index_df: The DataFrame with index and grouping information. + matrix: The sparse matrix containing the data to aggregate. + window_size: The size of the rolling window used for summary. + agg: The aggregation function to apply. + num_features: The total number of features to handle. + use_tqdm: The flag to enable or disable progress display. Returns: - - pl.LazyFrame: The summarized data frame. + The summary of data as a sparse matrix. + + Raises: + ValueError: If the aggregation type is not supported. """ if agg not in CODE_AGGREGATIONS + VALUE_AGGREGATIONS: raise ValueError( From 2ed1966400755de926186a378ee7a79df2af3e35 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:08:45 -0400 Subject: [PATCH 10/48] Updated docstring of first funcion of generate_static_features.py --- src/MEDS_tabular_automl/generate_static_features.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index c2164c4..d34bc86 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -24,8 +24,17 @@ ) -def convert_to_matrix(df, num_events, num_features): - """Converts a Polars DataFrame to a sparse matrix.""" +def convert_to_matrix(df: pl.DataFrame, num_events: int, num_features: int) -> csr_array: + """Converts a Polars DataFrame to a sparse matrix. + + Args: + df: The DataFrame to convert. + num_events: Number of events to set matrix dimension. + num_features: Number of features to set matrix dimension. + + Returns: + A sparse matrix representation of the DataFrame. + """ dense_matrix = df.drop("patient_id").collect().to_numpy() data_list = [] rows = [] From a0d1992b2edb373b6512b849dfb156f4fd041b8c Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:16:30 -0400 Subject: [PATCH 11/48] Updated docstring of second function --- .../generate_static_features.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index d34bc86..b577633 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -50,18 +50,19 @@ def convert_to_matrix(df: pl.DataFrame, num_events: int, num_features: int) -> c return matrix -def get_sparse_static_rep(static_features, static_df, meds_df, feature_columns) -> coo_array: - """Merges static and time-series dataframes. - - This function merges the static and time-series dataframes based on the patient_id column. +def get_sparse_static_rep( + static_features: list[str], static_df: pl.DataFrame, meds_df: pl.DataFrame, feature_columns: list[str] +) -> coo_array: + """Merges static and time-series dataframes into a sparse representation based on the patient_id column. Args: - - feature_columns (List[str]): A list of feature columns to include in the merged dataframe. - - static_df (pd.DataFrame): A dataframe containing static features. - - ts_df (pd.DataFrame): A dataframe containing time-series features. + static_features: A list of static feature names. + static_df: A DataFrame containing static features. + meds_df: A DataFrame containing time-series features. + feature_columns (list[str]): A list of feature columns to include in the merged DataFrame. Returns: - - pd.DataFrame: A merged dataframe containing static and time-series features. + A sparse array representation of the merged static and time-series features. """ # Make static data sparse and merge it with the time-series data logger.info("Make static data sparse and merge it with the time-series data") From 9fd36c7b8c6d9e37f7d4f1dc8c5e888f814a9a38 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:24:09 -0400 Subject: [PATCH 12/48] Updated docstring of third function --- .../generate_static_features.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index b577633..bfd2459 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -99,18 +99,17 @@ def summarize_static_measurements( ) -> pl.LazyFrame: """Aggregates static measurements for feature columns that are marked as 'present' or 'first'. - Parameters: - - feature_columns (list[str]): List of feature column identifiers that are specifically marked - for staticanalysis. - - df (DF_T): Data frame from which features will be extracted and summarized. - - Returns: - - pl.LazyFrame: A LazyFrame containing the summarized data pivoted by 'patient_id' - for each static feature. - This function first filters for features that need to be recorded as the first occurrence or simply as present, then performs a pivot to reshape the data for each patient, providing a tabular format where each row represents a patient and each column represents a static feature. + + Args: + agg: The type of aggregation ('present' or 'first'). + feature_columns: A list of feature column identifiers marked for static analysis. + df: DataFrame from which features will be extracted and summarized. + + Returns: + A LazyFrame containing summarized data pivoted by 'patient_id' for each static feature. """ if agg == STATIC_VALUE_AGGREGATION: static_features = get_feature_names(agg=agg, feature_columns=feature_columns) From 9e733f1feb5ee2cabb1279608a93da915a409c7b Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 13:34:07 -0400 Subject: [PATCH 13/48] Updated docstring of fourth function --- .../generate_static_features.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index bfd2459..c48798a 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -4,7 +4,9 @@ efficient data manipulation. Functions: -- _summarize_static_measurements: Summarizes static measurements from a given DataFrame. +- convert_to_matrix: Converts a Polars DataFrame to a sparse matrix. +- get_sparse_static_rep: Merges static and time-series dataframes into a sparse representation. +- summarize_static_measurements: Summarizes static measurements from a given DataFrame. - get_flat_static_rep: Produces a tabular representation of static data features. """ @@ -170,16 +172,17 @@ def get_flat_static_rep( ) -> coo_array: """Produces a raw representation for static data from a specified shard DataFrame. - Parameters: - - feature_columns (list[str]): List of feature columns to include in the static representation. - - shard_df (DF_T): The shard DataFrame containing patient data. + This function selects the appropriate static features, summarizes them using + summarize_static_measurements, and then normalizes the resulting data to ensure + it is suitable for further analysis or machine learning tasks. - Returns: - - pl.LazyFrame: A LazyFrame that includes all static features for the data provided. + Args: + agg: The aggregation method for static data. + feature_columns: A list of feature columns to include. + shard_df: The shard DataFrame containing the patient data. - This function selects the appropriate static features, summarizes them using - _summarize_static_measurements, and then normalizes the resulting data to ensure it is - suitable for further analysis or machine learning tasks. + Returns: + A LazyFrame sparse array representing the static features for the provided shard of data. """ static_features = get_feature_names(agg=agg, feature_columns=feature_columns) static_measurements = summarize_static_measurements(agg, static_features, df=shard_df) From 27f596f99157155610dd14a700d175413a5aef14 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 15:12:01 -0400 Subject: [PATCH 14/48] removed pl.LazyFrame alias DF_T incosistency --- src/MEDS_tabular_automl/generate_static_features.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_static_features.py b/src/MEDS_tabular_automl/generate_static_features.py index c48798a..5ad4b30 100644 --- a/src/MEDS_tabular_automl/generate_static_features.py +++ b/src/MEDS_tabular_automl/generate_static_features.py @@ -16,7 +16,6 @@ from scipy.sparse import coo_array, csr_array from MEDS_tabular_automl.utils import ( - DF_T, STATIC_CODE_AGGREGATION, STATIC_VALUE_AGGREGATION, get_events_df, @@ -97,7 +96,7 @@ def get_sparse_static_rep( def summarize_static_measurements( agg: str, feature_columns: list[str], - df: DF_T, + df: pl.LazyFrame, ) -> pl.LazyFrame: """Aggregates static measurements for feature columns that are marked as 'present' or 'first'. @@ -108,7 +107,7 @@ def summarize_static_measurements( Args: agg: The type of aggregation ('present' or 'first'). feature_columns: A list of feature column identifiers marked for static analysis. - df: DataFrame from which features will be extracted and summarized. + df: The DataFrame from which features will be extracted and summarized. Returns: A LazyFrame containing summarized data pivoted by 'patient_id' for each static feature. @@ -168,9 +167,9 @@ def summarize_static_measurements( def get_flat_static_rep( agg: str, feature_columns: list[str], - shard_df: DF_T, + shard_df: pl.LazyFrame, ) -> coo_array: - """Produces a raw representation for static data from a specified shard DataFrame. + """Produces a sparse representation for static data from a specified shard DataFrame. This function selects the appropriate static features, summarizes them using summarize_static_measurements, and then normalizes the resulting data to ensure @@ -182,7 +181,7 @@ def get_flat_static_rep( shard_df: The shard DataFrame containing the patient data. Returns: - A LazyFrame sparse array representing the static features for the provided shard of data. + A sparse array representing the static features for the provided shard of data. """ static_features = get_feature_names(agg=agg, feature_columns=feature_columns) static_measurements = summarize_static_measurements(agg, static_features, df=shard_df) From 042557c6858738764e83ec33dd3d739fa2b931c8 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 15:52:26 -0400 Subject: [PATCH 15/48] updated docstrings and merged two generate_summary functions --- .../generate_summarized_reps.py | 32 ++----------------- 1 file changed, 3 insertions(+), 29 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index 1336c2e..c86a2d0 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -164,17 +164,17 @@ def compute_agg( return matrix -def _generate_summary( +def generate_summary( index_df: pd.DataFrame, matrix: sparray, window_size: str, agg: str, - num_features: int, use_tqdm: bool = False, ) -> csr_array: """Generate a summary of the data frame for a given window size and aggregation. Args: + feature_columns: A list of all feature columns that must exist in the final output. index_df: The DataFrame with index and grouping information. matrix: The sparse matrix containing the data to aggregate. window_size: The size of the rolling window used for summary. @@ -192,30 +192,6 @@ def _generate_summary( raise ValueError( f"Invalid aggregation: {agg}. Valid options are: {CODE_AGGREGATIONS + VALUE_AGGREGATIONS}" ) - out_matrix = compute_agg(index_df, matrix, window_size, agg, num_features, use_tqdm=use_tqdm) - return out_matrix - - -def generate_summary( - feature_columns: list[str], index_df: pl.LazyFrame, matrix: sparray, window_size, agg: str, use_tqdm=False -) -> pl.LazyFrame: - """Generate a summary of the data frame for given window sizes and aggregations. - - This function processes a dataframe to apply specified aggregations over defined window sizes. - It then joins the resulting frames on 'patient_id' and 'timestamp', and ensures all specified - feature columns exist in the final output, adding missing ones with default values. - - Args: - feature_columns (list[str]): List of all feature columns that must exist in the final output. - df (list[pl.LazyFrame]): The input dataframes to process, expected to be length 2 list with code_df - (pivoted shard with binary presence of codes) and value_df (pivoted shard with numerical values - for each code). - window_sizes (list[str]): List of window sizes to apply for summarization. - aggregations (list[str]): List of aggregations to perform within each window size. - - Returns: - pl.LazyFrame: A LazyFrame containing the summarized data with all required features present. - """ assert len(feature_columns), "feature_columns must be a non-empty list" ts_columns = get_feature_names(agg, feature_columns) # Generate summaries for each window size and aggregation @@ -225,9 +201,7 @@ def generate_summary( logger.info( f"Generating aggregation {agg} for window_size {window_size}, with {len(ts_columns)} columns." ) - out_matrix = _generate_summary( - ts_columns, index_df, matrix, window_size, agg, len(ts_columns), use_tqdm=use_tqdm - ) + out_matrix = compute_agg(index_df, matrix, window_size, agg, len(ts_columns), use_tqdm=use_tqdm) return out_matrix From c5b4b20ab7c9bc2f490b97074a8685b46d156140 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 15:55:28 -0400 Subject: [PATCH 16/48] readability spacing --- src/MEDS_tabular_automl/generate_summarized_reps.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index c86a2d0..f92471b 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -193,6 +193,7 @@ def generate_summary( f"Invalid aggregation: {agg}. Valid options are: {CODE_AGGREGATIONS + VALUE_AGGREGATIONS}" ) assert len(feature_columns), "feature_columns must be a non-empty list" + ts_columns = get_feature_names(agg, feature_columns) # Generate summaries for each window size and aggregation code_type, _ = agg.split("/") @@ -201,6 +202,7 @@ def generate_summary( logger.info( f"Generating aggregation {agg} for window_size {window_size}, with {len(ts_columns)} columns." ) + out_matrix = compute_agg(index_df, matrix, window_size, agg, len(ts_columns), use_tqdm=use_tqdm) return out_matrix From 6118a8a31ee4ef5d953881ba263c4adb21ff3013 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 18:31:04 -0400 Subject: [PATCH 17/48] addressed PR comments --- .../generate_summarized_reps.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index f92471b..c2cc624 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -16,7 +16,7 @@ ) -def sparse_aggregate(sparse_matrix: sparray, agg: str) -> sparray: +def sparse_aggregate(sparse_matrix: sparray, agg: str) -> csr_array: """Aggregates values in a sparse matrix according to the specified method. Args: @@ -44,15 +44,15 @@ def sparse_aggregate(sparse_matrix: sparray, agg: str) -> sparray: return merged_matrix -def get_rolling_window_indicies(index_df: pl.DataFrame, window_size: str) -> pl.DataFrame: - """Computes the start and end indices for rolling window operations on a DataFrame. +def get_rolling_window_indicies(index_df: pl.LazyFrame, window_size: str) -> pl.LazyFrame: + """Computes the start and end indices for rolling window operations on a LazyFrame. Args: - index_df: The DataFrame containing the indices. + index_df: The LazyFrame containing the indices. window_size: The size of the window as a string denoting time, e.g., '7d' for 7 days. Returns: - DataFrame with columns 'min_index' and 'max_index' representing the range of each window. + A LazyFrame with columns 'min_index' and 'max_index' representing the range of each window. """ if window_size == "full": timedelta = pd.Timedelta(150 * 52, unit="W") # just use 150 years as time delta @@ -68,12 +68,12 @@ def get_rolling_window_indicies(index_df: pl.DataFrame, window_size: str) -> pl. def aggregate_matrix( - windows: pl.DataFrame, matrix: sparray, agg: str, num_features: int, use_tqdm: bool = False + windows: pl.LazyFrame, matrix: sparray, agg: str, num_features: int, use_tqdm: bool = False ) -> csr_array: """Aggregates a matrix according to defined windows and specified aggregation method. Args: - windows: The DataFrame containing 'min_index' and 'max_index' for each window. + windows: The LazyFrame containing 'min_index' and 'max_index' for each window. matrix: The matrix to aggregate. agg: The aggregation method to apply. num_features: The number of features in the matrix. @@ -123,7 +123,7 @@ def aggregate_matrix( def compute_agg( - index_df: pl.DataFrame, + index_df: pl.LazyFrame, matrix: sparray, window_size: str, agg: str, @@ -165,7 +165,7 @@ def compute_agg( def generate_summary( - index_df: pd.DataFrame, + index_df: pl.LazyFrame, matrix: sparray, window_size: str, agg: str, From 7ac3079ff13d3ff019291d7052a266c4ee443aea Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 19:57:56 -0400 Subject: [PATCH 18/48] Added some doctests --- src/MEDS_tabular_automl/describe_codes.py | 75 +++++++++++++++-------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 40ff0ac..968ecd1 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -14,10 +14,58 @@ def convert_to_df(freq_dict: dict[str, int]) -> pl.DataFrame: Returns: A DataFrame with two columns, "code" and "count". + + TODOs: + - Eliminate this function and just use a DataFrame throughout. See #14 + - Use categorical types for `code` instead of strings. + + Examples: + >>> convert_to_df({"A": 1, "B": 2, "C": 3}) + shape: (3, 2) + ┌──────┬───────┐ + │ code ┆ count │ + │ --- ┆ --- │ + │ str ┆ i64 │ + ╞══════╪═══════╡ + │ A ┆ 1 │ + │ B ┆ 2 │ + │ C ┆ 3 │ + └──────┴───────┘ """ return pl.DataFrame([[col, freq] for col, freq in freq_dict.items()], schema=["code", "count"]) +def convert_to_freq_dict(df: pl.LazyFrame) -> dict[str, dict[int, int]]: + """Converts a DataFrame to a dictionary of frequencies. + + Args: + df: The DataFrame to be converted. + + Returns: + A dictionary where keys are column names and values are + dictionaries of code frequencies. + + Raises: + ValueError: If the DataFrame does not have the expected columns "code" and "count". + + TODOs: + - Eliminate this function and just use a DataFrame throughout. See #14 + + Example: + >>> import polars as pl + >>> data = pl.DataFrame({"code": [1, 2, 3, 4, 5], "count": [10, 20, 30, 40, 50]}).lazy() + >>> convert_to_freq_dict(data) + {1: 10, 2: 20, 3: 30, 4: 40, 5: 50} + >>> convert_to_freq_dict(pl.DataFrame({"code": ["A", "B", "C"], "value": [1, 2, 3]}).lazy()) + Traceback (most recent call last): + ... + ValueError: DataFrame must have columns 'code' and 'count', but has columns ['code', 'value']! + """ + if not df.columns == ["code", "count"]: + raise ValueError(f"DataFrame must have columns 'code' and 'count', but has columns {df.columns}!") + return dict(df.collect().iter_rows()) + + def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame: """Generates a DataFrame containing the frequencies of codes and numerical values under different aggregations by computing frequency counts for certain attributes and organizing the results into specific @@ -71,33 +119,6 @@ def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame return convert_to_df(combined_freqs) -def convert_to_freq_dict(df: pl.LazyFrame) -> dict[str, dict[int, int]]: - """Converts a DataFrame to a dictionary of frequencies. - - Args: - df: The DataFrame to be converted. - - Returns: - A dictionary where keys are column names and values are - dictionaries of code frequencies. - - Raises: - ValueError: If the DataFrame does not have the expected columns "code" and "count". - - Example: - # >>> import polars as pl - # >>> df = pl.DataFrame({ - # ... "code": [1, 2, 3, 4, 5], - # ... "value": [10, 20, 30, 40, 50] - # ... }) - # >>> convert_to_freq_dict(df) - # {'code': {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, 'value': {10: 1, 20: 1, 30: 1, 40: 1, 50: 1}} - """ - if not df.columns == ["code", "count"]: - raise ValueError(f"DataFrame must have columns 'code' and 'count', but has columns {df.columns}!") - return dict(df.collect().iter_rows()) - - def get_feature_columns(fp: Path) -> list[str]: """Retrieves feature column names from a parquet file. From 5836cc9fa571902960c25de4e25ceb1be80a0485 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 20:44:04 -0400 Subject: [PATCH 19/48] Added first three docstrings for generate_ts_features --- .../generate_ts_features.py | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_ts_features.py b/src/MEDS_tabular_automl/generate_ts_features.py index 4a8d9dc..0d999aa 100644 --- a/src/MEDS_tabular_automl/generate_ts_features.py +++ b/src/MEDS_tabular_automl/generate_ts_features.py @@ -18,12 +18,30 @@ def feature_name_to_code(feature_name: str) -> str: - """Converts a feature name to a code name.""" + """Converts a feature name to a code name by removing the aggregation part. + + Args: + feature_name (str): The full feature name, including aggregation. + + Returns: + The code name without the aggregation part. + """ return "/".join(feature_name.split("/")[:-1]) -def get_long_code_df(df, ts_columns): - """Pivots the codes data frame to a long format one-hot rep for time series data.""" +def get_long_code_df( + df: pl.LazyFrame, ts_columns: list[str] +) -> tuple[np.ndarray, tuple[np.ndarray, np.ndarray]]: + """Pivots the codes data frame to a long format one-hot representation for time-series data. + + Args: + df: The LazyFrame containing the code data. + ts_columns: The list of time-series columns to include in the output. + + Returns: + A tuple containing the data (1s for presence), and a tuple of row and column indices for + the CSR sparse matrix. + """ column_to_int = {feature_name_to_code(col): i for i, col in enumerate(ts_columns)} rows = range(df.select(pl.len()).collect().item()) cols = ( @@ -38,8 +56,19 @@ def get_long_code_df(df, ts_columns): return data, (rows, cols) -def get_long_value_df(df, ts_columns): - """Pivots the numerical value data frame to a long format for time series data.""" +def get_long_value_df( + df: pl.LazyFrame, ts_columns: list[str] +) -> tuple[np.ndarray, tuple[np.ndarray, np.ndarray]]: + """Pivots the numerical value data frame to a long format for time-series data. + + Args: + df: The LazyFrame containing the numerical value data. + ts_columns: The list of time-series columns that have numerical values. + + Returns: + A tuple containing the data (numerical values), and a tuple of row and column indices for + the CSR sparse matrix. + """ column_to_int = {feature_name_to_code(col): i for i, col in enumerate(ts_columns)} value_df = ( df.with_row_index("index").drop_nulls("numerical_value").filter(pl.col("code").is_in(ts_columns)) From 33089a5155a826165da956f7ac02165f1a6d0ca6 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 20:52:45 -0400 Subject: [PATCH 20/48] docstrings for other two functions --- .../generate_ts_features.py | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/MEDS_tabular_automl/generate_ts_features.py b/src/MEDS_tabular_automl/generate_ts_features.py index 0d999aa..d6057d2 100644 --- a/src/MEDS_tabular_automl/generate_ts_features.py +++ b/src/MEDS_tabular_automl/generate_ts_features.py @@ -1,14 +1,12 @@ import warnings import numpy as np -import pandas as pd import polars as pl from loguru import logger from scipy.sparse import csr_array from MEDS_tabular_automl.utils import ( CODE_AGGREGATIONS, - DF_T, VALUE_AGGREGATIONS, get_events_df, get_feature_names, @@ -89,17 +87,18 @@ def get_long_value_df( def summarize_dynamic_measurements( agg: str, ts_columns: list[str], - df: pd.DataFrame, -) -> pd.DataFrame: - """Summarize dynamic measurements for feature columns that are marked as 'dynamic'. + df: pl.LazyFrame, +) -> tuple[pl.DataFrame, csr_array]: + """Summarizes dynamic measurements for feature columns that are marked as 'dynamic'. Args: - - ts_columns (list[str]): List of feature column identifiers that are specifically marked for dynamic - analysis. - - shard_df (DF_T): Data frame from which features will be extracted and summarized. + agg: The aggregation method, either from CODE_AGGREGATIONS or VALUE_AGGREGATIONS. + ts_columns: The list of time-series feature columns. + df: The LazyFrame from which features will be extracted and summarized. Returns: - - pl.LazyFrame: A summarized data frame containing the dynamic features. + A tuple containing a DataFrame with dynamic feature identifiers and a sparse matrix + of aggregated values. """ logger.info("Generating Sparse matrix for Time Series Features") id_cols = ["patient_id", "timestamp"] @@ -126,24 +125,18 @@ def summarize_dynamic_measurements( def get_flat_ts_rep( agg: str, feature_columns: list[str], - shard_df: DF_T, -) -> pl.LazyFrame: - """Produce a flat time series representation from a given data frame, focusing on non-static feature - columns. - - This function filters the given data frame for non-static features based on the 'feature_columns' - provided and generates a flat time series representation using these dynamic features. The resulting - data frame includes both codes and values transformed and aggregated appropriately. + shard_df: pl.LazyFrame, +) -> tuple[pl.DataFrame, csr_array]: + """Produces a flat time-series representation from a given data frame, focusing on non-static features. Args: - feature_columns (list[str]): A list of column identifiers that determine which features are considered - for dynamic analysis. - shard_df (DF_T): The data frame containing time-stamped data from which features will be extracted - and summarized. + agg: The aggregation method to use for summarizing the data. + feature_columns: The list of column identifiers for features involved in dynamic analysis. + shard_df: The LazyFrame containing time-stamped data from which features will be extracted. Returns: - pl.LazyFrame: A LazyFrame consisting of the processed time series data, combining both code and value - representations. + A tuple containing a LazyFrame with consisting of the processed time series data, combining + both code and value representations. and a sparse matrix of the flat time series data. """ # Remove codes not in training set shard_df = get_events_df(shard_df, feature_columns) From 8de852ffeee48dc42e6640eddd460d84fc2330b0 Mon Sep 17 00:00:00 2001 From: Nassim Oufattole Date: Thu, 13 Jun 2024 00:54:08 +0000 Subject: [PATCH 21/48] added unit tests --- src/MEDS_tabular_automl/describe_codes.py | 11 ++--------- src/MEDS_tabular_automl/generate_summarized_reps.py | 8 +++++--- tests/test_tabularize.py | 1 - 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 40ff0ac..ae28c5a 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -1,7 +1,7 @@ from pathlib import Path import polars as pl -from omegaconf import DictConfig +from omegaconf import DictConfig, OmegaConf from MEDS_tabular_automl.utils import DF_T, get_feature_names @@ -150,15 +150,8 @@ def filter_to_codes( ] return sorted(filtered_codes) - # code_freqs = { - # code: freq - # for code, freq in feature_freqs.items() - # if (freq >= min_code_inclusion_frequency and code in set(allowed_codes)) - # } - # return sorted([code for code, freq in code_freqs.items() if freq >= min_code_inclusion_frequency]) - -# OmegaConf.register_new_resolver("filter_to_codes", filter_to_codes) +OmegaConf.register_new_resolver("filter_to_codes", filter_to_codes) def clear_code_aggregation_suffix(code: str) -> str: diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index c2cc624..0745440 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -112,9 +112,10 @@ def aggregate_matrix( row = np.concatenate(row) data = np.concatenate(data) col = np.concatenate(col) - row = row.astype(get_min_dtype(row), copy=False) - col = col.astype(get_min_dtype(col), copy=False) - data = data.astype(get_min_dtype(data), copy=False) + if len(data): + row = row.astype(get_min_dtype(row), copy=False) + col = col.astype(get_min_dtype(col), copy=False) + data = data.astype(get_min_dtype(data), copy=False) out_matrix = csr_array( (data, (row, col)), shape=(windows.shape[0], num_features), @@ -165,6 +166,7 @@ def compute_agg( def generate_summary( + feature_columns: list[str], index_df: pl.LazyFrame, matrix: sparray, window_size: str, diff --git a/tests/test_tabularize.py b/tests/test_tabularize.py index 953f7a6..e67e2e1 100644 --- a/tests/test_tabularize.py +++ b/tests/test_tabularize.py @@ -198,7 +198,6 @@ def test_tabularize(): - return # Skip this test for now with tempfile.TemporaryDirectory() as d: MEDS_cohort_dir = Path(d) / "processed" From 0aa2e766a7b19784beca1379378506ba1fe7b4aa Mon Sep 17 00:00:00 2001 From: Nassim Oufattole Date: Thu, 13 Jun 2024 00:58:55 +0000 Subject: [PATCH 22/48] integration tests added --- tests/test_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index eeb1764..07851f5 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -44,7 +44,6 @@ def run_command(script: str, args: list[str], hydra_kwargs: dict[str, str], test def test_integration(): - return # Skip this test for now # Step 0: Setup Environment with tempfile.TemporaryDirectory() as d: MEDS_cohort_dir = Path(d) / "processed" From 4e2242863c2f137f30942f84c9cf6dbe4872d1ac Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 21:18:38 -0400 Subject: [PATCH 23/48] Added docstrings for scripts/cache_task.py --- src/MEDS_tabular_automl/scripts/cache_task.py | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/cache_task.py b/src/MEDS_tabular_automl/scripts/cache_task.py index 581d905..5550058 100644 --- a/src/MEDS_tabular_automl/scripts/cache_task.py +++ b/src/MEDS_tabular_automl/scripts/cache_task.py @@ -37,8 +37,19 @@ ] -def generate_row_cached_matrix(matrix, label_df): - """Generates row-cached matrix for a given matrix and label_df.""" +def generate_row_cached_matrix(matrix: sp.coo_array, label_df: pl.LazyFrame) -> sp.coo_array: + """Generates row-cached matrix for a given matrix and label DataFrame. + + Args: + matrix: The input sparse matrix. + label_df: A LazyFrame with an 'event_id' column indicating valid row indices in the matrix. + + Returns: + A COOrdinate formatted sparse matrix containing only the rows specified by label_df's event_ids. + + Raises: + ValueError: If the maximum event_id in label_df exceeds the number of rows in the matrix. + """ label_len = label_df.select(pl.col("event_id").max()).collect().item() if matrix.shape[0] <= label_len: raise ValueError( @@ -51,10 +62,15 @@ def generate_row_cached_matrix(matrix, label_df): @hydra.main(version_base=None, config_path=str(config_yaml.parent.resolve()), config_name=config_yaml.stem) -def main( - cfg: DictConfig, -): - """Performs row splicing of tabularized data for a specific task.""" +def main(cfg: DictConfig) -> None: + """Performs row splicing of tabularized data for a specific task based on configuration. + + Uses Hydra to manage configurations and logging. The function processes data files based on specified + task configurations, loading matrices, applying transformations, and writing results. + + Args: + cfg: The configuration for processing, loaded from a YAML file. + """ iter_wrapper = load_tqdm(cfg.tqdm) if not cfg.loguru_init: hydra_loguru_init() From 65fc0c6341889bac4edef6441664c061ec5bf523 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 21:29:45 -0400 Subject: [PATCH 24/48] Removed unused function parameter and added doctest --- src/MEDS_tabular_automl/describe_codes.py | 33 ++++++++++++------- .../scripts/describe_codes.py | 5 +-- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 968ecd1..b9a33ab 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -1,7 +1,6 @@ from pathlib import Path import polars as pl -from omegaconf import DictConfig from MEDS_tabular_automl.utils import DF_T, get_feature_names @@ -66,13 +65,12 @@ def convert_to_freq_dict(df: pl.LazyFrame) -> dict[str, dict[int, int]]: return dict(df.collect().iter_rows()) -def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame: +def compute_feature_frequencies(shard_df: DF_T) -> pl.DataFrame: """Generates a DataFrame containing the frequencies of codes and numerical values under different aggregations by computing frequency counts for certain attributes and organizing the results into specific categories based on the dataset's features. Args: - cfg: Configuration dictionary specifying how features should be evaluated and aggregated. shard_df: A DataFrame containing the data to be analyzed and split (e.g., 'train', 'test'). Returns: @@ -80,14 +78,27 @@ def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame during the evaluation. Examples: - # >>> import polars as pl - # >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], - # ... 'timestamp': [None, '2021-01-01', None, None, '2021-01-03', '2021-01-04', None], - # ... 'numerical_value': [1, None, 2, 2, None, None, 3]} - # >>> df = pl.DataFrame(data).lazy() - # >>> aggs = ['value/sum', 'code/count'] - # >>> compute_feature_frequencies(aggs, df) - # ['A/code', 'A/value', 'C/code', 'C/value'] + >>> from datetime import datetime + >>> data = pl.DataFrame({ + ... 'patient_id': [1, 1, 2, 2, 3, 3, 3], + ... 'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], + ... 'timestamp': [ + ... None, + ... datetime(2021, 1, 1), + ... None, + ... None, + ... datetime(2021, 1, 3), + ... datetime(2021, 1, 4), + ... None + ... ], + ... 'numerical_value': [1, None, 2, 2, None, None, 3] + ... }).lazy() + >>> assert ( + ... convert_to_freq_dict(compute_feature_frequencies(data).lazy()) == { + ... 'B/static/present': 2, 'C/static/present': 1, 'A/static/present': 1, 'B/static/first': 2, + ... 'C/static/first': 1, 'A/static/first': 1, 'A/code': 1, 'C/code': 2 + ... } + ... ) """ static_df = shard_df.filter( pl.col("patient_id").is_not_null() & pl.col("code").is_not_null() & pl.col("timestamp").is_null() diff --git a/src/MEDS_tabular_automl/scripts/describe_codes.py b/src/MEDS_tabular_automl/scripts/describe_codes.py index 034244a..7d53f6c 100644 --- a/src/MEDS_tabular_automl/scripts/describe_codes.py +++ b/src/MEDS_tabular_automl/scripts/describe_codes.py @@ -55,9 +55,6 @@ def main( # 0. Identify Output Columns and Frequencies logger.info("Iterating through shards and caching feature frequencies.") - def compute_fn(shard_df): - return compute_feature_frequencies(cfg, shard_df) - def write_fn(df, out_fp): write_df(df, out_fp) @@ -76,7 +73,7 @@ def read_fn(in_fp): out_fp, read_fn, write_fn, - compute_fn, + compute_feature_frequencies, do_overwrite=cfg.do_overwrite, do_return=False, ) From 9226125fa9dfc8bfa9d1818e322f5da5152a27ab Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 21:37:54 -0400 Subject: [PATCH 25/48] A test for get_feature_columns --- src/MEDS_tabular_automl/describe_codes.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 3abcbf6..40cd8a4 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -139,6 +139,17 @@ def get_feature_columns(fp: Path) -> list[str]: Returns: Sorted list of column names. + + Examples: + >>> from tempfile import NamedTemporaryFile + >>> fp = NamedTemporaryFile() + >>> with NamedTemporaryFile() as fp: + ... pl.DataFrame({ + ... "code": ["E", "D", "A", "A"], + ... "count": [1, 1, 1, 1], + ... }).write_parquet(fp.name) + ... get_feature_columns(fp.name) + ['A', 'D', 'E'] """ return sorted(list(convert_to_freq_dict(pl.scan_parquet(fp)).keys())) From cbc60287de9f3a0e9388a02d7d97bcbb0e06867d Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 21:41:12 -0400 Subject: [PATCH 26/48] Added one for get_feature_freqs --- src/MEDS_tabular_automl/describe_codes.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 40cd8a4..2d81554 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -142,16 +142,12 @@ def get_feature_columns(fp: Path) -> list[str]: Examples: >>> from tempfile import NamedTemporaryFile - >>> fp = NamedTemporaryFile() - >>> with NamedTemporaryFile() as fp: - ... pl.DataFrame({ - ... "code": ["E", "D", "A", "A"], - ... "count": [1, 1, 1, 1], - ... }).write_parquet(fp.name) - ... get_feature_columns(fp.name) + >>> with NamedTemporaryFile() as f: + ... pl.DataFrame({"code": ["E", "D", "A"], "count": [1, 3, 2]}).write_parquet(f.name) + ... get_feature_columns(f.name) ['A', 'D', 'E'] """ - return sorted(list(convert_to_freq_dict(pl.scan_parquet(fp)).keys())) + return sorted(list(get_feature_freqs(fp).keys())) def get_feature_freqs(fp: Path) -> dict[str, int]: @@ -162,6 +158,13 @@ def get_feature_freqs(fp: Path) -> dict[str, int]: Returns: Dictionary of feature frequencies. + + Examples: + >>> from tempfile import NamedTemporaryFile + >>> with NamedTemporaryFile() as f: + ... pl.DataFrame({"code": ["E", "D", "A"], "count": [1, 3, 2]}).write_parquet(f.name) + ... get_feature_freqs(f.name) + {'E': 1, 'D': 3, 'A': 2} """ return convert_to_freq_dict(pl.scan_parquet(fp)) From 0e0407a196125875a8e8789672cdd0a818dfef7f Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 21:44:49 -0400 Subject: [PATCH 27/48] Finished doctests for describe_codes --- src/MEDS_tabular_automl/describe_codes.py | 26 +++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 2d81554..3c5395c 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -184,6 +184,13 @@ def filter_to_codes( Returns: Sorted list of the intersection of allowed codes (if they are specified) and filters based on inclusion frequency. + + Examples: + >>> from tempfile import NamedTemporaryFile + >>> with NamedTemporaryFile() as f: + ... pl.DataFrame({"code": ["E", "D", "A"], "count": [4, 3, 2]}).write_parquet(f.name) + ... filter_to_codes(["A", "D"], 3, f.name) + ['D'] """ if allowed_codes is None: allowed_codes = get_feature_columns(code_metadata_fp) @@ -209,6 +216,23 @@ def clear_code_aggregation_suffix(code: str) -> str: Returns: Code string without aggregation suffixes. + + Raises: + ValueError: If the code does not have a recognized aggregation suffix. + + Examples: + >>> clear_code_aggregation_suffix("A/code") + 'A' + >>> clear_code_aggregation_suffix("A/value") + 'A' + >>> clear_code_aggregation_suffix("A/static/present") + 'A' + >>> clear_code_aggregation_suffix("A/static/first") + 'A' + >>> clear_code_aggregation_suffix("A") + Traceback (most recent call last): + ... + ValueError: Code A does not have a recognized aggregation suffix! """ if code.endswith("/code"): return code[:-5] @@ -218,6 +242,8 @@ def clear_code_aggregation_suffix(code: str) -> str: return code[:-15] elif code.endswith("/static/first"): return code[:-13] + else: + raise ValueError(f"Code {code} does not have a recognized aggregation suffix!") def filter_parquet(fp: Path, allowed_codes: list[str]) -> pl.LazyFrame: From 259ec6c8820f613df4ef6900c5ccbed25eb12dfd Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 21:54:18 -0400 Subject: [PATCH 28/48] added docstring for scripts/describe_codes --- .../scripts/describe_codes.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/describe_codes.py b/src/MEDS_tabular_automl/scripts/describe_codes.py index 034244a..eccfbcd 100644 --- a/src/MEDS_tabular_automl/scripts/describe_codes.py +++ b/src/MEDS_tabular_automl/scripts/describe_codes.py @@ -31,13 +31,13 @@ @hydra.main(version_base=None, config_path=str(config_yaml.parent.resolve()), config_name=config_yaml.stem) -def main( - cfg: DictConfig, -): - """Computes the feature frequencies so we can filter out infrequent events. +def main(cfg: DictConfig) -> None: + """Main function that orchestrates the feature frequency computation and storage process and enables + filtering out infrequent events. Args: - cfg: The configuration object for the tabularization process. + cfg: The configuration object for the tabularization process, loaded from a Hydra + YAML configuration file. """ iter_wrapper = load_tqdm(cfg.tqdm) if not cfg.loguru_init: @@ -56,12 +56,15 @@ def main( logger.info("Iterating through shards and caching feature frequencies.") def compute_fn(shard_df): + """Function to compute feature frequencies for each shard.""" return compute_feature_frequencies(cfg, shard_df) def write_fn(df, out_fp): + """Function to write computed data frame to disk.""" write_df(df, out_fp) def read_fn(in_fp): + """Function to read a data frame from disk.""" return pl.scan_parquet(in_fp) # Map: Iterates through shards and caches feature frequencies @@ -85,6 +88,7 @@ def read_fn(in_fp): # Reduce: sum the frequency computations def compute_fn(freq_df_list): + """Function to aggregate frequency data from multiple data frames.""" feature_freqs = defaultdict(int) for shard_freq_df in freq_df_list: shard_freq_dict = convert_to_freq_dict(shard_freq_df) @@ -94,9 +98,11 @@ def compute_fn(freq_df_list): return feature_df def write_fn(df, out_fp): + """Function to write computed data frame to disk.""" write_df(df, out_fp) def read_fn(feature_dir): + """Function to read multiple data frames from a directory.""" files = list_subdir_files(feature_dir, "parquet") return [pl.scan_parquet(fp) for fp in files] From 6d12091eef950a3cdb84f33bc11ab9f951ae6937 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 22:27:06 -0400 Subject: [PATCH 29/48] Added docstrings for scripts/generate_permutations.py --- .../scripts/generate_permutations.py | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/generate_permutations.py b/src/MEDS_tabular_automl/scripts/generate_permutations.py index 6fcd5a1..16e7b0c 100644 --- a/src/MEDS_tabular_automl/scripts/generate_permutations.py +++ b/src/MEDS_tabular_automl/scripts/generate_permutations.py @@ -2,14 +2,15 @@ from itertools import combinations -def format_print(permutations): - """ +def format_print(permutations: list[tuple[str, ...]]) -> None: + """Prints all permutations in a visually formatted string. + Args: - permutations: List of all possible permutations of length > 1 + permutations: The list of all possible permutations of length > 1. - Example: - >>> format_print([('2',), ('2', '3'), ('2', '3', '4'), ('2', '4'), ('3',), ('3', '4'), ('4',)]) - [2],[2,3],[2,3,4],[2,4],[3],[3,4],[4] + Examples: + >>> format_print([('2',), ('2', '3'), ('2', '3', '4'), ('2', '4'), ('3',), ('3', '4'), ('4',)]) + [2],[2,3],[2,3,4],[2,4],[3],[3,4],[4] """ out_str = "" for item in permutations: @@ -18,18 +19,15 @@ def format_print(permutations): print(out_str) -def get_permutations(list_of_options): - """Generate all possible permutations of a list of options passed as an arg. +def get_permutations(list_of_options: list[str]) -> None: + """Generates and prints all possible permutations from a list of options. Args: - - list_of_options (list): List of options. - - Returns: - - list: List of all possible permutations of length > 1 + list_of_options: The list of options. - Example: - >>> get_permutations(['2', '3', '4']) - [2],[2,3],[2,3,4],[2,4],[3],[3,4],[4] + Examples: + >>> get_permutations(['2', '3', '4']) + [2],[2,3],[2,3,4],[2,4],[3],[3,4],[4] """ permutations = [] for i in range(1, len(list_of_options) + 1): @@ -37,8 +35,8 @@ def get_permutations(list_of_options): format_print(sorted(permutations)) -def main(): - """Generate all possible permutations of a list of options.""" +def main() -> None: + """Generates and prints all possible permutations from given list of options.""" list_of_options = list(sys.argv[1].strip("[]").split(",")) get_permutations(list_of_options) From 6db34192c80530f06235ff6990f6e7928a0678ce Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:39:43 -0400 Subject: [PATCH 30/48] Added some doctests for utils --- src/MEDS_tabular_automl/utils.py | 37 +++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 6298c1a..bb04dfc 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -79,6 +79,16 @@ def parse_static_feature_column(c: str) -> tuple[str, str, str, str]: Raises: ValueError: If the column string format is incorrect. + + Examples: + >>> parse_static_feature_column("A/static/present") + ('A', 'static', 'present') + >>> parse_static_feature_column("A/B/static/first") + ('A/B', 'static', 'first') + >>> parse_static_feature_column("static/first") + Traceback (most recent call last): + ... + ValueError: Column static/first is not a valid flat feature column! """ parts = c.split("/") if len(parts) < 3: @@ -192,21 +202,38 @@ def load_matrix(fp_path: Path) -> coo_array: return array_to_sparse_matrix(array, shape) -def write_df(df: coo_array, fp: Path, **kwargs) -> None: +def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite: bool = False) -> None: """Writes a sparse matrix to disk. Args: df: The sparse matrix to write. fp: The file path where to write the data. - **kwargs: Additional keyword arguments, such as 'do_overwrite' to control file overwriting. + do_overwrite: A flag indicating whether to overwrite the file if it already exists. Raises: FileExistsError: If the file exists and 'do_overwrite' is not set to True. TypeError: If the type of 'df' is not supported for writing. - """ - do_overwrite = kwargs.get("do_overwrite", False) - if not do_overwrite and fp.is_file(): + Examples: + >>> import tempfile + >>> from polars.testing import assert_frame_equal + >>> df_polars = pl.DataFrame({"a": [1, 2, 3]}) + >>> df_coo_array = coo_array(([1, 2, 3], ([0, 1, 2], [0, 0, 0])), shape=(3, 1)) + >>> with tempfile.TemporaryDirectory() as tmpdir: + ... fp = Path(tmpdir) / "test.parquet" + ... write_df(df_polars, fp) + ... assert fp.is_file() + ... assert_frame_equal(pl.read_parquet(fp), df_polars) + ... write_df(df_polars.lazy(), fp, do_overwrite=True) + ... assert_frame_equal(pl.read_parquet(fp), df_polars) + ... write_df(df_coo_array, fp, do_overwrite=True) + ... assert load_matrix(fp).toarray().tolist() == [[1], [2], [3]] + ... write_df(df_coo_array, fp, do_overwrite=False) + Traceback (most recent call last): + ... + FileExistsError: test.parquet exists and do_overwrite is False! + """ + if fp.is_file() and not do_overwrite: raise FileExistsError(f"{fp} exists and do_overwrite is {do_overwrite}!") fp.parent.mkdir(exist_ok=True, parents=True) From 110543ca6dfc159fe74616f75b0f755739a6ce9a Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:45:00 -0400 Subject: [PATCH 31/48] Removed unnecessary function --- .../scripts/describe_codes.py | 5 ++--- src/MEDS_tabular_automl/utils.py | 17 +---------------- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/describe_codes.py b/src/MEDS_tabular_automl/scripts/describe_codes.py index 7d53f6c..d5a4d75 100644 --- a/src/MEDS_tabular_automl/scripts/describe_codes.py +++ b/src/MEDS_tabular_automl/scripts/describe_codes.py @@ -8,7 +8,7 @@ import numpy as np import polars as pl from loguru import logger -from omegaconf import DictConfig +from omegaconf import DictConfig, OmegaConf from MEDS_tabular_automl.describe_codes import ( compute_feature_frequencies, @@ -21,7 +21,6 @@ get_shard_prefix, hydra_loguru_init, load_tqdm, - store_config_yaml, write_df, ) @@ -46,7 +45,7 @@ def main( # Store Config output_dir = Path(cfg.output_dir) output_dir.mkdir(exist_ok=True, parents=True) - store_config_yaml(output_dir / "config.yaml", cfg) + OmegaConf.save(cfg, output_dir / "config.yaml") # Create output dir input_dir = Path(cfg.input_dir) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index bb04dfc..3ed1e44 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -14,7 +14,7 @@ import polars as pl import polars.selectors as cs from loguru import logger -from omegaconf import DictConfig, OmegaConf +from omegaconf import DictConfig from scipy.sparse import coo_array DF_T = pl.LazyFrame @@ -521,21 +521,6 @@ def get_feature_indices(agg: str, feature_columns: list[str]) -> list[int]: return [feature_to_index[c] for c in agg_features] -def store_config_yaml(config_fp: Path, cfg: DictConfig) -> None: - """Stores configuration parameters into a YAML file. - - This function writes a dictionary of parameters, which includes patient partitioning - information and configuration details, to a specified YAML file. - - Args: - config_fp: The file path for the YAML file where config should be stored. - cfg: A configuration object containing settings like the number of patients - per sub-shard, minimum code inclusion frequency, and flags for updating - or overwriting existing files. - """ - OmegaConf.save(cfg, config_fp) - - def get_shard_prefix(base_path: Path, fp: Path) -> str: """Extracts the shard prefix from a file path by removing the raw_cohort_dir. From 3e9a6130f9ab0f40a3124645a124c4400726d6c3 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:47:14 -0400 Subject: [PATCH 32/48] Removing unused functions. --- src/MEDS_tabular_automl/utils.py | 69 -------------------------------- 1 file changed, 69 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 3ed1e44..61198d5 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -12,7 +12,6 @@ import hydra import numpy as np import polars as pl -import polars.selectors as cs from loguru import logger from omegaconf import DictConfig from scipy.sparse import coo_array @@ -248,74 +247,6 @@ def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite raise TypeError(f"Unsupported type for df: {type(df)}") -def get_static_col_dtype(col: str) -> pl.DataType: - """Determines the appropriate minimal data type for given flat representation column string based on its - aggregation type. - - Args: - col (str): The column name in the format 'category/type/aggregation'. - - Returns: - pl.DataType: The appropriate Polars data type for the column. - - Raises: - ValueError: If the column name format or aggregation type is not recognized. - """ - code, code_type, agg = parse_static_feature_column(col) - - match agg: - case "sum" | "sum_sqd" | "min" | "max" | "value" | "first": - return pl.Float32 - case "present": - return pl.Boolean - case "count" | "has_values_count": - return pl.UInt32 - case _: - raise ValueError(f"Column name {col} malformed!") - - -def add_static_missing_cols( - flat_df: pl.LazyFrame, feature_columns: list[str], set_count_0_to_null: bool = False -) -> pl.LazyFrame: - """Normalizes columns in a LazyFrame so all expected columns are present and appropriately typed and - potentially modifies zero counts to nulls based on the configuration. - - Args: - flat_df: The LazyFrame to normalize. - feature_columns: A list of expected column names. - set_count_0_to_null: A flag of whether to convert zero counts to nulls. - - Returns: - The normalized LazyFrame with all specified columns present and correctly typed and with - zero-counts handled if specified. - """ - cols_to_add = set(feature_columns) - set(flat_df.columns) - cols_to_retype = set(feature_columns).intersection(set(flat_df.columns)) - - cols_to_add = [(c, get_static_col_dtype(c)) for c in cols_to_add] - cols_to_retype = [(c, get_static_col_dtype(c)) for c in cols_to_retype] - - if "timestamp" in flat_df.columns: - key_cols = ["patient_id", "timestamp"] - else: - key_cols = ["patient_id"] - - flat_df = flat_df.with_columns( - *[pl.lit(None, dtype=dt).alias(c) for c, dt in cols_to_add], - *[pl.col(c).cast(dt).alias(c) for c, dt in cols_to_retype], - ).select(*key_cols, *feature_columns) - - if not set_count_0_to_null: - return flat_df - - flat_df = flat_df.collect() - - flat_df = flat_df.with_columns( - pl.when(cs.ends_with("count") != 0).then(cs.ends_with("count")).keep_name() - ).lazy() - return flat_df - - def get_static_feature_cols(shard_df: pl.LazyFrame) -> list[str]: """Generates a list of static feature column names based on data within a shard. From 5dc394da6f7bed08088f7fff505520320530dacc Mon Sep 17 00:00:00 2001 From: Nassim Oufattole Date: Thu, 13 Jun 2024 02:47:31 +0000 Subject: [PATCH 33/48] fixed doctest for write_df, must use npz filename for npz files --- src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace | 7 +++++++ src/MEDS_tabular_automl/utils.py | 1 + 2 files changed, 8 insertions(+) create mode 100644 src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace diff --git a/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace b/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace new file mode 100644 index 0000000..fddd0bd --- /dev/null +++ b/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace @@ -0,0 +1,7 @@ +{ + "folders": [ + { + "path": "../../../../TabBench" + } + ] +} diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index bb04dfc..ca8690a 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -226,6 +226,7 @@ def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite ... assert_frame_equal(pl.read_parquet(fp), df_polars) ... write_df(df_polars.lazy(), fp, do_overwrite=True) ... assert_frame_equal(pl.read_parquet(fp), df_polars) + ... fp = Path(tmpdir) / "test.npz" ... write_df(df_coo_array, fp, do_overwrite=True) ... assert load_matrix(fp).toarray().tolist() == [[1], [2], [3]] ... write_df(df_coo_array, fp, do_overwrite=False) From 59d5a013173a4a1b772e0ab59f859f48d5c0e2ca Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 22:48:32 -0400 Subject: [PATCH 34/48] temporarily removed inner function comments --- src/MEDS_tabular_automl/scripts/describe_codes.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/describe_codes.py b/src/MEDS_tabular_automl/scripts/describe_codes.py index eccfbcd..999a8ab 100644 --- a/src/MEDS_tabular_automl/scripts/describe_codes.py +++ b/src/MEDS_tabular_automl/scripts/describe_codes.py @@ -56,15 +56,12 @@ def main(cfg: DictConfig) -> None: logger.info("Iterating through shards and caching feature frequencies.") def compute_fn(shard_df): - """Function to compute feature frequencies for each shard.""" return compute_feature_frequencies(cfg, shard_df) def write_fn(df, out_fp): - """Function to write computed data frame to disk.""" write_df(df, out_fp) def read_fn(in_fp): - """Function to read a data frame from disk.""" return pl.scan_parquet(in_fp) # Map: Iterates through shards and caches feature frequencies @@ -88,7 +85,6 @@ def read_fn(in_fp): # Reduce: sum the frequency computations def compute_fn(freq_df_list): - """Function to aggregate frequency data from multiple data frames.""" feature_freqs = defaultdict(int) for shard_freq_df in freq_df_list: shard_freq_dict = convert_to_freq_dict(shard_freq_df) @@ -98,11 +94,9 @@ def compute_fn(freq_df_list): return feature_df def write_fn(df, out_fp): - """Function to write computed data frame to disk.""" write_df(df, out_fp) def read_fn(feature_dir): - """Function to read multiple data frames from a directory.""" files = list_subdir_files(feature_dir, "parquet") return [pl.scan_parquet(fp) for fp in files] From 88c4cec1a90ee21ccdb05cedaea8b9d34d87989f Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:49:32 -0400 Subject: [PATCH 35/48] Removing unused functions. --- src/MEDS_tabular_automl/utils.py | 71 -------------------------------- 1 file changed, 71 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 61198d5..2c183d4 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -13,7 +13,6 @@ import numpy as np import polars as pl from loguru import logger -from omegaconf import DictConfig from scipy.sparse import coo_array DF_T = pl.LazyFrame @@ -247,58 +246,6 @@ def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite raise TypeError(f"Unsupported type for df: {type(df)}") -def get_static_feature_cols(shard_df: pl.LazyFrame) -> list[str]: - """Generates a list of static feature column names based on data within a shard. - - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of the static feature columns for modeling purposes. - - Args: - shard_df: The LazyFrame shard to analyze. - - Returns: - A list of column names representing static features. - - Examples: - >>> import polars as pl - >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], - ... 'timestamp': [ - ... None, '2021-01-01', '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04', None - ... ], - ... 'numerical_value': [1, None, 2, 2, None, None, 3]} - >>> df = pl.DataFrame(data).lazy() - >>> get_static_feature_cols(df) - ['A/static/first', 'A/static/present', 'C/static/first', 'C/static/present'] - """ - feature_columns = [] - static_df = shard_df.filter(pl.col("timestamp").is_null()) - for code in static_df.select(pl.col("code").unique()).collect().to_series(): - static_aggregations = [f"{code}/static/present", f"{code}/static/first"] - feature_columns.extend(static_aggregations) - return sorted(feature_columns) - - -def get_ts_feature_cols(shard_df: pl.LazyFrame) -> list[str]: - """Generates a list of time-series feature column names based on data within a shard. - - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of the time-series feature columns for modeling - purposes. - - Args: - shard_df: The LazyFrame shard to analyze. - - Returns: - A list of column names representing time-series features. - """ - ts_df = shard_df.filter(pl.col("timestamp").is_not_null()) - feature_columns = list(ts_df.select(pl.col("code").unique()).collect().to_series()) - feature_columns = [f"{code}/code" for code in feature_columns] + [ - f"{code}/value" for code in feature_columns - ] - return sorted(feature_columns) - - def get_prediction_ts_cols( aggregations: list[str], ts_feature_cols: pl.LazyFrame, window_sizes: list[str] | None = None ) -> list[str]: @@ -321,24 +268,6 @@ def get_prediction_ts_cols( return sorted(ts_aggregations) -def get_flat_rep_feature_cols(cfg: DictConfig, shard_df: pl.LazyFrame) -> list[str]: - """Combines static and time-series feature columns from a shard based on specified configurations. - - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of all feature columns for modeling purposes. - - Args: - cfg: The configuration dictionary specifying aggregation settings. - shard_df: The LazyFrame shard in MEDS format to process. - - Returns: - A combined list of all feature columns from both static and time-series data. - """ - static_feature_columns = get_static_feature_cols(shard_df) - ts_feature_columns = get_ts_feature_cols(cfg.aggs, shard_df) - return static_feature_columns + ts_feature_columns - - def load_meds_data(MEDS_cohort_dir: str, load_data: bool = True) -> Mapping[str, pl.LazyFrame]: """Loads the MEDS dataset from disk, structured by data splits. From 4f04fda6e61ee107eef44248a4ce7e4a1d61d9a8 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:51:13 -0400 Subject: [PATCH 36/48] Removing unused functions. --- src/MEDS_tabular_automl/utils.py | 60 -------------------------------- 1 file changed, 60 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index b04e82b..5319569 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -6,7 +6,6 @@ DF_T: This defines the type of internal dataframes -- e.g. polars DataFrames. """ import os -from collections.abc import Mapping from pathlib import Path import hydra @@ -247,65 +246,6 @@ def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite raise TypeError(f"Unsupported type for df: {type(df)}") -def get_prediction_ts_cols( - aggregations: list[str], ts_feature_cols: pl.LazyFrame, window_sizes: list[str] | None = None -) -> list[str]: - """Generates a list of feature column names for prediction tasks based on aggregations and window sizes. - - Args: - aggregations: The list of aggregation methods to apply. - ts_feature_cols: The list of existing time-series feature columns. - window_sizes: The optional list of window sizes to consider. - - Returns: - A list of feature column names formatted with aggregation and window size. - """ - agg_feature_columns = [] - for code in ts_feature_cols: - ts_aggregations = [f"{code}/{agg}" for agg in aggregations] - agg_feature_columns.extend(ts_aggregations) - if window_sizes: - ts_aggregations = [f"{window_size}/{code}" for window_size in window_sizes] - return sorted(ts_aggregations) - - -def load_meds_data(MEDS_cohort_dir: str, load_data: bool = True) -> Mapping[str, pl.LazyFrame]: - """Loads the MEDS dataset from disk, structured by data splits. - - Args: - MEDS_cohort_dir: The directory containing the MEDS datasets split by subfolders. - We expect `train` to be a split so `MEDS_cohort_dir/train` should exist. - load_data: If True, returns LazyFrames for each data split, otherwise returns file paths. - - Returns: - A dictionary mapping from split name to a LazyFrame, containing the MEDS dataset for each split. - - Example: - >>> import tempfile - >>> from pathlib import Path - >>> MEDS_cohort_dir = Path(tempfile.mkdtemp()) - >>> for split in ["train", "val", "test"]: - ... split_dir = MEDS_cohort_dir / split - ... split_dir.mkdir() - ... pl.DataFrame({"patient_id": [1, 2, 3]}).write_parquet(split_dir / "data.parquet") - >>> split_to_df = load_meds_data(MEDS_cohort_dir) - >>> assert "train" in split_to_df - >>> assert len(split_to_df) == 3 - >>> assert len(split_to_df["train"]) == 1 - >>> assert isinstance(split_to_df["train"][0], pl.LazyFrame) - """ - MEDS_cohort_dir = Path(MEDS_cohort_dir) - meds_fps = list(MEDS_cohort_dir.glob("*/*.parquet")) - splits = {fp.parent.stem for fp in meds_fps} - split_to_fps = {split: [fp for fp in meds_fps if fp.parent.stem == split] for split in splits} - if not load_data: - return split_to_fps - split_to_df = { - split: [pl.scan_parquet(fp) for fp in split_fps] for split, split_fps in split_to_fps.items() - } - return split_to_df - - def get_events_df(shard_df: pl.LazyFrame, feature_columns) -> pl.LazyFrame: """Extracts and filters an Events LazyFrame with one row per observation (timestamps can be duplicated). From 9cfadb6b49da117db22aaf7875e598301f891b5e Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:53:35 -0400 Subject: [PATCH 37/48] removing file --- src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace diff --git a/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace b/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace deleted file mode 100644 index fddd0bd..0000000 --- a/src/MEDS_tabular_automl/scripts/hf_subtype.code-workspace +++ /dev/null @@ -1,7 +0,0 @@ -{ - "folders": [ - { - "path": "../../../../TabBench" - } - ] -} From afdc9b194e6ba92432119cb7257ac46be193e6d5 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 22:57:24 -0400 Subject: [PATCH 38/48] Correct tiny test typo --- src/MEDS_tabular_automl/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 5319569..ed9c3a2 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -229,7 +229,7 @@ def write_df(df: pl.LazyFrame | pl.DataFrame | coo_array, fp: Path, do_overwrite ... write_df(df_coo_array, fp, do_overwrite=False) Traceback (most recent call last): ... - FileExistsError: test.parquet exists and do_overwrite is False! + FileExistsError: ...test.npz exists and do_overwrite is False! """ if fp.is_file() and not do_overwrite: raise FileExistsError(f"{fp} exists and do_overwrite is {do_overwrite}!") From a8a7b3c1b2706ebc1e21130abd5b89e56e3e24bd Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 23:04:14 -0400 Subject: [PATCH 39/48] Update function signature --- src/MEDS_tabular_automl/generate_summarized_reps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index 0745440..b0d66ac 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -16,7 +16,7 @@ ) -def sparse_aggregate(sparse_matrix: sparray, agg: str) -> csr_array: +def sparse_aggregate(sparse_matrix: sparray, agg: str) -> np.ndarray | coo_array: """Aggregates values in a sparse matrix according to the specified method. Args: From abc0f322023928d59ee2815bffc8682f1dfc9ae6 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 23:07:05 -0400 Subject: [PATCH 40/48] update the github criteria to the failing scenario --- .github/workflows/tests.yaml | 2 +- .../generate_summarized_reps.py | 25 +------------------ 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index b982ebd..c96be0e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -26,7 +26,7 @@ jobs: - name: Install packages run: | - pip install -e .[tests] + pip install .[tests] #---------------------------------------------- # run test suite diff --git a/src/MEDS_tabular_automl/generate_summarized_reps.py b/src/MEDS_tabular_automl/generate_summarized_reps.py index b0d66ac..f9afb70 100644 --- a/src/MEDS_tabular_automl/generate_summarized_reps.py +++ b/src/MEDS_tabular_automl/generate_summarized_reps.py @@ -6,8 +6,7 @@ from loguru import logger from scipy.sparse import coo_array, csr_array, sparray -from MEDS_tabular_automl.describe_codes import get_feature_columns -from MEDS_tabular_automl.generate_ts_features import get_feature_names, get_flat_ts_rep +from MEDS_tabular_automl.generate_ts_features import get_feature_names from MEDS_tabular_automl.utils import ( CODE_AGGREGATIONS, VALUE_AGGREGATIONS, @@ -207,25 +206,3 @@ def generate_summary( out_matrix = compute_agg(index_df, matrix, window_size, agg, len(ts_columns), use_tqdm=use_tqdm) return out_matrix - - -if __name__ == "__main__": - from pathlib import Path - - feature_columns_fp = ( - Path("/storage/shared/meds_tabular_ml/ebcl_dataset/processed") / "tabularized_code_metadata.parquet" - ) - shard_fp = Path("/storage/shared/meds_tabular_ml/ebcl_dataset/processed/final_cohort/train/0.parquet") - - feature_columns = get_feature_columns(feature_columns_fp) - df = pl.scan_parquet(shard_fp) - agg = "code/count" - index_df, sparse_matrix = get_flat_ts_rep(agg, feature_columns, df) - generate_summary( - feature_columns=feature_columns, - index_df=index_df, - matrix=sparse_matrix, - window_size="full", - agg=agg, - use_tqdm=True, - ) From d28ee3bcdf1fbf817c2d9014381c34917f9bb154 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 23:14:31 -0400 Subject: [PATCH 41/48] Fixed filter_to_codes resolver issue by moving it to utils and simplifying internal logic to not rely on describe_codes functions. --- src/MEDS_tabular_automl/describe_codes.py | 40 ------------------- .../scripts/tabularize_static.py | 2 +- src/MEDS_tabular_automl/utils.py | 37 +++++++++++++++++ 3 files changed, 38 insertions(+), 41 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 3c5395c..2ded0e8 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -1,7 +1,6 @@ from pathlib import Path import polars as pl -from omegaconf import OmegaConf from MEDS_tabular_automl.utils import DF_T, get_feature_names @@ -169,45 +168,6 @@ def get_feature_freqs(fp: Path) -> dict[str, int]: return convert_to_freq_dict(pl.scan_parquet(fp)) -def filter_to_codes( - allowed_codes: list[str] | None, - min_code_inclusion_frequency: int, - code_metadata_fp: Path, -) -> list[str]: - """Filters and returns codes based on allowed list and minimum frequency. - - Args: - allowed_codes: List of allowed codes, None means all codes are allowed. - min_code_inclusion_frequency: Minimum frequency a code must have to be included. - code_metadata_fp: Path to the metadata file containing code information. - - Returns: - Sorted list of the intersection of allowed codes (if they are specified) and filters based on - inclusion frequency. - - Examples: - >>> from tempfile import NamedTemporaryFile - >>> with NamedTemporaryFile() as f: - ... pl.DataFrame({"code": ["E", "D", "A"], "count": [4, 3, 2]}).write_parquet(f.name) - ... filter_to_codes(["A", "D"], 3, f.name) - ['D'] - """ - if allowed_codes is None: - allowed_codes = get_feature_columns(code_metadata_fp) - feature_freqs = get_feature_freqs(code_metadata_fp) - allowed_codes_set = set(allowed_codes) - - filtered_codes = [ - code - for code, freq in feature_freqs.items() - if freq >= min_code_inclusion_frequency and code in allowed_codes_set - ] - return sorted(filtered_codes) - - -OmegaConf.register_new_resolver("filter_to_codes", filter_to_codes) - - def clear_code_aggregation_suffix(code: str) -> str: """Removes aggregation suffixes from code strings. diff --git a/src/MEDS_tabular_automl/scripts/tabularize_static.py b/src/MEDS_tabular_automl/scripts/tabularize_static.py index 4e03ff4..d7434f0 100644 --- a/src/MEDS_tabular_automl/scripts/tabularize_static.py +++ b/src/MEDS_tabular_automl/scripts/tabularize_static.py @@ -17,7 +17,6 @@ from MEDS_tabular_automl.describe_codes import ( convert_to_df, filter_parquet, - filter_to_codes, get_feature_columns, get_feature_freqs, ) @@ -27,6 +26,7 @@ from MEDS_tabular_automl.utils import ( STATIC_CODE_AGGREGATION, STATIC_VALUE_AGGREGATION, + filter_to_codes, get_shard_prefix, hydra_loguru_init, load_tqdm, diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index ed9c3a2..9527aea 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -12,6 +12,7 @@ import numpy as np import polars as pl from loguru import logger +from omegaconf import OmegaConf from scipy.sparse import coo_array DF_T = pl.LazyFrame @@ -44,6 +45,42 @@ def hydra_loguru_init() -> None: logger.add(os.path.join(hydra_path, "main.log")) +def filter_to_codes( + allowed_codes: list[str] | None, + min_code_inclusion_frequency: int, + code_metadata_fp: Path, +) -> list[str]: + """Filters and returns codes based on allowed list and minimum frequency. + + Args: + allowed_codes: List of allowed codes, None means all codes are allowed. + min_code_inclusion_frequency: Minimum frequency a code must have to be included. + code_metadata_fp: Path to the metadata file containing code information. + + Returns: + Sorted list of the intersection of allowed codes (if they are specified) and filters based on + inclusion frequency. + + Examples: + >>> from tempfile import NamedTemporaryFile + >>> with NamedTemporaryFile() as f: + ... pl.DataFrame({"code": ["E", "D", "A"], "count": [4, 3, 2]}).write_parquet(f.name) + ... filter_to_codes(["A", "D"], 3, f.name) + ['D'] + """ + + feature_freqs = pl.read_parquet(code_metadata_fp) + + if allowed_codes is not None: + feature_freqs = feature_freqs.filter(pl.col("code").is_in(allowed_codes)) + + feature_freqs = feature_freqs.filter(pl.col("count") >= min_code_inclusion_frequency) + return sorted(feature_freqs["code"].to_list()) + + +OmegaConf.register_new_resolver("filter_to_codes", filter_to_codes) + + def load_tqdm(use_tqdm: bool): """Conditionally loads and returns tqdm progress bar handler or a no-operation function. From c76e114b1a0c553d05a65d5b7296d70636809f2f Mon Sep 17 00:00:00 2001 From: Nassim Oufattole Date: Thu, 13 Jun 2024 03:28:12 +0000 Subject: [PATCH 42/48] updated tests to go over all aggregations --- tests/test_integration.py | 6 +--- tests/test_tabularize.py | 62 +++------------------------------------ 2 files changed, 5 insertions(+), 63 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 07851f5..3c0bee8 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -17,7 +17,6 @@ SPLITS_JSON, STATIC_FIRST_COLS, STATIC_PRESENT_COLS, - SUMMARIZE_EXPECTED_FILES, VALUE_COLS, ) @@ -112,7 +111,6 @@ def test_integration(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } stderr, stdout = run_command( @@ -167,7 +165,6 @@ def test_integration(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } @@ -185,7 +182,7 @@ def test_integration(): for each in output_files if "none/static" not in str(each) ] - assert set(actual_files) == set(SUMMARIZE_EXPECTED_FILES) + assert len(actual_files) > 0 for f in output_files: ts_matrix = load_matrix(f) assert ts_matrix.shape[0] > 0, "Time-Series Tabular Dataframe Should not be Empty!" @@ -215,7 +212,6 @@ def test_integration(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - # "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } with initialize( diff --git a/tests/test_tabularize.py b/tests/test_tabularize.py index e67e2e1..0a33409 100644 --- a/tests/test_tabularize.py +++ b/tests/test_tabularize.py @@ -10,6 +10,7 @@ import polars as pl from hydra import compose, initialize +from loguru import logger from MEDS_tabular_automl.describe_codes import get_feature_columns from MEDS_tabular_automl.file_name import list_subdir_files @@ -29,6 +30,8 @@ load_matrix, ) +logger.disable("MEDS_tabular_automl") + SPLITS_JSON = """{"train/0": [239684, 1195293], "train/1": [68729, 814703], "tuning/0": [754281], "held_out/0": [1500733]}""" # noqa: E501 MEDS_TRAIN_0 = """ @@ -142,60 +145,6 @@ "tuning/0/none/static/present.npz", ] -SUMMARIZE_EXPECTED_FILES = [ - "train/1/365d/value/sum.npz", - "train/1/365d/code/count.npz", - "train/1/full/value/sum.npz", - "train/1/full/code/count.npz", - "train/1/30d/value/sum.npz", - "train/1/30d/code/count.npz", - "train/0/365d/value/sum.npz", - "train/0/365d/code/count.npz", - "train/0/full/value/sum.npz", - "train/0/full/code/count.npz", - "train/0/30d/value/sum.npz", - "train/0/30d/code/count.npz", - "held_out/0/365d/value/sum.npz", - "held_out/0/365d/code/count.npz", - "held_out/0/full/value/sum.npz", - "held_out/0/full/code/count.npz", - "held_out/0/30d/value/sum.npz", - "held_out/0/30d/code/count.npz", - "tuning/0/365d/value/sum.npz", - "tuning/0/365d/code/count.npz", - "tuning/0/full/value/sum.npz", - "tuning/0/full/code/count.npz", - "tuning/0/30d/value/sum.npz", - "tuning/0/30d/code/count.npz", -] - -MERGE_EXPECTED_FILES = [ - "train/365d/value/sum/0.npz", - "train/365d/value/sum/1.npz", - "train/365d/code/count/0.npz", - "train/365d/code/count/1.npz", - "train/full/value/sum/0.npz", - "train/full/value/sum/1.npz", - "train/full/code/count/0.npz", - "train/full/code/count/1.npz", - "train/30d/value/sum/0.npz", - "train/30d/value/sum/1.npz", - "train/30d/code/count/0.npz", - "train/30d/code/count/1.npz", - "held_out/365d/value/sum/0.npz", - "held_out/365d/code/count/0.npz", - "held_out/full/value/sum/0.npz", - "held_out/full/code/count/0.npz", - "held_out/30d/value/sum/0.npz", - "held_out/30d/code/count/0.npz", - "tuning/365d/value/sum/0.npz", - "tuning/365d/code/count/0.npz", - "tuning/full/value/sum/0.npz", - "tuning/full/code/count/0.npz", - "tuning/30d/value/sum/0.npz", - "tuning/30d/code/count/0.npz", -] - def test_tabularize(): with tempfile.TemporaryDirectory() as d: @@ -260,7 +209,6 @@ def test_tabularize(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } @@ -310,7 +258,7 @@ def test_tabularize(): for each in output_files if "none/static" not in str(each) ] - assert set(actual_files) == set(SUMMARIZE_EXPECTED_FILES) + assert len(actual_files) > 0 for f in output_files: ts_matrix = load_matrix(f) assert ts_matrix.shape[0] > 0, "Time-Series Tabular Dataframe Should not be Empty!" @@ -341,7 +289,6 @@ def test_tabularize(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } @@ -378,7 +325,6 @@ def test_tabularize(): "tqdm": False, "loguru_init": True, "tabularization.min_code_inclusion_frequency": 1, - "tabularization.aggs": "[static/present,static/first,code/count,value/sum]", "tabularization.window_sizes": "[30d,365d,full]", } From 0b34538d76bdfe9d2f6b980434b65d96e90135ea Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 23:44:48 -0400 Subject: [PATCH 43/48] removed -> None return --- src/MEDS_tabular_automl/scripts/cache_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/scripts/cache_task.py b/src/MEDS_tabular_automl/scripts/cache_task.py index 5550058..b42e765 100644 --- a/src/MEDS_tabular_automl/scripts/cache_task.py +++ b/src/MEDS_tabular_automl/scripts/cache_task.py @@ -62,7 +62,7 @@ def generate_row_cached_matrix(matrix: sp.coo_array, label_df: pl.LazyFrame) -> @hydra.main(version_base=None, config_path=str(config_yaml.parent.resolve()), config_name=config_yaml.stem) -def main(cfg: DictConfig) -> None: +def main(cfg: DictConfig): """Performs row splicing of tabularized data for a specific task based on configuration. Uses Hydra to manage configurations and logging. The function processes data files based on specified From 8c882a3bff4958b15b4ba9dd6697bfc0b2786a58 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 23:47:22 -0400 Subject: [PATCH 44/48] removed -> None return --- src/MEDS_tabular_automl/scripts/generate_permutations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/scripts/generate_permutations.py b/src/MEDS_tabular_automl/scripts/generate_permutations.py index 16e7b0c..749acc2 100644 --- a/src/MEDS_tabular_automl/scripts/generate_permutations.py +++ b/src/MEDS_tabular_automl/scripts/generate_permutations.py @@ -35,7 +35,7 @@ def get_permutations(list_of_options: list[str]) -> None: format_print(sorted(permutations)) -def main() -> None: +def main(): """Generates and prints all possible permutations from given list of options.""" list_of_options = list(sys.argv[1].strip("[]").split(",")) get_permutations(list_of_options) From 6eda7b5edf19744d2d27633ec52c20f59393df0e Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 23:57:14 -0400 Subject: [PATCH 45/48] removed -> None return and shortened first line --- src/MEDS_tabular_automl/scripts/describe_codes.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/describe_codes.py b/src/MEDS_tabular_automl/scripts/describe_codes.py index 999a8ab..603766c 100644 --- a/src/MEDS_tabular_automl/scripts/describe_codes.py +++ b/src/MEDS_tabular_automl/scripts/describe_codes.py @@ -31,9 +31,8 @@ @hydra.main(version_base=None, config_path=str(config_yaml.parent.resolve()), config_name=config_yaml.stem) -def main(cfg: DictConfig) -> None: - """Main function that orchestrates the feature frequency computation and storage process and enables - filtering out infrequent events. +def main(cfg: DictConfig): + """Computes feature frequencies and stores them to disk. Args: cfg: The configuration object for the tabularization process, loaded from a Hydra From 3040b7975e89a17ae30022d7e802ee45404ce864 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Thu, 13 Jun 2024 00:15:08 -0400 Subject: [PATCH 46/48] removed unnecessary annotation + doctest --- src/MEDS_tabular_automl/generate_ts_features.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/generate_ts_features.py b/src/MEDS_tabular_automl/generate_ts_features.py index d6057d2..65f95ab 100644 --- a/src/MEDS_tabular_automl/generate_ts_features.py +++ b/src/MEDS_tabular_automl/generate_ts_features.py @@ -19,10 +19,18 @@ def feature_name_to_code(feature_name: str) -> str: """Converts a feature name to a code name by removing the aggregation part. Args: - feature_name (str): The full feature name, including aggregation. + feature_name: The full feature name, including aggregation. Returns: The code name without the aggregation part. + + Examples: + >>> feature_name_to_code("A/code/count") + 'A/code' + >>> feature_name_to_code("A/B/code/count") + 'A/B/code' + >>> feature_name_to_code("invalid_name") + '' """ return "/".join(feature_name.split("/")[:-1]) From 0dbd79150b93ff73f182085f725b7d49052810f2 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Thu, 13 Jun 2024 00:47:51 -0400 Subject: [PATCH 47/48] edited docstrings for class XGBoost --- .../scripts/launch_xgboost.py | 71 +++++++++---------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/launch_xgboost.py b/src/MEDS_tabular_automl/scripts/launch_xgboost.py index 19da357..be5102a 100644 --- a/src/MEDS_tabular_automl/scripts/launch_xgboost.py +++ b/src/MEDS_tabular_automl/scripts/launch_xgboost.py @@ -96,10 +96,10 @@ def _load_matrix(self, path: Path) -> sp.csc_matrix: """Load a sparse matrix from disk. Args: - - path (Path): Path to the sparse matrix. + path: Path to the sparse matrix. Returns: - - sp.csc_matrix: Sparse matrix. + The sparse matrix. """ npzfile = np.load(path) array, shape = npzfile["array"], npzfile["shape"] @@ -113,10 +113,10 @@ def load_labels(self) -> tuple[Mapping[int, list], Mapping[int, list]]: """Loads valid event ids and labels for each shard. Returns: - - Tuple[Mapping[int, list], Mapping[int, list]]: Tuple containing: - dictionary from shard number to list of valid event ids -- used for indexing rows - in the sparse matrix - dictionary from shard number to list of labels for these valid event ids + A tuple containing: + dictionary from shard number to list of valid event ids -- used for indexing + rows in the sparse matrix + dictionary from shard number to list of labels for these valid event ids """ label_fps = { shard: (Path(self.cfg.input_label_dir) / self.split / shard).with_suffix(".parquet") @@ -214,10 +214,10 @@ def _filter_shard_on_codes_and_freqs(self, agg: str, df: sp.csc_matrix) -> sp.cs frame to only include columns that are True in the mask. Args: - - df (scipy.sparse.csc_matrix): Data frame to filter. + df: Data frame to filter. Returns: - - df (scipy.sparse.sp.csc_matrix): Filtered data frame. + The filtered data frame. """ if self.codes_set is None: return df @@ -233,14 +233,14 @@ def _filter_shard_on_codes_and_freqs(self, agg: str, df: sp.csc_matrix) -> sp.cs @TimeableMixin.TimeAs def next(self, input_data: Callable): - """Advance the iterator by 1 step and pass the data to XGBoost. This function is called by XGBoost - during the construction of ``DMatrix`` + """Advances iterator by 1 step and passes the data to XGBoost (called furing construction of + ``DMatrix``). Args: - - input_data (Callable): A function passed by XGBoost with the same signature as `DMatrix`. + input_data: A function passed by XGBoost with the same signature as `DMatrix`. Returns: - - int: 0 if end of iteration, 1 otherwise. + 0 if end of iteration, 1 otherwise. """ if self._it == len(self._data_shards): # return 0 to let XGBoost know this is the end of iteration @@ -256,7 +256,7 @@ def next(self, input_data: Callable): @TimeableMixin.TimeAs def reset(self): - """Reset the iterator to its beginning.""" + """Resets the iterator to its beginning.""" self._it = 0 @TimeableMixin.TimeAs @@ -287,12 +287,11 @@ def collect_in_memory(self) -> tuple[sp.csc_matrix, np.ndarray]: class XGBoostModel(TimeableMixin): def __init__(self, cfg: DictConfig): - """Initialize the XGBoostClassifier with the provided configuration. + """Initializes the XGBoostClassifier with the provided configuration. Args: - - cfg (DictConfig): Configuration dictionary. + cfg: The configuration dictionary. """ - self.cfg = cfg self.keep_data_in_memory = cfg.model_params.iterator.keep_data_in_memory @@ -306,9 +305,19 @@ def __init__(self, cfg: DictConfig): self.model = None + @TimeableMixin.TimeAs + def _build(self): + """Builds necessary data structures for training.""" + if self.keep_data_in_memory: + self._build_iterators() + self._build_dmatrix_in_memory() + else: + self._build_iterators() + self._build_dmatrix_from_iterators() + @TimeableMixin.TimeAs def _train(self): - """Train the model.""" + """Trains the model.""" self.model = xgb.train( OmegaConf.to_container(self.cfg.model_params.model), self.dtrain, @@ -321,23 +330,13 @@ def _train(self): @TimeableMixin.TimeAs def train(self): - """Train the model.""" + """Trains the model.""" self._build() self._train() - @TimeableMixin.TimeAs - def _build(self): - """Build necessary data structures for training.""" - if self.keep_data_in_memory: - self._build_iterators() - self._build_dmatrix_in_memory() - else: - self._build_iterators() - self._build_dmatrix_from_iterators() - @TimeableMixin.TimeAs def _build_dmatrix_in_memory(self): - """Build the DMatrix from the data in memory.""" + """Builds the DMatrix from the data in memory.""" X_train, y_train = self.itrain.collect_in_memory() X_tuning, y_tuning = self.ituning.collect_in_memory() X_held_out, y_held_out = self.iheld_out.collect_in_memory() @@ -347,24 +346,24 @@ def _build_dmatrix_in_memory(self): @TimeableMixin.TimeAs def _build_dmatrix_from_iterators(self): - """Build the DMatrix from the iterators.""" + """Builds the DMatrix from the iterators.""" self.dtrain = xgb.DMatrix(self.itrain) self.dtuning = xgb.DMatrix(self.ituning) self.dheld_out = xgb.DMatrix(self.iheld_out) @TimeableMixin.TimeAs def _build_iterators(self): - """Build the iterators for training, validation, and testing.""" + """Builds the iterators for training, validation, and testing.""" self.itrain = Iterator(self.cfg, split="train") self.ituning = Iterator(self.cfg, split="tuning") self.iheld_out = Iterator(self.cfg, split="held_out") @TimeableMixin.TimeAs def evaluate(self) -> float: - """Evaluate the model on the test set. + """Evaluates the model on the test set. Returns: - - float: Evaluation metric (mae). + The evaluation metric as the ROC AUC score. """ y_pred = self.model.predict(self.dheld_out) y_true = self.dheld_out.get_label() @@ -373,13 +372,13 @@ def evaluate(self) -> float: @hydra.main(version_base=None, config_path=str(config_yaml.parent.resolve()), config_name=config_yaml.stem) def main(cfg: DictConfig) -> float: - """Optimize the model based on the provided configuration. + """Optimizes the model based on the provided configuration. Args: - - cfg (DictConfig): Configuration dictionary. + cfg: The configuration dictionary specifying model and training parameters. Returns: - - float: Evaluation result. + The evaluation result as the AUC score on the held-out test set. """ # print(OmegaConf.to_yaml(cfg)) From 35b71d833837ac6a972f1db795d757be38abd87a Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Thu, 13 Jun 2024 01:39:25 -0400 Subject: [PATCH 48/48] edited docstrings for class Iterator --- .../scripts/launch_xgboost.py | 101 ++++++++++++------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/src/MEDS_tabular_automl/scripts/launch_xgboost.py b/src/MEDS_tabular_automl/scripts/launch_xgboost.py index be5102a..9089bd5 100644 --- a/src/MEDS_tabular_automl/scripts/launch_xgboost.py +++ b/src/MEDS_tabular_automl/scripts/launch_xgboost.py @@ -23,7 +23,7 @@ class Iterator(xgb.DataIter, TimeableMixin): - """Iterator class for loading and processing data shards. + """Iterator class for loading and processing data shards for use in XGBoost models. This class provides functionality for iterating through data shards, loading feature data and labels, and processing them based on the provided configuration. @@ -72,7 +72,7 @@ def __init__(self, cfg: DictConfig, split: str = "train"): @TimeableMixin.TimeAs def _get_code_masks(self, feature_columns: list, codes_set: set) -> Mapping[str, list[bool]]: - """Create boolean masks for filtering features. + """Creates boolean masks for filtering features. Creates a dictionary of boolean masks for each aggregation type. The masks are used to filter the feature columns based on the specified included codes and minimum code inclusion frequency. @@ -93,13 +93,16 @@ def _get_code_masks(self, feature_columns: list, codes_set: set) -> Mapping[str, @TimeableMixin.TimeAs def _load_matrix(self, path: Path) -> sp.csc_matrix: - """Load a sparse matrix from disk. + """Loads a sparse matrix from disk. Args: path: Path to the sparse matrix. Returns: The sparse matrix. + + Raises: + ValueError: If the loaded array does not have exactly 3 rows, indicating an unexpected format. """ npzfile = np.load(path) array, shape = npzfile["array"], npzfile["shape"] @@ -113,10 +116,9 @@ def load_labels(self) -> tuple[Mapping[int, list], Mapping[int, list]]: """Loads valid event ids and labels for each shard. Returns: - A tuple containing: - dictionary from shard number to list of valid event ids -- used for indexing - rows in the sparse matrix - dictionary from shard number to list of labels for these valid event ids + A tuple containing two mappings: one from shard indices to lists of valid event IDs + which is used for indexing rows in the sparse matrix, and another from shard indices + to lists of corresponding labels. """ label_fps = { shard: (Path(self.cfg.input_label_dir) / self.split / shard).with_suffix(".parquet") @@ -138,8 +140,15 @@ def load_labels(self) -> tuple[Mapping[int, list], Mapping[int, list]]: return cached_event_ids, cached_labels @TimeableMixin.TimeAs - def _get_code_set(self) -> tuple[set, Mapping[int, list], int]: - """Get the set of codes to include in the data based on the configuration.""" + def _get_code_set(self) -> tuple[set[int], Mapping[str, list[bool]], int]: + """Determines the set of feature codes to include based on the configuration settings. + + Returns: + A tuple containing: + - A set of feature indices to be included. + - A mapping from aggregation types to boolean masks indicating whether each feature is included. + - The total number of features. + """ feature_columns = get_feature_columns(self.cfg.tabularization.filtered_code_metadata_fp) feature_dict = {col: i for i, col in enumerate(feature_columns)} allowed_codes = set(self.cfg.tabularization._resolved_codes) @@ -153,13 +162,14 @@ def _get_code_set(self) -> tuple[set, Mapping[int, list], int]: @TimeableMixin.TimeAs def _load_dynamic_shard_from_file(self, path: Path, idx: int) -> sp.csc_matrix: - """Load a sparse shard into memory. + """Loads a specific data shard into memory as a sparse matrix. Args: - - path (Path): Path to the sparse shard. + path: Path to the sparse shard. + idx: Index of the shard. Returns: - - sp.csc_matrix: Data frame with the sparse shard. + The sparse matrix loaded from the file. """ # column_shard is of form event_idx, feature_idx, value matrix = self._load_matrix(path) @@ -172,14 +182,16 @@ def _load_dynamic_shard_from_file(self, path: Path, idx: int) -> sp.csc_matrix: @TimeableMixin.TimeAs def _get_dynamic_shard_by_index(self, idx: int) -> sp.csc_matrix: - """Load a specific shard of dynamic data from disk and return it as a sparse matrix after filtering - column inclusion. + """Loads a shard and returns it as a sparse matrix after applying feature inclusion filtering. Args: - - idx (int): Index of the shard to load. + idx: Index of the shard to load from disk. Returns: - - sp.csc_matrix: Filtered sparse matrix. + The filtered sparse matrix. + + Raises: + ValueError: If any of the required files for the shard do not exist. """ # get all window_size x aggreagation files using the file resolver files = get_model_files(self.cfg, self.split, self._data_shards[idx]) @@ -195,14 +207,14 @@ def _get_dynamic_shard_by_index(self, idx: int) -> sp.csc_matrix: @TimeableMixin.TimeAs def _get_shard_by_index(self, idx: int) -> tuple[sp.csc_matrix, np.ndarray]: - """Load a specific shard of data from disk and concatenate with static data. + """Loads a specific shard of data from disk and concatenate with static data. Args: - - idx (int): Index of the shard to load. + idx: Index of the shard to load. Returns: - - X (scipy.sparse.csc_matrix): Feature data frame.ß - - y (numpy.ndarray): Labels. + A tuple containing the combined feature data and the corresponding labels + for the given shard. """ dynamic_df = self._get_dynamic_shard_by_index(idx) label_df = self.labels[self._data_shards[idx]] @@ -210,11 +222,14 @@ def _get_shard_by_index(self, idx: int) -> tuple[sp.csc_matrix, np.ndarray]: @TimeableMixin.TimeAs def _filter_shard_on_codes_and_freqs(self, agg: str, df: sp.csc_matrix) -> sp.csc_matrix: - """Filter the dynamic data frame based on the inclusion sets. Given the codes_mask, filter the data - frame to only include columns that are True in the mask. + """Filters the given data frame based on the inclusion sets and aggregation type. + + Given the codes_mask, the method filters the dynamic data frame to only include + columns that are True in the mask. Args: - df: Data frame to filter. + agg: The aggregation type used to determine the filtering logic. + df: The data frame to be filtered. Returns: The filtered data frame. @@ -232,9 +247,8 @@ def _filter_shard_on_codes_and_freqs(self, agg: str, df: sp.csc_matrix) -> sp.cs return df @TimeableMixin.TimeAs - def next(self, input_data: Callable): - """Advances iterator by 1 step and passes the data to XGBoost (called furing construction of - ``DMatrix``). + def next(self, input_data: Callable) -> int: + """Advances the iterator by one step and provides data to XGBoost for DMatrix construction. Args: input_data: A function passed by XGBoost with the same signature as `DMatrix`. @@ -263,15 +277,17 @@ def reset(self): def collect_in_memory(self) -> tuple[sp.csc_matrix, np.ndarray]: """Collects data from all shards into memory and returns it. - This method iterates through all data shards, retrieves the feature data - and labels from each shard, and then concatenates them into a single - sparse matrix and a single array, respectively. + This method iterates through all data shards, retrieves the feature data and labels + from each shard, and then concatenates them into a single sparse matrix and a single + array, respectively. Returns: A tuple where the first element is a sparse matrix containing the feature data, and the second element is a numpy array containing the labels. - """ + Raises: + ValueError: If no data is found in the shards or labels, indicating an issue with input files. + """ X = [] y = [] for i in range(len(self._data_shards)): @@ -286,6 +302,29 @@ def collect_in_memory(self) -> tuple[sp.csc_matrix, np.ndarray]: class XGBoostModel(TimeableMixin): + """Class for configuring, training, and evaluating an XGBoost model. + + This class utilizes the configuration settings provided to manage the training and evaluation + process of an XGBoost model, ensuring the model is trained and validated using specified parameters + and data splits. It supports training with in-memory data handling as well as direct streaming from + disk using iterators. + + Args: + cfg: The configuration settings for the model, including data paths, model parameters, + and flags for data handling. + + Attributes: + cfg: Configuration object containing all settings required for model operation. + model: The XGBoost model after being trained. + dtrain: The training dataset in DMatrix format. + dtuning: The tuning (validation) dataset in DMatrix format. + dheld_out: The held-out (test) dataset in DMatrix format. + itrain: Iterator for the training dataset. + ituning: Iterator for the tuning dataset. + iheld_out: Iterator for the held-out dataset. + keep_data_in_memory: Flag indicating whether to keep all data in memory or stream from disk. + """ + def __init__(self, cfg: DictConfig): """Initializes the XGBoostClassifier with the provided configuration. @@ -378,7 +417,7 @@ def main(cfg: DictConfig) -> float: cfg: The configuration dictionary specifying model and training parameters. Returns: - The evaluation result as the AUC score on the held-out test set. + The evaluation result as the ROC AUC score on the held-out test set. """ # print(OmegaConf.to_yaml(cfg))