Skip to content

Commit

Permalink
Parallelize resource method, handle transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Feb 14, 2024
1 parent c482a6e commit b9f0ddf
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 69 deletions.
2 changes: 2 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Union,
runtime_checkable,
IO,
Iterator,
)
from typing_extensions import TypeAlias, ParamSpec, Concatenate, Annotated, get_args, get_origin

Expand All @@ -50,6 +51,7 @@
AnyFun: TypeAlias = Callable[..., Any]
TFun = TypeVar("TFun", bound=AnyFun) # any function
TAny = TypeVar("TAny", bound=Any)
TAnyFunOrIterator = TypeVar("TAnyFunOrIterator", AnyFun, Iterator[Any])
TAnyClass = TypeVar("TAnyClass", bound=object)
TimedeltaSeconds = Union[int, float, timedelta]
# represent secret value ie. coming from Kubernetes/Docker secrets or other providers
Expand Down
87 changes: 24 additions & 63 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ class SourceSchemaInjectableContext(ContainerInjectableContext):

if TYPE_CHECKING:

def __init__(self, schema: Schema = None) -> None:
...
def __init__(self, schema: Schema = None) -> None: ...


@configspec
Expand All @@ -91,8 +90,7 @@ class SourceInjectableContext(ContainerInjectableContext):

if TYPE_CHECKING:

def __init__(self, source: DltSource = None) -> None:
...
def __init__(self, source: DltSource = None) -> None: ...


TSourceFunParams = ParamSpec("TSourceFunParams")
Expand All @@ -112,8 +110,7 @@ def source(
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource, # type: ignore[assignment]
) -> Callable[TSourceFunParams, DltSource]:
...
) -> Callable[TSourceFunParams, DltSource]: ...


@overload
Expand All @@ -128,8 +125,7 @@ def source(
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource, # type: ignore[assignment]
) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, TDltSourceImpl]]:
...
) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, TDltSourceImpl]]: ...


def source(
Expand Down Expand Up @@ -278,8 +274,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource:
...
) -> DltResource: ...


@overload
Expand All @@ -297,8 +292,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]:
...
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ...


@overload
Expand All @@ -317,8 +311,7 @@ def resource(
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]:
...
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: ...


@overload
Expand All @@ -336,8 +329,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource:
...
) -> DltResource: ...


def resource(
Expand Down Expand Up @@ -434,7 +426,7 @@ def make_resource(
schema_contract=schema_contract,
table_format=table_format,
)
return DltResource.from_data(
resource = DltResource.from_data(
_data,
_name,
_section,
Expand All @@ -443,6 +435,9 @@ def make_resource(
cast(DltResource, data_from),
incremental=incremental,
)
if parallelized:
return resource.parallelize()
return resource

def decorator(
f: Callable[TResourceFunParams, Any]
Expand All @@ -457,8 +452,8 @@ def decorator(
if not standalone and callable(name):
raise DynamicNameNotStandaloneResource(get_callable_name(f))

if parallelized:
f = parallelize(f)
# if parallelized:
# f = parallelize(f)

# resource_section = name if name and not callable(name) else get_callable_name(f)
resource_name = name if name and not callable(name) else get_callable_name(f)
Expand Down Expand Up @@ -555,8 +550,8 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]:
...
parallelized: bool = False,
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]: ...


@overload
Expand All @@ -573,11 +568,11 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True,
parallelized: bool = False,
) -> Callable[
[Callable[Concatenate[TDataItem, TResourceFunParams], Any]],
Callable[TResourceFunParams, DltResource],
]:
...
]: ...


@overload
Expand All @@ -593,8 +588,8 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
) -> DltResource:
...
parallelized: bool = False,
) -> DltResource: ...


@overload
Expand All @@ -611,8 +606,8 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True,
) -> Callable[TResourceFunParams, DltResource]:
...
parallelized: bool = False,
) -> Callable[TResourceFunParams, DltResource]: ...


def transformer(
Expand All @@ -628,6 +623,7 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: bool = False,
parallelized: bool = False,
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
Expand Down Expand Up @@ -701,6 +697,7 @@ def transformer(
spec=spec,
standalone=standalone,
data_from=data_from,
parallelized=parallelized,
)


Expand Down Expand Up @@ -758,39 +755,3 @@ def _curry() -> TBoundItems:
return _curry

return _wrap


def parallelize(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunParams, Any]:
@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any: # TODO: Type correctly
gen = f(*args, **kwargs)
if inspect.isfunction(gen):
gen = gen()
if inspect.isasyncgen(gen):
raise ValueError("Async generators are not supported with paralellize")

exhausted = False
busy = False

def _parallel_gen() -> Any: # TODO: Type correctly
try:
return next(gen)
except StopIteration:
nonlocal exhausted
exhausted = True
return None
finally:
nonlocal busy
busy = False

while not exhausted:
try:
while busy:
yield None
busy = True
yield _parallel_gen
except GeneratorExit:
gen.close()
raise

return _wrap
7 changes: 3 additions & 4 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ def __init__(self, resource_name: str, item: Any, _typ: Type[Any], msg: str) ->
)


