Skip to content

Commit

Permalink
Parallelize decorator (#965)
Browse files Browse the repository at this point in the history
* Refactoring of pipe thread pool and reduce polling

* Decorator

* Tests for parallel

* Include items module

* Separate modules for pipe and pipe_iterator

* Overload arg order

* Keep parenthesis

* Use assert for sanity check

* Parallelize resource method, handle transformers

* Handle non-iterator transformers

* Rename WorkerPool -> FuturesPool, small cleanups

* Fix transformer, test bare generator

* Source parallelize skip invalid resources, docstring

* Handle wrapped generator function

* Don't test exec order, only check that multiple threads are used

* Poll futures with timeout, no check sources_count, test gen.close()

* Always block when submitting futures, remove redundant submit future

* adds additional sleep when futures pool is empty

* Update docs and snippets

* logs daemon signals message instead of printing

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
steinitzu and rudolfix authored Mar 1, 2024
1 parent b622e17 commit 4a3d6ab
Show file tree
Hide file tree
Showing 26 changed files with 1,330 additions and 736 deletions.
6 changes: 5 additions & 1 deletion dlt/common/runtime/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,9 @@ def delayed_signals() -> Iterator[None]:
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
else:
print("Running in daemon thread, signals not enabled")
if not TYPE_CHECKING:
from dlt.common.runtime import logger
else:
logger: Any = None
logger.info("Running in daemon thread, signals not enabled")
yield
5 changes: 5 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
Union,
runtime_checkable,
IO,
Iterator,
Generator,
)

from typing_extensions import (
Expand Down Expand Up @@ -69,6 +71,9 @@
AnyFun: TypeAlias = Callable[..., Any]
TFun = TypeVar("TFun", bound=AnyFun) # any function
TAny = TypeVar("TAny", bound=Any)
TAnyFunOrGenerator = TypeVar(
"TAnyFunOrGenerator", AnyFun, Generator[Any, Optional[Any], Optional[Any]]
)
TAnyClass = TypeVar("TAnyClass", bound=object)
TimedeltaSeconds = Union[int, float, timedelta]
# represent secret value ie. coming from Kubernetes/Docker secrets or other providers
Expand Down
238 changes: 238 additions & 0 deletions dlt/extract/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import asyncio
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
wait as wait_for_futures,
)
from threading import Thread
from typing import Awaitable, Dict, Optional

from dlt.common.exceptions import PipelineException
from dlt.common.configuration.container import Container
from dlt.common.runtime.signals import sleep
from dlt.extract.typing import DataItemWithMeta, TItemFuture
from dlt.extract.items import ResolvablePipeItem, FuturePipeItem

from dlt.extract.exceptions import (
DltSourceException,
ExtractorException,
PipeException,
ResourceExtractionError,
)


class FuturesPool:
"""Worker pool for pipe items that can be resolved asynchronously.
Items can be either asyncio coroutines or regular callables which will be executed in a thread pool.
"""

def __init__(
self, workers: int = 5, poll_interval: float = 0.01, max_parallel_items: int = 20
) -> None:
self.futures: Dict[TItemFuture, FuturePipeItem] = {}
self._thread_pool: ThreadPoolExecutor = None
self._async_pool: asyncio.AbstractEventLoop = None
self._async_pool_thread: Thread = None
self.workers = workers
self.poll_interval = poll_interval
self.max_parallel_items = max_parallel_items
self.used_slots: int = 0

def __len__(self) -> int:
return len(self.futures)

@property
def free_slots(self) -> int:
# Done futures don't count as slots, so we can still add futures
return self.max_parallel_items - self.used_slots

@property
def empty(self) -> bool:
return len(self.futures) == 0

def _ensure_thread_pool(self) -> ThreadPoolExecutor:
# lazily start or return thread pool
if self._thread_pool:
return self._thread_pool

self._thread_pool = ThreadPoolExecutor(
self.workers, thread_name_prefix=Container.thread_pool_prefix() + "threads"
)
return self._thread_pool

def _ensure_async_pool(self) -> asyncio.AbstractEventLoop:
# lazily create async pool is separate thread
if self._async_pool:
return self._async_pool

def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()

self._async_pool = asyncio.new_event_loop()
self._async_pool_thread = Thread(
target=start_background_loop,
args=(self._async_pool,),
daemon=True,
name=Container.thread_pool_prefix() + "futures",
)
self._async_pool_thread.start()

# start or return async pool
return self._async_pool

def _vacate_slot(self, _: TItemFuture) -> None:
# Used as callback to free up slot when future is done
self.used_slots -= 1

def submit(self, pipe_item: ResolvablePipeItem) -> TItemFuture:
"""Submit an item to the pool.
Args:
pipe_item: The pipe item to submit. `pipe_item.item` must be either an asyncio coroutine or a callable.
Returns:
The resulting future object
"""

# Sanity check, negative free slots means there's a bug somewhere
assert self.free_slots >= 0, "Worker pool has negative free slots, this should never happen"

if self.free_slots == 0:
# Wait until some future is completed to ensure there's a free slot
# Note: This is probably not thread safe. If ever multiple threads will be submitting
# jobs to the pool, we ned to change this whole method to be inside a `threading.Lock`
self._wait_for_free_slot()

