Skip to content

Commit

Permalink
[Core] Update subprocess_daemon.py to terminate all child processes f…
Browse files Browse the repository at this point in the history
…rom ray job when 'sky cancel' is ran (#3919)

* update subprocess_daemon

* nit

* remove skip mark from cancel smoke test

* remove start_new_session=True when running subprocess_daemon

* update docstring and comments for daemonize()

* update comment

* nit comment update and format

* move appending process

* add comment

* format
  • Loading branch information
landscapepainter authored Sep 11, 2024
1 parent 2d4059a commit 0de763c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 22 deletions.
5 changes: 4 additions & 1 deletion sky/skylet/log_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ def run_with_log(
str(proc.pid),
]

# We do not need to set `start_new_session=True` here, as the
# daemon script will detach itself from the parent process with
# fork to avoid being killed by ray job. See the reason we
# daemonize the process in `sky/skylet/subprocess_daemon.py`.
subprocess.Popen(
daemon_cmd,
start_new_session=True,
# Suppress output
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
Expand Down
62 changes: 47 additions & 15 deletions sky/skylet/subprocess_daemon.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
"""Sky subprocess daemon.
Wait for parent_pid to exit, then SIGTERM (or SIGKILL if needed) the child
processes of proc_pid.
"""

import argparse
import os
import sys
import time

import psutil

if __name__ == '__main__':

def daemonize():
"""Detaches the process from its parent process with double-forking.
This detachment is crucial in the context of SkyPilot and Ray job. When
'sky cancel' is executed, it uses Ray's stop job API to terminate the job.
Without daemonization, this subprocess_daemon process would be terminated
along with its parent process, ray::task, which is launched with Ray job.
Daemonization ensures this process survives the 'sky cancel' command,
allowing it to prevent orphaned processes of Ray job.
"""
# First fork: Creates a child process identical to the parent
if os.fork() > 0:
# Parent process exits, allowing the child to run independently
sys.exit()

# Continues to run from first forked child process.
# Detach from parent environment.
os.setsid()

# Second fork: Creates a grandchild process
if os.fork() > 0:
# First child exits, orphaning the grandchild
sys.exit()
# Continues execution in the grandchild process
# This process is now fully detached from the original parent and terminal


if __name__ == '__main__':
daemonize()
parser = argparse.ArgumentParser()
parser.add_argument('--parent-pid', type=int, required=True)
parser.add_argument('--proc-pid', type=int, required=True)
Expand All @@ -28,29 +55,34 @@
if process is None:
sys.exit()

children = []
if parent_process is not None:
# Wait for either parent or target process to exit.
while process.is_running() and parent_process.is_running():
try:
# process.children() must be called while the target process
# is alive, as it will return an empty list if the target
# process has already terminated.
tmp_children = process.children(recursive=True)
if tmp_children:
children = tmp_children
except psutil.NoSuchProcess:
pass
time.sleep(1)
children.append(process)

try:
children = process.children(recursive=True)
children.append(process)
except psutil.NoSuchProcess:
sys.exit()

for pid in children:
for child in children:
try:
pid.terminate()
child.terminate()
except psutil.NoSuchProcess:
pass
continue

# Wait 30s for the processes to exit gracefully.
time.sleep(30)

# SIGKILL if they're still running.
for pid in children:
for child in children:
try:
pid.kill()
child.kill()
except psutil.NoSuchProcess:
pass
continue
6 changes: 0 additions & 6 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2389,26 +2389,20 @@ def _get_cancel_task_with_cloud(name, cloud, timeout=15 * 60):

# ---------- Testing `sky cancel` ----------
@pytest.mark.aws
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_aws():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'aws')
run_one_test(test)


@pytest.mark.gcp
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_gcp():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'gcp')
run_one_test(test)


@pytest.mark.azure
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_azure():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'azure', timeout=30 * 60)
Expand Down

0 comments on commit 0de763c

Please sign in to comment.