diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index bd87ceda7f..b2f6702f1d 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -114,5 +114,5 @@ class TStoredSchema(TypedDict, total=False): class ColumnValidator(ABC): @abstractmethod - def __call__(self, item: TDataItem) -> TDataItem: + def __call__(self, item: TDataItem, meta: Any = None) -> TDataItem: ... diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index 79329f2107..71aaffdeb2 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -259,3 +259,9 @@ def __init__(self, source_name: str, schema_name: str) -> None: class IncrementalUnboundError(DltResourceException): def __init__(self, cursor_path: str) -> None: super().__init__("", f"The incremental definition with cursor path {cursor_path} is used without being bound to the resource. This most often happens when you create dynamic resource from a generator function that uses incremental. See https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-last-value for an example.") + + +class ValidationError(ValueError, DltException): + def __init__(self, original_exception: Exception) ->None: + self.original_exception = original_exception + super().__init__(f"Schema validation failed: {original_exception}") diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 703c683be2..f817aeb13e 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -151,7 +151,7 @@ def validator(self, validator: Optional[ColumnValidator]) -> None: if step_no >= 0: self._pipe.remove_step(step_no) if validator: - self.add_map(validator, insert_at=step_no if step_no >= 0 else None) + self.add_step(validator, insert_at=step_no if step_no >= 0 else None) def pipe_data_from(self, data_from: Union["DltResource", Pipe]) -> None: """Replaces the parent in the transformer resource pipe from which the data is piped.""" diff --git a/dlt/extract/validation.py b/dlt/extract/validation.py index a7f7feb98c..962e6784f0 100644 --- a/dlt/extract/validation.py +++ b/dlt/extract/validation.py @@ -1,11 +1,11 @@ -from typing import Optional, Protocol, TypeVar, Generic, Type, Union +from typing import Optional, Protocol, TypeVar, Generic, Type, Union, Any try: - from pydantic import BaseModel as PydanticBaseModel + from pydantic import BaseModel as PydanticBaseModel, ValidationError as PydanticValidationError except ModuleNotFoundError: PydanticBaseModel = None # type: ignore[misc] -from dlt.common.exceptions import MissingDependencyException +from dlt.extract.exceptions import ValidationError from dlt.common.typing import TDataItem from dlt.common.schema.typing import TAnySchemaColumns, ColumnValidator from dlt.extract.typing import TTableHintTemplate @@ -19,11 +19,14 @@ class PydanticValidator(ColumnValidator, Generic[_TPydanticModel]): def __init__(self, model: Type[_TPydanticModel]) -> None: self.model = model - def __call__(self, item: TDataItem) -> _TPydanticModel: + def __call__(self, item: TDataItem, meta: Any = None) -> _TPydanticModel: """Validate a data item agains the pydantic model""" if item is None: return None - return self.model.parse_obj(item) + try: + return self.model.parse_obj(item) + except PydanticValidationError as e: + raise ValidationError(e) from e def get_column_validator(columns: TTableHintTemplate[TAnySchemaColumns]) -> Optional[ColumnValidator]: