Skip to content

Commit

Permalink
Add cgroupv2 support on Linux (#11)
Browse files Browse the repository at this point in the history
* Add cgroupv2 support for linux

* Use python instead of python.exe in examples

* Add sentinel to linux cgroup to kill cgroup if launcher crashes

* Add ubuntu 22.04 ci tests

* ubuntu pip in ci

* Ubuntu ci setuptools

* Ubuntu ci upgrade pip

* Change ownership of ubuntu ci cgroup for testing

* Fix cgroup sentinel program group

* Fix asyncio loop warning
  • Loading branch information
johnwason authored Jun 17, 2023
1 parent efeb8e0 commit 8703aa0
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 20 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@ on:

jobs:
ubuntu_test:
runs-on: ubuntu-20.04
strategy:
matrix:
runs-on: [ubuntu-20.04, ubuntu-22.04]
runs-on: ${{ matrix.runs-on }}
steps:
- uses: actions/checkout@v3
- name: apt
run: |
sudo apt-get update
sudo apt-get install -y python3 python3-pip curl
sudo apt-get install -y python3 python3-pip curl python3-setuptools python3-wheel python3-pytest
- name: pip
run: |
pip3 install .[test]
python3 -m pip install --upgrade pip
python3 -m pip install .[test]
- name: allow cgroup
run : |
sudo chown -R $(id -un):$(id -gn) /sys/fs/cgroup/system.slice/runner-provisioner.service || true
- name: test
run: |
python3 -m pytest -s
Expand Down
242 changes: 229 additions & 13 deletions drekar_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import time
import signal
import subprocess
import uuid
import jinja2
import drekar_launch_process

Expand Down Expand Up @@ -72,7 +73,7 @@ async def run(self):
self.parent.process_state_changed(s.name,ProcessState.START_PENDING)
stderr_log.write(f"Starting process {s.name}...\n")
python_exe = sys.executable
self._process = await create_subprocess_exec(s.program, s.args, s.environment, s.cwd)
self._process = await create_subprocess_exec(s.program, s.args, s.environment, s.cwd, self.parent.cgroup)
# print(f"process pid: {self._process.pid}")
stderr_log.write(f"Process {s.name} started\n\n")
self.parent.process_state_changed(s.name,ProcessState.RUNNING)
Expand Down Expand Up @@ -164,6 +165,12 @@ def __init__(self, name, task_launches, exit_event, log_dir, screen, loop):
self._subprocesses = dict()
self._lock = threading.RLock()
self.exit_event = exit_event
self.cgroup = None

if sys.platform == "linux":
self.cgroup = _linux_cgroupv2_launch_scope()
self.cgroup.create_launcher_cgroup()


def _do_start(self,s):
p = DrekarProcess(self, s, self.log_dir, self.loop)
Expand Down Expand Up @@ -198,7 +205,7 @@ def process_state_changed(self, process_name, state):
def check_deps_status(self, deps):
return True

def close(self):
def stop_all(self):
with self._lock:
if self._closed:
return
Expand All @@ -211,7 +218,7 @@ def close(self):
traceback.print_exc()
pass

async def wait_all_closed(self):
async def wait_all_stopped(self):
try:
t1 = time.time()
t_last_sent_close = 0
Expand Down Expand Up @@ -263,9 +270,13 @@ def get_exit_status(self):
if p.exit_status != 0:
exit_status = p.exit_status
return exit_status

def close(self):
if sys.platform == "linux":
self.cgroup.close()


async def create_subprocess_exec(process, args, env, cwd):
async def create_subprocess_exec(process, args, env, cwd, launcher_cgroup=None):
if sys.platform == "win32":
job_handle = subprocess_impl_win32.win32_create_job_object()

Expand All @@ -284,13 +295,17 @@ async def create_subprocess_exec(process, args, env, cwd):
process = await asyncio.create_subprocess_exec(process,*args, \
stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,\
env=env, cwd=cwd, close_fds=True, start_new_session=True )
if sys.platform == "linux" and launcher_cgroup is not None:
task_cgroup = launcher_cgroup.create_task_cgroup(process.pid)
return DrekarSubprocessImpl(process, task_cgroup = task_cgroup)
return DrekarSubprocessImpl(process)


class DrekarSubprocessImpl:
def __init__(self, asyncio_subprocess, job_handle = None):
def __init__(self, asyncio_subprocess, job_handle = None, task_cgroup = None):
self._process = asyncio_subprocess
self._job_handle = job_handle
self._task_cgroup = task_cgroup
# TODO: Linux

@property
Expand Down Expand Up @@ -330,11 +345,11 @@ def send_term(self, attempt_count):
def close(self):
if sys.platform == "win32":
subprocess_impl_win32.win32_close_job_object(self._job_handle)
elif sys.platform == "linux" and self._task_cgroup:
self._task_cgroup.close()
else:
try:
with suppress(Exception):
self._process.kill()
except Exception:
pass

def get_exit_status(self):
return self._process.returncode
Expand Down Expand Up @@ -540,6 +555,196 @@ def _win32_send_ctrl_c_event(pid):
return
ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0,pid)

