Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eia176 wide table #3590

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@


core_module_groups = {
"_core_eia176": [pudl.transform.eia176],
"core_assn": [glue_assets],
"core_censusdp1tract": [
pudl.convert.censusdp1tract_to_sqlite,
Expand Down
9 changes: 9 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def process_raw(
selection = self._metadata._get_partition_selection(partition)
return df.assign(report_year=selection)

def process_renamed(
self, df: pd.DataFrame, page: str, **partition: PartitionSelection
) -> pd.DataFrame:
"""Strip and lowercase raw text fields (except ID)."""
text_fields = ["area", "atype", "company", "item"]
for tf in text_fields:
df[tf] = df[tf].str.strip().str.lower()
return df


raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176")

Expand Down
1 change: 1 addition & 0 deletions src/pudl/transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from . import (
classes,
eia,
eia176,
eia860,
eia860m,
eia861,
Expand Down
116 changes: 116 additions & 0 deletions src/pudl/transform/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Module to perform data cleaning functions on EIA176 data tables."""

import pandas as pd
from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset

from pudl.logging_helpers import get_logger

logger = get_logger(__name__)


@multi_asset(
outs={
"core_eia176__yearly_company_data": AssetOut(),
"core_eia861__yearly_aggregate_data": AssetOut(),
},
)
def _core_eia176__data(
raw_eia176__data: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Take raw list and return two wide tables with primary keys and one column per variable.

One table with data for each year and company, one with state- and US-level aggregates per year.
"""
raw_eia176__data["report_year"].astype(int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These astype calls don't mutate the original object:

>>> import pandas as pd
>>> df = pd.DataFrame({"a": [1]})
>>> df.dtypes
a    int64
dtype: object
>>> df["a"].astype("float")
0    1.0
Name: a, dtype: float64
>>> df.dtypes
a    int64
dtype: object

So I think you'll have to do some reassignment.

I also think it would be nice to do all that reassignment in one call - which you can do by calling astype on the DataFrame instead of its constituent Series:

>>> df = pd.DataFrame({"a": [1], "b": ["1"], "c": 1.0})
>>> df
   a  b    c
0  1  1  1.0
>>> df.dtypes
a      int64
b     object
c    float64
dtype: object
>>> df = df.astype({"a": "float", "b": "float"})
>>> df
     a    b    c
0  1.0  1.0  1.0
>>> df.dtypes
a    float64
b    float64
c    float64
dtype: object

raw_eia176__data["value"].astype(float)
raw_eia176__data["variable_name"] = (
raw_eia176__data["line"] + "_" + raw_eia176__data["atype"]
)

long_company = raw_eia176__data.loc[
raw_eia176__data.company != "total of all companies"
]
aggregate_primary_key = ["report_year", "area"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: combined with the comment below about why the primary keys should differ, I think it would be slightly more readable if you assigned the two different primary keys names - instead of only giving the aggregate one a name and the company one just randomly has ["id"] tacked on.

# your comment about how the primary keys should differ
aggregate_primary_key = [...]
company_primary_key = aggregate_primary_key + ["id"]

company_drop_columns = ["itemsort", "item", "atype", "line", "company"]

wide_company = get_wide_table(
long_table=long_company.drop(columns=company_drop_columns),
primary_key=aggregate_primary_key + ["id"],
)

long_aggregate = raw_eia176__data.loc[
raw_eia176__data.company == "total of all companies"
]
wide_aggregate = get_wide_table(
long_table=long_aggregate.drop(columns=company_drop_columns + ["id"]),
primary_key=aggregate_primary_key,
)

return wide_company, wide_aggregate


def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame:
"""Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable."""
unstacked = (
# we must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a fun quirk you've found! Hooray!

These comments seem like they belong before line 44, where you're actually doing the ID dropping.

# 'id' is a reliable ID only in the context of granular company data
long_table.set_index(primary_key + ["variable_name"]).unstack(
level="variable_name"
)
)
unstacked.columns = unstacked.columns.droplevel(0).fillna(0)
unstacked.columns.name = None # gets rid of "variable_name" name of columns index

# TODO instead of "first NA value we see in each column" applied willy-nilly, we could check to see if there are any conflicting non-null values using .count() first.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't exactly discern what you meant here @jdangerx, if you want to expand. I can probably figure out the intent with a closer look.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - yes, I meant first non-NA value here, which must have been confusing. Check out this example:

>>> df
   a     b
0  1  10.0
1  1   NaN
2  2  20.0
3  2  21.0
>>> df.groupby("a").first()
      b
a      
1  10.0
2  20.0

.first() grabs the first non-null value regardless of if there are other non-null values. So for group a=1 we get the unambiguously right outcome of b=10, but for group a=2... is b=20 or b=21 the right outcome?

If we can tell what the right outcome should be, maybe by sorting by some field, then we should sort by that field before calling .first():

>>> df.sort_values("b", ascending=False)
   a     b
3  2  21.0
2  2  20.0
0  1  10.0
1  1   NaN
>>> df.sort_values("b", ascending=False).groupby("a").first()
      b
a      
1  10.0
2  21.0

If there's no simple rule, we should check to see that we're not running into this ambiguous behavior by checking that there are no groups with multiple non-NA values before going forward with the .first() call:

>>> df.groupby("a").count()
   b
a   
1  1
2  2
>>> (df.groupby("a").count() == 1).all()
b    False
dtype: bool

Hopefully that clears things up!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're guaranteed to have one value per group by the time we call first, because we're using unstack, which doesn't tolerate competing values:

>>> df
   company_id variable  value
0           1     var1     11
1           1     var2     12
2           2     var2     22
3           2     var2      0
>>> df.set_index(["company_id", "variable"]).unstack(level="variable")
...
ValueError: Index contains duplicate entries, cannot reshape

In fact I don't think we need first at all. Seems like a holdover from when we were working with the idea of a sparse matrix.

>>> df
   company_id variable  value
0           1     var1     11
1           1     var2     12
2           2     var2     22
>>> df.set_index(["company_id", "variable"]).unstack(level="variable")
           value
variable    var1  var2
company_id
1           11.0  12.0
2            NaN  22.0

I'll get the adjustment in my next commit.

return unstacked.groupby(level=primary_key).first().reset_index()


@asset_check(
asset="core_eia176__yearly_company_data",
additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()},
blocking=True,
)
def validate_totals(
core_eia176__yearly_company_data: pd.DataFrame,
core_eia861__yearly_aggregate_data: pd.DataFrame,
) -> AssetCheckResult:
"""Compare reported and calculated totals for different geographical aggregates, report any differences."""
# First make it so we can directly compare reported aggregates to groupings of granular data
comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values(
["report_year", "area"]
).fillna(0)

# Group company data into state-level data and compare to reported totals
state_data = (
core_eia176__yearly_company_data.drop(columns="id")
.groupby(["report_year", "area"])
.sum()
.reset_index()
)
aggregate_state = comparable_aggregates[
comparable_aggregates.area != "u.s. total"
].reset_index(drop=True)
# Compare using the same columns
state_diff = aggregate_state[state_data.columns].compare(state_data)

# Group calculated state-level data into US-level data and compare to reported totals
us_data = (
state_data.drop(columns="area")
.groupby("report_year")
.sum()
.sort_values("report_year")
.reset_index()
)
aggregate_us = (
comparable_aggregates[comparable_aggregates.area == "u.s. total"]
.drop(columns="area")
.sort_values("report_year")
.reset_index(drop=True)
)
# Compare using the same columns
us_diff = aggregate_us[us_data.columns].compare(us_data)

return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty))


# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other ideas on validations to cover here as an asset check? I also see reasonable boundaries invalidate/gens_eia860_test.py and could pursue something similar for eia176.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know much about the actual semantics of the gas data - I think a reasonable thing to do is graph some of the different variables over time and see if anything jumps out as "suspicious", then bring that up and we can try to research that together.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to just get min, max, and enumerated values based on data so far to validate against, maybe in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a perfect follow-up PR!

157 changes: 157 additions & 0 deletions test/unit/transform/eia176_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import pandas as pd

from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals

COLUMN_NAMES = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought: it sort of seems like you'd have a nicer time making a single dataframe that looks like the raw EIA 176 data, registering it as a @pytest.fixture, and then selecting the subsets of it you want to test with in the individual tests. What made you go this direction instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a shot. I think a fixture could save a number of lines and data-building, particularly in checking internal consistency of aggregations. I think I'll still declare a good number of constants to use more semantically across the test file, instead of expecting maintainers to glean the nature of an arbitrary number (e.g., 532842.0) in limited contexts. I'm also inclined to keep the fixture minimal and somewhat contrived, if informed by actual raw data, rather than a fuller sample of raw data; I don't want more data points than needed to illustrate the tested behaviors, since that could make tests and code harder to comprehend.

"area",
"atype",
"company",
"id",
"line",
"report_year",
"value",
"itemsort",
"item",
]

ID_1 = "17673850NM"
VOLUME_1 = 30980426.0
COMPANY_1 = [
"new mexico",
"vl",
"new mexico gas company",
ID_1,
"1010",
"2022",
VOLUME_1,
"[10.1]",
"residential sales volume",
]

ID_2 = "17635017NM"
VOLUME_2 = 532842.0
COMPANY_2 = [
"new mexico",
"vl",
"west texas gas inc",
ID_2,
"1010",
"2022",
VOLUME_2,
"[10.1]",
"residential sales volume",
]

NM_VOLUME = VOLUME_1 + VOLUME_2
NM_AGGREGATE = [
"new mexico",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
NM_VOLUME,
"[10.1]",
"residential sales volume",
]

ID_3 = "17635017TX"
VOLUME_3 = 1.0
COMPANY_3 = [
"texas",
"vl",
"west texas gas inc",
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

TX_VOLUME = VOLUME_3
TX_AGGREGATE = [
"texas",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

US_VOLUME = NM_VOLUME + TX_VOLUME
US_AGGREGATE = [
"u.s. total",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
US_VOLUME,
"[10.1]",
"residential sales volume",
]

DROP_COLS = ["itemsort", "item", "atype", "line", "company"]


def test_core_eia176__data():
eav_model = pd.DataFrame(columns=COLUMN_NAMES)
eav_model.loc[0] = COMPANY_1
eav_model.loc[1] = NM_AGGREGATE

wide_company, wide_aggregate = _core_eia176__data(eav_model)
assert wide_company.shape == (1, 4)

company_row = wide_company.loc[0]
assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"]
assert list(company_row.values) == ["2022", "new mexico", ID_1, VOLUME_1]

assert wide_aggregate.shape == (1, 3)
aggregate_row = wide_aggregate.loc[0]
assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"]
assert list(aggregate_row.values) == ["2022", "new mexico", NM_VOLUME]


def test_get_wide_table():
long_table = pd.DataFrame(columns=COLUMN_NAMES)
long_table.loc[0] = COMPANY_1
long_table.loc[1] = COMPANY_2
long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"]
long_table = long_table.drop(columns=DROP_COLS)

primary_key = ["report_year", "area", "id"]
wide_table = get_wide_table(long_table, primary_key)
wide_table = wide_table.fillna(0)

assert wide_table.shape == (2, 4)
assert list(wide_table.loc[0].index) == ["report_year", "area", "id", "1010_vl"]
assert list(wide_table.loc[0].values) == ["2022", "new mexico", ID_2, VOLUME_2]
assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_1, VOLUME_1]


def test_validate__totals():
# Our test data will have only measurements for this 1010_vl variable
validation_cols = COLUMN_NAMES + ["1010_vl"]

company_data = pd.DataFrame(columns=validation_cols)
# Add the value for the 1010_vl variable
company_data.loc[0] = COMPANY_1 + [f"{VOLUME_1}"]
company_data.loc[1] = COMPANY_2 + [f"{VOLUME_2}"]
company_data.loc[2] = COMPANY_3 + [f"{VOLUME_3}"]
company_data = company_data.drop(columns=DROP_COLS)

aggregate_data = pd.DataFrame(columns=validation_cols)
# Add the value for the 1010_vl variable
aggregate_data.loc[0] = NM_AGGREGATE + [f"{NM_VOLUME}"]
aggregate_data.loc[1] = TX_AGGREGATE + [f"{TX_VOLUME}"]
aggregate_data.loc[2] = US_AGGREGATE + [f"{US_VOLUME}"]
aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"])

validate_totals(company_data, aggregate_data)