From 4ba1b0d94b33d22ac53a6c068e4deb1b73fa3181 Mon Sep 17 00:00:00 2001 From: Pragadeeswaran Sathyanarayanan Date: Tue, 7 Jan 2025 13:59:03 +0530 Subject: [PATCH] RHCEPHQE-17401: Refactor with context in parallel module. The initial design populated the results only when it had completed successfully. This could be a impact when qe is refering the values during the context. In this PR, the futures.as_completed is not used instead the timeout is controlled via datetime methods. Also, results are continued to be appended at the end of the method. The iteration is modified to use `_futures` instead of _results. This way we can capture details on the cancelled threads also. Signed-off-by: Pragadeeswaran Sathyanarayanan --- ceph/parallel.py | 84 ++++++++++++++++++++++----------- tests/parallel/test_parallel.py | 2 - 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/ceph/parallel.py b/ceph/parallel.py index 7e6cb10fe..5214da1a7 100644 --- a/ceph/parallel.py +++ b/ceph/parallel.py @@ -44,7 +44,9 @@ kills the rest and raises the exception. """ import logging -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from datetime import datetime, timedelta +from time import sleep logger = logging.getLogger(__name__) @@ -56,7 +58,6 @@ def __init__( self, thread_pool=True, timeout=None, - shutdown_wait=True, shutdown_cancel_pending=False, ): """Object initialization method. @@ -64,16 +65,22 @@ def __init__( Args: thread_pool (bool) Whether to use threads or processes. timeout (int | float) Maximum allowed time. - shutdown_wait (bool) If disabled, it would not wait for executing - threads/process to complete. shutdown_cancel_pending (bool) If enabled, it would cancel pending tasks. """ self._executor = ThreadPoolExecutor() if thread_pool else ProcessPoolExecutor() self._timeout = timeout - self._shutdown_wait = shutdown_wait self._cancel_pending = shutdown_cancel_pending self._futures = list() self._results = list() + self._iter_index = 0 + + @property + def count(self): + return len(self._futures) + + @property + def results(self): + return self._results def spawn(self, fun, *args, **kwargs): """Triggers the first class method. @@ -93,30 +100,53 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_value, trackback): - _exceptions = [] - exception_count = 0 - - for _f in as_completed(self._futures, timeout=self._timeout): - try: - self._results.append(_f.result()) - except Exception as e: - logger.exception(e) - _exceptions.append(e) - exception_count += 1 - - if exception_count > 0 and not self._shutdown_wait: - # At this point we are ignoring results - self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending) - raise _exceptions[0] - - if len(_exceptions) > 0: - raise _exceptions[0] - - return False if exception_count == 0 else True + _not_done = self._futures[:] + _end_time = datetime.now() + timedelta( + seconds=self._timeout if self._timeout else 3600 + ) + + # Wait for all futures to complete within the given time or 1 hour. + while datetime.now() < _end_time: + # if the list is empty break + if len(_not_done) == 0: + break + + sleep(2.0) + for _f in _not_done: + if _f.done(): + _not_done.remove(_f) + + # Graceful shutdown of running threads + if _not_done: + self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending) + + if exc_value is not None: + logger.exception(trackback) + return False + + # Check for any exceptions and raise + # At this point, they + try: + for _f in self._futures: + self._results.append(_f.result(timeout=1)) + except Exception: + logger.exception("Encountered an exception during parallel execution.") + raise + + return True def __iter__(self): return self def __next__(self): - for r in self._results: - yield r + if self.count == 0 or self._iter_index == self.count: + self._iter_index = 0 # reset the counter + raise StopIteration() + + try: + out = self._futures[self._iter_index].result(timeout=1) + except Exception as e: + out = e + + self._iter_index += 1 + return out diff --git a/tests/parallel/test_parallel.py b/tests/parallel/test_parallel.py index 2c6e7cd13..a31061392 100644 --- a/tests/parallel/test_parallel.py +++ b/tests/parallel/test_parallel.py @@ -48,14 +48,12 @@ def run(**kwargs): parallel_tests = kwargs["parallel"] parallel_tcs = manager.list() max_time = kwargs.get("config", {}).get("max_time", None) - wait_till_complete = kwargs.get("config", {}).get("wait_till_complete", True) cancel_pending = kwargs.get("config", {}).get("cancel_pending", False) parallel_log.info(kwargs) with parallel( thread_pool=False, timeout=max_time, - shutdown_wait=wait_till_complete, shutdown_cancel_pending=cancel_pending, ) as p: for test in parallel_tests: