diff --git a/CHANGELOG.md b/CHANGELOG.md index b86b11d..acc7ef3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,16 @@ ## [Unreleased][] -[Unreleased]: https://github.com/chaostoolkit/chaostoolkit-addons/compare/0.7.0...HEAD +[Unreleased]: https://github.com/chaostoolkit/chaostoolkit-addons/compare/0.8.0...HEAD + +## [0.8.0][] + +[0.8.0]: https://github.com/chaostoolkit/chaostoolkit-addons/compare/0.7.0...0.8.0 + +### Changed + +- Reworked how we trigger the actual exit call so that we never block the + threads playing the safeguards. Now only one thread can trigger the exit. ## [0.7.0][] diff --git a/chaosaddons/controls/safeguards.py b/chaosaddons/controls/safeguards.py index 907eae3..ce37f19 100644 --- a/chaosaddons/controls/safeguards.py +++ b/chaosaddons/controls/safeguards.py @@ -111,6 +111,7 @@ def __init__(self) -> None: self._lock = threading.Lock() self._interrupted = False self._setup = False + self.triggered_by = None @property def interrupted(self) -> bool: @@ -145,10 +146,12 @@ def prepare(self, probes: List[Probe]) -> None: now_count += 1 self.repeating_until = threading.Event() + self.wait_for_interruption = threading.Event() self.now_all_done = threading.Barrier(parties=now_count + 1) self.now = ThreadPoolExecutor(max_workers=now_count or 1) self.once = ThreadPoolExecutor(max_workers=once_count or 1) self.repeating = ThreadPoolExecutor(max_workers=repeating_count or 1) + self.interrupter = ThreadPoolExecutor(max_workers=1) self._setup = True def run(self, experiment: Experiment, probes: List[Probe], @@ -161,6 +164,7 @@ def run(self, experiment: Experiment, probes: List[Probe], or not), then this call blocks until all these pre-check safeguards are completed. """ + self.interrupter.submit(self._wait_interruption) for p in probes: f = None if p.get("frequency"): @@ -186,6 +190,26 @@ def run(self, experiment: Experiment, probes: List[Probe], # this allows the experiment to block until these are passed self.now_all_done.wait() + def interrupt_now(self, triggered_by: str) -> None: + with self._lock: + self.triggered_by = triggered_by + + self.wait_for_interruption.set() + + def _wait_interruption(self) -> None: + self.wait_for_interruption.wait() + + if not self.triggered_by: + return None + + if not self.interrupted: + self.interrupted = True + if not experiment_finished.is_set(): + logger.critical( + "Safeguard '{}' triggered the end of the experiment".format( + self.triggered_by)) + exit_gracefully() + def _log_finished(self, f: Future, probe: Probe) -> None: """ Logs each safeguard when they terminated. @@ -206,10 +230,12 @@ def terminate(self) -> None: if not self._setup: return None + self.wait_for_interruption.set() self.repeating_until.set() - self.now.shutdown(wait=True) - self.repeating.shutdown(wait=True) - self.once.shutdown(wait=True) + self.now.shutdown(wait=False, cancel_futures=False) + self.repeating.shutdown(wait=False, cancel_futures=False) + self.once.shutdown(wait=False, cancel_futures=False) + logger.debug("Guardian is now terminated") guardian = Guardian() @@ -292,13 +318,8 @@ def interrupt_experiment_on_unhealthy_probe(guard: Guardian, probe: Probe, checked = within_tolerance( tolerance, run["output"], configuration=configuration, secrets=secrets) - if not checked and not guard.interrupted: - guard.interrupted = True - if not experiment_finished.is_set(): - logger.critical( - "Safeguard '{}' triggered the end of the experiment".format( - probe["name"])) - exit_gracefully() + if not checked: + guard.interrupt_now(probe["name"]) def execute_activity(experiment: Experiment, probe: Probe, diff --git a/setup.py b/setup.py index 3846320..e5bcbb0 100644 --- a/setup.py +++ b/setup.py @@ -5,5 +5,5 @@ setup( use_scm_version=True, name="chaostoolkit-addons", - version="0.7.0", + version="0.8.0", )