Skip to content

Commit

Permalink
Break ferc_to_sqlite op monoliths.
Browse files Browse the repository at this point in the history
Refactor monolithic dbf2sqlite and xbrl2sqlite methods into per-dataset
smaller ops that are invoked within the graphs. This should allow us to
better make use of dagster parallelism and speed up ferc_to_sqlite
processing.

It seems that current unit/integration tests only use FERC1 raw data,
so I've modified the fixtures to only run the relevant pieces of processing.
  • Loading branch information
rousik committed Nov 30, 2023
1 parent c4f3d98 commit 428d04f
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 174 deletions.
28 changes: 27 additions & 1 deletion src/pudl/extract/dbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@
import importlib.resources
import zipfile
from collections import defaultdict
from collections.abc import Iterator
from collections.abc import Callable, Iterator
from functools import lru_cache
from pathlib import Path
from typing import IO, Any, Protocol, Self

import pandas as pd
import sqlalchemy as sa
from dagster import op
from dbfread import DBF, FieldParser

import pudl
import pudl.logging_helpers
from pudl.metadata.classes import DataSource
from pudl.settings import FercToSqliteSettings, GenericDatasetSettings
from pudl.workspace.datastore import Datastore
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)

Expand Down Expand Up @@ -464,6 +466,30 @@ def get_db_path(self) -> str:
db_path = str(Path(self.output_path) / self.DATABASE_NAME)
return f"sqlite:///{db_path}"

@classmethod
def get_dagster_op(cls) -> Callable:
"""Returns dagstger op that runs this extractor."""

@op(
name=f"dbf_{cls.DATASET}",
required_resource_keys={
"ferc_to_sqlite_settings",
"datastore",
"runtime_settings",
},
)
def inner_method(context) -> None:
"""Instantiates dbf extractor and runs it."""
dbf_extractor = cls(
datastore=context.resources.datastore,
settings=context.resources.ferc_to_sqlite_settings,
clobber=context.resources.runtime_settings.clobber,
output_path=PudlPaths().output_dir,
)
dbf_extractor.execute()

return inner_method

def execute(self):
"""Runs the extraction of the data from dbf to sqlite."""
logger.info(
Expand Down
37 changes: 6 additions & 31 deletions src/pudl/extract/ferc.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,17 @@
"""Hooks to integrate ferc to sqlite functionality into dagster graph."""


from dagster import Field, op

import pudl
from pudl.extract.ferc1 import Ferc1DbfExtractor
from pudl.extract.ferc2 import Ferc2DbfExtractor
from pudl.extract.ferc6 import Ferc6DbfExtractor
from pudl.extract.ferc60 import Ferc60DbfExtractor
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)


@op(
config_schema={
"clobber": Field(
bool, description="Clobber existing ferc1 database.", default_value=False
),
},
required_resource_keys={"ferc_to_sqlite_settings", "datastore"},
)
def dbf2sqlite(context) -> None:
"""Clone the FERC Form 1 Visual FoxPro databases into SQLite."""
# TODO(rousik): this thin wrapper seems to be somewhat quirky. Maybe there's a way
# to make the integration # between the class and dagster better? Investigate.
logger.info(f"dbf2sqlite settings: {context.resources.ferc_to_sqlite_settings}")

extractors = [
Ferc1DbfExtractor,
Ferc2DbfExtractor,
Ferc6DbfExtractor,
Ferc60DbfExtractor,
]
for xclass in extractors:
xclass(
datastore=context.resources.datastore,
settings=context.resources.ferc_to_sqlite_settings,
clobber=context.op_config["clobber"],
output_path=PudlPaths().output_dir,
).execute()
ALL_DBF_EXTRACTORS = [
Ferc1DbfExtractor,
Ferc2DbfExtractor,
Ferc6DbfExtractor,
Ferc60DbfExtractor,
]
42 changes: 24 additions & 18 deletions src/pudl/extract/xbrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,18 @@
from datetime import date
from pathlib import Path

from dagster import ConfigurableResource, op
from dagster import op
from ferc_xbrl_extractor.cli import run_main

import pudl
from pudl.resources import RuntimeSettings
from pudl.settings import FercGenericXbrlToSqliteSettings, XbrlFormNumber
from pudl.workspace.datastore import Datastore
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)


class XbrlRuntimeSettings(ConfigurableResource):
"""Encodes runtime setting for the XBRL extraction."""
# TODO(rousik): Using BaseSettings here might allow configuring this via environment variables.
clobber: bool = False
num_workers: None | int = None
batch_size: int = 50


class FercXbrlDatastore:
"""Simple datastore wrapper for accessing ferc1 xbrl resources."""

Expand Down Expand Up @@ -52,22 +45,33 @@ def get_filings(self, year: int, form: XbrlFormNumber) -> io.BytesIO:
)
)


def xbrl2sqlite_op_factory(form: XbrlFormNumber) -> Callable:
"""Generates xbrl2sqlite op for a given FERC form."""

