From 2915329ea99c1500b738156b676a069aa74f666a Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:15:31 +0200 Subject: [PATCH 1/6] Add checks on executor shutdown and logs --- .../slurm/ssh/_executor_wait_thread.py | 6 +++++ .../runner/executors/slurm/ssh/executor.py | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py b/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py index 086474e1a5..5db08ce65e 100644 --- a/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py +++ b/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py @@ -7,6 +7,7 @@ from cfut import FileWaitThread from ......logger import set_logger +from fractal_server.app.runner.exceptions import JobExecutionError logger = set_logger(__name__) @@ -48,6 +49,10 @@ def wait(self, *, job_id: str): This method is executed on the main thread. """ + if self.shutdown: + error_msg = "Cannot call `wait` method after executor shutdown." + logger.warning(error_msg) + raise JobExecutionError(info=error_msg) with self.lock: self.active_job_ids.append(job_id) @@ -102,6 +107,7 @@ def run(self): self.shutdown_callback() except Exception: # nosec pass + self.raise_job_execution_error_callback() return if ind % skip == 0: with self.lock: diff --git a/fractal_server/app/runner/executors/slurm/ssh/executor.py b/fractal_server/app/runner/executors/slurm/ssh/executor.py index 6771607758..c8b8ec99ae 100644 --- a/fractal_server/app/runner/executors/slurm/ssh/executor.py +++ b/fractal_server/app/runner/executors/slurm/ssh/executor.py @@ -353,6 +353,12 @@ def submit( Future representing the execution of the current SLURM job. """ + # Do not continue if auxiliary thread was shut down + if self.wait_thread.shutdown: + error_msg = "Cannot call `submit` method after executor shutdown" + logger.warning(error_msg) + raise JobExecutionError(info=error_msg) + # Set defaults, if needed if slurm_config is None: slurm_config = get_default_slurm_config() @@ -436,6 +442,12 @@ def map( """ + # Do not continue if auxiliary thread was shut down + if self.wait_thread.shutdown: + error_msg = "Cannot call `map` method after executor shutdown" + logger.warning(error_msg) + raise JobExecutionError(info=error_msg) + def _result_or_cancel(fut): """ This function is based on the Python Standard Library 3.11. @@ -867,6 +879,14 @@ def _submit_job(self, job: SlurmJob) -> tuple[Future, str]: job: The `SlurmJob` object to submit. """ + # Prevent calling sbatch if auxiliary thread was shut down + if self.wait_thread.shutdown: + error_msg = ( + "Cannot call `_submit_job` method after executor shutdown" + ) + logger.warning(error_msg) + raise JobExecutionError(info=error_msg) + # Submit job to SLURM, and get jobid sbatch_command = f"sbatch --parsable {job.slurm_script_remote}" pre_submission_cmds = job.slurm_config.pre_submission_commands @@ -1336,6 +1356,9 @@ def shutdown(self, wait=True, *, cancel_futures=False): the self.wait_thread thread, see _completion. """ + # Redudantly set thread shutdown attribute to True + self.wait_thread.shutdown = True + logger.debug("Executor shutdown: start") # Handle all job futures From 75e250c6ac51946d56a54aab2e76a1074a2e7e9d Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:15:43 +0200 Subject: [PATCH 2/6] Add test_slurm_ssh_executor_shutdown_before_job_submission --- tests/v2/00_ssh/test_executor.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/v2/00_ssh/test_executor.py b/tests/v2/00_ssh/test_executor.py index 50380cd038..ac6d1dd01e 100644 --- a/tests/v2/00_ssh/test_executor.py +++ b/tests/v2/00_ssh/test_executor.py @@ -1,8 +1,10 @@ import logging from pathlib import Path +import pytest from devtools import debug +from fractal_server.app.runner.exceptions import JobExecutionError from fractal_server.app.runner.executors.slurm.ssh.executor import ( FractalSlurmSSHExecutor, ) # noqa @@ -111,3 +113,31 @@ def test_slurm_ssh_executor_submit_with_pre_sbatch( debug(fut.result()) assert auxfile.exists() + + +def test_slurm_ssh_executor_shutdown_before_job_submission( + fractal_ssh, + tmp_path: Path, + tmp777_path: Path, + override_settings_factory, + current_py_version: str, +): + """ + Verify the behavior when shutdown is called before any job has started. + """ + + override_settings_factory( + FRACTAL_SLURM_WORKER_PYTHON=f"/usr/bin/python{current_py_version}" + ) + + with MockFractalSSHSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir", + workflow_dir_remote=(tmp777_path / "remote_job_dir"), + slurm_poll_interval=1, + fractal_ssh=fractal_ssh, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + fut = executor.submit(lambda: 1) + fut.result() + debug(exc_info.value) From 6d48ee4a2bf26816efb5dc2066a2148f3a3e6236 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:19:05 +0200 Subject: [PATCH 3/6] Update CHANGELOG [skip ci] --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e5990a8b4..18bc08af95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ # 2.3.7 +* SSH SLURM executor: + * Handle early shutdown in SSH executor (\#1696). * Task collection: * Introduce a new configuration variable `FRACTAL_MAX_PIP_VERSION` to pin task-collection pip (\#1675). From 68529b4f90baa6367d94d45f1ca4da0ea15d3250 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:20:57 +0200 Subject: [PATCH 4/6] Remove missing-method call --- .../app/runner/executors/slurm/ssh/_executor_wait_thread.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py b/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py index 5db08ce65e..6845ca1152 100644 --- a/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py +++ b/fractal_server/app/runner/executors/slurm/ssh/_executor_wait_thread.py @@ -107,7 +107,6 @@ def run(self): self.shutdown_callback() except Exception: # nosec pass - self.raise_job_execution_error_callback() return if ind % skip == 0: with self.lock: From e63b5540e6da78e1af846b4f31a8264fb2f6d815 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:33:16 +0200 Subject: [PATCH 5/6] Extend test_slurm_ssh_executor_shutdown_before_job_submission --- tests/v2/00_ssh/test_executor.py | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/v2/00_ssh/test_executor.py b/tests/v2/00_ssh/test_executor.py index ac6d1dd01e..c954b18499 100644 --- a/tests/v2/00_ssh/test_executor.py +++ b/tests/v2/00_ssh/test_executor.py @@ -131,8 +131,8 @@ def test_slurm_ssh_executor_shutdown_before_job_submission( ) with MockFractalSSHSlurmExecutor( - workflow_dir_local=tmp_path / "job_dir", - workflow_dir_remote=(tmp777_path / "remote_job_dir"), + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=(tmp777_path / "remote_job_dir1"), slurm_poll_interval=1, fractal_ssh=fractal_ssh, ) as executor: @@ -141,3 +141,37 @@ def test_slurm_ssh_executor_shutdown_before_job_submission( fut = executor.submit(lambda: 1) fut.result() debug(exc_info.value) + + with MockFractalSSHSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir2", + workflow_dir_remote=(tmp777_path / "remote_job_dir2"), + slurm_poll_interval=1, + fractal_ssh=fractal_ssh, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + fut = executor.map(lambda x: 1, [1, 2, 3]) + fut.result() + debug(exc_info.value) + + with MockFractalSSHSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir3", + workflow_dir_remote=(tmp777_path / "remote_job_dir3"), + slurm_poll_interval=1, + fractal_ssh=fractal_ssh, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + executor.wait_thread.wait(None) + debug(exc_info.value) + + with MockFractalSSHSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir4", + workflow_dir_remote=(tmp777_path / "remote_job_dir4"), + slurm_poll_interval=1, + fractal_ssh=fractal_ssh, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + executor._submit_job(None) + debug(exc_info.value) From 0c753ee203938590accfbfdc636a148dd83bec7a Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:44:53 +0200 Subject: [PATCH 6/6] Fix test --- tests/v2/00_ssh/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/v2/00_ssh/test_executor.py b/tests/v2/00_ssh/test_executor.py index c954b18499..a1df792017 100644 --- a/tests/v2/00_ssh/test_executor.py +++ b/tests/v2/00_ssh/test_executor.py @@ -162,7 +162,7 @@ def test_slurm_ssh_executor_shutdown_before_job_submission( ) as executor: executor.shutdown() with pytest.raises(JobExecutionError) as exc_info: - executor.wait_thread.wait(None) + executor.wait_thread.wait(job_id=1) debug(exc_info.value) with MockFractalSSHSlurmExecutor(