Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize decorator #965

Merged
merged 28 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4a5370c
Refactoring of pipe thread pool and reduce polling
steinitzu Feb 9, 2024
b59a59e
Decorator
steinitzu Feb 13, 2024
0d51fdc
Tests for parallel
steinitzu Feb 13, 2024
0c73473
Include items module
steinitzu Feb 13, 2024
4fe6507
Names
steinitzu Feb 13, 2024
fbe4d7f
Separate modules for pipe and pipe_iterator
steinitzu Feb 14, 2024
deea305
Overload arg order
steinitzu Feb 14, 2024
9820a54
Keep parenthesis
steinitzu Feb 14, 2024
2cc0ced
Use assert for sanity check
steinitzu Feb 14, 2024
fba59f6
Parallelize resource method, handle transformers
steinitzu Feb 14, 2024
ea882de
Comments, small refactor
steinitzu Feb 15, 2024
6bcd43d
Handle non-iterator transformers
steinitzu Feb 16, 2024
84f3618
Rename WorkerPool -> FuturesPool, small cleanups
steinitzu Feb 19, 2024
c9c1824
Fix transformer, test bare generator
steinitzu Feb 20, 2024
6fbd859
Source parallelize skip invalid resources, docstring
steinitzu Feb 20, 2024
549d5fa
Handle wrapped generator function
steinitzu Feb 22, 2024
7a0d8ae
Don't test exec order, only check that multiple threads are used
steinitzu Feb 22, 2024
ec5a965
Poll futures with timeout, no check sources_count, test gen.close()
steinitzu Feb 22, 2024
38f7069
Always block when submitting futures, remove redundant submit future
steinitzu Feb 22, 2024
96dcfe3
Revert limit async gen
steinitzu Feb 23, 2024
b7acf93
Always check done futures
steinitzu Feb 23, 2024
0882987
Fix type error
steinitzu Feb 23, 2024
acb3b45
Typing
steinitzu Feb 23, 2024
b4278f9
Merge branch 'devel' into sthor/parallelize-decorator
rudolfix Feb 27, 2024
6af5c1a
adds additional sleep when futures pool is empty
rudolfix Feb 27, 2024
19e09cb
Update docs and snippets
steinitzu Feb 29, 2024
85b7f62
small docs fixes
rudolfix Feb 29, 2024
d376cb5
logs daemon signals message instead of printing
rudolfix Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Union,
runtime_checkable,
IO,
Iterator,
)

from typing_extensions import (
Expand Down Expand Up @@ -69,6 +70,7 @@
AnyFun: TypeAlias = Callable[..., Any]
TFun = TypeVar("TFun", bound=AnyFun) # any function
TAny = TypeVar("TAny", bound=Any)
TAnyFunOrIterator = TypeVar("TAnyFunOrIterator", AnyFun, Iterator[Any])
TAnyClass = TypeVar("TAnyClass", bound=object)
TimedeltaSeconds = Union[int, float, timedelta]
# represent secret value ie. coming from Kubernetes/Docker secrets or other providers
Expand Down
242 changes: 242 additions & 0 deletions dlt/extract/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import asyncio
from uuid import uuid4
from asyncio import Future
from contextlib import contextmanager
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
wait as wait_for_futures,
FIRST_COMPLETED,
)
from threading import Thread
from typing import List, Awaitable, Callable, Any, Dict, Set, Optional, overload, Literal

from dlt.common.exceptions import PipelineException
from dlt.common.configuration.container import Container
from dlt.common.runtime.signals import sleep
from dlt.extract.typing import DataItemWithMeta, TItemFuture
from dlt.extract.items import ResolvablePipeItem, FuturePipeItem

from dlt.extract.exceptions import (
DltSourceException,
ExtractorException,
PipeException,
ResourceExtractionError,
)


class FuturesPool:
"""Worker pool for pipe items that can be resolved asynchronously.

Items can be either asyncio coroutines or regular callables which will be executed in a thread pool.
"""

def __init__(
self, workers: int = 5, poll_interval: float = 0.01, max_parallel_items: int = 20
) -> None:
self.futures: Dict[TItemFuture, FuturePipeItem] = {}
self._thread_pool: ThreadPoolExecutor = None
self._async_pool: asyncio.AbstractEventLoop = None
self._async_pool_thread: Thread = None
self.workers = workers
self.poll_interval = poll_interval
self.max_parallel_items = max_parallel_items
self.used_slots: int = 0

def __len__(self) -> int:
return len(self.futures)

@property
def free_slots(self) -> int:
# Done futures don't count as slots, so we can still add futures
return self.max_parallel_items - self.used_slots

@property
def empty(self) -> bool:
return len(self.futures) == 0

def _ensure_thread_pool(self) -> ThreadPoolExecutor:
# lazily start or return thread pool
if self._thread_pool:
return self._thread_pool

self._thread_pool = ThreadPoolExecutor(
self.workers, thread_name_prefix=Container.thread_pool_prefix() + "threads"
)
return self._thread_pool

def _ensure_async_pool(self) -> asyncio.AbstractEventLoop:
# lazily create async pool is separate thread
if self._async_pool:
return self._async_pool

def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()

self._async_pool = asyncio.new_event_loop()
self._async_pool_thread = Thread(
target=start_background_loop,
args=(self._async_pool,),
daemon=True,
name=Container.thread_pool_prefix() + "futures",
)
self._async_pool_thread.start()

# start or return async pool
return self._async_pool

def _vacate_slot(self, _: TItemFuture) -> None:
# Used as callback to free up slot when future is done
self.used_slots -= 1

@overload
def submit(
self, pipe_item: ResolvablePipeItem, block: Literal[False]
) -> Optional[TItemFuture]: ...

@overload
def submit(self, pipe_item: ResolvablePipeItem, block: Literal[True]) -> TItemFuture: ...

def submit(self, pipe_item: ResolvablePipeItem, block: bool = False) -> Optional[TItemFuture]:
"""Submit an item to the pool.

Args:
pipe_item: The item to submit.
block: If True, block until there's a free slot in the pool.

Returns:
The future if the item was successfully submitted, otherwise None.
"""

# Sanity check, negative free slots means there's a bug somewhere
assert self.free_slots >= 0, "Worker pool has negative free slots, this should never happen"

if self.free_slots == 0:
if block:
# Wait until some future is completed to ensure there's a free slot
# Note: This is probably not thread safe. If ever multiple threads will be submitting
# jobs to the pool, we ned to change this whole method to be inside a `threading.Lock`
self._wait_for_free_slot()
else:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is dangerous. now you call submit without checking this. IMO we do not need a submit without waiting. or raise exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done this, submit always blocks


future: Optional[TItemFuture] = None

# submit to thread pool or async pool
item = pipe_item.item
if isinstance(item, Awaitable):
future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool())
elif callable(item):
future = self._ensure_thread_pool().submit(item)
else:
raise ValueError(f"Unsupported item type: {type(item)}")

# Future is not removed from self.futures until it's been consumed by the
# pipe iterator. But we always want to vacate a slot so new jobs can be submitted
future.add_done_callback(self._vacate_slot)
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
self.used_slots += 1

self.futures[future] = FuturePipeItem(
future, pipe_item.step, pipe_item.pipe, pipe_item.meta
)
return future

def poll(self) -> None:
sleep(self.poll_interval)

def _resolve_future(self, future: TItemFuture) -> Optional[ResolvablePipeItem]:
future, step, pipe, meta = self.futures.pop(future)

if ex := future.exception():
if isinstance(ex, StopAsyncIteration):
return None
# Raise if any future fails
if isinstance(
ex, (PipelineException, ExtractorException, DltSourceException, PipeException)
):
raise ex
raise ResourceExtractionError(pipe.name, future, str(ex), "future") from ex

item = future.result()

if item is None:
return None
elif isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)

def _next_done_future(self) -> Optional[TItemFuture]:
"""Get the done future in the pool (if any). This does not block."""
return next((fut for fut in self.futures if fut.done() and not fut.cancelled()), None)

