Skip to content

Commit

Permalink
Add group (array) name to RetryingFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Jul 13, 2023
1 parent 5f7d8d7 commit 4e4cd90
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def map_unordered(
"""
return_when = ALWAYS if use_backups else ANY_COMPLETED

group_name = kwargs.get("name", None)
start_times = {}
end_times = {}
backups: Dict[RetryingFuture, RetryingFuture] = {}
Expand All @@ -82,6 +83,7 @@ def map_unordered(
timeout=timeout,
include_modules=include_modules,
retries=retries,
group_name=group_name,
)
start_times.update({k: time.monotonic() for k in futures})
pending.extend(futures)
Expand All @@ -104,7 +106,7 @@ def map_unordered(
future.status(throw_except=True)
end_times[future] = time.monotonic()
if return_stats:
yield future.result(), standardise_lithops_stats(future.stats)
yield future.result(), standardise_lithops_stats(future)
else:
yield future.result()

Expand Down Expand Up @@ -165,14 +167,15 @@ def execute_dag(
use_backups=use_backups,
return_stats=True,
):
stats["array_name"] = name
handle_callbacks(callbacks, stats)
else:
raise NotImplementedError()


def standardise_lithops_stats(stats):
def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
array_name=future.group_name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
4 changes: 4 additions & 0 deletions cubed/runtime/executors/lithops_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ def __init__(
input: Any,
map_kwargs: Any = None,
retries: Optional[int] = None,
group_name: Optional[str] = None,
):
self.response_future = response_future
self.map_function = map_function
self.input = input
self.map_kwargs = map_kwargs or {}
self.retries = retries or 0
self.group_name = group_name
self.failure_count = 0
self.cancelled = False

Expand Down Expand Up @@ -91,6 +93,7 @@ def map_with_retries(
timeout: Optional[int] = None,
include_modules: Optional[List[str]] = [],
retries: Optional[int] = None,
group_name: Optional[str] = None,
) -> List[RetryingFuture]:
"""
A generalisation of Lithops `map`, with retries.
Expand All @@ -107,6 +110,7 @@ def map_with_retries(
input=i,
map_kwargs=dict(timeout=timeout, include_modules=include_modules),
retries=retries,
group_name=group_name,
)
for i, f in zip(inputs, futures_list)
]
Expand Down

0 comments on commit 4e4cd90

Please sign in to comment.