Skip to content

Commit

Permalink
remove locks from generator wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 24, 2024
1 parent a7bf8e0 commit 3c047a2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
23 changes: 13 additions & 10 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,27 @@ def wrap_async_generator(
) -> Generator[Awaitable[TDataItems], None, None]:
"""Wraps an async generator into a list of awaitables"""
exhausted = False
lock = asyncio.Lock()
busy = False

# creates an awaitable that will return the next item from the async generator
async def run() -> TDataItems:
async with lock:
try:
return await gen.__anext__()
# on stop iteration mark as exhausted
except StopAsyncIteration:
nonlocal exhausted
exhausted = True
raise
try:
return await gen.__anext__()
# on stop iteration mark as exhausted
except StopAsyncIteration:
nonlocal exhausted
exhausted = True
raise
finally:
nonlocal busy
busy = False

# this generator yields None while the async generator is not exhausted
try:
while not exhausted:
while lock.locked():
while busy:
yield None
busy = True
yield run()
except GeneratorExit:
# clean up async generator
Expand Down
29 changes: 17 additions & 12 deletions tests/pipeline/test_resources_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ def test_async_decorator_experiment(parallelized) -> None:
threads = set()

def parallelize(f) -> Any:
exhausted = False
lock = threading.Lock()

"""converts regular itarable to generator of functions that can be run in parallel in the pipe"""

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
exhausted = False
busy = False

gen = f(*args, **kwargs)
# unpack generator
if inspect.isfunction(gen):
Expand All @@ -212,18 +213,22 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
# get next item from generator
def _gen():
nonlocal exhausted
with lock:
# await asyncio.sleep(0.1)
try:
return next(gen)
# on stop iteration mark as exhausted
except StopIteration:
exhausted = True
return None
# await asyncio.sleep(0.1)
try:
return next(gen)
# on stop iteration mark as exhausted
except StopIteration:
exhausted = True
return None
finally:
nonlocal busy
busy = False

try:
while not exhausted:
while lock.locked():
while busy:
yield None
busy = True
yield _gen
except GeneratorExit:
# clean up inner generator
Expand Down

0 comments on commit 3c047a2

Please sign in to comment.