Skip to content

Commit

Permalink
move items transform steps into extra file
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 16, 2024
1 parent 9d38f56 commit fc013f5
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 178 deletions.
1 change: 0 additions & 1 deletion dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from dlt.common.exceptions import DltException
from dlt.common.utils import get_callable_name
from dlt.extract.items import ValidateItem, TDataItems


class ExtractorException(DltException):
Expand Down
3 changes: 2 additions & 1 deletion dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
InconsistentTableTemplate,
)
from dlt.extract.incremental import Incremental, TIncrementalConfig
from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem
from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta
from dlt.extract.items_transform import ValidateItem
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
from dlt.extract.validation import create_item_validator

Expand Down
3 changes: 2 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
IncrementalArgs,
TIncrementalRange,
)
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
from dlt.extract.items import SupportsPipe, TTableHintTemplate
from dlt.extract.items_transform import ItemTransform
from dlt.extract.incremental.transform import (
JsonIncremental,
ArrowIncremental,
Expand Down
165 changes: 0 additions & 165 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
import inspect
import time

from abc import ABC, abstractmethod
from typing import (
Any,
Callable,
ClassVar,
Generic,
Iterator,
Iterable,
Literal,
Optional,
Protocol,
TypeVar,
Union,
Awaitable,
TYPE_CHECKING,
NamedTuple,
Generator,
)
from concurrent.futures import Future

Expand All @@ -30,7 +23,6 @@
TDynHintType,
)


TDecompositionStrategy = Literal["none", "scc"]
TDeferredDataItems = Callable[[], TDataItems]
TAwaitableDataItems = Awaitable[TDataItems]
Expand Down Expand Up @@ -135,160 +127,3 @@ def has_parent(self) -> bool:
def close(self) -> None:
"""Closes pipe generator"""
...


ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny]
ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny]
ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]]


class ItemTransform(ABC, Generic[TAny]):
_f_meta: ItemTransformFunctionWithMeta[TAny] = None
_f: ItemTransformFunctionNoMeta[TAny] = None

placement_affinity: ClassVar[float] = 0
"""Tell how strongly an item sticks to start (-1) or end (+1) of pipe."""

def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None:
# inspect the signature
sig = inspect.signature(transform_f)
# TODO: use TypeGuard here to get rid of type ignore
if len(sig.parameters) == 1:
self._f = transform_f # type: ignore
else: # TODO: do better check
self._f_meta = transform_f # type: ignore

def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]":
return self

@abstractmethod
def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
"""Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)"""
pass


class FilterItem(ItemTransform[bool]):
# mypy needs those to type correctly
_f_meta: ItemTransformFunctionWithMeta[bool]
_f: ItemTransformFunctionNoMeta[bool]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if isinstance(item, list):
# preserve empty lists
if len(item) == 0:
return item

if self._f_meta:
item = [i for i in item if self._f_meta(i, meta)]
else:
item = [i for i in item if self._f(i)]
if not item:
# item was fully consumed by the filter
return None
return item
else:
if self._f_meta:
return item if self._f_meta(item, meta) else None
else:
return item if self._f(item) else None


class MapItem(ItemTransform[TDataItem]):
# mypy needs those to type correctly
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
_f: ItemTransformFunctionNoMeta[TDataItem]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if isinstance(item, list):
if self._f_meta:
return [self._f_meta(i, meta) for i in item]
else:
return [self._f(i) for i in item]
else:
if self._f_meta:
return self._f_meta(item, meta)
else:
return self._f(item)


class YieldMapItem(ItemTransform[Iterator[TDataItem]]):
# mypy needs those to type correctly
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
_f: ItemTransformFunctionNoMeta[TDataItem]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if isinstance(item, list):
for i in item:
if self._f_meta:
yield from self._f_meta(i, meta)
else:
yield from self._f(i)
else:
if self._f_meta:
yield from self._f_meta(item, meta)
else:
yield from self._f(item)


class ValidateItem(ItemTransform[TDataItem]):
"""Base class for validators of data items.
Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`.
See `PydanticValidator` for possible implementation.
"""

placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental

table_name: str

def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]:
self.table_name = pipe.name
return self


class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental

def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None:
self.max_items = max_items if max_items is not None else -1
self.max_time = max_time

def bind(self, pipe: SupportsPipe) -> "LimitItem":
# we also wrap iterators to make them stoppable
from dlt.extract.utils import (
wrap_iterator,
)

if isinstance(pipe.gen, Iterator):
pipe.replace_gen(wrap_iterator(pipe.gen))

self.gen = pipe.gen
self.count = 0
self.exhausted = False
self.start_time = time.time()

return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
self.count += 1

# detect when the limit is reached, max time or yield count
if (
(self.count == self.max_items)
or (self.max_time and time.time() - self.start_time > self.max_time)
or self.max_items == 0
):
self.exhausted = True
if inspect.isgenerator(self.gen):
self.gen.close()

# if max items is not 0, we return the last item
# otherwise never return anything
if self.max_items != 0:
return item

# do not return any late arriving items
if self.exhausted:
return None

return item
Loading

0 comments on commit fc013f5

Please sign in to comment.