Skip to content

Commit

Permalink
Subclass validator from ItemTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Sep 19, 2023
1 parent b9089b5 commit 9cf59ee
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 17 deletions.
4 changes: 2 additions & 2 deletions dlt/extract/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.validation import validate_dict_ignoring_xkeys

from dlt.extract.incremental import Incremental
from dlt.extract.typing import TFunHintTemplate, TTableHintTemplate, ColumnValidator
from dlt.extract.typing import TFunHintTemplate, TTableHintTemplate, ValidateItem
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, TableNameMissing
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
from dlt.extract.validation import get_column_validator
Expand All @@ -25,7 +25,7 @@ class TTableSchemaTemplate(TypedDict, total=False):
primary_key: TTableHintTemplate[TColumnNames]
merge_key: TTableHintTemplate[TColumnNames]
incremental: Incremental[Any]
validator: ColumnValidator
validator: ValidateItem


class DltResourceSchema:
Expand Down
12 changes: 6 additions & 6 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dlt.common.pipeline import PipelineContext, StateInjectableContext, SupportsPipelineRun, resource_state, source_state, pipeline_state
from dlt.common.utils import graph_find_scc_nodes, flatten_list_or_items, get_callable_name, graph_edges_to_nodes, multi_context_manager, uniq_id

from dlt.extract.typing import DataItemWithMeta, ItemTransformFunc, ItemTransformFunctionWithMeta, TDecompositionStrategy, TableNameMeta, FilterItem, MapItem, YieldMapItem, ColumnValidator
from dlt.extract.typing import DataItemWithMeta, ItemTransformFunc, ItemTransformFunctionWithMeta, TDecompositionStrategy, TableNameMeta, FilterItem, MapItem, YieldMapItem, ValidateItem
from dlt.extract.pipe import Pipe, ManagedPipeIterator, TPipeStep
from dlt.extract.schema import DltResourceSchema, TTableSchemaTemplate
from dlt.extract.incremental import Incremental, IncrementalResourceWrapper
Expand Down Expand Up @@ -136,18 +136,18 @@ def incremental(self) -> IncrementalResourceWrapper:
return incremental

@property
def validator(self) -> Optional[ColumnValidator]:
def validator(self) -> Optional[ValidateItem]:
"""Gets validator transform if it is in the pipe"""
validator: ColumnValidator = None
step_no = self._pipe.find(ColumnValidator)
validator: ValidateItem = None
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
validator = self._pipe.steps[step_no] # type: ignore[assignment]
return validator

@validator.setter
def validator(self, validator: Optional[ColumnValidator]) -> None:
def validator(self, validator: Optional[ValidateItem]) -> None:
"""Add/remove or replace the validator in pipe"""
step_no = self._pipe.find(ColumnValidator)
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
self._pipe.remove_step(step_no)
if validator:
Expand Down
5 changes: 1 addition & 4 deletions dlt/extract/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,9 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
yield from self._f(item)


class ColumnValidator(ABC):
class ValidateItem(ItemTransform[TDataItem]):
"""Base class for validators of data items.
Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`.
See `PydanticValidator` for possible implementation.
"""
@abstractmethod
def __call__(self, item: TDataItem, meta: Any = None) -> TDataItem:
...
6 changes: 3 additions & 3 deletions dlt/extract/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from dlt.extract.exceptions import ValidationError
from dlt.common.typing import TDataItems
from dlt.common.schema.typing import TAnySchemaColumns
from dlt.extract.typing import TTableHintTemplate, ColumnValidator
from dlt.extract.typing import TTableHintTemplate, ValidateItem


_TPydanticModel = TypeVar("_TPydanticModel", bound=PydanticBaseModel)


class PydanticValidator(ColumnValidator, Generic[_TPydanticModel]):
class PydanticValidator(ValidateItem, Generic[_TPydanticModel]):
model: Type[_TPydanticModel]
def __init__(self, model: Type[_TPydanticModel]) -> None:
self.model = model
Expand All @@ -37,7 +37,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Union[_TPydanticModel,
raise ValidationError(e) from e


def get_column_validator(columns: TTableHintTemplate[TAnySchemaColumns]) -> Optional[ColumnValidator]:
def get_column_validator(columns: TTableHintTemplate[TAnySchemaColumns]) -> Optional[ValidateItem]:
if PydanticBaseModel is not None and isinstance(columns, type) and issubclass(columns, PydanticBaseModel):
return PydanticValidator(columns)
return None
4 changes: 2 additions & 2 deletions tests/extract/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest
import dlt
from dlt.extract.typing import ColumnValidator
from dlt.extract.typing import ValidateItem
from dlt.common.typing import TDataItems
from dlt.extract.validation import PydanticValidator
from dlt.extract.exceptions import ValidationError, ResourceExtractionError
Expand Down Expand Up @@ -100,7 +100,7 @@ class AnotherModel(BaseModel):
steps = resource._pipe.steps
assert len(steps) == 2

assert isinstance(steps[-1], ColumnValidator)
assert isinstance(steps[-1], ValidateItem)
assert steps[-1].model is AnotherModel


Expand Down

0 comments on commit 9cf59ee

Please sign in to comment.