diff --git a/cubed/runtime/backup.py b/cubed/runtime/backup.py index b2a95708..2ad3562b 100644 --- a/cubed/runtime/backup.py +++ b/cubed/runtime/backup.py @@ -1,15 +1,18 @@ import math +from typing import Dict, TypeVar + +T = TypeVar("T") def should_launch_backup( - task, - now, - start_times, - end_times, - min_tasks=10, - min_completed_fraction=0.5, - slow_factor=3.0, -): + task: T, + now: float, + start_times: Dict[T, float], + end_times: Dict[T, float], + min_tasks: int = 10, + min_completed_fraction: float = 0.5, + slow_factor: float = 3.0, +) -> bool: """ Determine whether to launch a backup task. diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index 645d09db..071607d3 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -1,13 +1,31 @@ import copy import logging import time +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Union, +) from lithops.executors import FunctionExecutor from lithops.wait import ALWAYS, ANY_COMPLETED +from networkx import MultiDiGraph +from cubed.core.array import Callback from cubed.core.plan import visit_nodes from cubed.runtime.backup import should_launch_backup -from cubed.runtime.executors.lithops_retries import map_with_retries, wait_with_retries +from cubed.runtime.executors.lithops_retries import ( + RetryingFuture, + map_with_retries, + wait_with_retries, +) from cubed.runtime.types import DagExecutor from cubed.runtime.utils import handle_callbacks @@ -20,16 +38,16 @@ def run_func(input, func=None, config=None, name=None): def map_unordered( - lithops_function_executor, - map_function, - map_iterdata, - include_modules=[], - timeout=None, - retries=2, - use_backups=False, - return_stats=False, + lithops_function_executor: FunctionExecutor, + map_function: Callable[..., Any], + map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]], + include_modules: List[str] = [], + timeout: Optional[int] = None, + retries: int = 2, + use_backups: bool = False, + return_stats: bool = False, **kwargs, -): +) -> Iterator[Any]: """ Apply a function to items of an input list, yielding results as they are completed (which may be different to the input order). @@ -52,7 +70,7 @@ def map_unordered( tasks = {} start_times = {} end_times = {} - backups = {} + backups: Dict[RetryingFuture, RetryingFuture] = {} pending = [] # can't use functools.partial here as we get an error in lithops @@ -128,7 +146,13 @@ def map_unordered( time.sleep(1) -def execute_dag(dag, callbacks=None, array_names=None, resume=None, **kwargs): +def execute_dag( + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, +) -> None: use_backups = kwargs.pop("use_backups", False) with FunctionExecutor(**kwargs) as executor: for name, node in visit_nodes(dag, resume=resume): @@ -168,6 +192,19 @@ class LithopsDagExecutor(DagExecutor): def __init__(self, **kwargs): self.kwargs = kwargs - def execute_dag(self, dag, callbacks=None, array_names=None, **kwargs): + def execute_dag( + self, + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, + ) -> None: merged_kwargs = {**self.kwargs, **kwargs} - execute_dag(dag, callbacks=callbacks, array_names=array_names, **merged_kwargs) + execute_dag( + dag, + callbacks=callbacks, + array_names=array_names, + resume=resume, + **merged_kwargs, + ) diff --git a/cubed/runtime/executors/lithops_retries.py b/cubed/runtime/executors/lithops_retries.py index 570610a1..9306363d 100644 --- a/cubed/runtime/executors/lithops_retries.py +++ b/cubed/runtime/executors/lithops_retries.py @@ -14,7 +14,7 @@ class RetryingFuture: def __init__( self, response_future: ResponseFuture, - map_function: Callable, + map_function: Callable[..., Any], input: Any, map_kwargs: Any = None, retries: Optional[int] = None, @@ -86,7 +86,7 @@ def result(self, throw_except: bool = True, internal_storage: Any = None): def map_with_retries( function_executor: FunctionExecutor, - map_function: Callable, + map_function: Callable[..., Any], map_iterdata: List[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]], timeout: Optional[int] = None, include_modules: Optional[List[str]] = [], diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index 8c489ec9..6d3541c6 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -1,11 +1,14 @@ import os import time from asyncio.exceptions import TimeoutError +from typing import Optional, Sequence import modal from modal.exception import ConnectionError +from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt +from cubed.core.array import Callback from cubed.core.plan import visit_nodes from cubed.runtime.types import DagExecutor from cubed.runtime.utils import execute_with_stats, handle_callbacks @@ -90,8 +93,13 @@ def run_remotely(self, input, func=None, config=None): stop=stop_after_attempt(3), ) def execute_dag( - dag, callbacks=None, array_names=None, resume=None, cloud=None, **kwargs -): + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + cloud: Optional[str] = None, + **kwargs, +) -> None: with stub.run(): cloud = cloud or "aws" if cloud == "aws": @@ -124,6 +132,19 @@ class ModalDagExecutor(DagExecutor): def __init__(self, **kwargs): self.kwargs = kwargs - def execute_dag(self, dag, callbacks=None, array_names=None, **kwargs): + def execute_dag( + self, + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, + ) -> None: merged_kwargs = {**self.kwargs, **kwargs} - execute_dag(dag, callbacks=callbacks, array_names=array_names, **merged_kwargs) + execute_dag( + dag, + callbacks=callbacks, + array_names=array_names, + resume=resume, + **merged_kwargs, + ) diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index e7fc74b3..da941f03 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -2,10 +2,14 @@ import copy import time from asyncio.exceptions import TimeoutError +from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence from modal.exception import ConnectionError +from modal.function import Function +from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt +from cubed.core.array import Callback from cubed.core.plan import visit_nodes from cubed.runtime.backup import should_launch_backup from cubed.runtime.executors.modal import Container, run_remotely, stub @@ -15,14 +19,14 @@ # We need map_unordered for the use_backups implementation async def map_unordered( - app_function, - input, - use_backups=False, - backup_function=None, - return_stats=False, - name=None, + app_function: Function, + input: Iterable[Any], + use_backups: bool = False, + backup_function: Optional[Function] = None, + return_stats: bool = False, + name: Optional[str] = None, **kwargs, -): +) -> AsyncIterator[Any]: """ Apply a function to items of an input list, yielding results as they are completed (which may be different to the input order). @@ -57,7 +61,7 @@ async def map_unordered( t = time.monotonic() start_times = {f: t for f in pending} end_times = {} - backups = {} + backups: Dict[asyncio.Future, asyncio.Future] = {} while pending: finished, pending = await asyncio.wait( @@ -118,8 +122,13 @@ async def map_unordered( stop=stop_after_attempt(3), ) async def async_execute_dag( - dag, callbacks=None, array_names=None, resume=None, cloud=None, **kwargs -): + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + cloud: Optional[str] = None, + **kwargs, +) -> None: async with stub.run(): cloud = cloud or "aws" if cloud == "aws": @@ -153,7 +162,14 @@ class AsyncModalDagExecutor(DagExecutor): def __init__(self, **kwargs): self.kwargs = kwargs - def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs): + def execute_dag( + self, + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, + ) -> None: merged_kwargs = {**self.kwargs, **kwargs} asyncio.run( async_execute_dag( diff --git a/cubed/runtime/executors/python.py b/cubed/runtime/executors/python.py index 703493d8..26852f12 100644 --- a/cubed/runtime/executors/python.py +++ b/cubed/runtime/executors/python.py @@ -1,21 +1,32 @@ +from typing import Any, Callable, Optional, Sequence + +from networkx import MultiDiGraph from tenacity import retry, stop_after_attempt -from cubed.core.array import TaskEndEvent +from cubed.core.array import Callback, TaskEndEvent from cubed.core.plan import visit_nodes +from cubed.primitive.types import CubedPipeline from cubed.runtime.types import DagExecutor @retry(reraise=True, stop=stop_after_attempt(3)) -def exec_stage_func(func, *args, **kwargs): +def exec_stage_func(func: Callable[..., Any], *args, **kwargs): return func(*args, **kwargs) class PythonDagExecutor(DagExecutor): """The default execution engine that runs tasks sequentially uses Python loops.""" - def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs): + def execute_dag( + self, + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, + ) -> None: for name, node in visit_nodes(dag, resume=resume): - pipeline = node["pipeline"] + pipeline: CubedPipeline = node["pipeline"] for stage in pipeline.stages: if stage.mappable is not None: for m in stage.mappable: diff --git a/cubed/runtime/executors/python_async.py b/cubed/runtime/executors/python_async.py index ce9a43e9..37faaff6 100644 --- a/cubed/runtime/executors/python_async.py +++ b/cubed/runtime/executors/python_async.py @@ -1,12 +1,17 @@ import asyncio import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Executor, ThreadPoolExecutor from functools import partial +from typing import Any, AsyncIterator, Callable, Iterable, Optional, Sequence from aiostream import stream +from aiostream.core import Stream +from networkx import MultiDiGraph from tenacity import Retrying, stop_after_attempt +from cubed.core.array import Callback from cubed.core.plan import visit_node_generations +from cubed.primitive.types import CubedPipeline from cubed.runtime.types import DagExecutor from cubed.runtime.utils import execution_stats, handle_callbacks @@ -19,15 +24,15 @@ def run_func(input, func=None, config=None, name=None): async def map_unordered( - concurrent_executor, - function, - input, - retries=2, - use_backups=False, - return_stats=False, - name=None, + concurrent_executor: Executor, + function: Callable[..., Any], + input: Iterable[Any], + retries: int = 2, + use_backups: bool = False, + return_stats: bool = False, + name: Optional[str] = None, **kwargs, -): +) -> AsyncIterator[Any]: if name is not None: print(f"{name}: running map_unordered") if retries == 0: @@ -63,7 +68,9 @@ async def map_unordered( yield task.result() -def pipeline_to_stream(concurrent_executor, name, pipeline, **kwargs): +def pipeline_to_stream( + concurrent_executor: Executor, name: str, pipeline: CubedPipeline, **kwargs +) -> Stream: if any([stage for stage in pipeline.stages if stage.mappable is None]): raise NotImplementedError("All stages must be mappable in pipelines") it = stream.iterate( @@ -88,8 +95,12 @@ def pipeline_to_stream(concurrent_executor, name, pipeline, **kwargs): async def async_execute_dag( - dag, callbacks=None, array_names=None, resume=None, **kwargs -): + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, +) -> None: with ThreadPoolExecutor() as concurrent_executor: for gen in visit_node_generations(dag, resume=resume): # run pipelines in the same topological generation in parallel by merging their streams @@ -108,7 +119,14 @@ async def async_execute_dag( class AsyncPythonDagExecutor(DagExecutor): """An execution engine that uses Python asyncio.""" - def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs): + def execute_dag( + self, + dag: MultiDiGraph, + callbacks: Optional[Sequence[Callback]] = None, + array_names: Optional[Sequence[str]] = None, + resume: Optional[bool] = None, + **kwargs, + ) -> None: asyncio.run( async_execute_dag( dag, diff --git a/cubed/runtime/pipeline.py b/cubed/runtime/pipeline.py index d21bc3a2..51467cb2 100644 --- a/cubed/runtime/pipeline.py +++ b/cubed/runtime/pipeline.py @@ -1,6 +1,6 @@ import itertools import math -from typing import Any, Iterable, Iterator, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple import numpy as np @@ -39,7 +39,7 @@ def __iter__(self): return chunk_keys(self.shape, self.chunks) -def copy_read_to_write(chunk_key, *, config: CubedCopySpec): +def copy_read_to_write(chunk_key: Sequence[slice], *, config: CubedCopySpec) -> None: # workaround limitation of lithops.utils.verify_args if isinstance(chunk_key, list): chunk_key = tuple(chunk_key) @@ -64,7 +64,7 @@ def spec_to_pipeline( ) -def already_computed(node_dict, resume=None): +def already_computed(node_dict: Dict[str, Any], resume: Optional[bool] = None) -> bool: """ Return True if the array for a node doesn't have a pipeline to compute it, or it has already been computed (all chunks are present). @@ -76,7 +76,7 @@ def already_computed(node_dict, resume=None): target = node_dict.get("target", None) if resume and target is not None: target = open_if_lazy_zarr_array(target) - # this check can be expensive since it has to list the directory to find nchunks + # this check can be expensive since it has to list the directory to find nchunks_initialized if target.ndim > 0 and target.nchunks_initialized == target.nchunks: return True diff --git a/cubed/runtime/types.py b/cubed/runtime/types.py index fd00271e..3fbeae22 100644 --- a/cubed/runtime/types.py +++ b/cubed/runtime/types.py @@ -1,5 +1,8 @@ +from networkx import MultiDiGraph + + class DagExecutor: - def execute_dag(self, dag, **kwargs): + def execute_dag(self, dag: MultiDiGraph, **kwargs) -> None: raise NotImplementedError # pragma: no cover diff --git a/cubed/runtime/utils.py b/cubed/runtime/utils.py index 00d4cb8a..a8d31856 100644 --- a/cubed/runtime/utils.py +++ b/cubed/runtime/utils.py @@ -6,7 +6,7 @@ sym_counter = 0 -def gensym(name): +def gensym(name: str) -> str: global sym_counter sym_counter += 1 return f"{name}-{sym_counter:03}"