Skip to content

Commit

Permalink
Add --only-assets flag to hand-pick specified assets.
Browse files Browse the repository at this point in the history
  • Loading branch information
rousik committed Nov 27, 2023
1 parent be79e33 commit 9a97df3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
24 changes: 22 additions & 2 deletions src/pudl/cli/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import fsspec
from dagster import (
AssetKey,
AssetSelection,
DagsterInstance,
Definitions,
JobDefinition,
Expand Down Expand Up @@ -67,19 +69,27 @@ def parse_command_line(argv):
help="Set the max number of processes dagster can launch. Defaults to use the number of CPUs on the machine.",
default=0,
)
parser.add_argument(
"--only-assets",
action="append",
default=[],
)
arguments = parser.parse_args(argv[1:])
return arguments


def pudl_etl_job_factory(
logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True
logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True,
selected_assets: list[str] = [],
) -> Callable[[], JobDefinition]:
"""Factory for parameterizing a reconstructable pudl_etl job.
Args:
loglevel: The log level for the job's execution.
logfile: Path to a log file for the job's execution.
process_epacems: Include EPA CEMS assets in the job execution.
selected_assets: if not empty, only execute these assets and their upstream
dependencies.
Returns:
The job definition to be executed.
Expand All @@ -89,7 +99,16 @@ def get_pudl_etl_job():
"""Create an pudl_etl_job wrapped by to be wrapped by reconstructable."""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
jobs = [define_asset_job("etl_job")]
if not process_epacems:
if selected_assets:
logger.info(f"Only executing selected assets: {selected_assets}")
targets = AssetSelection.keys(*[AssetKey(asset) for asset in selected_assets])
jobs = [
define_asset_job(
"etl_job",
selection=targets.upstream(),
),
]
elif not process_epacems:
jobs = [
define_asset_job(
"etl_job",
Expand Down Expand Up @@ -135,6 +154,7 @@ def main():
"loglevel": args.loglevel,
"logfile": args.logfile,
"process_epacems": process_epacems,
"selected_assets": args.only_assets,
},
)
run_config = {
Expand Down
1 change: 1 addition & 0 deletions src/pudl/ferc_to_sqlite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def main(): # noqa: C901
},
}
run_config.update(get_dagster_execution_config(args.dagster_workers))
logger.info(f"Run config: {run_config}")

start_time = time.time()
result = execute_job(
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ def get_eia_ferc_acct_map() -> pd.DataFrame:
description and prime mover code to FERC Uniform System of Accounts
(USOA) accouting names. Read more about USOA
`here
<https://www.ferc.gov/enforcement-legal/enforcement/accounting-matters>`__
<https://www.ferc.gov/enforerent-legal/enforcement/accounting-matters>`__
The output table has the following columns: `['technology_description',
'prime_mover_code', 'ferc_acct_name']`
"""
Expand Down

0 comments on commit 9a97df3

Please sign in to comment.