diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index b941e45256..6e188c93d6 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -62,6 +62,7 @@ core_module_groups = { + "_core_eia176": [pudl.transform.eia176], "core_assn": [glue_assets], "core_censusdp1tract": [ pudl.convert.censusdp1tract_to_sqlite, diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 484fedaf83..4ad9e59ef2 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -35,6 +35,15 @@ def process_raw( selection = self._metadata._get_partition_selection(partition) return df.assign(report_year=selection) + def process_renamed( + self, df: pd.DataFrame, page: str, **partition: PartitionSelection + ) -> pd.DataFrame: + """Strip and lowercase raw text fields (except ID).""" + text_fields = ["area", "atype", "company", "item"] + for tf in text_fields: + df[tf] = df[tf].str.strip().str.lower() + return df + raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176") diff --git a/src/pudl/transform/__init__.py b/src/pudl/transform/__init__.py index 6470064d29..7db61f4f67 100644 --- a/src/pudl/transform/__init__.py +++ b/src/pudl/transform/__init__.py @@ -63,6 +63,7 @@ from . import ( classes, eia, + eia176, eia860, eia860m, eia861, diff --git a/src/pudl/transform/eia176.py b/src/pudl/transform/eia176.py new file mode 100644 index 0000000000..7a18dff6e3 --- /dev/null +++ b/src/pudl/transform/eia176.py @@ -0,0 +1,116 @@ +"""Module to perform data cleaning functions on EIA176 data tables.""" + +import pandas as pd +from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset + +from pudl.logging_helpers import get_logger + +logger = get_logger(__name__) + + +@multi_asset( + outs={ + "core_eia176__yearly_company_data": AssetOut(), + "core_eia861__yearly_aggregate_data": AssetOut(), + }, +) +def _core_eia176__data( + raw_eia176__data: pd.DataFrame, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Take raw list and return two wide tables with primary keys and one column per variable. + + One table with data for each year and company, one with state- and US-level aggregates per year. + """ + raw_eia176__data["report_year"].astype(int) + raw_eia176__data["value"].astype(float) + raw_eia176__data["variable_name"] = ( + raw_eia176__data["line"] + "_" + raw_eia176__data["atype"] + ) + + long_company = raw_eia176__data.loc[ + raw_eia176__data.company != "total of all companies" + ] + aggregate_primary_key = ["report_year", "area"] + company_drop_columns = ["itemsort", "item", "atype", "line", "company"] + + wide_company = get_wide_table( + long_table=long_company.drop(columns=company_drop_columns), + primary_key=aggregate_primary_key + ["id"], + ) + + long_aggregate = raw_eia176__data.loc[ + raw_eia176__data.company == "total of all companies" + ] + wide_aggregate = get_wide_table( + long_table=long_aggregate.drop(columns=company_drop_columns + ["id"]), + primary_key=aggregate_primary_key, + ) + + return wide_company, wide_aggregate + + +def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame: + """Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable.""" + unstacked = ( + # we must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records + # 'id' is a reliable ID only in the context of granular company data + long_table.set_index(primary_key + ["variable_name"]).unstack( + level="variable_name" + ) + ) + unstacked.columns = unstacked.columns.droplevel(0).fillna(0) + unstacked.columns.name = None # gets rid of "variable_name" name of columns index + + # TODO instead of "first NA value we see in each column" applied willy-nilly, we could check to see if there are any conflicting non-null values using .count() first. + return unstacked.groupby(level=primary_key).first().reset_index() + + +@asset_check( + asset="core_eia176__yearly_company_data", + additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()}, + blocking=True, +) +def validate_totals( + core_eia176__yearly_company_data: pd.DataFrame, + core_eia861__yearly_aggregate_data: pd.DataFrame, +) -> AssetCheckResult: + """Compare reported and calculated totals for different geographical aggregates, report any differences.""" + # First make it so we can directly compare reported aggregates to groupings of granular data + comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values( + ["report_year", "area"] + ).fillna(0) + + # Group company data into state-level data and compare to reported totals + state_data = ( + core_eia176__yearly_company_data.drop(columns="id") + .groupby(["report_year", "area"]) + .sum() + .reset_index() + ) + aggregate_state = comparable_aggregates[ + comparable_aggregates.area != "u.s. total" + ].reset_index(drop=True) + # Compare using the same columns + state_diff = aggregate_state[state_data.columns].compare(state_data) + + # Group calculated state-level data into US-level data and compare to reported totals + us_data = ( + state_data.drop(columns="area") + .groupby("report_year") + .sum() + .sort_values("report_year") + .reset_index() + ) + aggregate_us = ( + comparable_aggregates[comparable_aggregates.area == "u.s. total"] + .drop(columns="area") + .sort_values("report_year") + .reset_index(drop=True) + ) + # Compare using the same columns + us_diff = aggregate_us[us_data.columns].compare(us_data) + + return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty)) + + +# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed? diff --git a/test/unit/transform/eia176_test.py b/test/unit/transform/eia176_test.py new file mode 100644 index 0000000000..75a05a79a3 --- /dev/null +++ b/test/unit/transform/eia176_test.py @@ -0,0 +1,157 @@ +import pandas as pd + +from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals + +COLUMN_NAMES = [ + "area", + "atype", + "company", + "id", + "line", + "report_year", + "value", + "itemsort", + "item", +] + +ID_1 = "17673850NM" +VOLUME_1 = 30980426.0 +COMPANY_1 = [ + "new mexico", + "vl", + "new mexico gas company", + ID_1, + "1010", + "2022", + VOLUME_1, + "[10.1]", + "residential sales volume", +] + +ID_2 = "17635017NM" +VOLUME_2 = 532842.0 +COMPANY_2 = [ + "new mexico", + "vl", + "west texas gas inc", + ID_2, + "1010", + "2022", + VOLUME_2, + "[10.1]", + "residential sales volume", +] + +NM_VOLUME = VOLUME_1 + VOLUME_2 +NM_AGGREGATE = [ + "new mexico", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_1, + "1010", + "2022", + NM_VOLUME, + "[10.1]", + "residential sales volume", +] + +ID_3 = "17635017TX" +VOLUME_3 = 1.0 +COMPANY_3 = [ + "texas", + "vl", + "west texas gas inc", + ID_3, + "1010", + "2022", + VOLUME_3, + "[10.1]", + "residential sales volume", +] + +TX_VOLUME = VOLUME_3 +TX_AGGREGATE = [ + "texas", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_3, + "1010", + "2022", + VOLUME_3, + "[10.1]", + "residential sales volume", +] + +US_VOLUME = NM_VOLUME + TX_VOLUME +US_AGGREGATE = [ + "u.s. total", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_1, + "1010", + "2022", + US_VOLUME, + "[10.1]", + "residential sales volume", +] + +DROP_COLS = ["itemsort", "item", "atype", "line", "company"] + + +def test_core_eia176__data(): + eav_model = pd.DataFrame(columns=COLUMN_NAMES) + eav_model.loc[0] = COMPANY_1 + eav_model.loc[1] = NM_AGGREGATE + + wide_company, wide_aggregate = _core_eia176__data(eav_model) + assert wide_company.shape == (1, 4) + + company_row = wide_company.loc[0] + assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"] + assert list(company_row.values) == ["2022", "new mexico", ID_1, VOLUME_1] + + assert wide_aggregate.shape == (1, 3) + aggregate_row = wide_aggregate.loc[0] + assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"] + assert list(aggregate_row.values) == ["2022", "new mexico", NM_VOLUME] + + +def test_get_wide_table(): + long_table = pd.DataFrame(columns=COLUMN_NAMES) + long_table.loc[0] = COMPANY_1 + long_table.loc[1] = COMPANY_2 + long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"] + long_table = long_table.drop(columns=DROP_COLS) + + primary_key = ["report_year", "area", "id"] + wide_table = get_wide_table(long_table, primary_key) + wide_table = wide_table.fillna(0) + + assert wide_table.shape == (2, 4) + assert list(wide_table.loc[0].index) == ["report_year", "area", "id", "1010_vl"] + assert list(wide_table.loc[0].values) == ["2022", "new mexico", ID_2, VOLUME_2] + assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_1, VOLUME_1] + + +def test_validate__totals(): + # Our test data will have only measurements for this 1010_vl variable + validation_cols = COLUMN_NAMES + ["1010_vl"] + + company_data = pd.DataFrame(columns=validation_cols) + # Add the value for the 1010_vl variable + company_data.loc[0] = COMPANY_1 + [f"{VOLUME_1}"] + company_data.loc[1] = COMPANY_2 + [f"{VOLUME_2}"] + company_data.loc[2] = COMPANY_3 + [f"{VOLUME_3}"] + company_data = company_data.drop(columns=DROP_COLS) + + aggregate_data = pd.DataFrame(columns=validation_cols) + # Add the value for the 1010_vl variable + aggregate_data.loc[0] = NM_AGGREGATE + [f"{NM_VOLUME}"] + aggregate_data.loc[1] = TX_AGGREGATE + [f"{TX_VOLUME}"] + aggregate_data.loc[2] = US_AGGREGATE + [f"{US_VOLUME}"] + aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"]) + + validate_totals(company_data, aggregate_data)