@op(
name=f"ferc{form.value}_xbrl",
required_resource_keys={"ferc_to_sqlite_settings", "datastore", "xbrl_runtime_settings"}
required_resource_keys={
"ferc_to_sqlite_settings",
"datastore",
"runtime_settings",
},
)
def inner_xbrl2sqlite(context) -> None:
def inner_op(context) -> None:
output_path = PudlPaths().output_dir
runtime_settings: XbrlRuntimeSettings = context.resources.xbrl_runtime_settings
settings = context.resources.ferc_to_sqlite_settings.get_xbrl_dataset_settings(form)
runtime_settings: RuntimeSettings = context.resources.runtime_settings
settings = context.resources.ferc_to_sqlite_settings.get_xbrl_dataset_settings(
form
)
datastore = FercXbrlDatastore(context.resources.datastore)

if settings is None or settings.disabled:
logger.info(f"Skipping dataset ferc{form}_xbrl: no config or is disabled.")
sql_path = PudlPaths().sqlite_db_path(f"ferc{form.value}_xbrl")
logger.info(
f"Skipping dataset ferc{form.value}_xbrl: no config or is disabled."
)
return

sql_path = PudlPaths().sqlite_db_path(f"ferc{form.value}_xbrl")
if sql_path.exists():
if runtime_settings.clobber:
sql_path.unlink()
Expand All @@ -82,10 +86,12 @@ def inner_xbrl2sqlite(context) -> None:
datastore,
output_path=output_path,
sql_path=sql_path,
batch_size=runtime_settings.batch_size,
workers=runtime_settings.num_workers,
batch_size=runtime_settings.xbrl_batch_size,
workers=runtime_settings.xbrl_num_workers,
)
return inner_xbrl2sqlite

return inner_op


def convert_form(
form_settings: FercGenericXbrlToSqliteSettings,
Expand Down
24 changes: 15 additions & 9 deletions src/pudl/ferc_to_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from dagster import Definitions, graph

import pudl
from pudl.extract.ferc import dbf2sqlite
from pudl.extract.xbrl import XbrlRuntimeSettings, xbrl2sqlite_op_factory
from pudl.resources import datastore, ferc_to_sqlite_settings
from pudl.extract.ferc import ALL_DBF_EXTRACTORS
from pudl.extract.ferc1 import Ferc1DbfExtractor
from pudl.extract.ferc2 import Ferc2DbfExtractor
from pudl.extract.ferc6 import Ferc6DbfExtractor
from pudl.extract.ferc60 import Ferc60DbfExtractor
from pudl.extract.xbrl import xbrl2sqlite_op_factory
from pudl.resources import RuntimeSettings, datastore, ferc_to_sqlite_settings
from pudl.settings import EtlSettings, XbrlFormNumber

logger = pudl.logging_helpers.get_logger(__name__)
Expand All @@ -15,15 +19,17 @@
@graph
def ferc_to_sqlite():
"""Clone the FERC FoxPro databases and XBRL filings into SQLite."""
dbf2sqlite()
for extractor in ALL_DBF_EXTRACTORS:
extractor.get_dagster_op()()
for form in XbrlFormNumber:
xbrl2sqlite_op_factory(form)()


@graph
def ferc_to_sqlite_dbf_only():
"""Clone the FERC FoxPro databases into SQLite."""
dbf2sqlite()
for extractor in ALL_DBF_EXTRACTORS:
extractor.get_dagster_op()()


@graph
Expand All @@ -32,9 +38,10 @@ def ferc_to_sqlite_xbrl_only():
for form in XbrlFormNumber:
xbrl2sqlite_op_factory(form)()


default_resources_defs = {
"ferc_to_sqlite_settings": ferc_to_sqlite_settings,
"xbrl_runtime_settings": XbrlRuntimeSettings(),
"runtime_settings": RuntimeSettings(),
"datastore": datastore,
}

Expand All @@ -55,10 +62,9 @@ def ferc_to_sqlite_xbrl_only():
"ferc_to_sqlite_settings": {
"config": ferc_to_sqlite_fast_settings.model_dump(),
},
"xbrl_runtime_settings": {
# TODO(rousik): do we need to set some defaults here?
"runtime_settings": {
"config": {},
}
},
},
},
)
Expand Down
11 changes: 3 additions & 8 deletions src/pudl/ferc_to_sqlite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,13 @@ def main(): # noqa: C901
else "",
},
},
},
"ops": {
"xbrl2sqlite": {
"runtime_settings": {
"config": {
"workers": args.workers,
"batch_size": args.batch_size,
"clobber": args.clobber,
"xbrl_num_workers": args.workers,
"xbrl_batch_size": args.batch_size,
},
},
"dbf2sqlite": {
"config": {"clobber": args.clobber},
},
},
},
raise_on_error=True,
Expand Down
11 changes: 10 additions & 1 deletion src/pudl/resources.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Collection of Dagster resources for PUDL."""

from dagster import Field, resource
from dagster import ConfigurableResource, Field, resource

from pudl.settings import DatasetsSettings, FercToSqliteSettings, create_dagster_config
from pudl.workspace.datastore import Datastore
from pudl.workspace.setup import PudlPaths


class RuntimeSettings(ConfigurableResource):
"""Encodes runtime settings for the ferc_to_sqlite graphs."""

# TODO(rousik): Using BaseSettings here might allow configuring this via environment variables.
clobber: bool = False
xbrl_num_workers: None | int = None
xbrl_batch_size: int = 50


@resource(config_schema=create_dagster_config(DatasetsSettings()))
def dataset_settings(init_context) -> DatasetsSettings:
"""Dagster resource for parameterizing PUDL ETL assets.
Expand Down
Loading

0 comments on commit 428d04f

Please sign in to comment.