From 4e4cd905e782f179295bc438f73c235e7383524e Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 13 Jul 2023 11:32:37 +0100 Subject: [PATCH] Add group (array) name to RetryingFuture --- cubed/runtime/executors/lithops.py | 9 ++++++--- cubed/runtime/executors/lithops_retries.py | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index 8856813d..eb29558f 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -66,6 +66,7 @@ def map_unordered( """ return_when = ALWAYS if use_backups else ANY_COMPLETED + group_name = kwargs.get("name", None) start_times = {} end_times = {} backups: Dict[RetryingFuture, RetryingFuture] = {} @@ -82,6 +83,7 @@ def map_unordered( timeout=timeout, include_modules=include_modules, retries=retries, + group_name=group_name, ) start_times.update({k: time.monotonic() for k in futures}) pending.extend(futures) @@ -104,7 +106,7 @@ def map_unordered( future.status(throw_except=True) end_times[future] = time.monotonic() if return_stats: - yield future.result(), standardise_lithops_stats(future.stats) + yield future.result(), standardise_lithops_stats(future) else: yield future.result() @@ -165,14 +167,15 @@ def execute_dag( use_backups=use_backups, return_stats=True, ): - stats["array_name"] = name handle_callbacks(callbacks, stats) else: raise NotImplementedError() -def standardise_lithops_stats(stats): +def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]: + stats = future.stats return dict( + array_name=future.group_name, task_create_tstamp=stats["host_job_create_tstamp"], function_start_tstamp=stats["worker_func_start_tstamp"], function_end_tstamp=stats["worker_func_end_tstamp"], diff --git a/cubed/runtime/executors/lithops_retries.py b/cubed/runtime/executors/lithops_retries.py index f9553797..e4cb4903 100644 --- a/cubed/runtime/executors/lithops_retries.py +++ b/cubed/runtime/executors/lithops_retries.py @@ -18,12 +18,14 @@ def __init__( input: Any, map_kwargs: Any = None, retries: Optional[int] = None, + group_name: Optional[str] = None, ): self.response_future = response_future self.map_function = map_function self.input = input self.map_kwargs = map_kwargs or {} self.retries = retries or 0 + self.group_name = group_name self.failure_count = 0 self.cancelled = False @@ -91,6 +93,7 @@ def map_with_retries( timeout: Optional[int] = None, include_modules: Optional[List[str]] = [], retries: Optional[int] = None, + group_name: Optional[str] = None, ) -> List[RetryingFuture]: """ A generalisation of Lithops `map`, with retries. @@ -107,6 +110,7 @@ def map_with_retries( input=i, map_kwargs=dict(timeout=timeout, include_modules=include_modules), retries=retries, + group_name=group_name, ) for i, f in zip(inputs, futures_list) ]