From ce441cc8cd22805e047455e58f460e1c8be3e52e Mon Sep 17 00:00:00 2001 From: David Maxson <937812+scnerd@users.noreply.github.com> Date: Mon, 20 Nov 2023 17:02:43 -0800 Subject: [PATCH] Pydantic 2 (#3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [WIP] Started converting to pydantic 2 * [WIP] Started converting to pydantic 2 * Bump version: 0.3.0 → 1.0.0 * [WIP] Started converting to pydantic 2 --------- Co-authored-by: David Maxson --- .bumpversion.cfg | 2 +- .github/workflows/docs.yml | 44 +++------ .github/workflows/pre-commit.yml | 2 +- .github/workflows/pypi.yml | 2 +- .github/workflows/pytest.yml | 8 +- .pre-commit-config.yaml | 6 +- .ruff.toml | 2 +- README.md | 15 +-- data_dag/__init__.py | 2 +- data_dag/dag_factory.py | 14 ++- data_dag/operator_factory/__init__.py | 2 + data_dag/operator_factory/base.py | 10 +- data_dag/operator_factory/dynamic.py | 120 +++++++++-------------- data_dag/operator_factory/simple.py | 44 +++++---- docs/conf.py | 2 +- docs/getting_started.md | 12 +-- docs/overview.md | 12 +-- docs/reference/operator_factory.rst | 2 + docs/requirements.txt | 2 +- pyproject.toml | 10 +- requirements.txt | 2 +- tests/test_dag_factory.py | 26 ++--- tests/test_dynamic_operator_factory.py | 130 ++++++++++--------------- tests/test_operator_factory.py | 44 ++++++--- 24 files changed, 236 insertions(+), 279 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9a8823c..b41b24a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.0 +current_version = 1.0.0 commit = True tag = True diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index b09b21b..f2c8522 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -7,36 +7,16 @@ jobs: docs_build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: ammaraskar/sphinx-action@master - with: - docs-folder: "docs/" + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: '3.10' + cache: 'pip' # caching pip dependencies + - run: pip install -r docs/requirements.txt + - run: cd docs && make html - - uses: actions/upload-artifact@v1 - with: - name: DocumentationHTML - path: docs/_build/html/ - -# Use RTFD instead -# publish_gh_pages: -# runs-on: ubuntu-latest -# needs: docs_build -# if: github.ref == 'refs/heads/master' -# steps: -# - name: Commit documentation changes -# run: | -# git clone https://github.com/your_git/repository.git --branch gh-pages --single-branch gh-pages -# cp -r docs/_build/html/* gh-pages/ -# cd gh-pages -# git config --local user.email "action@github.com" -# git config --local user.name "GitHub Action" -# git add . -# git commit -m "Update documentation" -a || true -# # The above command will fail if no changes were present, so we ignore -# # the return code. -# - name: Push changes -# uses: ad-m/github-push-action@master -# with: -# branch: gh-pages -# directory: gh-pages -# github_token: ${{ secrets.GITHUB_TOKEN }} + - uses: actions/upload-artifact@v3 + with: + name: DocumentationHTML + path: docs/_build/html/ + retention-days: 30 diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 138af6f..a23cd3e 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -11,7 +11,7 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - uses: actions/setup-python@v4 diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 1c15831..ab20008 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -8,7 +8,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index c4c12ed..e91e73b 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -8,17 +8,17 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ '3.7', '3.8', '3.9', '3.10' ] + python-version: [ '3.8', '3.9', '3.10' ] name: Python ${{ matrix.python-version }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: '3.x' + python-version: ${{ matrix.python-version }} cache: 'pip' - name: Install dependencies run: | - python -m pip install --upgrade pip + pip install pendulum pip install -r requirements.txt - name: Test with pytest run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0a73350..4658ba8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -11,14 +11,14 @@ repos: - id: check-added-large-files - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 23.11.0 hooks: - id: black additional_dependencies: ['click!=8.1.0'] - repo: https://github.com/charliermarsh/ruff-pre-commit # Ruff version. - rev: 'v0.0.280' + rev: 'v0.1.6' hooks: - id: ruff args: ["--fix"] diff --git a/.ruff.toml b/.ruff.toml index f881d18..31b3657 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -42,7 +42,7 @@ line-length = 120 dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" # Assume Python 3.7. -target-version = "py37" +target-version = "py38" [mccabe] # Unlike Flake8, default to a complexity level of 10. diff --git a/README.md b/README.md index 31f25d6..29e845e 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,10 @@ from data_dag.dag_factory import DagFactory from urllib.request import urlretrieve from typing import List -from airflow.operators.dummy import DummyOperator +try: + from airflow.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.dummy import DummyOperator as EmptyOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup @@ -54,8 +57,8 @@ class DownloaderDag(DagFactory): downloads: List[DownloadOperator] def _make_dag(self): - start = DummyOperator(task_id='start') - end = DummyOperator(task_id='end') + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') for download in self.downloads: start >> download.make_operator() >> end @@ -68,7 +71,7 @@ Then a definition for a particular DAG can live in a data file: dag_id: sample_dag description: An example of how to write a data-driven DAG -schedule_interval: '@daily' +schedule: '@daily' start_date: '2020-01-01T00:00:00' downloads: - name: data @@ -90,7 +93,7 @@ from my_factories.download import DownloaderDag with open('yaml/sample_dag.yaml', 'r') as f: dag_data = safe_load(f) -dag = DownloaderDag.parse_obj(dag_data).make_dag() +dag = DownloaderDag.model_validate(dag_data).make_dag() ``` ![img.png](docs/_images/img.png) @@ -115,7 +118,7 @@ for yaml_file_path in dag_dir.glob('typical_dags/**.yml'): dag_metadata = yaml.safe_load(f) # ... generate a DAG from that metadata - dag_metadata_obj = BaseDag.parse_obj(dag_metadata) + dag_metadata_obj = BaseDag.model_validate(dag_metadata) dag = dag_metadata_obj.make_dag() # See https://www.astronomer.io/guides/dynamically-generating-dags/ diff --git a/data_dag/__init__.py b/data_dag/__init__.py index ce711c9..fc55655 100644 --- a/data_dag/__init__.py +++ b/data_dag/__init__.py @@ -1,3 +1,3 @@ """Tooling to help build data-driven DAGs""" -__version__ = "0.3.0" +__version__ = "1.0.0" diff --git a/data_dag/dag_factory.py b/data_dag/dag_factory.py index 072105a..278e03d 100644 --- a/data_dag/dag_factory.py +++ b/data_dag/dag_factory.py @@ -3,7 +3,7 @@ from typing import Dict, Iterable, List, Optional, Union from airflow.models.dag import DAG -from pydantic import BaseModel, Extra +from pydantic import BaseModel, ConfigDict class DagFactory(BaseModel, abc.ABC): @@ -16,9 +16,9 @@ class DagFactory(BaseModel, abc.ABC): class MyKindOfDag(DagFactory): def _make_dag(self): - start = DummyOperator(...) + start = EmptyOperator(...) do_something = PythonOperator(...) - end = DummyOperator(...) + end = EmptyOperator(...) start >> do_something >> end @@ -38,7 +38,7 @@ def _make_dag(self): dag_id: str description: Optional[str] = None - schedule_interval: Union[timedelta, str, None] = None + schedule: Union[timedelta, str, None] = None start_date: Optional[datetime] = None end_date: Optional[datetime] = None full_filepath: Optional[str] = None @@ -60,9 +60,7 @@ def _make_dag(self): jinja_environment_kwargs: Optional[Dict] = None render_template_as_native_obj: bool = False tags: Optional[List[str]] = None - - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") @property def default_dag_kwargs(self) -> Dict: @@ -79,7 +77,7 @@ def make_dag_object(self, **overrides) -> DAG: kwargs.update( { field: getattr(self, field, None) - for field in DagFactory.__fields__ + for field in DagFactory.model_fields if getattr(self, field, None) is not None } ) diff --git a/data_dag/operator_factory/__init__.py b/data_dag/operator_factory/__init__.py index 2e6dfd2..28c7139 100644 --- a/data_dag/operator_factory/__init__.py +++ b/data_dag/operator_factory/__init__.py @@ -3,6 +3,7 @@ OperatorFactory, ) from .dynamic import ( + DynamicOperatorComponent, DynamicOperatorFactory, ) from .simple import ( @@ -16,4 +17,5 @@ "SimpleOperatorFactory", "SimpleOperatorComponent", "DynamicOperatorFactory", + "DynamicOperatorComponent", ) diff --git a/data_dag/operator_factory/base.py b/data_dag/operator_factory/base.py index fc106c7..8fa2c72 100644 --- a/data_dag/operator_factory/base.py +++ b/data_dag/operator_factory/base.py @@ -4,12 +4,11 @@ from airflow.models.taskmixin import TaskMixin from airflow.utils.task_group import TaskGroup -from pydantic import BaseModel, Extra +from pydantic import BaseModel, ConfigDict class BaseOperatorFactory(BaseModel, abc.ABC): - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") def make_operator( self, *args, **kwargs @@ -25,7 +24,7 @@ def make_operator( class OperatorFactory(BaseOperatorFactory, abc.ABC): """An interface for writing operator factories.""" - task_id: Optional[str] + task_id: Optional[str] = None @property def default_task_id(self) -> str: @@ -68,5 +67,4 @@ def _make_operators(self, *args, **kwargs) -> None: class OperatorComponent(BaseModel, abc.ABC): """A non-operator component for use in other operator factories. Just a proxy for :py:class:`pydantic.BaseModel`.""" - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") diff --git a/data_dag/operator_factory/dynamic.py b/data_dag/operator_factory/dynamic.py index f5e0841..d052993 100644 --- a/data_dag/operator_factory/dynamic.py +++ b/data_dag/operator_factory/dynamic.py @@ -1,76 +1,47 @@ import abc import inspect -import warnings +from typing import Union -from pydantic.main import ModelMetaclass +from pydantic import BaseModel, Field +from typing_extensions import Self from .base import OperatorComponent, OperatorFactory -class _DynamicModelMetaclass(ModelMetaclass): - def __new__(mcs, *args, **kwargs): - cls = super().__new__(mcs, *args, **kwargs) - cls.__known_subclasses__ = dict() - return cls - - def __call__(cls, *args, **kwargs): - known_subtype = cls.__type_name__ - specified_subtype = kwargs.pop( - cls.__type_kwarg_name__, cls.__default_type_name__ - ) - - if known_subtype is None and specified_subtype is None: - raise TypeError( - f"Failed to find type kwarg `{cls.__type_kwarg_name__}` while instantiating {cls}" - ) - elif known_subtype is not None and specified_subtype is not None: - raise TypeError( - f"Cannot specify explicit `{cls.__type_kwarg_name__}` to specific type {cls}" - ) - - # At this point, we know that exactly one of known_subtype and specified_subtype is given - assert bool(known_subtype) ^ bool(specified_subtype), ( - known_subtype, - specified_subtype, - ) - - if known_subtype: - specified_cls = cls - elif specified_subtype: - try: - specified_cls = cls.__known_subclasses__[specified_subtype] - except KeyError as ex: - raise TypeError( - f"Subtype `{specified_subtype}` not found for {cls}. Options are {list(cls.__known_subclasses__)}" - ) from ex - else: # pragma: no cover - assert False, ("How did we get here?", known_subtype, specified_subtype) - - return super(_DynamicModelMetaclass, specified_cls).__call__(*args, **kwargs) - - # With much help from # https://stackoverflow.com/questions/23374715/changing-the-bases-of-an-object-based-on-arguments-to-init class _DynamicOperatorBase: - __type_name__ = None - __default_type_name__ = None __type_kwarg_name__ = "type" + __known_subclasses__ = None - def __init_subclass__(cls, **kwargs): + @classmethod + def __pydantic_init_subclass__(cls, **kwargs): if not inspect.isabstract(cls) and abc.ABC not in cls.__bases__: - subtype_name = cls.__type_name__ - - if not subtype_name: - warnings.warn( - f"Type {cls} does not specify a subtype name (as __type_name__) and cannot be dynamically instantiated; if this is intentional, make the clas abstract, like `class {cls.__name__}(..., abc.ABC):`" + if cls.__type_kwarg_name__ not in cls.model_fields: + raise TypeError( + f"Dynamic subclass {cls} defined with no discriminator field, expected a literal field named `{cls.__type_kwarg_name__}`" ) - cls.__known_subclasses__[subtype_name] = cls + cls.__known_subclasses__.append(cls) + + @classmethod + def discriminated_annotation(cls) -> type[Union]: + return Union[tuple(cls.__known_subclasses__)] + + @classmethod + def discrimated_field(cls, **kwargs) -> Field: + return Field(discriminator=cls.__type_kwarg_name__, **kwargs) + @classmethod + def model_validate(cls, data: dict) -> Self: + class PolymorphicWrapper(BaseModel): + obj: cls.discriminated_annotation() = cls.discrimated_field() -class DynamicOperatorFactory( - OperatorFactory, _DynamicOperatorBase, abc.ABC, metaclass=_DynamicModelMetaclass -): + obj = PolymorphicWrapper.model_validate(dict(obj=data)) + return obj.obj + + +class DynamicOperatorFactory(_DynamicOperatorBase, OperatorFactory, abc.ABC): """An OperatorFactory that can automatically instantiate sub-classes based on the input data. Consider the following example:: @@ -79,47 +50,46 @@ class InputFile(DynamicOperatorFactory, abc.ABC): pass class LocalFile(InputFile): - __type_name__ = 'local' + type: Literal['local'] path: str class S3File(InputFile): - __type_name__ = 's3' + type: Literal['s3'] bucket: str key: str - InputFile.parse_obj({'type': 's3', 'bucket': 'my-bucket', 'key': 'my-key'}) + TypeAdapter[InputFile.subtypes_annotation()](type='s3', bucket='my-bucket', key='my-key') # S3File(bucket='my-bucket', key='my-key') Note how the type of object that gets instantiated is dynamically chosen from the data, rather than specified by the code. This allows a supertype to be used in code, and for the subtype to be chosen at runtime based on data. - To use a dynamic factory, define your base supertype to inherit directly from :py:class:`DynamicOperatorFactory` and :py:class:`abc.ABC`. The class can be totally empty, as in the example above. This top-level class will be populated with a dictionary that will automatically track subclasses as they get define. + To use a dynamic factory, define your base supertype to inherit directly from :py:class:`DynamicOperatorFactory` and :py:class:`abc.ABC`. The class can be totally empty, as in the example above. This top-level class will be populated with a list that will automatically track subclasses as they get define. .. warning:: It's important to remember that, while subtypes are automatically tracked upon definition, they must still be imported somewhere. Make sure that when the supertype is imported, the subtypes also eventually get imported, or else they will be unavailable at DAG resolution time. - Subclasses must either define ``__type_name__ = "some_name"`` or else inherit from :py:class:`abc.ABC` to indicate that they are abstract. Classes that are not abstract and not named will generate a warning. - - A default subtype can be specified using ``__default_type_name__`` in the top-level type. Note that this is the ``__type_name__`` of the default subclass, not the class name itself. + Subclasses must define a `Literal`-annotated field with a name matching the value of ``__default_type_name__`` (`type` by default) in the top-level type. - By default, the subclass is chosen by the ``"type"`` key in the input data. This can be changed by setting ``__type_kwarg_name__`` in the top-level type to some other string. This key will be stripped from the input data and all other keys will be passed along to the subtype's constructor without further modification. + By default, the subclass is chosen by the ``"type"`` key in the input data. This can be changed by setting ``__type_kwarg_name__`` in the top-level type to some other string. - Attempting to construct a top-level object, either directly (with its constructor) or using ``parse_obj``, without specifying a "type" (or whatever you renamed the key to be) will result in a :py:exc:`TypeError`. + This functionality is based on Pydantic's "discriminated union" feature. See that for more details. + """ - .. note:: + def __init_subclass__(cls, **kwargs): + if cls.__known_subclasses__ is None and cls is not DynamicOperatorFactory: + cls.__known_subclasses__ = [] - Pydantic already supports Union types, so why would we use a custom DynamicOperatorFactory instead? + super().__init_subclass__(**kwargs) - Dynamic factories provide two key advantages: - - The subtype selected is explicit rather than implicit. The subtypes don't need to be distinguishable in any other way besides their ``__type_name__``, nor is there any kind of ordering of the subtypes. - - The list of options is automatically maintained, as long as the modules containing the subtypes are sure to be imported. That is, another component or factory can use the top-level type to annotate one of its fields, and the subtypes will automatically be implied. - """ +class DynamicOperatorComponent(_DynamicOperatorBase, OperatorComponent, abc.ABC): + """Identical to :py:class:`DynamicOperatorFactory` but based on :py:class:`OperatorComponent` instead.""" + def __init_subclass__(cls, **kwargs): + if cls.__known_subclasses__ is None and cls is not DynamicOperatorComponent: + cls.__known_subclasses__ = [] -class DynamicOperatorComponent( - OperatorComponent, _DynamicOperatorBase, abc.ABC, metaclass=_DynamicModelMetaclass -): - """Identical to :py:class:`DynamicOperatorFactory` but based on :py:class:`OperatorComponent` instead.""" + super().__init_subclass__(**kwargs) diff --git a/data_dag/operator_factory/simple.py b/data_dag/operator_factory/simple.py index fa3e8c5..f90ad11 100644 --- a/data_dag/operator_factory/simple.py +++ b/data_dag/operator_factory/simple.py @@ -2,20 +2,23 @@ import inspect from typing import Any, Type -from pydantic import BaseModel +from pydantic import BaseModel, model_validator +from pydantic.fields import FieldInfo from typing_extensions import get_origin from .base import OperatorComponent, OperatorFactory def _dict_from_primitive(cls: "Type[_SimpleModelMixin]", obj): - assert cls.__simple_field__ is not None + assert cls.__simple_field__ is not None and isinstance( + cls.__simple_field__, FieldInfo + ) + if not isinstance(obj, dict): - return {cls.__simple_field__.name: obj} + return {cls.__simple_field__name__: obj} else: # Check if we're expecting a dictionary... - tp = cls.__simple_field__.outer_type_ - tp = get_origin(tp) or tp + tp = get_origin(cls.__simple_field__.annotation) if isinstance(tp, type) and issubclass(tp, dict): raise NotImplementedError( "Not yet sure how to handle sanitizing a dictionary when the class is just a proxy for a dictionary field" @@ -27,27 +30,34 @@ def _dict_from_primitive(cls: "Type[_SimpleModelMixin]", obj): class _SimpleModelMixin: """A mixin to support single-field pydantic models being parsed directly from primitives rather than requiring dictionaries""" - def __init_subclass__(cls, **kwargs): + @classmethod + def __pydantic_init_subclass__(cls, **_kwargs): assert issubclass(cls, BaseModel) if not inspect.isabstract(cls) and abc.ABC not in cls.__bases__: required_fields = [ - field for field in cls.__fields__.values() if field.required + (field_name, field) + for field_name, field in cls.model_fields.items() + if field.is_required() ] if len(required_fields) != 1: raise TypeError( - f"A non-abstract inheritor of {cls} must have exactly one non-default field (Found {[f.name for f in required_fields]})" + f"The non-abstract model {cls} must have exactly one non-default field (Found {required_fields} out of {cls.model_fields})" ) - field = required_fields[0] + field_name, field = required_fields[0] + cls.__simple_field__name__ = field_name cls.__simple_field__ = field - cls.__pre_root_validators__ = [_dict_from_primitive] + # cls.__pre_root_validators__ = [_dict_from_primitive] + @model_validator(mode="before") @classmethod - def _enforce_dict_if_root(cls, obj: Any) -> Any: - obj = super()._enforce_dict_if_root(obj) - obj = _dict_from_primitive(cls, obj) - return obj + def _convert_to_dict_if_scalar(cls, data: Any) -> Any: + return _dict_from_primitive(cls, data) + + @classmethod + def model_validate(cls, data: Any): + return super().model_validate(_dict_from_primitive(cls, data)) class SimpleOperatorFactory(_SimpleModelMixin, OperatorFactory, abc.ABC): @@ -63,14 +73,14 @@ class SimpleOperatorFactory(_SimpleModelMixin, OperatorFactory, abc.ABC): class FilePath(SimpleOperatorFactory): path: str # <-- single required field is_file: bool = True # <-- optional field (because of default) - mime_type: Optional[str] # <-- optional field (because of Optional type) + mime_type: str | None # <-- optional field (because of Optional type) def make_operator(self): ... Normally, this object could only be instantiated using a dictionary:: - FilePath.parse_obj({'path': 'path/to/file.txt'}) + FilePath.model_validate({'path': 'path/to/file.txt'}) # Or, in YAML: # outer_object: @@ -81,7 +91,7 @@ def make_operator(self): However, because we inherit from :py:class:`SimpleOperatorFactory`, we can instantiate a ``FilePath`` by specifying just the ``path`` literal:: - FilePath.parse_obj('path/to/file.txt') + FilePath.model_validate('path/to/file.txt') # Or, in YAML: # outer_object: diff --git a/docs/conf.py b/docs/conf.py index 120ecbb..6417589 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,7 +24,7 @@ author = "David Maxson (david.maxson@rearc.io)" # The full version, including alpha/beta/rc tags -release = "0.3.0" +release = "1.0.0" # -- General configuration --------------------------------------------------- diff --git a/docs/getting_started.md b/docs/getting_started.md index e4b0344..892e1b8 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -18,7 +18,7 @@ All three are interfaces over a `pydantic` model, and thus provide much of the s Common workflows that all abstractions share are: -- Can use `parse_obj` to instantiate the class from a dictionary of data (e.g., from a YAML or JSON file) +- Can use `model_validate` to instantiate the class from a dictionary of data (e.g., from a YAML or JSON file) - Can also be directly constructed in-code, just like any other `pydantic`-based class - Can have custom field validation that goes well beyond mere type verification or coercion (e.g. checking that a string matches a regex, or that a number is within a given range, or converting a string to an enum) - `*args, **kwargs` can often be passed to the primary methods of the various factories; however, it is recommended to make each class's attributes self-contained such that the factory can be executed without any additional arguments. @@ -214,7 +214,7 @@ file_path: The same is true of {py:class}`.SimpleOperatorFactory` relative to {py:class}`.OperatorFactory`. In fact, since the above sample code uses both a {py:class}`.SimpleOperatorComponent` and a {py:class}`.SimpleOperatorFactory`, the entire operator can be defined using a string rather than a full dictionary: ```python -CheckFileExists.parse_obj('path/to/file.txt') +CheckFileExists.model_validate('path/to/file.txt') # CheckFileExists(file_path=Path(path='path/to/file.txt', is_file=True)) ``` @@ -257,7 +257,7 @@ Several common data languages, such as YAML and JSON, serialize collections of s When these files are loaded into memory, e.g. using `json` or `pyyaml`, the result is an in-memory object comprised of simple data types. Beyond the most trivial use cases, these objects are typically nested and represent a variety of application-specific details. Working directly with these raw dictionaries and lists is likely to involve a lot of boilerplate code; however, if we can map this data into classes, creating useful object instances, we can tie data directly into the functionality related to it. -To do this mapping from in-memory data (no matter where it was loaded from or how it was constructed), `pydantic` allows us to use `MyType.parse_obj(data)` where `MyType` is the factory or component that is represented by the data. +To do this mapping from in-memory data (no matter where it was loaded from or how it was constructed), `pydantic` allows us to use `MyType.model_validate(data)` where `MyType` is the factory or component that is represented by the data. ```{note} This approach requires that your full top-level data object maps directly onto a Python type. In general, this either isn't hard or is a good idea to add; however, you can also parse out pieces of the data, or restructure it, to match the data types you've defined, especially if you're working with a legacy or shared data schema that isn't easy to modify. Another option is to add [a custom `__init__`](custom_constructor) to a top-level type that knows how to interpret a slightly different input data schema. @@ -289,7 +289,7 @@ class EmailAllUsers(DagFactory): You could load an email list from the following YAML: ```yaml dag_id: email_users -schedule_interval: '@weekly' +schedule: '@weekly' emails: - email_address: sample@example.com @@ -305,7 +305,7 @@ This would produce the following in-memory object: ```python data = { 'dag_id': 'email_users', - 'schedule_interval': '@weekly', + 'schedule': '@weekly', 'email_upon_completion': { 'email_address': 'admin@my.site.com', 'message_html': '

All messages sent

' @@ -319,7 +319,7 @@ data = { Which we can then parse as: ```python -dag_metadata = EmailAllUsers.parse_obj(data) +dag_metadata = EmailAllUsers.model_validate(data) # EmailAllUsers( # dag_id='email_users', # ... diff --git a/docs/overview.md b/docs/overview.md index c7e8afd..3828279 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -30,7 +30,7 @@ from data_dag.dag_factory import DagFactory from urllib.request import urlretrieve from typing import List -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup @@ -64,8 +64,8 @@ class DownloaderDag(DagFactory): downloads: List[DownloadOperator] def _make_dag(self): - start = DummyOperator(task_id='start') - end = DummyOperator(task_id='end') + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') for download in self.downloads: start >> download.make_operator() >> end @@ -78,7 +78,7 @@ Then a definition for a particular DAG can live in a data file: dag_id: sample_dag description: An example of how to write a data-driven DAG -schedule_interval: '@daily' +schedule: '@daily' start_date: '2020-01-01T00:00:00' downloads: - name: data @@ -100,7 +100,7 @@ from my_factories.download import DownloaderDag with open('yaml/sample_dag.yaml', 'r') as f: dag_data = safe_load(f) -dag = DownloaderDag.parse_obj(dag_data).make_dag() +dag = DownloaderDag.model_validate(dag_data).make_dag() ``` ![img.png](_images/img.png) @@ -126,7 +126,7 @@ for yaml_file_path in dag_dir.glob('typical_dags/**.yml'): dag_metadata = yaml.safe_load(f) # ... generate a DAG from that metadata - dag_metadata_obj = BaseDag.parse_obj(dag_metadata) + dag_metadata_obj = BaseDag.model_validate(dag_metadata) dag = dag_metadata_obj.make_dag() # See https://www.astronomer.io/guides/dynamically-generating-dags/ diff --git a/docs/reference/operator_factory.rst b/docs/reference/operator_factory.rst index d2d3b01..e729dd4 100644 --- a/docs/reference/operator_factory.rst +++ b/docs/reference/operator_factory.rst @@ -13,3 +13,5 @@ data\_dag.operator\_factory .. autoclass:: SimpleOperatorComponent .. autoclass:: DynamicOperatorFactory + +.. autoclass:: DynamicOperatorComponent diff --git a/docs/requirements.txt b/docs/requirements.txt index ae22691..14f1d62 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,4 +1,4 @@ -pydantic +pydantic==2.* sphinx myst-parser sphinxcontrib-mermaid diff --git a/pyproject.toml b/pyproject.toml index 5325550..418bec6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,11 +13,11 @@ classifiers = [ "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] -requires-python = ">=3.7" +requires-python = ">=3.8" dependencies = [ - "apache-airflow", + "apache-airflow>=2.6.3", "typing_extensions>=4", - "pydantic>=1", + "pydantic==2.*", ] [project.readme] @@ -25,8 +25,8 @@ file = "README.md" content-type = "text/markdown" [project.urls] -Homepage = "https://github.com/rearc-data/data_dag" -"Bug Tracker" = "https://github.com/rearc-data/data_dag/issues" +Homepage = "https://github.com/rearc-data/data-dag" +"Bug Tracker" = "https://github.com/rearc-data/data-dag/issues" [tool.setuptools] include-package-data = false diff --git a/requirements.txt b/requirements.txt index 7c1c6bb..25d1d77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # Core -pydantic +pydantic==2.* apache-airflow # Testing diff --git a/tests/test_dag_factory.py b/tests/test_dag_factory.py index b2e230a..97029a0 100644 --- a/tests/test_dag_factory.py +++ b/tests/test_dag_factory.py @@ -3,46 +3,50 @@ import pytest from airflow import DAG -from airflow.operators.dummy import DummyOperator + +try: + from airflow.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.dummy import DummyOperator as EmptyOperator from data_dag.dag_factory import DagFactory from data_dag.operator_factory import SimpleOperatorFactory def test_simple(): - class DummyOp(SimpleOperatorFactory): + class EmptyOp(SimpleOperatorFactory): name: str def make_operator(self): - return DummyOperator(task_id=self.name) + return EmptyOperator(task_id=self.name) class SampleDag(DagFactory): - dummy_names: List[DummyOp] + empty_names: List[EmptyOp] def _make_dag(self): prev_op = None - for dummy in self.dummy_names: - next_op = dummy.make_operator() + for empty in self.empty_names: + next_op = empty.make_operator() if prev_op is not None: prev_op >> next_op prev_op = next_op data = { "dag_id": "my_dag", - "dummy_names": ["start", "end"], + "empty_names": ["start", "end"], "start_date": datetime.now(), } - dag_factory = SampleDag.parse_obj(data) + dag_factory = SampleDag.model_validate(data) dag = dag_factory.make_dag() assert isinstance(dag, DAG) assert dag.dag_id == "my_dag" op1, op2 = dag.topological_sort() - assert isinstance(op1, DummyOperator) + assert isinstance(op1, EmptyOperator) assert op1.task_id == "start" - assert isinstance(op2, DummyOperator) + assert isinstance(op2, EmptyOperator) assert op2.task_id == "end" def test_failure_direct_instantiation(): data = {"dag_id": "my_dag", "start_date": datetime.now()} - pytest.raises(NotImplementedError, DagFactory.parse_obj(data).make_dag) + pytest.raises(NotImplementedError, DagFactory.model_validate(data).make_dag) diff --git a/tests/test_dynamic_operator_factory.py b/tests/test_dynamic_operator_factory.py index e1c767e..1da4c9d 100644 --- a/tests/test_dynamic_operator_factory.py +++ b/tests/test_dynamic_operator_factory.py @@ -1,10 +1,14 @@ import abc -from typing import List +from typing import List, Literal import pytest +from pydantic import ValidationError from data_dag.operator_factory import OperatorFactory -from data_dag.operator_factory.dynamic import DynamicOperatorFactory +from data_dag.operator_factory.dynamic import ( + DynamicOperatorComponent, + DynamicOperatorFactory, +) def test_direct_dynamic_class(): @@ -12,28 +16,24 @@ class ABase(DynamicOperatorFactory, abc.ABC): pass class A1(ABase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" x: str class A2(ABase): - __type_name__ = "type2" + type: Literal["type2"] = "type2" x: str class A3(ABase): - __type_name__ = "type3" + type: Literal["type3"] = "type3" x: str class Root(OperatorFactory): - a: ABase + a: ABase.discriminated_annotation() = ABase.discrimated_field() - assert ABase.__known_subclasses__ == { - "type1": A1, - "type2": A2, - "type3": A3, - } + assert ABase.__known_subclasses__ == [A1, A2, A3] obj = {"a": {"type": "type2", "x": "yolo"}} - obj = Root.parse_obj(obj) + obj = Root.model_validate(obj) assert isinstance(obj, Root) obj1 = obj.a assert isinstance(obj1, A2) @@ -45,25 +45,21 @@ class ABase(DynamicOperatorFactory, abc.ABC): pass class A1(ABase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" x: str class A2(ABase): - __type_name__ = "type2" + type: Literal["type2"] = "type2" x: str class A3(ABase): - __type_name__ = "type3" + type: Literal["type3"] = "type3" x: str class Root(OperatorFactory): - a: List[ABase] + a: List[ABase.discriminated_annotation()] = ABase.discrimated_field() - assert ABase.__known_subclasses__ == { - "type1": A1, - "type2": A2, - "type3": A3, - } + assert set(ABase.__known_subclasses__) == {A1, A2, A3} obj = { "a": [ @@ -71,7 +67,7 @@ class Root(OperatorFactory): {"type": "type1", "x": "wassup"}, ] } - obj = Root.parse_obj(obj) + obj = Root.model_validate(obj) assert isinstance(obj, Root) assert len(obj.a) == 2 obj1, obj2 = obj.a @@ -80,34 +76,7 @@ class Root(OperatorFactory): assert isinstance(obj2, A1) assert obj2.x == "wassup" - assert ABase.parse_obj({"type": "type1", "x": "yolo"}) == A1.parse_obj( - {"x": "yolo"} - ) - - -def test_default_dynamic_class(): - class ABase(DynamicOperatorFactory, abc.ABC): - __default_type_name__ = "type2" - - class A1(ABase): - __type_name__ = "type1" - x: str - - class A2(ABase): - __type_name__ = "type2" - x: str - - a1 = ABase.parse_obj({"type": "type1", "x": "wassup"}) - assert isinstance(a1, A1) - assert a1.x == "wassup" - - a2 = ABase.parse_obj({"type": "type2", "x": "lol"}) - assert isinstance(a2, A2) - assert a2.x == "lol" - - ad = ABase.parse_obj({"x": "default thing"}) - assert isinstance(ad, A2) - assert ad.x == "default thing" + assert ABase.model_validate(dict(type="type1", x="yolo")) == A1(x="yolo") def test_failure_erroneous_dynamic_class(): @@ -115,18 +84,20 @@ class ABase(DynamicOperatorFactory, abc.ABC): pass class A1(ABase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" x: str class A2(ABase): - __type_name__ = "type2" + type: Literal["type2"] = "type2" x: str - pytest.raises(TypeError, ABase.parse_obj, {"type": "typeNone", "x": "wassup"}) + pytest.raises( + ValidationError, ABase.model_validate, {"type": "typeNone", "x": "wassup"} + ) def test_failure_no_type_name(): - with pytest.warns(UserWarning): + with pytest.raises(TypeError): class ABase(DynamicOperatorFactory, abc.ABC): pass @@ -140,11 +111,11 @@ class ABase(DynamicOperatorFactory, abc.ABC): pass class A1(ABase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" x: str - with pytest.raises(TypeError): - ABase.parse_obj({"x": "junk"}) + with pytest.raises(ValidationError): + ABase.model_validate({"x": "junk"}) def test_failure_both_specified_and_explicit_type(): @@ -152,15 +123,11 @@ class ABase(DynamicOperatorFactory, abc.ABC): pass class A1(ABase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" x: str - with pytest.raises(TypeError): - A1.parse_obj({"type": "type2", "x": "junk"}) - - with pytest.raises(TypeError): - # This is more up for debate: should this fail when "type" is given, even when it's correct? - A1.parse_obj({"type": "type1", "x": "junk"}) + with pytest.raises(ValidationError): + A1.model_validate({"type": "type2", "x": "junk"}) def test_customize_type_name(): @@ -168,37 +135,44 @@ class ABase(DynamicOperatorFactory, abc.ABC): __type_kwarg_name__ = "different_type" class A1(ABase): - __type_name__ = "type1" + different_type: Literal["type1"] = "type1" x: str class A2(ABase): - __type_name__ = "type2" + different_type: Literal["type2"] = "type2" y: str class BBase(DynamicOperatorFactory, abc.ABC): pass class B1(BBase): - __type_name__ = "type1" + type: Literal["type1"] = "type1" a: str class B2(BBase): - __type_name__ = "type2" + type: Literal["type2"] = "type2" b: str - assert ABase.__known_subclasses__ == { - "type1": A1, - "type2": A2, - } - assert BBase.__known_subclasses__ == { - "type1": B1, - "type2": B2, - } + assert set(ABase.__known_subclasses__) == {A1, A2} + assert set(BBase.__known_subclasses__) == {B1, B2} - a1 = ABase.parse_obj({"different_type": "type1", "x": "lol"}) + a1 = ABase.model_validate({"different_type": "type1", "x": "lol"}) assert isinstance(a1, A1) assert a1.x == "lol" - b2 = BBase.parse_obj({"type": "type2", "b": "yup"}) + b2 = BBase.model_validate({"type": "type2", "b": "yup"}) assert isinstance(b2, B2) assert b2.b == "yup" + + +def test_basic_dynamic_component(): + class Uri(DynamicOperatorComponent, abc.ABC): + pass + + class S3Uri(Uri): + type: Literal["s3"] = "s3" + bucket: str + key: str + + uri = Uri.model_validate(dict(type="s3", bucket="my-bucket", key="my-key")) + assert uri == S3Uri(bucket="my-bucket", key="my-key") diff --git a/tests/test_operator_factory.py b/tests/test_operator_factory.py index ff70c25..64a9a9b 100644 --- a/tests/test_operator_factory.py +++ b/tests/test_operator_factory.py @@ -2,7 +2,12 @@ from typing import Any, Dict, Optional import pytest -from airflow.operators.dummy import DummyOperator + +try: + from airflow.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.dummy import DummyOperator as EmptyOperator +from pydantic import ValidationError from data_dag.operator_factory import ( OperatorFactory, @@ -13,7 +18,7 @@ _junk_dag_kwargs = dict( dag_id="junk", - schedule_interval="@daily", + schedule="@daily", start_date=datetime.today(), ) @@ -23,12 +28,12 @@ class SampleOp(OperatorFactory): to_add: float def make_operator(self, *args, **kwargs): - return DummyOperator(task_id=f"Add_{self.to_add}") + return EmptyOperator(task_id=f"Add_{self.to_add}") data = {"to_add": 5} - op = SampleOp.parse_obj(data) + op = SampleOp.model_validate(data) airflow_op = op.make_operator() - assert isinstance(airflow_op, DummyOperator) + assert isinstance(airflow_op, EmptyOperator) assert airflow_op.task_id == "Add_5.0" @@ -65,7 +70,7 @@ class SampleOp(BaseOperatorFactory): def test_failure_direct_instantiation(): - pytest.raises(NotImplementedError, OperatorFactory.parse_obj({}).make_operator) + pytest.raises(NotImplementedError, OperatorFactory.model_validate({}).make_operator) def test_simple(): @@ -78,17 +83,17 @@ class SampleOp(SimpleOperatorFactory): def make_operator(self): raise NotImplementedError() - op = SampleOp.parse_obj(5) + op = SampleOp.model_validate(5) assert isinstance(op, SampleOp) assert op.i == 5 assert op.flag is True - op = SampleOp.parse_obj({"i": 5}) + op = SampleOp.model_validate({"i": 5}) assert isinstance(op, SampleOp) assert op.i == 5 assert op.flag is True - op = SampleOp.parse_obj({"i": 5, "flag": False}) + op = SampleOp.model_validate({"i": 5, "flag": False}) assert isinstance(op, SampleOp) assert op.i == 5 assert op.flag is False @@ -107,13 +112,13 @@ class SampleOp(SimpleOperatorComponent): d: Dict[str, Any] with pytest.raises(NotImplementedError): - SampleOp.parse_obj(dict()) + SampleOp.model_validate(dict()) def test_none_not_the_same_as_undefined(): class SampleOp(SimpleOperatorComponent): a: int - b: Optional[int] # Defaults to None, so there's only one undefined field + b: Optional[int] = None def test_task_id_and_task_group(): @@ -125,9 +130,9 @@ def default_task_id(self): return f"add_{self.i}" def _make_operators(self, *args, **kwargs) -> None: - load_value = DummyOperator(task_id="load") - compute_result = DummyOperator(task_id="compute") - finish = DummyOperator(task_id="finish") + load_value = EmptyOperator(task_id="load") + compute_result = EmptyOperator(task_id="compute") + finish = EmptyOperator(task_id="finish") load_value >> compute_result >> finish @@ -142,3 +147,14 @@ def _make_dag(self): assert load.task_id == "add_3.load" assert compute.task_id == "add_3.compute" assert finish.task_id == "add_3.finish" + + +def test_strict(): + class SampleOp(OperatorFactory): + to_add: float + + def make_operator(self, *args, **kwargs): + return EmptyOperator(task_id=f"Add_{self.to_add}") + + with pytest.raises(ValidationError): + SampleOp.model_validate(dict(to_subtract=3))