From 4e7302eceddbe16bafed6424fbd1816c01283f9d Mon Sep 17 00:00:00 2001 From: Al Rigazzi Date: Tue, 14 May 2024 20:55:31 +0200 Subject: [PATCH] Dragon server enhancement (#582) The Dragon server could fail, dumping a core file, if it was shut down before all spawned Process Groups completed. This PR fixes such behavior: the immediate flag on the `DragonShutdownRequest` now requests every non-terminated job to be stopped. [ committed by @al-rigazzi ] [ reviewed by @ashao ] --- doc/changelog.md | 4 ++ .../_core/launcher/dragon/dragonBackend.py | 18 ++++- .../_core/launcher/dragon/dragonLauncher.py | 2 +- tests/{on_wlm => full_wlm}/test_symlinking.py | 0 tests/test_dragon_backend.py | 67 +++++++++++++++++-- 5 files changed, 81 insertions(+), 10 deletions(-) rename tests/{on_wlm => full_wlm}/test_symlinking.py (100%) diff --git a/doc/changelog.md b/doc/changelog.md index be971bce7..f3908fcc0 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -15,6 +15,7 @@ To be released at some future point in time Description +- Improve Dragon server shutdown - Add dragon runtime installer - Add launcher based on Dragon - Reuse Orchestrators within the testing suite to improve performance. @@ -62,6 +63,9 @@ Description - Fix publishing of development docs Detailed Notes + +- The Dragon server will now terminate any process which is still running + when a request of an immediate shutdown is sent. ([SmartSim-PR582](https://github.com/CrayLabs/SmartSim/pull/582)) - Add `--dragon` option to `smart build`. Install appropriate Dragon runtime from Dragon GitHub release assets. ([SmartSim-PR580](https://github.com/CrayLabs/SmartSim/pull/580)) diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index 71012886a..245660662 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -130,7 +130,10 @@ def redir_worker(io_conn: dragon_connection.Connection, file_path: str) -> None: except Exception as e: print(e) finally: - io_conn.close() + try: + io_conn.close() + except Exception as e: + print(e) class DragonBackend: @@ -293,6 +296,9 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str] message = f"Cannot satisfy request. Requested {request.nodes} nodes, " message += f"but only {len(self._hosts)} nodes are available." return False, message + if self._shutdown_requested: + message = "Cannot satisfy request, server is shutting down." + return False, message return True, None def _allocate_step( @@ -565,6 +571,12 @@ def _update(self) -> None: self._refresh_statuses() self._update_shutdown_status() + def _kill_all_running_jobs(self) -> None: + with self._queue_lock: + for step_id, group_info in self._group_infos.items(): + if group_info.status not in TERMINAL_STATUSES: + self._stop_requests.append(DragonStopRequest(step_id=step_id)) + def update(self) -> None: """Update internal data structures, queues, and job statuses""" logger.debug("Dragon Backend update thread started") @@ -579,6 +591,7 @@ def update(self) -> None: logger.debug(str(self)) except ValueError as e: logger.error(e) + logger.debug("Dragon Backend update thread stopping") @functools.singledispatchmethod @@ -633,7 +646,8 @@ def _(self, request: DragonHandshakeRequest) -> DragonHandshakeResponse: def _(self, request: DragonShutdownRequest) -> DragonShutdownResponse: self._shutdown_requested = True self._update_shutdown_status() - self._can_shutdown |= request.immediate + if request.immediate: + self._kill_all_running_jobs() self._frontend_shutdown = request.frontend_shutdown return DragonShutdownResponse() diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index c13eefedd..17b47e309 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -110,7 +110,7 @@ def add_step_to_mapping_table(self, name: str, step_map: StepMap) -> None: elif step_map.step_id.startswith("PBS-"): sublauncher = self._pbs_launcher else: - raise ValueError(f"Step id {step_map.step_id} is not valid.") + return sublauncher_step_map = StepMap( step_id=DragonLauncher._unprefix_step_id(step_map.step_id), diff --git a/tests/on_wlm/test_symlinking.py b/tests/full_wlm/test_symlinking.py similarity index 100% rename from tests/on_wlm/test_symlinking.py rename to tests/full_wlm/test_symlinking.py diff --git a/tests/test_dragon_backend.py b/tests/test_dragon_backend.py index be4babf0d..a510f660a 100644 --- a/tests/test_dragon_backend.py +++ b/tests/test_dragon_backend.py @@ -49,6 +49,7 @@ from smartsim._core.schemas.dragonRequests import * from smartsim._core.schemas.dragonResponses import * from smartsim._core.utils.helpers import create_short_id_str +from smartsim.status import TERMINAL_STATUSES, SmartSimStatus if t.TYPE_CHECKING: from smartsim._core.launcher.dragon.dragonBackend import ( @@ -248,6 +249,31 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert not dragon_backend._running_steps +def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None: + dragon_backend = get_mock_backend(monkeypatch) + + dragon_backend._shutdown_requested = True + + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + ) + + run_resp = dragon_backend.process_request(run_req) + assert isinstance(run_resp, DragonRunResponse) + assert run_resp.error_message == "Cannot satisfy request, server is shutting down." + step_id = run_resp.step_id + + assert dragon_backend.group_infos[step_id].status == SmartSimStatus.STATUS_FAILED + + def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) @@ -296,25 +322,51 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: @pytest.mark.parametrize( - "immediate, frontend_shutdown", - [[True, True], [True, False], [False, True], [False, False]], + "immediate, kill_jobs, frontend_shutdown", + [ + [True, True, True], + [True, True, False], + [True, False, True], + [True, False, False], + [False, True, True], + [False, True, False], + ], ) def test_shutdown_request( - monkeypatch: pytest.MonkeyPatch, immediate: bool, frontend_shutdown: bool + monkeypatch: pytest.MonkeyPatch, + immediate: bool, + kill_jobs: bool, + frontend_shutdown: bool, ) -> None: monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", "0") dragon_backend = get_mock_backend(monkeypatch) monkeypatch.setattr(dragon_backend, "_cooldown_period", 1) set_mock_group_infos(monkeypatch, dragon_backend) + if kill_jobs: + for group_info in dragon_backend.group_infos.values(): + if not group_info.status in TERMINAL_STATUSES: + group_info.status = SmartSimStatus.STATUS_FAILED + group_info.return_codes = [-9] + group_info.process_group = None + group_info.redir_workers = None + dragon_backend._running_steps.clear() + shutdown_req = DragonShutdownRequest( immediate=immediate, frontend_shutdown=frontend_shutdown ) shutdown_resp = dragon_backend.process_request(shutdown_req) - assert dragon_backend._shutdown_requested + if not kill_jobs: + stop_request_ids = ( + stop_request.step_id for stop_request in dragon_backend._stop_requests + ) + for step_id, group_info in dragon_backend.group_infos.items(): + if not group_info.status in TERMINAL_STATUSES: + assert step_id in stop_request_ids + assert isinstance(shutdown_resp, DragonShutdownResponse) - assert dragon_backend._can_shutdown == immediate + assert dragon_backend._shutdown_requested assert dragon_backend.frontend_shutdown == frontend_shutdown dragon_backend._update() @@ -322,8 +374,9 @@ def test_shutdown_request( time.sleep(dragon_backend._cooldown_period + 0.1) dragon_backend._update() - assert dragon_backend.should_shutdown == immediate - assert dragon_backend._has_cooled_down == immediate + assert dragon_backend._can_shutdown == kill_jobs + assert dragon_backend.should_shutdown == kill_jobs + assert dragon_backend._has_cooled_down == kill_jobs @pytest.mark.parametrize("telemetry_flag", ["0", "1"])