From fa6037fc0d3e44d823995b364d4632c8a04fefaf Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 12:04:20 -0800 Subject: [PATCH] revert: run_in_parallel `continue_on_errors` --- sky/cli.py | 4 +--- sky/utils/subprocess_utils.py | 29 ++++++----------------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 79a77130424..afbcec5e13e 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3553,9 +3553,7 @@ def delete_storage(name: str) -> None: with ux_utils.print_exception_no_traceback(): raise e - subprocess_utils.run_in_parallel(delete_storage, - names, - continue_on_error=True) + subprocess_utils.run_in_parallel(delete_storage, names) @cli.group(cls=_NaturalOrderGroup) diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index 7da98132209..a83d9fa8262 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -99,8 +99,7 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int: # TODO(andyl): Why this function returns a list of results? Why not yielding? def run_in_parallel(func: Callable, args: Iterable[Any], - num_threads: Optional[int] = None, - continue_on_error: bool = False) -> List[Any]: + num_threads: Optional[int] = None) -> List[Any]: """Run a function in parallel on a list of arguments. Args: @@ -108,35 +107,19 @@ def run_in_parallel(func: Callable, args: Iterable of arguments to pass to func num_threads: Number of threads to use. If None, uses get_parallel_threads() - continue_on_error: If True, continues execution when errors occur - If False (default), raises the first error immediately Returns: A list of the return values of the function func, in the same order as the - arguments. If continue_on_error=True, failed operations will have - their exceptions in the result list. + arguments. + + Raises: + Exception: The first exception encountered. """ processes = (num_threads if num_threads is not None else get_parallel_threads()) with pool.ThreadPool(processes=processes) as p: ordered_iterators = p.imap(func, args) - - # TODO(andyl): Is this list(ordered_iterators) clear? Maybe we should - # merge two cases, and move this logic deeper to - # `except e: results.append(e) if continue_on_error else raise e` - if not continue_on_error: - return list(ordered_iterators) - else: - results: List[Union[Any, Exception]] = [] - while True: - try: - result = next(ordered_iterators) - results.append(result) - except StopIteration: - break - except Exception as e: # pylint: disable=broad-except - results.append(e) - return results + return list(ordered_iterators) def handle_returncode(returncode: int,