if sys.platform == "linux":

class _linux_cgroupv2_launch_scope:

@staticmethod
def cgroupv2_supported():
if Path("/sys/fs/cgroup/cgroup.controllers").exists():
return True
return False

def __init__(self):
self._pid = os.getpid()
self.cgroup_parent_path = None
self.cgroup_path = None

if not self.cgroupv2_supported():
return

self.cgroup_parent_path = _linux_cgroupv2_launch_scope.read_proc_cgroup(self._pid)

self.sentinel_process = None

@staticmethod
def read_proc_cgroup(pid):
cgroup_path = None
try:
# read cgroup path from /proc/{pid}/cgroup
with open(f"/proc/{pid}/cgroup","r") as f:
for line in f:
if line.startswith("0::/"):
path1 = line.split(":")[2]
if path1.strip() == "/":
# Not currently assigned to a cgroup
break
cgroup_path = Path("/sys/fs/cgroup") / Path(path1.strip().strip("/"))
break
except:
# TODO: log error
traceback.print_exc()
pass

return cgroup_path

def create_launcher_cgroup(self):
if not self.cgroupv2_supported():
return

# create a new cgroup scope for this launcher with the name drekar-launch-{random_uuid}.scope
try:
cgroup_name = f"drekar-launch-{uuid.uuid4().hex}.scope"
cgroup_path = self.cgroup_parent_path / cgroup_name
# TODO: log
# print(f"Creating cgroup {cgroup_path}")
cgroup_path.mkdir()
self.cgroup_path = cgroup_path

enable_sentinel_environ = os.environ.get("DREKAR_LAUNCH_ENABLE_SENTINEL", "1").strip().lower()
if enable_sentinel_environ == "1" or enable_sentinel_environ == "true":
self.start_sentinel()
except:
traceback.print_exc()
pass

def create_task_cgroup(self, task_pid):
task_name = f"task-{task_pid}"
if self.cgroup_path is None:
return _linux_cgroupv2_task_scope(None, task_name, task_pid)

task_cgroup = _linux_cgroupv2_task_scope(self.cgroup_path, task_name, task_pid)
task_cgroup.create_task_cgroup()
return task_cgroup

@staticmethod
def close_cgroup_path(cgroup_path):
try:
#Iterate through all subdirectories and recursively close them
for subpath in cgroup_path.iterdir():
if subpath.is_dir():
_linux_cgroupv2_launch_scope.close_cgroup_path(subpath)
cgroup_kill_path = cgroup_path / "cgroup.kill"
if cgroup_kill_path.exists():
with open(cgroup_kill_path,"w") as f:
f.write("1")
pass
time.sleep(0.1)
cgroup_path.rmdir()
except:
traceback.print_exc()
pass

def close(self):
if self.cgroup_path is not None:
# Iterate through all subdirectories depth first

self.close_cgroup_path(self.cgroup_path)

self.cgroup_path = None

if self.sentinel_process is not None:
self.stop_sentinel()

def start_sentinel(self):
if self.sentinel_process is not None:
return

sentinel_environ = os.environ.copy()
sentinel_environ["DREKAR_LAUNCH_ENABLE_SENTINEL"] = "0"
self.sentinel_process=subprocess.Popen([sys.executable, "-m", "drekar_launch", "--sentinel", str(os.getpid()), str(self.cgroup_path)],
env=sentinel_environ, close_fds=True, start_new_session=True)

def stop_sentinel(self):
if self.sentinel_process is None:
return
try:
os.killpg(self.sentinel_process.pid, signal.SIGTERM)
except:
pass

def __enter__(self):
self.create_launcher_cgroup()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


