From 5002ab94c99aad457e4a8798f632e906680aaae6 Mon Sep 17 00:00:00 2001 From: Ali Date: Wed, 2 Oct 2024 11:04:33 +0200 Subject: [PATCH] working implementation --- src/aiida/cmdline/commands/cmd_process.py | 16 +++++-- src/aiida/cmdline/params/options/main.py | 4 +- src/aiida/engine/processes/calcjobs/tasks.py | 49 ++++++++++++-------- src/aiida/engine/processes/control.py | 15 +++--- src/aiida/engine/utils.py | 14 +++--- 5 files changed, 56 insertions(+), 42 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index 285fe00e4f..9673b3e95a 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -320,7 +320,10 @@ def process_status(call_link_label, most_recent_node, max_depth, processes): @options.ALL(help='Kill all processes if no specific processes are specified.') @options.TIMEOUT() @options.WAIT() -@options.FORCE_KILL() +@options.FORCE_KILL( + help='Force kill the process if it does not respond to the initial kill signal.\n' + ' Note: This may lead to orphaned jobs on your HPC and should be used with caution.' +) @decorators.with_dbenv() def process_kill(processes, all_entries, timeout, wait, force_kill): """Kill running processes. @@ -341,10 +344,13 @@ def process_kill(processes, all_entries, timeout, wait, force_kill): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' - control.kill_processes( - processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message, force_kill=force_kill - ) + if force_kill: + echo.echo_warning('Force kill is enabled. This may lead to orphaned jobs on your HPC.') + # note: It's important to include -F in the message, as this is used to identify force-killed processes. + message = 'Force killed through `verdi process kill -F`' + else: + message = 'Killed through `verdi process kill`' + control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index 2d74d3a2a1..c8bd87f533 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -330,11 +330,11 @@ def set_log_level(ctx, _param, value): FORCE = OverridableOption('-f', '--force', is_flag=True, default=False, help='Do not ask for confirmation.') FORCE_KILL = OverridableOption( - '-fk', + '-F', '--force-kill', is_flag=True, default=False, - help='Kills the process without waiting for a response if the job is killed.', + help='Kills the process without waiting for a confirmation if the job has been killed from remote.', ) SILENT = OverridableOption('-s', '--silent', is_flag=True, default=False, help='Suppress any output printed to stdout.') diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 699d3e594b..1af167a4d8 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -101,9 +101,9 @@ async def do_upload(): try: logger.info(f'scheduled request to upload CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) skip_submit = await exponential_backoff_retry( - do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_upload, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except PreSubmitException: raise @@ -149,9 +149,9 @@ async def do_submit(): try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( - do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_submit, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -207,9 +207,9 @@ async def do_update(): try: logger.info(f'scheduled request to update CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) job_done = await exponential_backoff_retry( - do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_update, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -258,9 +258,9 @@ async def do_monitor(): try: logger.info(f'scheduled request to monitor CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) monitor_result = await exponential_backoff_retry( - do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_monitor, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -334,9 +334,9 @@ async def do_retrieve(): try: logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( - do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_retrieve, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -385,7 +385,7 @@ async def do_stash(): initial_interval, max_attempts, logger=node.logger, - ignore_exceptions=plumpy.process_states.Interruption, + breaking_exceptions=plumpy.process_states.Interruption, ) except plumpy.process_states.Interruption: raise @@ -398,7 +398,9 @@ async def do_stash(): return -async def task_kill_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture): +async def task_kill_job( + node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture, force_kill: bool = False +): """Transport task that will attempt to kill a job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager @@ -412,7 +414,6 @@ async def task_kill_job(node: CalcJobNode, transport_queue: TransportQueue, canc :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ - breakpoint() initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) @@ -423,19 +424,23 @@ async def task_kill_job(node: CalcJobNode, transport_queue: TransportQueue, canc authinfo = node.get_authinfo() async def do_kill(): - # this function fails when there is no transport - # then the exponential backof raises an exception with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) return execmanager.kill_calculation(node, transport) + if force_kill: + logger.warning(f'Process<{node.pk}> has been force killed! this may result in orphaned jobs.') + raise plumpy.process_states.ForceKillInterruption('Force killing CalcJob') try: logger.info(f'scheduled request to kill CalcJob<{node.pk}>') result = await exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger) + # Note: any exception raised here, will result in the process being excepted. not killed! + # There for it can result in orphaned jobs! except plumpy.process_states.Interruption: + logger.warning(f'killing CalcJob<{node.pk}> excepted, the job might be orphaned.') raise except Exception as exception: - logger.warning(f'killing CalcJob<{node.pk}> failed') + logger.warning(f'killing CalcJob<{node.pk}> excepted, the job might be orphaned.') raise TransportTaskException(f'kill_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'killing CalcJob<{node.pk}> successful') @@ -531,7 +536,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override monitor_result = await self._monitor_job(node, transport_queue, self.monitors) if monitor_result and monitor_result.action is CalcJobMonitorAction.KILL: - await self._kill_job(node, transport_queue) + await self._kill_job(node, transport_queue, force_kill=False) job_done = True if monitor_result and not monitor_result.retrieve: @@ -570,7 +575,11 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override except TransportTaskException as exception: raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}') except plumpy.process_states.KillInterruption as exception: - await self._kill_job(node, transport_queue) + await self._kill_job(node, transport_queue, force_kill=False) + node.set_process_status(str(exception)) + return self.retrieve(monitor_result=self._monitor_result) + except plumpy.process_states.ForceKillInterruption as exception: + await self._kill_job(node, transport_queue, force_kill=True) node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) except (plumpy.futures.CancelledError, asyncio.CancelledError): @@ -610,9 +619,9 @@ async def _monitor_job(self, node, transport_queue, monitors) -> CalcJobMonitorR return monitor_result - async def _kill_job(self, node, transport_queue) -> None: + async def _kill_job(self, node, transport_queue, force_kill) -> None: """Kill the job.""" - await self._launch_task(task_kill_job, node, transport_queue) + await self._launch_task(task_kill_job, node, transport_queue, force_kill=force_kill) if self._killing is not None: self._killing.set_result(True) else: diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index a8e2d61a1e..6d12760b57 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -144,7 +144,7 @@ def pause_processes( .. note:: Requires the daemon to be running, or processes will be unresponsive. - :param processes: List of processes to play. + :param processes: List of processes to pause. :param all_entries: Pause all playing processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. @@ -174,7 +174,6 @@ def kill_processes( all_entries: bool = False, timeout: float = 5.0, wait: bool = False, - force_kill: bool = False, ) -> None: """Kill running processes. @@ -184,7 +183,6 @@ def kill_processes( :param all_entries: Kill all active processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. - :param force_kill: ... TODO :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout. """ if not get_daemon_client().is_daemon_running: @@ -201,9 +199,7 @@ def kill_processes( return controller = get_manager().get_process_controller() - _perform_actions( - processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message, force_kill=force_kill - ) + _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) def _perform_actions( @@ -283,9 +279,12 @@ def handle_result(result): try: # unwrap is need here since LoopCommunicator will also wrap a future unwrapped = unwrap_kiwi_future(future) - result = unwrapped.result() + result = unwrapped.result(timeout=timeout) except communications.TimeoutError: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') + if process.is_terminated: + LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent') + else: + LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') except Exception as exception: LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') else: diff --git a/src/aiida/engine/utils.py b/src/aiida/engine/utils.py index 4053156a97..cce9ad4d29 100644 --- a/src/aiida/engine/utils.py +++ b/src/aiida/engine/utils.py @@ -101,7 +101,7 @@ async def with_interrupt(self, coro: Awaitable[Any]) -> Any: import asyncio loop = asyncio.get_event_loop() - interruptable = InterutableFuture() + interruptable = InterruptableFuture() loop.call_soon(interruptable.interrupt, RuntimeError("STOP")) loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.))) >>> RuntimeError: STOP @@ -114,7 +114,7 @@ async def with_interrupt(self, coro: Awaitable[Any]) -> Any: wait_iter = asyncio.as_completed({self, task}) result = await next(wait_iter) if self.done(): - raise RuntimeError(f"This interruptible future had it's result set unexpectedly to '{result}'") + raise RuntimeError(f"This interruptable future had it's result set unexpectedly to '{result}'") return result @@ -124,7 +124,7 @@ def interruptable_task( ) -> InterruptableFuture: """Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it. - :param coro: the coroutine that should be made interruptable with object of InterutableFuture as last paramenter + :param coro: the coroutine that should be made interruptable with object of InterruptableFuture as last parameter :param loop: the event loop in which to run the coroutine, by default uses asyncio.get_event_loop() :return: an InterruptableFuture """ @@ -178,7 +178,7 @@ async def exponential_backoff_retry( initial_interval: Union[int, float] = 10.0, max_attempts: int = 5, logger: Optional[logging.Logger] = None, - ignore_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None, + breaking_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None, ) -> Any: """Coroutine to call a function, recalling it with an exponential backoff in the case of an exception @@ -190,7 +190,8 @@ async def exponential_backoff_retry( :param fct: the function to call, which will be turned into a coroutine first if it is not already :param initial_interval: the time to wait after the first caught exception before calling the coroutine again :param max_attempts: the maximum number of times to call the coroutine before re-raising the exception - :param ignore_exceptions: exceptions to ignore, i.e. when caught do nothing and simply re-raise + :param breaking_exceptions: exceptions that breaks EBM loop. These exceptions are re-raise. + If None, all exceptions are raised only after max_attempts reached. :return: result if the ``coro`` call completes within ``max_attempts`` retries without raising """ if logger is None: @@ -205,8 +206,7 @@ async def exponential_backoff_retry( result = await coro() break # Finished successfully except Exception as exception: - # Re-raise exceptions that should be ignored - if ignore_exceptions is not None and isinstance(exception, ignore_exceptions): + if breaking_exceptions is not None and isinstance(exception, breaking_exceptions): raise count = iteration + 1