Skip to content

Commit

Permalink
feat: as_completed static type checking (#98)
Browse files Browse the repository at this point in the history
* feat: proper static type checking for as_completed

* chore: add gather docstrings

* chore: add as_completed docstrings
  • Loading branch information
BobTheBuidler authored Oct 29, 2023
1 parent dfe0922 commit 1a37bd2
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 9 deletions.
93 changes: 84 additions & 9 deletions a_sync/utils/as_completed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

import asyncio
from typing import (Any, AsyncIterator, Awaitable, Iterable, Iterator, Mapping,
Optional, Tuple, TypeVar, Union, overload)
from typing import (Any, AsyncIterator, Awaitable, Coroutine, Iterable,
Iterator, Literal, Mapping, Optional, Tuple, TypeVar,
Union, overload)

try:
from tqdm.asyncio import tqdm_asyncio
Expand All @@ -17,18 +18,64 @@ def as_completed(*args, **kwargs):
VT = TypeVar('VT')

@overload
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float], return_exceptions: bool, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[Tuple[KT, VT]]]:
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[False] = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Iterator[Coroutine[None, None, T]]:
...
@overload
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float], return_exceptions: bool, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[T]]:
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[True] = True, tqdm: bool = False, **tqdm_kwargs: Any) -> ASyncIterator[T]:
...
@overload
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float], return_exceptions: bool, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[False] = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Iterator[Coroutine[None, None, Tuple[KT, VT]]]:
...
@overload
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float], return_exceptions: bool, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[T]:
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[True] = True, tqdm: bool = False, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
...
def as_completed(fs, *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any):
"""
Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results.
This function extends Python's asyncio.as_completed, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.
Differences from asyncio.as_completed:
- Uses type hints for use with static type checkers.
- Supports either individual awaitables or a k:v mapping of awaitables.
- Can be used as an async iterator which yields the result values. Example below.
- Provides progress reporting using tqdm if 'tqdm' is set to True.
Args:
fs (Iterable[Awaitable[T] or Mapping[KT, Awaitable[VT]]]): The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables.
timeout (float, optional): The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).
return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False.
aiter (bool, optional): If True, returns an async iterator of results. Defaults to False.
tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False.
**tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled.
Returns:
Iterator[Coroutine[None, None, T] or ASyncIterator[Tuple[KT, VT]]]: An iterator of results when awaiting individual awaitables or an async iterator when awaiting mappings.
Examples:
Awaiting individual awaitables:
```
awaitables = [async_function1(), async_function2()]
for coro in as_completed(awaitables):
val = await coro
...
async for val in as_completed(awaitables, aiter=True):
...
```
Awaiting mappings of awaitables:
```
mapping = {'key1': async_function1(), 'key2': async_function2()}
for coro in as_completed(mapping):
k, v = await coro
...
async for k, v in as_completed(mapping, aiter=True):
...
```
"""
if return_exceptions:
raise NotImplementedError
return (
Expand All @@ -39,12 +86,40 @@ def as_completed(fs, *, timeout: Optional[float] = None, return_exceptions: bool
)

@overload
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[True] = True, tqdm: bool = False, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
...
@overload
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[Tuple[KT, VT]]]:
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[False] = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Iterator[Coroutine[None, None, Tuple[KT, VT]]]:
...
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Union[Iterator[Awaitable[Tuple[KT, VT]]], ASyncIterator[Tuple[KT, VT]]]:
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Union[Iterator[Coroutine[None, None, Tuple[KT, VT]]], ASyncIterator[Tuple[KT, VT]]]:
"""
Concurrently awaits a mapping of awaitable objects and returns an iterator or async iterator of results.
This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into an iterator or an async iterator.
Args:
mapping (Mapping[KT, Awaitable[VT]]): A dictionary-like object where keys are of type KT and values are awaitable objects of type VT.
timeout (float, optional): The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).
return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False.
aiter (bool, optional): If True, returns an async iterator of results. Defaults to False.
tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False.
**tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled.
Returns:
Union[Iterator[Coroutine[None, None, Tuple[KT, VT]]] or ASyncIterator[Tuple[KT, VT]]]: An iterator of results or an async iterator when awaiting mappings.
Example:
```
mapping = {'key1': async_function1(), 'key2': async_function2()}
for coro in as_completed_mapping(mapping):
k, v = await coro
...
async for k, v in as_completed_mapping(mapping, aiter=True):
...
```
"""
return as_completed([__mapping_wrap(k, v) for k, v in mapping.items()], timeout=timeout, return_exceptions=return_exceptions, aiter=aiter, tqdm=tqdm, **tqdm_kwargs)

