From 35260b0c4996c7e5271a02fadafd92fffb9cde30 Mon Sep 17 00:00:00 2001 From: Katie Lamb Date: Mon, 8 Jan 2024 12:58:59 -0800 Subject: [PATCH] move vectorizers into op --- .../record_linkage/classify_plants_ferc1.py | 111 +++--- .../eia_ferc1_record_linkage.py | 338 ++++++++---------- test/integration/record_linkage_test.py | 4 +- 3 files changed, 204 insertions(+), 249 deletions(-) diff --git a/src/pudl/analysis/record_linkage/classify_plants_ferc1.py b/src/pudl/analysis/record_linkage/classify_plants_ferc1.py index 25ce0f04d5..1842ba15b5 100644 --- a/src/pudl/analysis/record_linkage/classify_plants_ferc1.py +++ b/src/pudl/analysis/record_linkage/classify_plants_ferc1.py @@ -27,57 +27,60 @@ ] -dataframe_vectorizers = { - "plant_name": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.NameCleaner(), - embed_dataframe.TextVectorizer(), - ], - weight=2.0, - columns=["plant_name_ferc1"], - ), - "plant_type": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), - embed_dataframe.CategoricalVectorizer(), - ], - weight=2.0, - columns=["plant_type"], - ), - "construction_type": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), - embed_dataframe.CategoricalVectorizer(), - ], - columns=["construction_type"], - ), - "capacity_mw": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericalVectorizer(), - ], - columns=["capacity_mw"], - ), - "construction_year": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="fix_int_na"), - embed_dataframe.CategoricalVectorizer(), - ], - columns=["construction_year"], - ), - "utility_id_ferc1": embed_dataframe.ColumnVectorizer( - transform_steps=[embed_dataframe.CategoricalVectorizer()], - columns=["utility_id_ferc1"], - ), - "fuel_fractions": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericalVectorizer(), - embed_dataframe.NumericalNormalizer(), - ], - columns=_FUEL_COLS, - ), -} +@op +def get_vectorizers(): + """Get the dictionary of vectorizer transforms for each column.""" + return { + "plant_name": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.NameCleaner(), + embed_dataframe.TextVectorizer(), + ], + weight=2.0, + columns=["plant_name_ferc1"], + ), + "plant_type": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), + embed_dataframe.CategoricalVectorizer(), + ], + weight=2.0, + columns=["plant_type"], + ), + "construction_type": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), + embed_dataframe.CategoricalVectorizer(), + ], + columns=["construction_type"], + ), + "capacity_mw": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericalVectorizer(), + ], + columns=["capacity_mw"], + ), + "construction_year": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="fix_int_na"), + embed_dataframe.CategoricalVectorizer(), + ], + columns=["construction_year"], + ), + "utility_id_ferc1": embed_dataframe.ColumnVectorizer( + transform_steps=[embed_dataframe.CategoricalVectorizer()], + columns=["utility_id_ferc1"], + ), + "fuel_fractions": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericalVectorizer(), + embed_dataframe.NumericalNormalizer(), + ], + columns=_FUEL_COLS, + ), + } @op @@ -139,12 +142,6 @@ def merge_steam_fuel_dfs( ).astype({"plant_type": str, "construction_type": str}) -@op -def get_vectorizers(): - """Get the dictionary of vectorizer transforms for each column.""" - return dataframe_vectorizers - - @graph_asset def _out_ferc1__yearly_steam_plants_sched402_with_plant_ids( core_ferc1__yearly_steam_plants_sched402: pd.DataFrame, diff --git a/src/pudl/analysis/record_linkage/eia_ferc1_record_linkage.py b/src/pudl/analysis/record_linkage/eia_ferc1_record_linkage.py index 1a9ce0826f..b8d507e0e3 100644 --- a/src/pudl/analysis/record_linkage/eia_ferc1_record_linkage.py +++ b/src/pudl/analysis/record_linkage/eia_ferc1_record_linkage.py @@ -49,150 +49,154 @@ logger = pudl.logging_helpers.get_logger(__name__) # Silence the recordlinkage logger, which is out of control -pair_vectorizers = { - "plant_name": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.NameCleaner(), - embed_dataframe.StringSimilarityScorer( - metric="jaro_winkler", - col1="plant_name_ferc1", - col2="plant_name_eia", - output_name="plant_name", - ), - ], - columns=["plant_name_ferc1", "plant_name_eia"], - ), - "utility_name": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.NameCleaner(), - embed_dataframe.StringSimilarityScorer( - metric="jaro_winkler", - col1="utility_name_ferc1", - col2="utility_name_eia", - output_name="utility_name", - ), - ], - columns=["utility_name_ferc1", "utility_name_eia"], - ), - "net_generation_mwh": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="exponential", - col1="net_generation_mwh_ferc1", - col2="net_generation_mwh_eia", - output_name="net_generation_mwh", - scale=1000, - ), - ], - columns=["net_generation_mwh_ferc1", "net_generation_mwh_eia"], - ), - "capacity_mw": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="exponential", - col1="capacity_mw_ferc1", - col2="capacity_mw_eia", - output_name="capacity_mw", - scale=10, - ), - ], - columns=["capacity_mw_ferc1", "capacity_mw_eia"], - ), - "total_fuel_cost": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="exponential", - col1="total_fuel_cost_ferc1", - col2="total_fuel_cost_eia", - output_name="total_fuel_cost", - scale=10000, - offset=2500, - missing_value=0.5, - ), - ], - columns=["total_fuel_cost_ferc1", "total_fuel_cost_eia"], - ), - "total_mmbtu": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="exponential", - col1="total_mmbtu_ferc1", - col2="total_mmbtu_eia", - output_name="total_mmbtu", - scale=100, - offset=1, - missing_value=0.5, - ), - ], - columns=["total_mmbtu_ferc1", "total_mmbtu_eia"], - ), - "capacity_factor": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="linear", - col1="capacity_factor_ferc1", - col2="capacity_factor_eia", - output_name="capacity_factor", - ), - ], - columns=["capacity_factor_ferc1", "capacity_factor_eia"], - ), - "fuel_cost_per_mmbtu": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="linear", - col1="fuel_cost_per_mmbtu_ferc1", - col2="fuel_cost_per_mmbtu_eia", - output_name="fuel_cost_per_mmbtu", - ), - ], - columns=["fuel_cost_per_mmbtu_ferc1", "fuel_cost_per_mmbtu_eia"], - ), - "heat_rate_mmbtu_mwh": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), - embed_dataframe.NumericSimilarityScorer( - method="linear", - col1="unit_heat_rate_mmbtu_per_mwh_ferc1", - col2="unit_heat_rate_mmbtu_per_mwh_eia", - output_name="heat_rate_mmbtu_mwh", - ), - ], - columns=[ - "unit_heat_rate_mmbtu_per_mwh_ferc1", - "unit_heat_rate_mmbtu_per_mwh_eia", - ], - ), - "fuel_type_code_pudl": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), - embed_dataframe.NumericSimilarityScorer( - method="exact", - col1="fuel_type_code_pudl_ferc1", - col2="fuel_type_code_pudl_eia", - output_name="fuel_type_code_pudl", - ), - ], - columns=["fuel_type_code_pudl_ferc1", "fuel_type_code_pudl_eia"], - ), - "installation_year": embed_dataframe.ColumnVectorizer( - transform_steps=[ - embed_dataframe.NumericSimilarityScorer( - method="linear", - col1="installation_year_ferc1", - col2="installation_year_eia", - output_name="installation_year", - ) - ], - columns=["installation_year_ferc1", "installation_year_eia"], - ), -} + +@op +def get_pair_vectorizers(): + """Get dictionary of vectorizers for each column in input dataframe.""" + return { + "plant_name": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.NameCleaner(), + embed_dataframe.StringSimilarityScorer( + metric="jaro_winkler", + col1="plant_name_ferc1", + col2="plant_name_eia", + output_name="plant_name", + ), + ], + columns=["plant_name_ferc1", "plant_name_eia"], + ), + "utility_name": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.NameCleaner(), + embed_dataframe.StringSimilarityScorer( + metric="jaro_winkler", + col1="utility_name_ferc1", + col2="utility_name_eia", + output_name="utility_name", + ), + ], + columns=["utility_name_ferc1", "utility_name_eia"], + ), + "net_generation_mwh": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="exponential", + col1="net_generation_mwh_ferc1", + col2="net_generation_mwh_eia", + output_name="net_generation_mwh", + scale=1000, + ), + ], + columns=["net_generation_mwh_ferc1", "net_generation_mwh_eia"], + ), + "capacity_mw": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="exponential", + col1="capacity_mw_ferc1", + col2="capacity_mw_eia", + output_name="capacity_mw", + scale=10, + ), + ], + columns=["capacity_mw_ferc1", "capacity_mw_eia"], + ), + "total_fuel_cost": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="exponential", + col1="total_fuel_cost_ferc1", + col2="total_fuel_cost_eia", + output_name="total_fuel_cost", + scale=10000, + offset=2500, + missing_value=0.5, + ), + ], + columns=["total_fuel_cost_ferc1", "total_fuel_cost_eia"], + ), + "total_mmbtu": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="exponential", + col1="total_mmbtu_ferc1", + col2="total_mmbtu_eia", + output_name="total_mmbtu", + scale=100, + offset=1, + missing_value=0.5, + ), + ], + columns=["total_mmbtu_ferc1", "total_mmbtu_eia"], + ), + "capacity_factor": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="linear", + col1="capacity_factor_ferc1", + col2="capacity_factor_eia", + output_name="capacity_factor", + ), + ], + columns=["capacity_factor_ferc1", "capacity_factor_eia"], + ), + "fuel_cost_per_mmbtu": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="linear", + col1="fuel_cost_per_mmbtu_ferc1", + col2="fuel_cost_per_mmbtu_eia", + output_name="fuel_cost_per_mmbtu", + ), + ], + columns=["fuel_cost_per_mmbtu_ferc1", "fuel_cost_per_mmbtu_eia"], + ), + "heat_rate_mmbtu_mwh": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericSimilarityScorer( + method="linear", + col1="unit_heat_rate_mmbtu_per_mwh_ferc1", + col2="unit_heat_rate_mmbtu_per_mwh_eia", + output_name="heat_rate_mmbtu_mwh", + ), + ], + columns=[ + "unit_heat_rate_mmbtu_per_mwh_ferc1", + "unit_heat_rate_mmbtu_per_mwh_eia", + ], + ), + "fuel_type_code_pudl": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), + embed_dataframe.NumericSimilarityScorer( + method="exact", + col1="fuel_type_code_pudl_ferc1", + col2="fuel_type_code_pudl_eia", + output_name="fuel_type_code_pudl", + ), + ], + columns=["fuel_type_code_pudl_ferc1", "fuel_type_code_pudl_eia"], + ), + "installation_year": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.NumericSimilarityScorer( + method="linear", + col1="installation_year_ferc1", + col2="installation_year_eia", + output_name="installation_year", + ) + ], + columns=["installation_year_ferc1", "installation_year_eia"], + ), + } @op @@ -235,44 +239,6 @@ def get_pairs_dfs(inputs): return (all_pairs_df, train_pairs_df) -@op -def get_all_pairs_df(inputs): - """Get a dataframe with all possible FERC to EIA record pairs. - - Merge the FERC and EIA records on ``block_col`` to generate possible - record pairs for the matching model. - - Arguments: - inputs: :class:`InputManager` object. - """ - ferc1_df = inputs.get_plants_ferc1().reset_index() - eia_df = inputs.get_plant_parts_eia_true().reset_index() - block_col = "plant_id_report_year_util_id" - out = ferc1_df.merge( - eia_df, how="inner", on=block_col, suffixes=("_ferc1", "_eia") - ).set_index(["record_id_ferc1", "record_id_eia"]) - return out - - -@op -def get_train_pairs_df(inputs): - """Get a dataframe with possible FERC to EIA record pairs from training data. - - Merge the FERC and EIA records on ``block_col`` to generate possible - record pairs for the matching model. - - Arguments: - inputs: :class:`InputManager` object. - """ - ferc1_df = inputs.get_train_ferc1().reset_index() - eia_df = inputs.get_train_eia().reset_index() - block_col = "plant_id_report_year_util_id" - out = ferc1_df.merge( - eia_df, how="inner", on=block_col, suffixes=("_ferc1", "_eia") - ).set_index(["record_id_ferc1", "record_id_eia"]) - return out - - @op def get_y_label_df(train_pairs_df, inputs): """Get the dataframe of y labels. @@ -325,12 +291,6 @@ def get_match_full_records(best_match_df, inputs): ).enforce_schema(connected_df) -@op -def get_pair_vectorizers(): - """Get dictionary of vectorizers for each column in input dataframe.""" - return pair_vectorizers - - @graph_asset def _out_pudl__yearly_assn_eia_ferc1_plant_parts( out_ferc1__yearly_all_plants: pd.DataFrame, @@ -350,8 +310,6 @@ def _out_pudl__yearly_assn_eia_ferc1_plant_parts( out_ferc1__yearly_steam_plants_fuel_by_plant_sched402, out_eia__yearly_plant_parts, ) - # all_pairs_df = get_all_pairs_df(inputs) - # train_pairs_df = get_train_pairs_df(inputs) all_pairs_df, train_pairs_df = get_pairs_dfs(inputs) vectorizer = get_pair_vectorizers() features_all = embed_dataframe.embed_dataframe_graph(all_pairs_df, vectorizer) diff --git a/test/integration/record_linkage_test.py b/test/integration/record_linkage_test.py index 9a190177be..e4b6e7fc38 100644 --- a/test/integration/record_linkage_test.py +++ b/test/integration/record_linkage_test.py @@ -13,7 +13,7 @@ from pudl.analysis.record_linkage import embed_dataframe from pudl.analysis.record_linkage.classify_plants_ferc1 import ( _FUEL_COLS, - dataframe_vectorizers, + get_vectorizers, ) from pudl.analysis.record_linkage.link_cross_year import link_ids_cross_year from pudl.transform.params.ferc1 import ( @@ -229,7 +229,7 @@ def _link_ids(df: pd.DataFrame, vectorizers: dict): .execute_in_process( input_values={ "df": mock_ferc1_plants_df, - "vectorizers": dataframe_vectorizers, + "vectorizers": get_vectorizers(), } ) .output_value()["record_label"]