diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 553d31120..a6f0bfd35 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -36,6 +36,8 @@ from osrf_pycommon.process_utils import async_execute_process from osrf_pycommon.process_utils import AsyncSubprocessProtocol +import psutil + from .emit_event import EmitEvent from .opaque_function import OpaqueFunction from .timer_action import TimerAction @@ -64,9 +66,7 @@ from ..launch_description_entity import LaunchDescriptionEntity from ..some_actions_type import SomeActionsType from ..some_substitutions_type import SomeSubstitutionsType -from ..substitution import Substitution # noqa: F401 from ..substitutions import LaunchConfiguration -from ..substitutions import PythonExpression from ..utilities import create_future from ..utilities import is_a_subclass from ..utilities import normalize_to_list_of_substitutions @@ -87,6 +87,8 @@ def __init__( 'sigterm_timeout', default=5), sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration( 'sigkill_timeout', default=5), + signal_lingering_subprocesses: SomeSubstitutionsType = LaunchConfiguration( + 'signal_lingering_subprocesses', default=True), emulate_tty: bool = False, output: SomeSubstitutionsType = 'log', output_format: Text = '[{this.process_description.final_name}] {line}', @@ -158,6 +160,11 @@ def __init__( as a string or a list of strings and Substitutions to be resolved at runtime, defaults to the LaunchConfiguration called 'sigkill_timeout' + :param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process + will be signaled to make sure they finish. + The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence + used to kill the main process. + Subprocesses start being signaled when the main process completes. :param: emulate_tty emulate a tty (terminal), defaults to False, but can be overridden with the LaunchConfiguration called 'emulate_tty', the value of which is evaluated as true or false according to @@ -188,6 +195,8 @@ def __init__( self.__shell = shell self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout) self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout) + self.__signal_lingering_subprocesses = normalize_to_list_of_substitutions( + signal_lingering_subprocesses) self.__emulate_tty = emulate_tty self.__output = os.environ.get('OVERRIDE_LAUNCH_PROCESS_OUTPUT', output) if not isinstance(self.__output, dict): @@ -207,6 +216,7 @@ def __init__( self.__shutdown_future = None # type: Optional[asyncio.Future] self.__sigterm_timer = None # type: Optional[TimerAction] self.__sigkill_timer = None # type: Optional[TimerAction] + self.__children: List[psutil.Process] = [] self.__stdout_buffer = io.StringIO() self.__stderr_buffer = io.StringIO() @@ -279,7 +289,11 @@ def _shutdown_process(self, context, *, send_sigint): self.__shutdown_future.set_result(None) # Otherwise process is still running, start the shutdown procedures. - context.extend_locals({'process_name': self.process_details['name']}) + context.extend_locals( + { + 'process_name': self.process_details['name'], + 'process_pid': self.process_details['pid'], + }) actions_to_return = self.__get_shutdown_timer_actions() if send_sigint: actions_to_return.append(self.__get_sigint_event()) @@ -356,7 +370,7 @@ def __on_process_output( if buffer.closed: # buffer was probably closed by __flush_buffers on shutdown. Output without # buffering. - buffer.info( + logger.info( self.__output_format.format(line=to_write, this=self) ) else: @@ -440,23 +454,17 @@ def __get_shutdown_timer_actions(self) -> List[Action]: base_msg = \ "process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'" - def printer(context, msg, timeout_substitutions): - self.__logger.error(msg.format( - context.locals.process_name, - perform_substitutions(context, timeout_substitutions), - )) + def printer(context, msg): + self.__logger.error(msg.format(context.locals.process_name)) - sigterm_timeout = self.__sigterm_timeout - sigkill_timeout = [PythonExpression( - ('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')') - )] # Setup a timer to send us a SIGTERM if we don't shutdown quickly. + sigterm_timeout = self.__sigterm_timeout_value self.__sigterm_timer = TimerAction( period=sigterm_timeout, actions=[ OpaqueFunction( function=printer, - args=(base_msg.format('{}', '{}', 'SIGINT', 'SIGTERM'), sigterm_timeout) + args=(base_msg.format('{}', sigterm_timeout, 'SIGINT', 'SIGTERM'), ) ), EmitEvent(event=SignalProcess( signal_number=signal.SIGTERM, @@ -465,13 +473,14 @@ def printer(context, msg, timeout_substitutions): ], cancel_on_shutdown=False, ) + sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value # Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM. self.__sigkill_timer = TimerAction( period=sigkill_timeout, actions=[ OpaqueFunction( function=printer, - args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout) + args=(base_msg.format('{}', sigkill_timeout, 'SIGTERM', 'SIGKILL'), ) ), EmitEvent(event=SignalProcess( signal_number='SIGKILL', @@ -480,6 +489,13 @@ def printer(context, msg, timeout_substitutions): ], cancel_on_shutdown=False, ) + self.__children = [] + pid = self._subprocess_transport.get_pid() + if pid is not None: + try: + self.__children = psutil.Process(pid).children(recursive=True) + except psutil.NoSuchProcess: + pass return [ cast(Action, self.__sigterm_timer), cast(Action, self.__sigkill_timer), @@ -491,12 +507,15 @@ def __get_sigint_event(self): process_matcher=matches_action(self), )) - def __cleanup(self): - # Cancel any pending timers we started. + def __cleanup_timers(self): if self.__sigterm_timer is not None: self.__sigterm_timer.cancel() if self.__sigkill_timer is not None: self.__sigkill_timer.cancel() + + def __cleanup(self): + # Cancel any pending timers we started. + self.__cleanup_timers() # Close subprocess transport if any. if self._subprocess_transport is not None: self._subprocess_transport.close() @@ -529,6 +548,48 @@ def on_stdout_received(self, data: bytes) -> None: def on_stderr_received(self, data: bytes) -> None: self.__context.emit_event_sync(ProcessStderr(text=data, **self.__process_event_args)) + async def _signal_subprocesses(self, context): + to_signal = self.__children + signaled = [] + sig = signal.SIGINT + start_time = context.asyncio_loop.time() + sigterm_timeout = self.__sigterm_timeout_value + sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value + process_pid = self.process_details['pid'] + process_name = self.process_details['name'] + log_prefix_format = ( + 'subprocess[pid={}] of process[' + f'{process_name}, pid={process_pid}]: ') + next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout))) + while True: + for p in to_signal: + try: + p.send_signal(sig) + except psutil.NoSuchProcess: + continue + log_prefix = log_prefix_format.format(p.pid) + self.__logger.info( + f'{log_prefix}sending {sig.name} to subprocess directly.' + ) + signaled.append(p) + try: + sig, timeout = next(next_signals) + except StopIteration: + return + current_time = context.asyncio_loop.time() + while current_time < start_time + timeout: + await asyncio.sleep(min(0.5, start_time + timeout - current_time)) + for p in list(signaled): + if not p.is_running(): + log_prefix = log_prefix_format.format(p.pid) + self.__logger.info(f'{log_prefix}exited') + signaled.remove(p) + if not signaled: + return + current_time = context.asyncio_loop.time() + to_signal = signaled + signaled = [] + async def __execute_process(self, context: LaunchContext) -> None: process_event_args = self.__process_event_args if process_event_args is None: @@ -596,8 +657,13 @@ async def __execute_process(self, context: LaunchContext) -> None: timeout=self.__respawn_delay ) if not self.__shutdown_future.done(): + if self.__signal_lingering_subprocesses_value: + await self._signal_subprocesses(context) context.asyncio_loop.create_task(self.__execute_process(context)) return + self.__cleanup_timers() + if self.__signal_lingering_subprocesses_value: + await self._signal_subprocesses(context) self.__cleanup() def prepare(self, context: LaunchContext): @@ -678,6 +744,12 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti ] for event_handler in event_handlers: context.register_event_handler(event_handler) + self.__sigterm_timeout_value = perform_typed_substitution( + context, self.__sigterm_timeout, float) + self.__sigkill_timeout_value = perform_typed_substitution( + context, self.__sigkill_timeout, float) + self.__signal_lingering_subprocesses_value = perform_typed_substitution( + context, self.__signal_lingering_subprocesses, bool) try: self.__completed_future = create_future(context.asyncio_loop) diff --git a/launch/package.xml b/launch/package.xml index b46221a99..1182cb5fe 100644 --- a/launch/package.xml +++ b/launch/package.xml @@ -20,6 +20,7 @@ ament_index_python python3-importlib-metadata python3-lark-parser + python3-psutil python3-yaml ament_copyright diff --git a/launch/test/launch/test_execute_local.py b/launch/test/launch/test_execute_local.py index 24b5f26b6..14dc1cd00 100644 --- a/launch/test/launch/test_execute_local.py +++ b/launch/test/launch/test_execute_local.py @@ -17,8 +17,11 @@ """Tests for the ExecuteLocal Action.""" +import asyncio import os +import signal import sys +import time from launch import LaunchDescription from launch import LaunchService @@ -28,6 +31,8 @@ from launch.actions import TimerAction from launch.descriptions import Executable +import psutil + import pytest @@ -138,3 +143,39 @@ def test_execute_process_with_output_dictionary(): ls = LaunchService() ls.include_launch_description(ld) assert 0 == ls.run() + + +PYTHON_SCRIPT = """\ +import time + +while 1: + time.sleep(0.5) +""" + + +def test_kill_subprocesses(): + """Test launching a process with an environment variable.""" + executable = ExecuteLocal( + process_description=Executable( + cmd=['python3', '-c', f'"{PYTHON_SCRIPT}"'], + ), + shell=True, + output='screen', + ) + ld = LaunchDescription([executable]) + ls = LaunchService() + ls.include_launch_description(ld) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + run_async_task = loop.create_task(ls.run_async()) + + async def wait_for_subprocesses(): + start = time.time() + while len(psutil.Process().children(recursive=True)) != 2: + await asyncio.sleep(0.5) + assert time.time() < start + 5., 'timed out waiting for processes to setup' + wait_for_subprocesses_task = loop.create_task(wait_for_subprocesses()) + loop.run_until_complete(wait_for_subprocesses_task) + os.kill(executable.process_details['pid'], signal.SIGTERM) + loop.run_until_complete(run_async_task) + assert len(psutil.Process().children(recursive=True)) == 0