Skip to content

Commit

Permalink
Change map_unordered to run groups (of functions/mappables) in para…
Browse files Browse the repository at this point in the history
…llel.
  • Loading branch information
tomwhite committed Jul 14, 2023
1 parent 32c49d8 commit 44153af
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 52 deletions.
142 changes: 93 additions & 49 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import copy
import logging
import time
from functools import partial
from typing import (
Any,
Callable,
Expand All @@ -19,7 +21,7 @@
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.plan import visit_nodes
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.lithops_retries import (
RetryingFuture,
Expand All @@ -39,8 +41,11 @@ def run_func(input, func=None, config=None, name=None):

def map_unordered(
lithops_function_executor: FunctionExecutor,
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
group_map_functions: Sequence[Callable[..., Any]],
group_map_iterdata: Sequence[
Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]
],
group_names: Sequence[str],
include_modules: List[str] = [],
timeout: Optional[int] = None,
retries: int = 2,
Expand All @@ -55,8 +60,9 @@ def map_unordered(
A generalisation of Lithops `map`, with retries, and relaxed return ordering.
:param lithops_function_executor: The Lithops function executor to use.
:param map_function: The function to map over the data.
:param map_iterdata: An iterable of input data.
:param group_map_functions: A sequence of functions to map over the data.
:param group_map_iterdata: A sequence of iterables of input data.
:param group_names: The names of the function/iterable groups.
:param include_modules: Modules to include.
:param retries: The number of times to retry a failed task before raising an exception.
:param use_backups: Whether to launch backup tasks to mitigate against slow-running tasks.
Expand All @@ -66,25 +72,33 @@ def map_unordered(
"""
return_when = ALWAYS if use_backups else ANY_COMPLETED

start_times = {}
end_times = {}
group_name_to_function: Dict[str, Callable[..., Any]] = {}
# backups are launched based on task start and end times for the group
start_times: Dict[str, Dict[RetryingFuture, float]] = {}
end_times: Dict[str, Dict[RetryingFuture, float]] = collections.defaultdict(dict)
backups: Dict[RetryingFuture, RetryingFuture] = {}
pending = []

# can't use functools.partial here as we get an error in lithops
# also, lithops extra_args doesn't work for this case
partial_map_function = lambda x: map_function(x, **kwargs)

futures = map_with_retries(
lithops_function_executor,
partial_map_function,
map_iterdata,
timeout=timeout,
include_modules=include_modules,
retries=retries,
)
start_times.update({k: time.monotonic() for k in futures})
pending.extend(futures)
pending: List[RetryingFuture] = []
group_name: str

for map_function, map_iterdata, group_name in zip(
group_map_functions, group_map_iterdata, group_names
):
# can't use functools.partial here as we get an error in lithops
# also, lithops extra_args doesn't work for this case
partial_map_function = lambda x: map_function(x, **kwargs)
group_name_to_function[group_name] = partial_map_function

futures = map_with_retries(
lithops_function_executor,
partial_map_function,
map_iterdata,
timeout=timeout,
include_modules=include_modules,
retries=retries,
group_name=group_name,
)
start_times[group_name] = {k: time.monotonic() for k in futures}
pending.extend(futures)

while pending:
finished, pending = wait_with_retries(
Expand All @@ -102,9 +116,10 @@ def map_unordered(
if not backup.done or not backup.error:
continue
future.status(throw_except=True)
end_times[future] = time.monotonic()
group_name = future.group_name # type: ignore[assignment]
end_times[group_name][future] = time.monotonic()
if return_stats:
yield future.result(), standardise_lithops_stats(future.stats)
yield future.result(), standardise_lithops_stats(future)
else:
yield future.result()

Expand All @@ -121,20 +136,24 @@ def map_unordered(
if use_backups:
now = time.monotonic()
for future in copy.copy(pending):
group_name = future.group_name # type: ignore[assignment]
if future not in backups and should_launch_backup(
future, now, start_times, end_times
future, now, start_times[group_name], end_times[group_name]
):
input = future.input
logger.info("Running backup task for %s", input)
futures = map_with_retries(
lithops_function_executor,
partial_map_function,
group_name_to_function[group_name],
[input],
timeout=timeout,
include_modules=include_modules,
retries=0, # don't retry backup tasks
group_name=group_name,
)
start_times[group_name].update(
{k: time.monotonic() for k in futures}
)
start_times.update({k: time.monotonic() for k in futures})
pending.extend(futures)
backup = futures[0]
backups[future] = backup
Expand All @@ -147,32 +166,57 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
with FunctionExecutor(**kwargs) as executor:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
for _, stats in map_unordered(
executor,
run_func,
stage.mappable,
func=stage.function,
config=pipeline.config,
name=name,
use_backups=use_backups,
return_stats=True,
):
stats["array_name"] = name
handle_callbacks(callbacks, stats)
else:
raise NotImplementedError()


def standardise_lithops_stats(stats):
if not compute_arrays_in_parallel:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
for _, stats in map_unordered(
executor,
[run_func],
[stage.mappable],
[name],
func=stage.function,
config=pipeline.config,
name=name,
use_backups=use_backups,
return_stats=True,
):
handle_callbacks(callbacks, stats)
else:
raise NotImplementedError()
else:
for gen in visit_node_generations(dag, resume=resume):
group_map_functions = []
group_map_iterdata = []
group_names = []
for name, node in gen:
pipeline = node["pipeline"]
stage = pipeline.stages[0] # assume one
f = partial(run_func, func=stage.function, config=pipeline.config)
group_map_functions.append(f)
group_map_iterdata.append(stage.mappable)
group_names.append(name)
for _, stats in map_unordered(
executor,
group_map_functions,
group_map_iterdata,
group_names,
use_backups=use_backups,
return_stats=True,
):
handle_callbacks(callbacks, stats)


def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
array_name=future.group_name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
5 changes: 3 additions & 2 deletions cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ def run_test(function, input, retries, timeout=10, use_backups=False):
with LocalhostExecutor() as executor:
for output in map_unordered(
executor,
function,
input,
[function],
[input],
["group0"],
timeout=timeout,
retries=retries,
use_backups=use_backups,
Expand Down
11 changes: 10 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,16 @@ def test_plan_scaling(tmp_path, factor):
def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parallel):
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor

if not isinstance(any_executor, AsyncPythonDagExecutor):
supported_executors = [AsyncPythonDagExecutor]

try:
from cubed.runtime.executors.lithops import LithopsDagExecutor

supported_executors.append(LithopsDagExecutor)
except ImportError:
pass

if not isinstance(any_executor, tuple(supported_executors)):
pytest.skip(f"{type(any_executor)} does not support compute_arrays_in_parallel")

a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
Expand Down

0 comments on commit 44153af

Please sign in to comment.