From 3d6fa4379bb4183c3d2ffc6c0e3183ba76cd1f8d Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 09:27:07 -0400 Subject: [PATCH 01/16] Added docstring for first function --- src/MEDS_tabular_automl/file_name.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/file_name.py b/src/MEDS_tabular_automl/file_name.py index 898d11e..92e1995 100644 --- a/src/MEDS_tabular_automl/file_name.py +++ b/src/MEDS_tabular_automl/file_name.py @@ -1,8 +1,19 @@ -"""Help functions for getting file names and paths for MEDS tabular automl tasks.""" +"""Helper functions for getting file names and paths for MEDS tabular automl tasks.""" from pathlib import Path -def list_subdir_files(dir: [Path | str], fmt: str): +def list_subdir_files(dir: Path | str, fmt: str) -> list[Path]: + """List files in subdirectories of a directory with a given extension. + + Args: + dir: Path to the directory. + fmt: File extension to filter files. + + Returns: + An alphabetically sorted list of Path objects to files matching the extension in any level of + subdirectories of the given directory. + """ + return sorted(list(Path(dir).glob(f"**/*.{fmt}"))) From ad7ced347445b856d109efbe097d0e1c9e427090 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 09:36:54 -0400 Subject: [PATCH 02/16] Added doctests for list_subdir_files --- src/MEDS_tabular_automl/file_name.py | 41 +++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/MEDS_tabular_automl/file_name.py b/src/MEDS_tabular_automl/file_name.py index 92e1995..35c4cfb 100644 --- a/src/MEDS_tabular_automl/file_name.py +++ b/src/MEDS_tabular_automl/file_name.py @@ -2,19 +2,52 @@ from pathlib import Path -def list_subdir_files(dir: Path | str, fmt: str) -> list[Path]: +def list_subdir_files(root: Path | str, ext: str) -> list[Path]: """List files in subdirectories of a directory with a given extension. Args: - dir: Path to the directory. - fmt: File extension to filter files. + root: Path to the directory. + ext: File extension to filter files. Returns: An alphabetically sorted list of Path objects to files matching the extension in any level of subdirectories of the given directory. + + Examples: + >>> import tempfile + >>> tmpdir = tempfile.TemporaryDirectory() + >>> root = Path(tmpdir.name) + >>> subdir_1 = root / "subdir_1" + >>> subdir_1.mkdir() + >>> subdir_2 = root / "subdir_2" + >>> subdir_2.mkdir() + >>> subdir_1_A = subdir_1 / "A" + >>> subdir_1_A.mkdir() + >>> (root / "1.csv").touch() + >>> (root / "foo.parquet").touch() + >>> (root / "2.csv").touch() + >>> (root / "subdir_1" / "3.csv").touch() + >>> (root / "subdir_2" / "4.csv").touch() + >>> (root / "subdir_1" / "A" / "5.csv").touch() + >>> (root / "subdir_1" / "A" / "15.csv.gz").touch() + >>> [fp.relative_to(root) for fp in list_subdir_files(root, "csv")] # doctest: +NORMALIZE_WHITESPACE + [PosixPath('1.csv'), + PosixPath('2.csv'), + PosixPath('subdir_1/3.csv'), + PosixPath('subdir_1/A/5.csv'), + PosixPath('subdir_2/4.csv')] + >>> [fp.relative_to(root) for fp in list_subdir_files(root, "parquet")] + [PosixPath('foo.parquet')] + >>> [fp.relative_to(root) for fp in list_subdir_files(root, "csv.gz")] + [PosixPath('subdir_1/A/15.csv.gz')] + >>> [fp.relative_to(root) for fp in list_subdir_files(root, "json")] + [] + >>> list_subdir_files(root / "nonexistent", "csv") + [] + >>> tmpdir.cleanup() """ - return sorted(list(Path(dir).glob(f"**/*.{fmt}"))) + return sorted(list(Path(root).glob(f"**/*.{ext}"))) def get_task_specific_path(cfg, split, shard_num, window_size, agg): From 92d2a2c335676906126a02f023c9e509ad4e1122 Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 09:40:44 -0400 Subject: [PATCH 03/16] Removed rarely used and unnecessary function --- src/MEDS_tabular_automl/file_name.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/MEDS_tabular_automl/file_name.py b/src/MEDS_tabular_automl/file_name.py index 35c4cfb..c8f5b0d 100644 --- a/src/MEDS_tabular_automl/file_name.py +++ b/src/MEDS_tabular_automl/file_name.py @@ -50,13 +50,10 @@ def list_subdir_files(root: Path | str, ext: str) -> list[Path]: return sorted(list(Path(root).glob(f"**/*.{ext}"))) -def get_task_specific_path(cfg, split, shard_num, window_size, agg): - return Path(cfg.input_dir) / split / f"{shard_num}" / f"{window_size}" / f"{agg}.npz" - - def get_model_files(cfg, split: str, shard_num: int): window_sizes = cfg.tabularization.window_sizes aggs = cfg.tabularization.aggs + shard_dir = Path(cfg.input_dir) / split / f"{shard_num}" # Given a shard number, returns the model files model_files = [] for window_size in window_sizes: @@ -64,9 +61,9 @@ def get_model_files(cfg, split: str, shard_num: int): if agg.startswith("static"): continue else: - model_files.append(get_task_specific_path(cfg, split, shard_num, window_size, agg)) + model_files.append(shard_dir / window_size / f"{agg}.npz") for agg in aggs: if agg.startswith("static"): window_size = "none" - model_files.append(get_task_specific_path(cfg, split, shard_num, window_size, agg)) + model_files.append(shard_dir / window_size / f"{agg}.npz") return sorted(model_files) From fe8971df504ab7d0256059c7d4133e35e57c9a8a Mon Sep 17 00:00:00 2001 From: Matthew McDermott Date: Wed, 12 Jun 2024 09:52:02 -0400 Subject: [PATCH 04/16] Cleaned up and added doctests for get_model_files --- src/MEDS_tabular_automl/file_name.py | 45 ++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/file_name.py b/src/MEDS_tabular_automl/file_name.py index c8f5b0d..4ca144a 100644 --- a/src/MEDS_tabular_automl/file_name.py +++ b/src/MEDS_tabular_automl/file_name.py @@ -1,6 +1,8 @@ """Helper functions for getting file names and paths for MEDS tabular automl tasks.""" from pathlib import Path +from omegaconf import DictConfig + def list_subdir_files(root: Path | str, ext: str) -> list[Path]: """List files in subdirectories of a directory with a given extension. @@ -50,10 +52,49 @@ def list_subdir_files(root: Path | str, ext: str) -> list[Path]: return sorted(list(Path(root).glob(f"**/*.{ext}"))) -def get_model_files(cfg, split: str, shard_num: int): +def get_model_files(cfg: DictConfig, split: str, shard: str) -> list[Path]: + """Get the tabularized npz files for a given split and shard number. + + TODO: Rename function to get_tabularized_input_files or something. + + Args: + cfg: `OmegaConf.DictConfig` object with the configuration. It must have the following keys: + - input_dir: Path to the directory with the tabularized npz files. + - tabularization: Tabularization configuration, as a nested `DictConfig` object with keys: + - window_sizes: List of window sizes. + - aggs: List of aggregation functions. + split: Split name to reference the files stored on disk. + shard: The shard within the split to reference the files stored on disk. + + Returns: + An alphabetically sorted list of Path objects to the tabularized npz files for the given split and + shard. These files will take the form ``{cfg.input_dir}/{split}/{shard}/{window_size}/{agg}.npz``. For + static aggregations, the window size will be "none" as these features are not time-varying. + + Examples: + >>> cfg = DictConfig({ + ... "input_dir": "data", + ... "tabularization": { + ... "window_sizes": ["1d", "7d"], + ... "aggs": ["code/count", "value/sum", "static/present"], + ... } + ... }) + >>> get_model_files(cfg, "train", "0") # doctest: +NORMALIZE_WHITESPACE + [PosixPath('data/train/0/1d/code/count.npz'), + PosixPath('data/train/0/1d/value/sum.npz'), + PosixPath('data/train/0/7d/code/count.npz'), + PosixPath('data/train/0/7d/value/sum.npz'), + PosixPath('data/train/0/none/static/present.npz')] + >>> get_model_files(cfg, "test/IID", "3/0") # doctest: +NORMALIZE_WHITESPACE + [PosixPath('data/test/IID/3/0/1d/code/count.npz'), + PosixPath('data/test/IID/3/0/1d/value/sum.npz'), + PosixPath('data/test/IID/3/0/7d/code/count.npz'), + PosixPath('data/test/IID/3/0/7d/value/sum.npz'), + PosixPath('data/test/IID/3/0/none/static/present.npz')] + """ window_sizes = cfg.tabularization.window_sizes aggs = cfg.tabularization.aggs - shard_dir = Path(cfg.input_dir) / split / f"{shard_num}" + shard_dir = Path(cfg.input_dir) / split / shard # Given a shard number, returns the model files model_files = [] for window_size in window_sizes: From 48e5fb7d9bbb56d22a0f8999486cf2ed08e6e28d Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 10:47:09 -0400 Subject: [PATCH 05/16] docstring for first function of describe_codes --- src/MEDS_tabular_automl/describe_codes.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index b1264f8..b5bbc84 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -6,7 +6,15 @@ from MEDS_tabular_automl.utils import DF_T, get_feature_names -def convert_to_df(freq_dict): +def convert_to_df(freq_dict: dict[str, int]) -> pl.DataFrame: + """Converts a dictionary of code frequencies to a Polars DataFrame. + + Args: + freq_dict: A dictionary with code features and their respective frequencies. + + Returns: + A DataFrame with two columns, "code" and "count". + """ return pl.DataFrame([[col, freq] for col, freq in freq_dict.items()], schema=["code", "count"]) From f2f7a1f6773c775f6ddebc528db7454168ca205e Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 11:16:32 -0400 Subject: [PATCH 06/16] Updated docstring of second funcion --- src/MEDS_tabular_automl/describe_codes.py | 35 +++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index b5bbc84..475173d 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -18,29 +18,28 @@ def convert_to_df(freq_dict: dict[str, int]) -> pl.DataFrame: return pl.DataFrame([[col, freq] for col, freq in freq_dict.items()], schema=["code", "count"]) -def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> list[str]: - """Generates a list of feature column names from the data within each shard based on specified - configurations. +def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame: + """Generates a DataFrame containing the frequencies of codes and numerical values under different + aggregations by computing frequency counts for certain attributes and organizing the results into specific + categories based on the dataset's features. - Parameters: - - cfg (DictConfig): Configuration dictionary specifying how features should be evaluated and aggregated. - - split_to_shard_df (dict): A dictionary of DataFrames, divided by data split (e.g., 'train', 'test'). + Args: + cfg: Configuration dictionary specifying how features should be evaluated and aggregated. + shard_df: A DataFrame containing the data to be analyzed and split (e.g., 'train', 'test'). Returns: - - tuple[list[str], dict]: A tuple containing a list of feature columns and a dictionary of code properties - identified during the evaluation. + A tuple containing a list of feature columns and a dictionary of code properties identified + during the evaluation. - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of feature columns for modeling purposes. Examples: - # >>> import polars as pl - # >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], - # ... 'timestamp': [None, '2021-01-01', None, None, '2021-01-03', '2021-01-04', None], - # ... 'numerical_value': [1, None, 2, 2, None, None, 3]} - # >>> df = pl.DataFrame(data).lazy() - # >>> aggs = ['value/sum', 'code/count'] - # >>> compute_feature_frequencies(aggs, df) - # ['A/code', 'A/value', 'C/code', 'C/value'] + # >>> import polars as pl + # >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], + # ... 'timestamp': [None, '2021-01-01', None, None, '2021-01-03', '2021-01-04', None], + # ... 'numerical_value': [1, None, 2, 2, None, None, 3]} + # >>> df = pl.DataFrame(data).lazy() + # >>> aggs = ['value/sum', 'code/count'] + # >>> compute_feature_frequencies(aggs, df) + # ['A/code', 'A/value', 'C/code', 'C/value'] """ static_df = shard_df.filter( pl.col("patient_id").is_not_null() & pl.col("code").is_not_null() & pl.col("timestamp").is_null() From 4d443050de1c6c696b4e445a2b6ad68013bac8a8 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 11:30:59 -0400 Subject: [PATCH 07/16] Updated docstring of third funcion --- src/MEDS_tabular_automl/describe_codes.py | 28 +++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 475173d..3b81648 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -71,27 +71,27 @@ def compute_feature_frequencies(cfg: DictConfig, shard_df: DF_T) -> pl.DataFrame return convert_to_df(combined_freqs) -def convert_to_freq_dict(df: pl.LazyFrame) -> dict: +def convert_to_freq_dict(df: pl.LazyFrame) -> dict[str, dict[int, int]]: """Converts a DataFrame to a dictionary of frequencies. - This function converts a DataFrame to a dictionary of frequencies, where the keys are the - column names and the values are dictionaries of code frequencies. - Args: - - df (pl.DataFrame): The DataFrame to be converted. + df: The DataFrame to be converted. Returns: - - dict: A dictionary of frequencies, where the keys are the column names and the values are - dictionaries of code frequencies. + A dictionary where keys are column names and values are + dictionaries of code frequencies. + + Raises: + ValueError: If the DataFrame does not have the expected columns "code" and "count". Example: - # >>> import polars as pl - # >>> df = pl.DataFrame({ - # ... "code": [1, 2, 3, 4, 5], - # ... "value": [10, 20, 30, 40, 50] - # ... }) - # >>> convert_to_freq_dict(df) - # {'code': {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, 'value': {10: 1, 20: 1, 30: 1, 40: 1, 50: 1}} + # >>> import polars as pl + # >>> df = pl.DataFrame({ + # ... "code": [1, 2, 3, 4, 5], + # ... "value": [10, 20, 30, 40, 50] + # ... }) + # >>> convert_to_freq_dict(df) + # {'code': {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, 'value': {10: 1, 20: 1, 30: 1, 40: 1, 50: 1}} """ if not df.columns == ["code", "count"]: raise ValueError(f"DataFrame must have columns 'code' and 'count', but has columns {df.columns}!") From 361a066263a3d12c0946bc5e9cd80318ad4bec6b Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 11:42:29 -0400 Subject: [PATCH 08/16] Updated docstring of fourth funcion --- src/MEDS_tabular_automl/describe_codes.py | 35 +++++++++++++++++++---- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 3b81648..58e20b9 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -98,11 +98,27 @@ def convert_to_freq_dict(df: pl.LazyFrame) -> dict[str, dict[int, int]]: return dict(df.collect().iter_rows()) -def get_feature_columns(fp): +def get_feature_columns(fp: Path) -> list[str]: + """Retrieves feature column names from a parquet file. + + Args: + fp: File path to the Parquet data. + + Returns: + Sorted list of column names. + """ return sorted(list(convert_to_freq_dict(pl.scan_parquet(fp)).keys())) -def get_feature_freqs(fp): +def get_feature_freqs(fp: Path) -> dict[str, int]: + """Retrieves feature frequencies from a parquet file. + + Args: + fp: File path to the Parquet data. + + Returns: + Dictionary of feature frequencies. + """ return convert_to_freq_dict(pl.scan_parquet(fp)) @@ -110,9 +126,18 @@ def filter_to_codes( allowed_codes: list[str] | None, min_code_inclusion_frequency: int, code_metadata_fp: Path, -): - """Returns intersection of allowed codes if they are specified, and filters to codes based on inclusion - frequency.""" +) -> list[str]: + """Filters and returns codes based on allowed list and minimum frequency. + + Args: + allowed_codes: List of allowed codes, None means all codes are allowed. + min_code_inclusion_frequency: Minimum frequency a code must have to be included. + code_metadata_fp: Path to the metadata file containing code information. + + Returns: + Sorted list of the intersection of allowed codes (if they are specified) and filters based on + inclusion frequency. + """ if allowed_codes is None: allowed_codes = get_feature_columns(code_metadata_fp) feature_freqs = get_feature_freqs(code_metadata_fp) From 47713cd4cbff7e84a527d8a514893826cfbdbe0d Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 11:50:25 -0400 Subject: [PATCH 09/16] Updated docstring of last funcion --- src/MEDS_tabular_automl/describe_codes.py | 70 +++++++++++++---------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/src/MEDS_tabular_automl/describe_codes.py b/src/MEDS_tabular_automl/describe_codes.py index 58e20b9..40ff0ac 100644 --- a/src/MEDS_tabular_automl/describe_codes.py +++ b/src/MEDS_tabular_automl/describe_codes.py @@ -161,7 +161,15 @@ def filter_to_codes( # OmegaConf.register_new_resolver("filter_to_codes", filter_to_codes) -def clear_code_aggregation_suffix(code): +def clear_code_aggregation_suffix(code: str) -> str: + """Removes aggregation suffixes from code strings. + + Args: + code: Code string to be cleared. + + Returns: + Code string without aggregation suffixes. + """ if code.endswith("/code"): return code[:-5] elif code.endswith("/value"): @@ -172,36 +180,40 @@ def clear_code_aggregation_suffix(code): return code[:-13] -def filter_parquet(fp, allowed_codes: list[str]): - """Loads Parquet with Polars and filters to allowed codes. +def filter_parquet(fp: Path, allowed_codes: list[str]) -> pl.LazyFrame: + """Loads and filters a Parquet file with Polars to include only specified codes and removes rare + codes/values. Args: - fp: Path to the Meds cohort shard - allowed_codes: List of codes to filter to. - - Expect: - >>> from tempfile import NamedTemporaryFile - >>> fp = NamedTemporaryFile() - >>> pl.DataFrame({ - ... "code": ["A", "A", "A", "A", "D", "D", "E", "E"], - ... "timestamp": [None, None, "2021-01-01", "2021-01-01", None, None, "2021-01-03", "2021-01-04"], - ... "numerical_value": [1, None, 2, 2, None, 5, None, 3] - ... }).write_parquet(fp.name) - >>> filter_parquet(fp.name, ["A/code", "D/static/present", "E/code", "E/value"]).collect() - shape: (6, 3) - ┌──────┬────────────┬─────────────────┐ - │ code ┆ timestamp ┆ numerical_value │ - │ --- ┆ --- ┆ --- │ - │ str ┆ str ┆ i64 │ - ╞══════╪════════════╪═════════════════╡ - │ A ┆ 2021-01-01 ┆ null │ - │ A ┆ 2021-01-01 ┆ null │ - │ D ┆ null ┆ null │ - │ D ┆ null ┆ null │ - │ E ┆ 2021-01-03 ┆ null │ - │ E ┆ 2021-01-04 ┆ 3 │ - └──────┴────────────┴─────────────────┘ - >>> fp.close() + fp: Path to the Parquet file of a Meds cohort shard. + allowed_codes: List of codes to filter by. + + Returns: + pl.LazyFrame: A filtered LazyFrame containing only the allowed and not rare codes/values. + + Examples: + >>> from tempfile import NamedTemporaryFile + >>> fp = NamedTemporaryFile() + >>> pl.DataFrame({ + ... "code": ["A", "A", "A", "A", "D", "D", "E", "E"], + ... "timestamp": [None, None, "2021-01-01", "2021-01-01", None, None, "2021-01-03", "2021-01-04"], + ... "numerical_value": [1, None, 2, 2, None, 5, None, 3] + ... }).write_parquet(fp.name) + >>> filter_parquet(fp.name, ["A/code", "D/static/present", "E/code", "E/value"]).collect() + shape: (6, 3) + ┌──────┬────────────┬─────────────────┐ + │ code ┆ timestamp ┆ numerical_value │ + │ --- ┆ --- ┆ --- │ + │ str ┆ str ┆ i64 │ + ╞══════╪════════════╪═════════════════╡ + │ A ┆ 2021-01-01 ┆ null │ + │ A ┆ 2021-01-01 ┆ null │ + │ D ┆ null ┆ null │ + │ D ┆ null ┆ null │ + │ E ┆ 2021-01-03 ┆ null │ + │ E ┆ 2021-01-04 ┆ 3 │ + └──────┴────────────┴─────────────────┘ + >>> fp.close() """ df = pl.scan_parquet(fp) # Drop values that are rare From 97b61604a3fefe9d5b6c2318c5c30996bd9b2609 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 18:17:20 -0400 Subject: [PATCH 10/16] Added docstrings for src/MEDS_tabular_automl/utils.py --- src/MEDS_tabular_automl/utils.py | 45 +++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 061485d..a6ce0c1 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -47,7 +47,15 @@ def hydra_loguru_init() -> None: logger.add(os.path.join(hydra_path, "main.log")) -def load_tqdm(use_tqdm): +def load_tqdm(use_tqdm: bool): + """Conditionally loads and returns tqdm progress bar handler or a no-operation function. + + Args: + use_tqdm: Flag indicating whether to use tqdm progress bar. + + Returns: + A function that either encapsulates tqdm or simply returns the input it is given. + """ if use_tqdm: from tqdm import tqdm @@ -61,13 +69,36 @@ def noop(x, **kwargs): def parse_static_feature_column(c: str) -> tuple[str, str, str, str]: + """Parses a flat feature column format into component parts. + + Args: + c: The column string in 'category/subcategory/feature' format. + + Returns: + A tuple containing separate strings of the feature column format. + + Raises: + ValueError: If the column string format is incorrect. + """ parts = c.split("/") if len(parts) < 3: raise ValueError(f"Column {c} is not a valid flat feature column!") return ("/".join(parts[:-2]), parts[-2], parts[-1]) -def array_to_sparse_matrix(array: np.ndarray, shape: tuple[int, int]): +def array_to_sparse_matrix(array: np.ndarray, shape: tuple[int, int]) -> coo_array: + """Converts a numpy array representation into a sparse matrix. + + Args: + array: The array containing data, rows, and columns. + shape: The shape of the resulting sparse matrix. + + Returns: + The formatted sparse matrix. + + Raises: + AssertionError: If the input array's first dimension is not 3. + """ assert array.shape[0] == 3 data, row, col = array return coo_array((data, (row, col)), shape=shape) @@ -112,7 +143,15 @@ def get_min_dtype(array: np.ndarray) -> np.dtype: return array.dtype -def sparse_matrix_to_array(coo_matrix: coo_array): +def sparse_matrix_to_array(coo_matrix: coo_array) -> tuple[np.ndarray, tuple[int, int]]: + """Converts a sparse matrix to a numpy array format with shape information. + + Args: + coo_matrix: The sparse matrix to convert. + + Returns: + A tuple of a numpy array ([data, row, col]) and the shape of the original matrix. + """ data, row, col = coo_matrix.data, coo_matrix.row, coo_matrix.col # Remove invalid indices valid_indices = (data == 0) | np.isnan(data) From 169efa9826f6f91620a457d9a5c1014781c18929 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 18:45:16 -0400 Subject: [PATCH 11/16] first eight docstrings --- src/MEDS_tabular_automl/utils.py | 40 +++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index a6ce0c1..d5c8605 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -167,12 +167,26 @@ def sparse_matrix_to_array(coo_matrix: coo_array) -> tuple[np.ndarray, tuple[int return np.array([data, row, col]), coo_matrix.shape -def store_matrix(coo_matrix: coo_array, fp_path: Path): +def store_matrix(coo_matrix: coo_array, fp_path: Path) -> None: + """Stores a sparse matrix to disk as a .npz file. + + Args: + coo_matrix: The sparse matrix to store. + fp_path: The file path where the matrix will be stored. + """ array, shape = sparse_matrix_to_array(coo_matrix) np.savez(fp_path, array=array, shape=shape) -def load_matrix(fp_path: Path): +def load_matrix(fp_path: Path) -> coo_array: + """Loads a sparse matrix from a .npz file. + + Args: + fp_path: The path to the .npz file containing the sparse matrix data. + + Returns: + The loaded sparse matrix. + """ npzfile = np.load(fp_path) array, shape = npzfile["array"], npzfile["shape"] return array_to_sparse_matrix(array, shape) @@ -216,19 +230,17 @@ def get_static_col_dtype(col: str) -> pl.DataType: def add_static_missing_cols( flat_df: DF_T, feature_columns: list[str], set_count_0_to_null: bool = False ) -> DF_T: - """Normalizes columns in a DataFrame so all expected columns are present and appropriately typed. + """Normalizes columns in a DataFrame so all expected columns are present and appropriately typed and + potentially modifies zero counts to nulls based on the configuration. - Parameters: - - flat_df (DF_T): The DataFrame to be normalized. - - feature_columns (list[str]): A list of feature column names that should exist in the DataFrame. - - set_count_0_to_null (bool): A flag indicating whether counts of zero should be converted to nulls. + Args: + flat_df: The dataframe to normalize. + feature_columns: A list of expected column names. + set_count_0_to_null: A flag of whether to convert zero counts to nulls. Returns: - - DF_T: The normalized DataFrame with all columns set to the correct type and zero-counts handled - if specified. - - This function ensures that all necessary columns are added and typed correctly within - a DataFrame, potentially modifying zero counts to nulls based on the configuration. + The normalized dataframe with all specified columns present and correctly typed and with + zero-counts handled if specified. """ cols_to_add = set(feature_columns) - set(flat_df.columns) cols_to_retype = set(feature_columns).intersection(set(flat_df.columns)) @@ -430,8 +442,8 @@ def store_config_yaml(config_fp: Path, cfg: DictConfig): information and configuration details, to a specified JSON file. Args: - - config_fp (Path): The file path for the JSON file where config should be stored. - - cfg (DictConfig): A configuration object containing settings like the number of patients + config_fp: The file path for the JSON file where config should be stored. + cfg: A configuration object containing settings like the number of patients per sub-shard, minimum code inclusion frequency, and flags for updating or overwriting existing files. Behavior: From 5854556f8d9501870b1942d383bee7b1d126e53a Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 19:11:05 -0400 Subject: [PATCH 12/16] docstrings for getting feature columns functions --- src/MEDS_tabular_automl/utils.py | 103 ++++++++++++++++++------------- 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index d5c8605..687e1e7 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -1,9 +1,9 @@ """The base class for core dataset processing logic. Attributes: - INPUT_DF_T: This defines the type of the allowable input dataframes -- e.g., databases, filepaths, + INPUT_pl.LazyFrame: This defines the type of the allowable input dataframes -- e.g., databases, filepaths, dataframes, etc. - DF_T: This defines the type of internal dataframes -- e.g. polars DataFrames. + pl.LazyFrame: This defines the type of internal dataframes -- e.g. polars DataFrames. """ import os from collections.abc import Mapping @@ -17,7 +17,6 @@ from omegaconf import DictConfig, OmegaConf from scipy.sparse import coo_array -DF_T = pl.LazyFrame WRITE_USE_PYARROW = True ROW_IDX_NAME = "__row_idx" @@ -192,8 +191,18 @@ def load_matrix(fp_path: Path) -> coo_array: return array_to_sparse_matrix(array, shape) -def write_df(df: coo_array, fp: Path, **kwargs): - """Write shard to disk.""" +def write_df(df: coo_array, fp: Path, **kwargs) -> None: + """Writes a sparse matrix to disk. + + Args: + df: The sparse matrix to write. + fp: The file path where to write the data. + **kwargs: Additional keyword arguments, such as 'do_overwrite' to control file overwriting. + + Raises: + FileExistsError: If the file exists and 'do_overwrite' is not set to True. + TypeError: If the type of 'df' is not supported for writing. + """ do_overwrite = kwargs.get("do_overwrite", False) if not do_overwrite and fp.is_file(): @@ -212,8 +221,18 @@ def write_df(df: coo_array, fp: Path, **kwargs): def get_static_col_dtype(col: str) -> pl.DataType: - """Gets the appropriate minimal dtype for the given flat representation column string.""" + """Determines the appropriate minimal data type for given flat representation column string based on its + aggregation type. + + Args: + col (str): The column name in the format 'category/type/aggregation'. + Returns: + pl.DataType: The appropriate Polars data type for the column. + + Raises: + ValueError: If the column name format or aggregation type is not recognized. + """ code, code_type, agg = parse_static_feature_column(col) match agg: @@ -228,18 +247,18 @@ def get_static_col_dtype(col: str) -> pl.DataType: def add_static_missing_cols( - flat_df: DF_T, feature_columns: list[str], set_count_0_to_null: bool = False -) -> DF_T: - """Normalizes columns in a DataFrame so all expected columns are present and appropriately typed and + flat_df: pl.LazyFrame, feature_columns: list[str], set_count_0_to_null: bool = False +) -> pl.LazyFrame: + """Normalizes columns in a LazyFrame so all expected columns are present and appropriately typed and potentially modifies zero counts to nulls based on the configuration. Args: - flat_df: The dataframe to normalize. + flat_df: The LazyFrame to normalize. feature_columns: A list of expected column names. set_count_0_to_null: A flag of whether to convert zero counts to nulls. Returns: - The normalized dataframe with all specified columns present and correctly typed and with + The normalized LazyFrame with all specified columns present and correctly typed and with zero-counts handled if specified. """ cols_to_add = set(feature_columns) - set(flat_df.columns) @@ -269,30 +288,28 @@ def add_static_missing_cols( return flat_df -def get_static_feature_cols(shard_df) -> list[str]: - """Generates a list of feature column names from the data within each shard based on specified - configurations. +def get_static_feature_cols(shard_df: pl.LazyFrame) -> list[str]: + """Generates a list of static feature column names based on data within a shard. - Parameters: - - cfg (dict): Configuration dictionary specifying how features should be evaluated and aggregated. - - split_to_shard_df (dict): A dictionary of DataFrames, divided by data split (e.g., 'train', 'test'). + This function evaluates the properties of codes within training data and applies configured + aggregations to generate a comprehensive list of the static feature columns for modeling purposes. + + Args: + shard_df: The LazyFrame shard to analyze. Returns: - - tuple[list[str], dict]: A tuple containing a list of feature columns and a dictionary of code properties - identified during the evaluation. + A list of column names representing static features. - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of feature columns for modeling purposes. Examples: - >>> import polars as pl - >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], - ... 'timestamp': [ - ... None, '2021-01-01', '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04', None - ... ], - ... 'numerical_value': [1, None, 2, 2, None, None, 3]} - >>> df = pl.DataFrame(data).lazy() - >>> get_static_feature_cols(df) - ['A/static/first', 'A/static/present', 'C/static/first', 'C/static/present'] + >>> import polars as pl + >>> data = {'code': ['A', 'A', 'B', 'B', 'C', 'C', 'C'], + ... 'timestamp': [ + ... None, '2021-01-01', '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04', None + ... ], + ... 'numerical_value': [1, None, 2, 2, None, None, 3]} + >>> df = pl.DataFrame(data).lazy() + >>> get_static_feature_cols(df) + ['A/static/first', 'A/static/present', 'C/static/first', 'C/static/present'] """ feature_columns = [] static_df = shard_df.filter(pl.col("timestamp").is_null()) @@ -302,20 +319,18 @@ def get_static_feature_cols(shard_df) -> list[str]: return sorted(feature_columns) -def get_ts_feature_cols(shard_df: DF_T) -> list[str]: - """Generates a list of feature column names from the data within each shard based on specified - configurations. +def get_ts_feature_cols(shard_df: pl.LazyFrame) -> list[str]: + """Generates a list of time-series feature column names based on data within a shard. - Parameters: - - cfg (dict): Configuration dictionary specifying how features should be evaluated and aggregated. - - split_to_shard_df (dict): A dictionary of DataFrames, divided by data split (e.g., 'train', 'test'). + This function evaluates the properties of codes within training data and applies configured + aggregations to generate a comprehensive list of the time-series feature columns for modeling + purposes. - Returns: - - tuple[list[str], dict]: A tuple containing a list of feature columns and a dictionary of code properties - identified during the evaluation. + Args: + shard_df: The LazyFrame shard to analyze. - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of feature columns for modeling purposes. + Returns: + A list of column names representing time-series features. """ ts_df = shard_df.filter(pl.col("timestamp").is_not_null()) feature_columns = list(ts_df.select(pl.col("code").unique()).collect().to_series()) @@ -326,7 +341,7 @@ def get_ts_feature_cols(shard_df: DF_T) -> list[str]: def get_prediction_ts_cols( - aggregations: list[str], ts_feature_cols: DF_T, window_sizes: list[str] | None = None + aggregations: list[str], ts_feature_cols: pl.LazyFrame, window_sizes: list[str] | None = None ) -> list[str]: """Generates a list of feature column names that will be used for downstream task.""" agg_feature_columns = [] @@ -338,13 +353,13 @@ def get_prediction_ts_cols( return sorted(ts_aggregations) -def get_flat_rep_feature_cols(cfg: DictConfig, shard_df: DF_T) -> list[str]: +def get_flat_rep_feature_cols(cfg: DictConfig, shard_df: pl.LazyFrame) -> list[str]: """Generates a list of feature column names from the data within each shard based on specified configurations. Parameters: - cfg (dict): Configuration dictionary specifying how features should be evaluated and aggregated. - - shard_df (DF_T): MEDS format dataframe shard. + - shard_df (pl.LazyFrame): MEDS format dataframe shard. Returns: - list[str]: list of all feature columns. From 846a6048d0367d36086125d98ac03a788064a28d Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 19:24:15 -0400 Subject: [PATCH 13/16] docstrings up to load_meds_data --- src/MEDS_tabular_automl/utils.py | 59 ++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 687e1e7..404333d 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -343,7 +343,16 @@ def get_ts_feature_cols(shard_df: pl.LazyFrame) -> list[str]: def get_prediction_ts_cols( aggregations: list[str], ts_feature_cols: pl.LazyFrame, window_sizes: list[str] | None = None ) -> list[str]: - """Generates a list of feature column names that will be used for downstream task.""" + """Generates a list of feature column names for prediction tasks based on aggregations and window sizes. + + Args: + aggregations: The list of aggregation methods to apply. + ts_feature_cols: The list of existing time-series feature columns. + window_sizes: The optional list of window sizes to consider. + + Returns: + A list of feature column names formatted with aggregation and window size. + """ agg_feature_columns = [] for code in ts_feature_cols: ts_aggregations = [f"{code}/{agg}" for agg in aggregations] @@ -354,47 +363,47 @@ def get_prediction_ts_cols( def get_flat_rep_feature_cols(cfg: DictConfig, shard_df: pl.LazyFrame) -> list[str]: - """Generates a list of feature column names from the data within each shard based on specified - configurations. + """Combines static and time-series feature columns from a shard based on specified configurations. - Parameters: - - cfg (dict): Configuration dictionary specifying how features should be evaluated and aggregated. - - shard_df (pl.LazyFrame): MEDS format dataframe shard. + This function evaluates the properties of codes within training data and applies configured + aggregations to generate a comprehensive list of all feature columns for modeling purposes. - Returns: - - list[str]: list of all feature columns. + Args: + cfg: The configuration dictionary specifying aggregation settings. + shard_df: The LazyFrame shard in MEDS format to process. - This function evaluates the properties of codes within training data and applies configured - aggregations to generate a comprehensive list of feature columns for modeling purposes. + Returns: + A combined list of all feature columns from both static and time-series data. """ static_feature_columns = get_static_feature_cols(shard_df) ts_feature_columns = get_ts_feature_cols(cfg.aggs, shard_df) return static_feature_columns + ts_feature_columns -def load_meds_data(MEDS_cohort_dir: str, load_data: bool = True) -> Mapping[str, pl.DataFrame]: - """Loads the MEDS dataset from disk. +def load_meds_data(MEDS_cohort_dir: str, load_data: bool = True) -> Mapping[str, pl.LazyFrame]: + """Loads the MEDS dataset from disk, structured by data splits. Args: MEDS_cohort_dir: The directory containing the MEDS datasets split by subfolders. We expect `train` to be a split so `MEDS_cohort_dir/train` should exist. + load_data: If True, returns LazyFrames for each data split, otherwise returns file paths. Returns: - Mapping[str, pl.DataFrame]: Mapping from split name to a polars DataFrame containing the MEDS dataset. + A dictionary mapping from split name to a LazyFrame, containing the MEDS dataset for each split. Example: - >>> import tempfile - >>> from pathlib import Path - >>> MEDS_cohort_dir = Path(tempfile.mkdtemp()) - >>> for split in ["train", "val", "test"]: - ... split_dir = MEDS_cohort_dir / split - ... split_dir.mkdir() - ... pl.DataFrame({"patient_id": [1, 2, 3]}).write_parquet(split_dir / "data.parquet") - >>> split_to_df = load_meds_data(MEDS_cohort_dir) - >>> assert "train" in split_to_df - >>> assert len(split_to_df) == 3 - >>> assert len(split_to_df["train"]) == 1 - >>> assert isinstance(split_to_df["train"][0], pl.LazyFrame) + >>> import tempfile + >>> from pathlib import Path + >>> MEDS_cohort_dir = Path(tempfile.mkdtemp()) + >>> for split in ["train", "val", "test"]: + ... split_dir = MEDS_cohort_dir / split + ... split_dir.mkdir() + ... pl.DataFrame({"patient_id": [1, 2, 3]}).write_parquet(split_dir / "data.parquet") + >>> split_to_df = load_meds_data(MEDS_cohort_dir) + >>> assert "train" in split_to_df + >>> assert len(split_to_df) == 3 + >>> assert len(split_to_df["train"]) == 1 + >>> assert isinstance(split_to_df["train"][0], pl.LazyFrame) """ MEDS_cohort_dir = Path(MEDS_cohort_dir) meds_fps = list(MEDS_cohort_dir.glob("*/*.parquet")) From d7d72b1714917dc49961a12c34266bfe2f8f56d0 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 19:45:39 -0400 Subject: [PATCH 14/16] finished utils docstrings --- src/MEDS_tabular_automl/utils.py | 73 ++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 404333d..99e3896 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -417,8 +417,16 @@ def load_meds_data(MEDS_cohort_dir: str, load_data: bool = True) -> Mapping[str, return split_to_df -def get_events_df(shard_df: pl.DataFrame, feature_columns) -> pl.DataFrame: - """Extracts Events DataFrame with one row per observation (timestamps can be duplicated)""" +def get_events_df(shard_df: pl.LazyFrame, feature_columns) -> pl.LazyFrame: + """Extracts and filters an Events LazyFrame with one row per observation (timestamps can be duplicated). + + Args: + shard_df: The LazyFrame shard from which to extract events. + feature_columns: The columns that define features used to filter the LazyFrame. + + Returns: + A LazyFrame where each row corresponds to an event, filtered by feature columns. + """ # Filter out feature_columns that were not present in the training set raw_feature_columns = ["/".join(c.split("/")[:-1]) for c in feature_columns] shard_df = shard_df.filter(pl.col("code").is_in(raw_feature_columns)) @@ -427,8 +435,15 @@ def get_events_df(shard_df: pl.DataFrame, feature_columns) -> pl.DataFrame: return ts_shard_df -def get_unique_time_events_df(events_df: pl.DataFrame): - """Updates Events DataFrame to have unique timestamps and sorted by patient_id and timestamp.""" +def get_unique_time_events_df(events_df: pl.LazyFrame) -> pl.LazyFrame: + """Ensures all timestamps in the events LazyFrame are unique and sorted by patient_id and timestamp. + + Args: + events_df: Events LazyFrame to process. + + Returns: + A LazyFrame with unique timestamps, sorted by patient_id and timestamp. + """ assert events_df.select(pl.col("timestamp")).null_count().collect().item() == 0 # Check events_df is sorted - so it aligns with the ts_matrix we generate later in the pipeline events_df = ( @@ -440,8 +455,19 @@ def get_unique_time_events_df(events_df: pl.DataFrame): return events_df -def get_feature_names(agg, feature_columns) -> str: - """Indices of columns in feature_columns list.""" +def get_feature_names(agg: str, feature_columns: list[str]) -> str: + """Extracts feature column names based on aggregation type from a list of column names. + + Args: + agg: The aggregation type to filter by. + feature_columns: The list of feature column names. + + Returns: + The filtered list of feature column names based on the aggregation type. + + Raises: + ValueError: If the aggregation type is unknown or unsupported. + """ if agg in [STATIC_CODE_AGGREGATION, STATIC_VALUE_AGGREGATION]: return [c for c in feature_columns if c.endswith(agg)] elif agg in CODE_AGGREGATIONS: @@ -452,31 +478,32 @@ def get_feature_names(agg, feature_columns) -> str: raise ValueError(f"Unknown aggregation type {agg}") -def get_feature_indices(agg, feature_columns) -> str: - """Indices of columns in feature_columns list.""" +def get_feature_indices(agg: str, feature_columns: list[str]) -> list[int]: + """Generates a list of feature name indices based on the aggregation type. + + Args: + agg: The aggregation type used to filter feature names. + feature_columns: The list of all feature column names. + + Returns: + Indices of the columns that match the aggregation type. + """ feature_to_index = {c: i for i, c in enumerate(feature_columns)} agg_features = get_feature_names(agg, feature_columns) return [feature_to_index[c] for c in agg_features] -def store_config_yaml(config_fp: Path, cfg: DictConfig): - """Stores configuration parameters into a JSON file. +def store_config_yaml(config_fp: Path, cfg: DictConfig) -> None: + """Stores configuration parameters into a YAML file. This function writes a dictionary of parameters, which includes patient partitioning - information and configuration details, to a specified JSON file. + information and configuration details, to a specified YAML file. Args: - config_fp: The file path for the JSON file where config should be stored. + config_fp: The file path for the YAML file where config should be stored. cfg: A configuration object containing settings like the number of patients - per sub-shard, minimum code inclusion frequency, and flags for updating or overwriting existing files. - - Behavior: - - If config_fp exists and cfg.do_overwrite is False (without do_update being True), a - FileExistsError is raised to prevent unintentional data loss. - - Raises: - - ValueError: If there are discrepancies between old and new parameters during an update. - - FileExistsError: If the file exists and overwriting is not allowed. + per sub-shard, minimum code inclusion frequency, and flags for updating + or overwriting existing files. """ OmegaConf.save(cfg, config_fp) @@ -485,8 +512,8 @@ def get_shard_prefix(base_path: Path, fp: Path) -> str: """Extracts the shard prefix from a file path by removing the raw_cohort_dir. Args: - base_path: The base path to remove. - fp: The file path to extract the shard prefix from. + base_path: The base path to remove from the file path. + fp: The full file path from which to extract the shard prefix. Returns: The shard prefix (the file path relative to the base path with the suffix removed). From 4afccdfec10b036fe027d006bde44716b56f35de Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 20:01:39 -0400 Subject: [PATCH 15/16] added DF_T --- src/MEDS_tabular_automl/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 99e3896..0be57b4 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -17,6 +17,7 @@ from omegaconf import DictConfig, OmegaConf from scipy.sparse import coo_array +DF_T = pl.LazyFrame WRITE_USE_PYARROW = True ROW_IDX_NAME = "__row_idx" From 3f4b87c23a1f86ef74c6f4be4a1fc81727982c93 Mon Sep 17 00:00:00 2001 From: Aleksia Kolo Date: Wed, 12 Jun 2024 20:17:40 -0400 Subject: [PATCH 16/16] added previous DF_T top file comments --- src/MEDS_tabular_automl/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/MEDS_tabular_automl/utils.py b/src/MEDS_tabular_automl/utils.py index 0be57b4..6298c1a 100644 --- a/src/MEDS_tabular_automl/utils.py +++ b/src/MEDS_tabular_automl/utils.py @@ -1,9 +1,9 @@ """The base class for core dataset processing logic. Attributes: - INPUT_pl.LazyFrame: This defines the type of the allowable input dataframes -- e.g., databases, filepaths, + INPUT_DF_T: This defines the type of the allowable input dataframes -- e.g., databases, filepaths, dataframes, etc. - pl.LazyFrame: This defines the type of internal dataframes -- e.g. polars DataFrames. + DF_T: This defines the type of internal dataframes -- e.g. polars DataFrames. """ import os from collections.abc import Mapping