async def __yield_as_completed(futs: Iterable[Awaitable[T]], *, timeout: Optional[float] = None, return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> AsyncIterator[T]:
Expand Down
55 changes: 55 additions & 0 deletions a_sync/utils/gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,68 @@ async def gather(*awaitables: Mapping[KT, Awaitable[VT]], return_exceptions: boo
async def gather(*awaitables: Awaitable[T], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> List[T]:
...
async def gather(*awaitables: Union[Awaitable[T], Mapping[KT, Awaitable[VT]]], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Union[List[T], Dict[KT, VT]]:
"""
Concurrently awaits a list of awaitable objects or mappings of awaitables and returns the results.
This function extends Python's asyncio.gather, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.
Differences from asyncio.gather:
- Uses type hints for use with static type checkers.
- Supports gathering either individual awaitables or a k:v mapping of awaitables.
- Provides progress reporting using tqdm if 'tqdm' is set to True.
Args:
*awaitables (Union[Awaitable[T], Mapping[KT, Awaitable[VT]]]): The awaitables to await concurrently. It can be a single awaitable or a mapping of awaitables.
return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False.
tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False.
**tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled.
Returns:
Union[List[T], Dict[KT, VT]]: A list of results when awaiting individual awaitables or a dictionary of results when awaiting mappings.
Examples:
Awaiting individual awaitables:
results will be a list containing the result of each awaitable in sequential order
```
results = await gather(thing1(), thing2())
```
Awaiting mappings of awaitables
results will be a dictionary with 'key1' mapped to the result of thing1() and 'key2' mapped to the result of thing2.
```
mapping = {'key1': thing1(), 'key2': thing2()}
results = await gather(mapping)
```
"""
return await (
gather_mapping(awaitables[0], return_exceptions=return_exceptions, tqdm=tqdm, **tqdm_kwargs) if _is_mapping(awaitables)
else tqdm_asyncio.gather(*awaitables, return_exceptions=return_exceptions, **tqdm_kwargs) if tqdm
else asyncio.gather(*awaitables, return_exceptions=return_exceptions)
)

async def gather_mapping(mapping: Mapping[KT, Awaitable[VT]], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[KT, VT]:
"""
Concurrently awaits a mapping of awaitable objects and returns a dictionary of results.
This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into a dictionary.
Args:
mapping (Mapping[KT, Awaitable[VT]]): A dictionary-like object where keys are of type KT and values are awaitable objects of type VT.
return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False.
tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False.
**tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled.
Returns:
Dict[KT, VT]: A dictionary with keys corresponding to the keys of the input mapping and values containing the results of the corresponding awaitables.
Example:
The 'results' dictionary will contain the awaited results, where keys match the keys in the 'mapping' and values contain the results of the corresponding awaitables.
```
mapping = {'task1': async_function1(), 'task2': async_function2(), 'task3': async_function3()}
results = await gather_mapping(mapping)
```
"""
results = {k: None for k in mapping.keys()} # return data in same order
async for k, v in as_completed_mapping(mapping, return_exceptions=return_exceptions, aiter=True, tqdm=tqdm, **tqdm_kwargs):
results[k] = v
Expand Down

0 comments on commit 1a37bd2

Please sign in to comment.