class _linux_cgroupv2_task_scope:
def __init__(self, cgroup_path, task_name, task_pid):
self.cgroup_path = cgroup_path
self.task_name = task_name
self.task_pid = task_pid
self.task_cgroup_path = None

def create_task_cgroup(self):
if self.cgroup_path is None:
return

self.task_cgroup_path = self.cgroup_path / f"{self.task_name}.scope"
self.task_cgroup_path.mkdir()
# Move task to new cgroup
with open(self.task_cgroup_path / "cgroup.procs","w") as f:
f.write(str(self.task_pid))

def close(self):
if self.task_cgroup_path is not None:
task_cgroup_kill_path = self.task_cgroup_path / "cgroup.kill"
if task_cgroup_kill_path.exists():
with open(task_cgroup_kill_path,"w") as f:
f.write("1")
pass
self.task_cgroup_path.rmdir()
self.task_cgroup_path = None

def __enter__(self):
self.create_task_cgroup()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

def _sentinel_main():
if not _linux_cgroupv2_launch_scope.cgroupv2_supported():
return
assert len(sys.argv) >= 4
assert sys.argv[1] == "--sentinel"
parent_pid = int(sys.argv[2])
parent_cgroup_path = Path(sys.argv[3])
parent_pid_proc_path = Path(f"/proc/{parent_pid}")
evt = threading.Event()
drekar_launch_process.wait_exit_callback(lambda: evt.set())
while True:
evt.wait(15)
if evt.is_set():
break
if not parent_cgroup_path.exists():
return

# Check if parent process is still alive
if not parent_pid_proc_path.exists():
break

# Parent process is dead, close cgroup
if not parent_cgroup_path.exists():
return
time.sleep(10)
if not parent_cgroup_path.exists():
return
_linux_cgroupv2_launch_scope.close_cgroup_path(Path(parent_cgroup_path))


def parse_task_launch_from_yaml(yaml_dict, cwd):
# parse yaml_dict into DrekarTask tuple
name = yaml_dict["name"]
Expand Down Expand Up @@ -712,6 +917,13 @@ def parse_task_launches_from_jinja2_config(config, config_fname, cwd, extra_proc
return name, task_launches

def main():

# Run the sentinel if requsted on linux
if sys.platform == "linux" and "--sentinel" in sys.argv:
_sentinel_main()
return

core = None
try:
parser = argparse.ArgumentParser("PyRI Core Launcher")
parser.add_argument("--config", type=str, default=None, help="Configuration file")
Expand Down Expand Up @@ -746,8 +958,10 @@ def main():

timestamp = datetime.now().strftime("-%Y-%m-%d--%H-%M-%S")
log_dir = Path(appdirs.user_log_dir(appname="drekar-launch")).joinpath(name).joinpath(name + timestamp)
log_dir.mkdir(parents=True, exist_ok=True)
loop = asyncio.get_event_loop()
log_dir.mkdir(parents=True, exist_ok=True)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

exit_event = asyncio.Event()
core = DrekarCore(parser_results.name, task_launch, exit_event, log_dir, not parser_results.quiet, loop)
Expand All @@ -758,13 +972,12 @@ def main():
loop.call_soon(lambda: core.start_all())
def ctrl_c_pressed():
loop.call_soon_threadsafe(lambda: exit_event.set())
loop.call_soon_threadsafe(lambda: core.close())
drekar_launch_process.wait_exit_callback(ctrl_c_pressed)
print("Press Ctrl-C to exit")
loop.run_until_complete(exit_event.wait())
print("Exit received, closing")
core.close()
loop.run_until_complete(core.wait_all_closed())
core.stop_all()
loop.run_until_complete(core.wait_all_stopped())
#pending = asyncio.all_tasks(loop)
# pending = asyncio.all_tasks()
#loop.run_until_complete(asyncio.gather(*pending))
Expand All @@ -778,6 +991,9 @@ def ctrl_c_pressed():
except Exception:
traceback.print_exc()
raise
finally:
if core is not None:
core.close()



Expand Down
4 changes: 2 additions & 2 deletions examples/http_client_server/drekar-launch.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
name: example_http_servers
tasks:
- name: http_server
program: python.exe
program: python
args:
- "example_http_server.py"
- 8100
- "Hello from server 1"
cwd: .
- name: http_client
program: curl.exe
program: curl
args: http://localhost:8100
start-delay: 2
quit-on-terminate: true
Loading

0 comments on commit 8703aa0

Please sign in to comment.