Skip to content

Commit

Permalink
RHCEPHQE-17401: Refactor with context in parallel module.
Browse files Browse the repository at this point in the history
The initial design populated the results only when it had
completed successfully. This could be a impact when qe is
refering the values during the context.

In this PR, the futures.as_completed is not used instead the
timeout is controlled via datetime methods. Also, results are
continued to be appended at the end of the method.

The iteration is modified to use `_futures` instead of _results.
This way we can capture details on the cancelled threads also.

Signed-off-by: Pragadeeswaran Sathyanarayanan <[email protected]>
  • Loading branch information
psathyan committed Jan 8, 2025
1 parent 7ffa138 commit 4ba1b0d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
84 changes: 57 additions & 27 deletions ceph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
kills the rest and raises the exception.
"""
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timedelta
from time import sleep

logger = logging.getLogger(__name__)

Expand All @@ -56,24 +58,29 @@ def __init__(
self,
thread_pool=True,
timeout=None,
shutdown_wait=True,
shutdown_cancel_pending=False,
):
"""Object initialization method.
Args:
thread_pool (bool) Whether to use threads or processes.
timeout (int | float) Maximum allowed time.
shutdown_wait (bool) If disabled, it would not wait for executing
threads/process to complete.
shutdown_cancel_pending (bool) If enabled, it would cancel pending tasks.
"""
self._executor = ThreadPoolExecutor() if thread_pool else ProcessPoolExecutor()
self._timeout = timeout
self._shutdown_wait = shutdown_wait
self._cancel_pending = shutdown_cancel_pending
self._futures = list()
self._results = list()
self._iter_index = 0

@property
def count(self):
return len(self._futures)

@property
def results(self):
return self._results

def spawn(self, fun, *args, **kwargs):
"""Triggers the first class method.
Expand All @@ -93,30 +100,53 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, trackback):
_exceptions = []
exception_count = 0

for _f in as_completed(self._futures, timeout=self._timeout):
try:
self._results.append(_f.result())
except Exception as e:
logger.exception(e)
_exceptions.append(e)
exception_count += 1

if exception_count > 0 and not self._shutdown_wait:
# At this point we are ignoring results
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)
raise _exceptions[0]

if len(_exceptions) > 0:
raise _exceptions[0]

return False if exception_count == 0 else True
_not_done = self._futures[:]
_end_time = datetime.now() + timedelta(
seconds=self._timeout if self._timeout else 3600
)

# Wait for all futures to complete within the given time or 1 hour.
while datetime.now() < _end_time:
# if the list is empty break
if len(_not_done) == 0:
break

sleep(2.0)
for _f in _not_done:
if _f.done():
_not_done.remove(_f)

# Graceful shutdown of running threads
if _not_done:
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)

if exc_value is not None:
logger.exception(trackback)
return False

# Check for any exceptions and raise
# At this point, they
try:
for _f in self._futures:
self._results.append(_f.result(timeout=1))
except Exception:
logger.exception("Encountered an exception during parallel execution.")
raise

return True

def __iter__(self):
return self

def __next__(self):
for r in self._results:
yield r
if self.count == 0 or self._iter_index == self.count:
self._iter_index = 0 # reset the counter
raise StopIteration()

try:
out = self._futures[self._iter_index].result(timeout=1)
except Exception as e:
out = e

self._iter_index += 1
return out
2 changes: 0 additions & 2 deletions tests/parallel/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ def run(**kwargs):
parallel_tests = kwargs["parallel"]
parallel_tcs = manager.list()
max_time = kwargs.get("config", {}).get("max_time", None)
wait_till_complete = kwargs.get("config", {}).get("wait_till_complete", True)
cancel_pending = kwargs.get("config", {}).get("cancel_pending", False)
parallel_log.info(kwargs)

with parallel(
thread_pool=False,
timeout=max_time,
shutdown_wait=wait_till_complete,
shutdown_cancel_pending=cancel_pending,
) as p:
for test in parallel_tests:
Expand Down

0 comments on commit 4ba1b0d

Please sign in to comment.