future: Optional[TItemFuture] = None

# submit to thread pool or async pool
item = pipe_item.item
if isinstance(item, Awaitable):
future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool())
elif callable(item):
future = self._ensure_thread_pool().submit(item)
else:
raise ValueError(f"Unsupported item type: {type(item)}")

# Future is not removed from self.futures until it's been consumed by the
# pipe iterator. But we always want to vacate a slot so new jobs can be submitted
future.add_done_callback(self._vacate_slot)
self.used_slots += 1

self.futures[future] = FuturePipeItem(
future, pipe_item.step, pipe_item.pipe, pipe_item.meta
)
return future

def sleep(self) -> None:
sleep(self.poll_interval)

def _resolve_future(self, future: TItemFuture) -> Optional[ResolvablePipeItem]:
future, step, pipe, meta = self.futures.pop(future)

if ex := future.exception():
if isinstance(ex, StopAsyncIteration):
return None
# Raise if any future fails
if isinstance(
ex, (PipelineException, ExtractorException, DltSourceException, PipeException)
):
raise ex
raise ResourceExtractionError(pipe.name, future, str(ex), "future") from ex

item = future.result()

if item is None:
return None
elif isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)

def _next_done_future(self) -> Optional[TItemFuture]:
"""Get the done future in the pool (if any). This does not block."""
return next((fut for fut in self.futures if fut.done() and not fut.cancelled()), None)

def resolve_next_future(
self, use_configured_timeout: bool = False
) -> Optional[ResolvablePipeItem]:
"""Block until the next future is done and return the result. Returns None if no futures done.
Args:
use_configured_timeout: If True, use the value of `self.poll_interval` as the max wait time,
raises `concurrent.futures.TimeoutError` if no future is done within that time.
Returns:
The resolved future item or None if no future is done.
"""
if not self.futures:
return None

if (future := self._next_done_future()) is not None:
# When there are multiple already done futures from the same pipe we return results in insertion order
return self._resolve_future(future)
for future in as_completed(
self.futures, timeout=self.poll_interval if use_configured_timeout else None
):
if future.cancelled():
# Get the next not-cancelled future
continue

return self._resolve_future(future)

return None

def resolve_next_future_no_wait(self) -> Optional[ResolvablePipeItem]:
"""Resolve the first done future in the pool.
This does not block and returns None if no future is done.
"""
# Get next done future
future = self._next_done_future()
if not future:
return None

return self._resolve_future(future)

def _wait_for_free_slot(self) -> None:
"""Wait until any future in the pool is completed to ensure there's a free slot."""
if self.free_slots >= 1:
return

for future in as_completed(self.futures):
if future.cancelled():
# Get the next not-cancelled future
continue
if self.free_slots == 0:
# Future was already completed so slot was not freed
continue
return # Return when first future completes

def close(self) -> None:
# Cancel all futures
for f in self.futures:
if not f.done():
f.cancel()

def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None:
loop.stop()

if self._async_pool:
# wait for all async generators to be closed
future = asyncio.run_coroutine_threadsafe(
self._async_pool.shutdown_asyncgens(), self._ensure_async_pool()
)

wait_for_futures([future])
self._async_pool.call_soon_threadsafe(stop_background_loop, self._async_pool)

self._async_pool_thread.join()
self._async_pool = None
self._async_pool_thread = None

if self._thread_pool:
self._thread_pool.shutdown(wait=True)
self._thread_pool = None

self.futures.clear()
18 changes: 17 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -318,6 +319,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ...


Expand All @@ -335,6 +337,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: ...

Expand All @@ -353,6 +356,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -369,6 +373,7 @@ def resource(
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: bool = False,
data_from: TUnboundDltResource = None,
) -> Any:
Expand Down Expand Up @@ -427,6 +432,8 @@ def resource(
data_from (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.
parallelized (bool, optional): If `True`, the resource generator will be extracted in parallel with other resources. Defaults to `False`.
Raises:
ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed.
InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource`
Expand All @@ -447,7 +454,7 @@ def make_resource(
schema_contract=schema_contract,
table_format=table_format,
)
return DltResource.from_data(
resource = DltResource.from_data(
_data,
_name,
_section,
Expand All @@ -456,6 +463,9 @@ def make_resource(
cast(DltResource, data_from),
incremental=incremental,
)
if parallelized:
return resource.parallelize()
return resource

def decorator(
f: Callable[TResourceFunParams, Any]
Expand Down Expand Up @@ -565,6 +575,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]: ...


Expand All @@ -581,6 +592,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[
[Callable[Concatenate[TDataItem, TResourceFunParams], Any]],
Expand All @@ -601,6 +613,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...


Expand All @@ -617,6 +630,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: Literal[True] = True,
) -> Callable[TResourceFunParams, DltResource]: ...

Expand All @@ -633,6 +647,7 @@ def transformer(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: bool = False,
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
Expand Down Expand Up @@ -707,6 +722,7 @@ def transformer(
spec=spec,
standalone=standalone,
data_from=data_from,
parallelized=parallelized,
)


Expand Down
Loading

0 comments on commit 4a3d6ab

Please sign in to comment.