def resolve_next_future(self) -> Optional[ResolvablePipeItem]:
"""Block until the next future is done and return the result. Returns None if no futures done."""
if not self.futures:
return None

if (future := self._next_done_future()) is not None:
# When there are multiple already done futures from the same pipe we return results in insertion order
return self._resolve_future(future)
for future in as_completed(self.futures):
if future.cancelled():
# Get the next not-cancelled future
continue

return self._resolve_future(future)

return None

def resolve_next_future_no_wait(self) -> Optional[ResolvablePipeItem]:
"""Resolve the first done future in the pool.
This does not block and returns None if no future is done.
"""
# Get next done future
future = next((fut for fut in self.futures if fut.done() and not fut.cancelled()), None)
steinitzu marked this conversation as resolved.
Show resolved Hide resolved
if not future:
return None

return self._resolve_future(future)

def _wait_for_free_slot(self) -> None:
"""Wait until any future in the pool is completed to ensure there's a free slot."""
if self.free_slots >= 1:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
return

for future in as_completed(self.futures):
if future.cancelled():
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# Get the next not-cancelled future
continue
if self.free_slots == 0:
# Future was already completed so slot was not freed
continue
return # Return when first future completes

def close(self) -> None:
# Cancel all futures
for f in self.futures:
if not f.done():
f.cancel()

def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None:
loop.stop()

if self._async_pool:
# wait for all async generators to be closed
future = asyncio.run_coroutine_threadsafe(
self._async_pool.shutdown_asyncgens(), self._ensure_async_pool()
)

wait_for_futures([future])
self._async_pool.call_soon_threadsafe(stop_background_loop, self._async_pool)

self._async_pool_thread.join()
self._async_pool = None
self._async_pool_thread = None

if self._thread_pool:
self._thread_pool.shutdown(wait=True)
self._thread_pool = None

self.futures.clear()
18 changes: 17 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -290,6 +291,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ...


Expand All @@ -307,6 +309,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: ...

Expand All @@ -325,6 +328,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -341,6 +345,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: bool = False,
data_from: TUnboundDltResource = None,
) -> Any:
Expand Down Expand Up @@ -399,6 +404,8 @@ def resource(

data_from (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.

parallelized (bool, optional): If `True`, the resource generator will be extracted in parallel with other resources. Defaults to `False`.

Raises:
ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed.
InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource`
Expand All @@ -419,7 +426,7 @@ def make_resource(
schema_contract=schema_contract,
table_format=table_format,
)
return DltResource.from_data(
resource = DltResource.from_data(
_data,
_name,
_section,
Expand All @@ -428,6 +435,9 @@ def make_resource(
cast(DltResource, data_from),
incremental=incremental,
)
if parallelized:
return resource.parallelize()
return resource

def decorator(
f: Callable[TResourceFunParams, Any]
Expand Down Expand Up @@ -537,6 +547,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]: ...


Expand All @@ -553,6 +564,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[
[Callable[Concatenate[TDataItem, TResourceFunParams], Any]],
Expand All @@ -573,6 +585,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -589,6 +602,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[TResourceFunParams, DltResource]: ...

Expand All @@ -605,6 +619,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: bool = False,
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
Expand Down Expand Up @@ -679,6 +694,7 @@ def transformer(
spec=spec,
standalone=standalone,
data_from=data_from,
parallelized=parallelized,
)


Expand Down
7 changes: 3 additions & 4 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ def __init__(self, resource_name: str, item: Any, _typ: Type[Any], msg: str) ->
)


class InvalidResourceDataTypeAsync(InvalidResourceDataType):
class InvalidParallelResourceDataType(InvalidResourceDataType):
def __init__(self, resource_name: str, item: Any, _typ: Type[Any]) -> None:
super().__init__(
resource_name,
item,
_typ,
"Async iterators and generators are not valid resources. Please use standard iterators"
" and generators that yield Awaitables instead (for example by yielding from async"
" function without await",
"Parallel resource data must be a generator or a generator function. The provided"
f" data type for resource '{resource_name}' was {_typ.__name__}.",
)


Expand Down
Loading
Loading