class InvalidResourceDataTypeAsync(InvalidResourceDataType):
class InvalidParallelResourceDataType(InvalidResourceDataType):
def __init__(self, resource_name: str, item: Any, _typ: Type[Any]) -> None:
super().__init__(
resource_name,
item,
_typ,
"Async iterators and generators are not valid resources. Please use standard iterators"
" and generators that yield Awaitables instead (for example by yielding from async"
" function without await",
"Parallel resource data must be a generator or a generator function. The provided"
f" data type for resource '{resource_name}' was {_typ.__name__}.",
)


Expand Down
16 changes: 15 additions & 1 deletion dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
pipeline_state,
)
from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id
from dlt.extract.utils import wrap_async_iterator
from dlt.extract.utils import wrap_async_iterator, wrap_parallel_iterator

from dlt.extract.typing import (
DataItemWithMeta,
Expand All @@ -49,6 +49,7 @@
InvalidTransformerGeneratorFunction,
InvalidResourceDataTypeBasic,
InvalidResourceDataTypeMultiplePipes,
InvalidParallelResourceDataType,
ParametrizedResourceUnbound,
ResourceNameMissing,
ResourceNotATransformer,
Expand Down Expand Up @@ -342,6 +343,19 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:
self._pipe.replace_gen(_gen_wrap(self._pipe.gen))
return self

def parallelize(self) -> "DltResource":
"""Parallelizes the resource pipe"""

if (
not inspect.isgenerator(self._pipe.gen)
and not inspect.isgeneratorfunction(self._pipe.gen)
and not self.is_transformer
):
raise InvalidParallelResourceDataType(self.name, self._pipe.gen, type(self._pipe.gen))

self._pipe.replace_gen(wrap_parallel_iterator(self._pipe.gen))
return self

def add_step(
self, item_transform: ItemTransformFunctionWithMeta[TDataItems], insert_at: int = None
) -> "DltResource": # noqa: A003
Expand Down
5 changes: 5 additions & 0 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ def add_limit(self, max_items: int) -> "DltSource": # noqa: A003
resource.add_limit(max_items)
return self

def parallelize(self) -> "DltSource":
for resource in self.resources.selected.values():
resource.parallelize()
return self

@property
def run(self) -> SupportsPipelineRun:
"""A convenience method that will call `run` run on the currently active `dlt` pipeline. If pipeline instance is not found, one with default settings will be created."""
Expand Down
7 changes: 7 additions & 0 deletions dlt/extract/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Union,
Awaitable,
TYPE_CHECKING,
Generator,
)
from concurrent.futures import Future

Expand All @@ -28,6 +29,12 @@
TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]]


TGenOrGenFunction = Union[
Generator[TDataItems, Optional[Any], Optional[Any]],
Callable[..., Generator[TDataItems, Optional[Any], Optional[Any]]],
] # ]


class DataItemWithMeta:
__slots__ = "meta", "data"

Expand Down
45 changes: 44 additions & 1 deletion dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
AsyncGenerator,
Awaitable,
Generator,
Iterator,
)
from collections.abc import Mapping as C_Mapping
from functools import wraps

from dlt.common.exceptions import MissingDependencyException
from dlt.common.pipeline import reset_resource_state
Expand All @@ -26,7 +28,13 @@
InvalidResourceDataTypeFunctionNotAGenerator,
InvalidStepFunctionArguments,
)
from dlt.extract.typing import TTableHintTemplate, TDataItem, TFunHintTemplate, SupportsPipe
from dlt.extract.typing import (
TTableHintTemplate,
TDataItem,
TFunHintTemplate,
SupportsPipe,
TGenOrGenFunction,
)

try:
from dlt.common.libs import pydantic
Expand Down Expand Up @@ -171,6 +179,41 @@ async def run() -> TDataItems:
exhausted = True


def wrap_parallel_iterator(f: TGenOrGenFunction) -> TGenOrGenFunction:
"""Wraps a generator for parallel extraction"""

def _wrapper(*args: Any, **kwargs: Any) -> Generator[TDataItems, None, None]:
gen = f(*args, **kwargs) if callable(f) else f

exhausted = False
busy = False

def _parallel_gen() -> TDataItems:
nonlocal busy
try:
return next(gen)
except StopIteration:
nonlocal exhausted
exhausted = True
return None
finally:
busy = False

while not exhausted:
try:
while busy:
yield None
busy = True
yield _parallel_gen
except GeneratorExit:
# gen.close()
raise

if callable(f):
return wraps(f)(_wrapper)
return _wrapper()


def wrap_compat_transformer(
name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any
) -> AnyFun:
Expand Down
Loading

0 comments on commit b9f0ddf

Please sign in to comment.