From fc013f591698fba41a6c3eda2e1c32997f68385b Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 17:35:54 +0100 Subject: [PATCH] move items transform steps into extra file --- dlt/extract/exceptions.py | 1 - dlt/extract/hints.py | 3 +- dlt/extract/incremental/__init__.py | 3 +- dlt/extract/items.py | 165 ------------------------- dlt/extract/items_transform.py | 179 ++++++++++++++++++++++++++++ dlt/extract/pipe.py | 2 +- dlt/extract/resource.py | 6 +- dlt/extract/validation.py | 3 +- dlt/sources/helpers/transform.py | 2 +- tests/extract/test_extract_pipe.py | 3 +- tests/extract/test_incremental.py | 4 +- tests/extract/test_validation.py | 2 +- tests/extract/utils.py | 2 +- 13 files changed, 197 insertions(+), 178 deletions(-) create mode 100644 dlt/extract/items_transform.py diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index f4d2b1f302..e832833428 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -3,7 +3,6 @@ from dlt.common.exceptions import DltException from dlt.common.utils import get_callable_name -from dlt.extract.items import ValidateItem, TDataItems class ExtractorException(DltException): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 000e5c4cdb..22a0062acf 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -37,7 +37,8 @@ InconsistentTableTemplate, ) from dlt.extract.incremental import Incremental, TIncrementalConfig -from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem +from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta +from dlt.extract.items_transform import ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 5e7bae49c6..ce06292864 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -44,7 +44,8 @@ IncrementalArgs, TIncrementalRange, ) -from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform +from dlt.extract.items import SupportsPipe, TTableHintTemplate +from dlt.extract.items_transform import ItemTransform from dlt.extract.incremental.transform import ( JsonIncremental, ArrowIncremental, diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 399f67a947..ad7447c163 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,23 +1,16 @@ -import inspect -import time - from abc import ABC, abstractmethod from typing import ( Any, Callable, - ClassVar, - Generic, Iterator, Iterable, Literal, Optional, Protocol, - TypeVar, Union, Awaitable, TYPE_CHECKING, NamedTuple, - Generator, ) from concurrent.futures import Future @@ -30,7 +23,6 @@ TDynHintType, ) - TDecompositionStrategy = Literal["none", "scc"] TDeferredDataItems = Callable[[], TDataItems] TAwaitableDataItems = Awaitable[TDataItems] @@ -135,160 +127,3 @@ def has_parent(self) -> bool: def close(self) -> None: """Closes pipe generator""" ... - - -ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] -ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] -ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] - - -class ItemTransform(ABC, Generic[TAny]): - _f_meta: ItemTransformFunctionWithMeta[TAny] = None - _f: ItemTransformFunctionNoMeta[TAny] = None - - placement_affinity: ClassVar[float] = 0 - """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" - - def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: - # inspect the signature - sig = inspect.signature(transform_f) - # TODO: use TypeGuard here to get rid of type ignore - if len(sig.parameters) == 1: - self._f = transform_f # type: ignore - else: # TODO: do better check - self._f_meta = transform_f # type: ignore - - def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": - return self - - @abstractmethod - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" - pass - - -class FilterItem(ItemTransform[bool]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[bool] - _f: ItemTransformFunctionNoMeta[bool] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - # preserve empty lists - if len(item) == 0: - return item - - if self._f_meta: - item = [i for i in item if self._f_meta(i, meta)] - else: - item = [i for i in item if self._f(i)] - if not item: - # item was fully consumed by the filter - return None - return item - else: - if self._f_meta: - return item if self._f_meta(item, meta) else None - else: - return item if self._f(item) else None - - -class MapItem(ItemTransform[TDataItem]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - if self._f_meta: - return [self._f_meta(i, meta) for i in item] - else: - return [self._f(i) for i in item] - else: - if self._f_meta: - return self._f_meta(item, meta) - else: - return self._f(item) - - -class YieldMapItem(ItemTransform[Iterator[TDataItem]]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - for i in item: - if self._f_meta: - yield from self._f_meta(i, meta) - else: - yield from self._f(i) - else: - if self._f_meta: - yield from self._f_meta(item, meta) - else: - yield from self._f(item) - - -class ValidateItem(ItemTransform[TDataItem]): - """Base class for validators of data items. - - Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. - See `PydanticValidator` for possible implementation. - """ - - placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental - - table_name: str - - def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: - self.table_name = pipe.name - return self - - -class LimitItem(ItemTransform[TDataItem]): - placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - - def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: - self.max_items = max_items if max_items is not None else -1 - self.max_time = max_time - - def bind(self, pipe: SupportsPipe) -> "LimitItem": - # we also wrap iterators to make them stoppable - from dlt.extract.utils import ( - wrap_iterator, - ) - - if isinstance(pipe.gen, Iterator): - pipe.replace_gen(wrap_iterator(pipe.gen)) - - self.gen = pipe.gen - self.count = 0 - self.exhausted = False - self.start_time = time.time() - - return self - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - self.count += 1 - - # detect when the limit is reached, max time or yield count - if ( - (self.count == self.max_items) - or (self.max_time and time.time() - self.start_time > self.max_time) - or self.max_items == 0 - ): - self.exhausted = True - if inspect.isgenerator(self.gen): - self.gen.close() - - # if max items is not 0, we return the last item - # otherwise never return anything - if self.max_items != 0: - return item - - # do not return any late arriving items - if self.exhausted: - return None - - return item diff --git a/dlt/extract/items_transform.py b/dlt/extract/items_transform.py new file mode 100644 index 0000000000..12375640bc --- /dev/null +++ b/dlt/extract/items_transform.py @@ -0,0 +1,179 @@ +import inspect +import time + +from abc import ABC, abstractmethod +from typing import ( + Any, + Callable, + ClassVar, + Generic, + Iterator, + Optional, + Union, +) +from concurrent.futures import Future + +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, +) + +from dlt.extract.utils import ( + wrap_iterator, +) + +from dlt.extract.items import SupportsPipe + + +ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] +ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] +ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] + + +class ItemTransform(ABC, Generic[TAny]): + _f_meta: ItemTransformFunctionWithMeta[TAny] = None + _f: ItemTransformFunctionNoMeta[TAny] = None + + placement_affinity: ClassVar[float] = 0 + """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" + + def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: + # inspect the signature + sig = inspect.signature(transform_f) + # TODO: use TypeGuard here to get rid of type ignore + if len(sig.parameters) == 1: + self._f = transform_f # type: ignore + else: # TODO: do better check + self._f_meta = transform_f # type: ignore + + def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": + return self + + @abstractmethod + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" + pass + + +class FilterItem(ItemTransform[bool]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[bool] + _f: ItemTransformFunctionNoMeta[bool] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + # preserve empty lists + if len(item) == 0: + return item + + if self._f_meta: + item = [i for i in item if self._f_meta(i, meta)] + else: + item = [i for i in item if self._f(i)] + if not item: + # item was fully consumed by the filter + return None + return item + else: + if self._f_meta: + return item if self._f_meta(item, meta) else None + else: + return item if self._f(item) else None + + +class MapItem(ItemTransform[TDataItem]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + if self._f_meta: + return [self._f_meta(i, meta) for i in item] + else: + return [self._f(i) for i in item] + else: + if self._f_meta: + return self._f_meta(item, meta) + else: + return self._f(item) + + +class YieldMapItem(ItemTransform[Iterator[TDataItem]]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + for i in item: + if self._f_meta: + yield from self._f_meta(i, meta) + else: + yield from self._f(i) + else: + if self._f_meta: + yield from self._f_meta(item, meta) + else: + yield from self._f(item) + + +class ValidateItem(ItemTransform[TDataItem]): + """Base class for validators of data items. + + Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. + See `PydanticValidator` for possible implementation. + """ + + placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental + + table_name: str + + def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: + self.table_name = pipe.name + return self + + +class LimitItem(ItemTransform[TDataItem]): + placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental + + def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: + self.max_items = max_items if max_items is not None else -1 + self.max_time = max_time + + def bind(self, pipe: SupportsPipe) -> "LimitItem": + # we also wrap iterators to make them stoppable + if isinstance(pipe.gen, Iterator): + pipe.replace_gen(wrap_iterator(pipe.gen)) + + self.gen = pipe.gen + self.count = 0 + self.exhausted = False + self.start_time = time.time() + + return self + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + self.count += 1 + + # detect when the limit is reached, max time or yield count + if ( + (self.count == self.max_items) + or (self.max_time and time.time() - self.start_time > self.max_time) + or self.max_items == 0 + ): + self.exhausted = True + if inspect.isgenerator(self.gen): + self.gen.close() + + # if max items is not 0, we return the last item + # otherwise never return anything + if self.max_items != 0: + return item + + # do not return any late arriving items + if self.exhausted: + return None + + return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index b95c9a6c40..e70365b4f4 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -27,12 +27,12 @@ UnclosablePipe, ) from dlt.extract.items import ( - ItemTransform, ResolvablePipeItem, SupportsPipe, TPipeStep, TPipedDataItems, ) +from dlt.extract.items_transform import ItemTransform from dlt.extract.utils import ( check_compat_transformer, simulate_func_call, diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 2f4f66a96e..366e6e1a88 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -34,14 +34,16 @@ from dlt.extract.items import ( DataItemWithMeta, - ItemTransformFunc, - ItemTransformFunctionWithMeta, TableNameMeta, +) +from dlt.extract.items_transform import ( FilterItem, MapItem, YieldMapItem, ValidateItem, LimitItem, + ItemTransformFunc, + ItemTransformFunctionWithMeta, ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep diff --git a/dlt/extract/validation.py b/dlt/extract/validation.py index 4cd321b88c..d9fe70a90b 100644 --- a/dlt/extract/validation.py +++ b/dlt/extract/validation.py @@ -8,7 +8,8 @@ from dlt.common.typing import TDataItems from dlt.common.schema.typing import TAnySchemaColumns, TSchemaContract, TSchemaEvolutionMode -from dlt.extract.items import TTableHintTemplate, ValidateItem +from dlt.extract.items import TTableHintTemplate +from dlt.extract.items_transform import ValidateItem _TPydanticModel = TypeVar("_TPydanticModel", bound=PydanticBaseModel) diff --git a/dlt/sources/helpers/transform.py b/dlt/sources/helpers/transform.py index 32843e2aa2..45738fe4fb 100644 --- a/dlt/sources/helpers/transform.py +++ b/dlt/sources/helpers/transform.py @@ -2,7 +2,7 @@ from typing import Any, Dict, Sequence, Union from dlt.common.typing import TDataItem -from dlt.extract.items import ItemTransformFunctionNoMeta +from dlt.extract.items_transform import ItemTransformFunctionNoMeta import jsonpath_ng diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index d40639a594..659888269a 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -10,7 +10,8 @@ from dlt.common import sleep from dlt.common.typing import TDataItems from dlt.extract.exceptions import CreatePipeException, ResourceExtractionError, UnclosablePipe -from dlt.extract.items import DataItemWithMeta, FilterItem, MapItem, YieldMapItem +from dlt.extract.items import DataItemWithMeta +from dlt.extract.items_transform import FilterItem, MapItem, YieldMapItem from dlt.extract.pipe import Pipe from dlt.extract.pipe_iterator import PipeIterator, ManagedPipeIterator, PipeItem diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 205df41a81..9ad7d28e88 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -36,7 +36,7 @@ IncrementalPrimaryKeyMissing, ) from dlt.extract.incremental.lag import apply_lag -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.resource import DltResource from dlt.pipeline.exceptions import PipelineStepFailed from dlt.sources.helpers.transform import take_first @@ -3989,7 +3989,7 @@ def resource( resource.add_limit(10) - p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) + p = dlt.pipeline(pipeline_name="incremental_limit", destination="duckdb", dev_mode=True) p.run(resource()) diff --git a/tests/extract/test_validation.py b/tests/extract/test_validation.py index 138589bb06..3800f333f6 100644 --- a/tests/extract/test_validation.py +++ b/tests/extract/test_validation.py @@ -10,7 +10,7 @@ from dlt.common.libs.pydantic import BaseModel from dlt.extract import DltResource -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.validation import PydanticValidator from dlt.extract.exceptions import ResourceExtractionError from dlt.pipeline.exceptions import PipelineStepFailed diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 7364ef7243..f1de3de093 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -6,7 +6,7 @@ from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractStorage -from dlt.extract.items import ItemTransform +from dlt.extract.items_transform import ItemTransform from tests.utils import TestDataItemFormat