-
-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vce memory fix #3959
base: main
Are you sure you want to change the base?
Vce memory fix #3959
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One major issue -- this isn't producing a concatenated parquet file that corresponds to the output table we want to distribute, and it isn't using the PyArrow schema that is defined by the resource metadata for this output table.
Non blocking but if there's an easy way to not fast-fail the asset checks and provide more comprehensive feedback to users that would be nicer for debugging. See specific notes.
"limit": 4, | ||
"limit": 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we really limited to a concurrency of 2 now? Given the number of high memory assets we have, I would expect this to pretty significantly extend the overall runtime.
@@ -4,12 +4,18 @@ | |||
in this module, as they have exactly the same structure. | |||
""" | |||
|
|||
import duckdb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the first direct import of duckdb
anywhere in PUDL, so we should add it as a direct rather than transitive dependency in pyproject.toml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I went ahead and made this change and merged in the changes from main
)
if (error := _check_rows(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_nulls(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_pv_capacity_factor_upper_bound(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_wind_capacity_factor_upper_bound(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_capacity_factor_lower_bound(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_max_hour_of_year(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_unexpected_dates(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_hour_from_date(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_unexpected_counties(vce)) is not None: | ||
return error | ||
|
||
if (error := _check_duplicate_county_id_fips(vce)) is not None: | ||
return error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any easy way to refactor the logic here so that rather than bailing as soon as any error is encountered and returning just that error, some kind of composite report is accumulated that includes the outputs of each of these checks, and the whole thing is returned at the end? Is there a built in way to return multiple AssetCheckResult
objects to Dagster?
Or maybe now that we're using DuckDB and the Parquet data is being queried independently for each of these checks rather than read into memory once as a dataframe that's re-used for each check, we can just split each of these out their own asset checks that run separately, which is more normal? Are all of them actually high memory, or could some of them be run as non-high-memory checks in parallel?
df.to_parquet( | ||
parquet_path / f"vcerare_hourly_available_capacity_factor-{year}", | ||
index=False, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay that we're not providing any schema information when writing out these parquet files? In the end when it's written out to the final unified parquet file the specified schema will be enforced, but could see this round trip having impacts. We might be able to use dtype_backend="pyarrow"
to avoid data type changes resulting from the write and read.
Right now I am getting a Pandera schema validation failure on the new asset. I think because it's referring to a directory rather than a parquet file, but would imagine that without enforcing the defined schema on the outputs, that would also cause a failure.
I think the individual parquet files inside the directory should use the .parquet
suffix, and the directory should just be the table name, like we did with EPA CEMS.
) | ||
def out_vcerare__hourly_available_capacity_factor( | ||
def _get_parquet_path(): | ||
return PudlPaths().parquet_path("out_vcerare__hourly_available_capacity_factor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquet_path()
is meant to create a file path, not a directory path. So here it's creating out_vcerare__hourly_available_capacity_factor.parquet
-- but this is later created as a directory.
For the EPA CEMS we used a directory name that was just the table name, and then had a number of individual Parquet files inside that directory, and it seems like we should follow the same naming convention here too.
@asset | ||
def out_vcerare__hourly_available_capacity_factor( | ||
raw_vcerare__lat_lon_fips: pd.DataFrame, | ||
raw_vcerare__fixed_solar_pv_lat_upv: pd.DataFrame, | ||
raw_vcerare__offshore_wind_power_140m: pd.DataFrame, | ||
raw_vcerare__onshore_wind_power_100m: pd.DataFrame, | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only creates a set of single-year parquet outputs, rather than creating a single concatenated parquet output that conforms to the PyArrow schema defined for the table, so it seems like there's a missing concatenation step that's required to get the outputs into the same shape as all of our other outputs -- a single table_name.parquet
file in the PUDL_OUTPUT/parquet
directory, similar to the use of pq.ParquetWriter
in pudl.etl.epacems_assets
.
But instead of having the transform embedded within that function as an op graph, it seems like we can just iterate over the annual files and create pyarrow Tables out of them, and write them serially to the single output file with pq.ParquetWriter
using the PyArrow schema defined for the table in the PUDL resource metadata. Using the resource metadata defined PyArrow schema also stamps the table and column descriptions into the Parquet metadata directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the lower memory footprint of running the asset checks with DuckDB depend on the inputs being split into separate parquet files? Or will it still be memory efficient if it operates on a single concatenated parquet file that has annual row-groups? Ideally we would want to validate the thing that we're actually going to distribute.
for c in columns: | ||
nulls = duckdb.query(f"SELECT {c} FROM vce WHERE {c} IS NULL").fetchall() # noqa: S608 | ||
if len(nulls) > 0: | ||
return AssetCheckResult( | ||
passed=False, | ||
description=f"Found NA values in column {c}", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than bailing as soon as we get any unexpected nulls, can we check all the columns, and tell the user what columns had unexpected nulls? Will make debugging less frustrating.
columns = [c for c in vce.columns if c.endswith("wind")] | ||
for c in columns: | ||
cap_oob = duckdb.query(f"SELECT {c} FROM vce WHERE {c} > 1.0").fetchall() # noqa: S608 | ||
if len(cap_oob) > 0: # noqa: S608 | ||
return AssetCheckResult( | ||
passed=False, | ||
description=f"Found wind capacity factor values greater than 1.0 in column {c}", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, rather than fast failing, it would be nice to know all the columns that have problems after a single run.
columns = [c for c in vce.columns if c.startswith("capacity_factor")] | ||
for c in columns: | ||
cap_oob = duckdb.query(f"SELECT {c} FROM vce WHERE {c} < 0.0").fetchall() # noqa: S608 | ||
if len(cap_oob) > 0: | ||
return AssetCheckResult( | ||
passed=False, | ||
description=f"Found capacity factor values less than 0 from column {c}", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, rather than fast failing, it would be nice to know all the columns that have problems after a single run.
max_hours = duckdb.query( | ||
"SELECT hour_of_year FROM vce WHERE hour_of_year > 8760" | ||
).fetchall() | ||
if len(max_hours) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a slightly less specific test than the one it replaces. The max hour should always be exactly 8760 -- neither more nor less.
Overview
This PR refactors VCE RARE transforms and the
asset_check
to reduce peak memory usage.Closes #3925 #3926.
Approach
To reduce memory usage, I refactored the transforms to work on a single year of data at a time and write outputs to parquet files. This approach writes directly to parquet without an IO manager, which doesn't feel ideal, but it does work well to reduce memory usage.
To deal with asset checks, I refactored from loading the entire table into pandas to using duckdb to query the parquet outputs. This approach is also a bit messy, but is fast/efficient and we could probably build tooling around this approach and standardize it as a part of the validation framework design.
Alternative approaches
Partitioned assets
I think using partitioned assets would be a good approach for this asset, as well as other resource intensive assets, but I believe that there are a couple reasons why that's not easy/feasible at this point. Mainly, it seems like it's best practice to maintain 1 consistent partitioning scheme per job. I think splitting the ETL into multiple jobs with partitioned assets could be a good pattern to adopt, but this felt like too big of a can of worms to tackle right now.
Dynamic op graph
The other approach I investigated was using dynamic op graphs to process years in parallel, but I found this could still lead to significant memory spikes depending on what all ends up running together and the unparallelized version doesn't take too long to run.
Tasks