Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lithops compute arrays in parallel #259

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 94 additions & 54 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import copy
import logging
import time
from functools import partial
from typing import (
Any,
Callable,
Expand All @@ -19,7 +21,7 @@
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.plan import visit_nodes
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.lithops_retries import (
RetryingFuture,
Expand All @@ -39,8 +41,11 @@ def run_func(input, func=None, config=None, name=None):

def map_unordered(
lithops_function_executor: FunctionExecutor,
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
group_map_functions: Sequence[Callable[..., Any]],
group_map_iterdata: Sequence[
Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]
],
group_names: Sequence[str],
include_modules: List[str] = [],
timeout: Optional[int] = None,
retries: int = 2,
Expand All @@ -55,8 +60,9 @@ def map_unordered(
A generalisation of Lithops `map`, with retries, and relaxed return ordering.

:param lithops_function_executor: The Lithops function executor to use.
:param map_function: The function to map over the data.
:param map_iterdata: An iterable of input data.
:param group_map_functions: A sequence of functions to map over the data.
:param group_map_iterdata: A sequence of iterables of input data.
:param group_names: The names of the function/iterable groups.
:param include_modules: Modules to include.
:param retries: The number of times to retry a failed task before raising an exception.
:param use_backups: Whether to launch backup tasks to mitigate against slow-running tasks.
Expand All @@ -66,28 +72,33 @@ def map_unordered(
"""
return_when = ALWAYS if use_backups else ANY_COMPLETED

inputs = list(map_iterdata)
tasks = {}
start_times = {}
end_times = {}
group_name_to_function: Dict[str, Callable[..., Any]] = {}
# backups are launched based on task start and end times for the group
start_times: Dict[str, Dict[RetryingFuture, float]] = {}
end_times: Dict[str, Dict[RetryingFuture, float]] = collections.defaultdict(dict)
backups: Dict[RetryingFuture, RetryingFuture] = {}
pending = []

# can't use functools.partial here as we get an error in lithops
# also, lithops extra_args doesn't work for this case
partial_map_function = lambda x: map_function(x, **kwargs)

futures = map_with_retries(
lithops_function_executor,
partial_map_function,
inputs,
timeout=timeout,
include_modules=include_modules,
retries=retries,
)
tasks.update({k: v for (k, v) in zip(futures, inputs)})
start_times.update({k: time.monotonic() for k in futures})
pending.extend(futures)
pending: List[RetryingFuture] = []
group_name: str

for map_function, map_iterdata, group_name in zip(
group_map_functions, group_map_iterdata, group_names
):
# can't use functools.partial here as we get an error in lithops
# also, lithops extra_args doesn't work for this case
partial_map_function = lambda x: map_function(x, **kwargs)
group_name_to_function[group_name] = partial_map_function

futures = map_with_retries(
lithops_function_executor,
partial_map_function,
map_iterdata,
timeout=timeout,
include_modules=include_modules,
retries=retries,
group_name=group_name,
)
start_times[group_name] = {k: time.monotonic() for k in futures}
pending.extend(futures)

while pending:
finished, pending = wait_with_retries(
Expand All @@ -105,9 +116,10 @@ def map_unordered(
if not backup.done or not backup.error:
continue
future.status(throw_except=True)
end_times[future] = time.monotonic()
group_name = future.group_name # type: ignore[assignment]
end_times[group_name][future] = time.monotonic()
if return_stats:
yield future.result(), standardise_lithops_stats(future.stats)
yield future.result(), standardise_lithops_stats(future)
else:
yield future.result()

Expand All @@ -124,21 +136,24 @@ def map_unordered(
if use_backups:
now = time.monotonic()
for future in copy.copy(pending):
group_name = future.group_name # type: ignore[assignment]
if future not in backups and should_launch_backup(
future, now, start_times, end_times
future, now, start_times[group_name], end_times[group_name]
):
input = tasks[future]
input = future.input
logger.info("Running backup task for %s", input)
futures = map_with_retries(
lithops_function_executor,
partial_map_function,
group_name_to_function[group_name],
[input],
timeout=timeout,
include_modules=include_modules,
retries=0, # don't retry backup tasks
group_name=group_name,
)
start_times[group_name].update(
{k: time.monotonic() for k in futures}
)
tasks.update({k: v for (k, v) in zip(futures, [input])})
start_times.update({k: time.monotonic() for k in futures})
pending.extend(futures)
backup = futures[0]
backups[future] = backup
Expand All @@ -151,32 +166,57 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
with FunctionExecutor(**kwargs) as executor:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
for _, stats in map_unordered(
executor,
run_func,
stage.mappable,
func=stage.function,
config=pipeline.config,
name=name,
use_backups=use_backups,
return_stats=True,
):
stats["array_name"] = name
handle_callbacks(callbacks, stats)
else:
raise NotImplementedError()


def standardise_lithops_stats(stats):
if not compute_arrays_in_parallel:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
for _, stats in map_unordered(
executor,
[run_func],
[stage.mappable],
[name],
func=stage.function,
config=pipeline.config,
name=name,
use_backups=use_backups,
return_stats=True,
):
handle_callbacks(callbacks, stats)
else:
raise NotImplementedError()
else:
for gen in visit_node_generations(dag, resume=resume):
group_map_functions = []
group_map_iterdata = []
group_names = []
for name, node in gen:
pipeline = node["pipeline"]
stage = pipeline.stages[0] # assume one
f = partial(run_func, func=stage.function, config=pipeline.config)
group_map_functions.append(f)
group_map_iterdata.append(stage.mappable)
group_names.append(name)
for _, stats in map_unordered(
executor,
group_map_functions,
group_map_iterdata,
group_names,
use_backups=use_backups,
return_stats=True,
):
handle_callbacks(callbacks, stats)


def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
array_name=future.group_name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
8 changes: 6 additions & 2 deletions cubed/runtime/executors/lithops_retries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from lithops import FunctionExecutor
from lithops.future import ResponseFuture
Expand All @@ -18,12 +18,14 @@ def __init__(
input: Any,
map_kwargs: Any = None,
retries: Optional[int] = None,
group_name: Optional[str] = None,
):
self.response_future = response_future
self.map_function = map_function
self.input = input
self.map_kwargs = map_kwargs or {}
self.retries = retries or 0
self.group_name = group_name
self.failure_count = 0
self.cancelled = False

Expand Down Expand Up @@ -87,10 +89,11 @@ def result(self, throw_except: bool = True, internal_storage: Any = None):
def map_with_retries(
function_executor: FunctionExecutor,
map_function: Callable[..., Any],
map_iterdata: List[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
timeout: Optional[int] = None,
include_modules: Optional[List[str]] = [],
retries: Optional[int] = None,
group_name: Optional[str] = None,
) -> List[RetryingFuture]:
"""
A generalisation of Lithops `map`, with retries.
Expand All @@ -107,6 +110,7 @@ def map_with_retries(
input=i,
map_kwargs=dict(timeout=timeout, include_modules=include_modules),
retries=retries,
group_name=group_name,
)
for i, f in zip(inputs, futures_list)
]
Expand Down
5 changes: 3 additions & 2 deletions cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ def run_test(function, input, retries, timeout=10, use_backups=False):
with LocalhostExecutor() as executor:
for output in map_unordered(
executor,
function,
input,
[function],
[input],
["group0"],
timeout=timeout,
retries=retries,
use_backups=use_backups,
Expand Down
11 changes: 10 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,16 @@ def test_plan_scaling(tmp_path, factor):
def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parallel):
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor

if not isinstance(any_executor, AsyncPythonDagExecutor):
supported_executors = [AsyncPythonDagExecutor]

try:
from cubed.runtime.executors.lithops import LithopsDagExecutor

supported_executors.append(LithopsDagExecutor)
except ImportError:
pass

if not isinstance(any_executor, tuple(supported_executors)):
pytest.skip(f"{type(any_executor)} does not support compute_arrays_in_parallel")

a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
Expand Down