diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b9495a2..f36e8fd 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,16 +10,23 @@ on: jobs: ubuntu_test: - runs-on: ubuntu-20.04 + strategy: + matrix: + runs-on: [ubuntu-20.04, ubuntu-22.04] + runs-on: ${{ matrix.runs-on }} steps: - uses: actions/checkout@v3 - name: apt run: | sudo apt-get update - sudo apt-get install -y python3 python3-pip curl + sudo apt-get install -y python3 python3-pip curl python3-setuptools python3-wheel python3-pytest - name: pip run: | - pip3 install .[test] + python3 -m pip install --upgrade pip + python3 -m pip install .[test] + - name: allow cgroup + run : | + sudo chown -R $(id -un):$(id -gn) /sys/fs/cgroup/system.slice/runner-provisioner.service || true - name: test run: | python3 -m pytest -s diff --git a/drekar_launch.py b/drekar_launch.py index 8b45633..88abaaf 100644 --- a/drekar_launch.py +++ b/drekar_launch.py @@ -18,6 +18,7 @@ import time import signal import subprocess +import uuid import jinja2 import drekar_launch_process @@ -72,7 +73,7 @@ async def run(self): self.parent.process_state_changed(s.name,ProcessState.START_PENDING) stderr_log.write(f"Starting process {s.name}...\n") python_exe = sys.executable - self._process = await create_subprocess_exec(s.program, s.args, s.environment, s.cwd) + self._process = await create_subprocess_exec(s.program, s.args, s.environment, s.cwd, self.parent.cgroup) # print(f"process pid: {self._process.pid}") stderr_log.write(f"Process {s.name} started\n\n") self.parent.process_state_changed(s.name,ProcessState.RUNNING) @@ -164,6 +165,12 @@ def __init__(self, name, task_launches, exit_event, log_dir, screen, loop): self._subprocesses = dict() self._lock = threading.RLock() self.exit_event = exit_event + self.cgroup = None + + if sys.platform == "linux": + self.cgroup = _linux_cgroupv2_launch_scope() + self.cgroup.create_launcher_cgroup() + def _do_start(self,s): p = DrekarProcess(self, s, self.log_dir, self.loop) @@ -198,7 +205,7 @@ def process_state_changed(self, process_name, state): def check_deps_status(self, deps): return True - def close(self): + def stop_all(self): with self._lock: if self._closed: return @@ -211,7 +218,7 @@ def close(self): traceback.print_exc() pass - async def wait_all_closed(self): + async def wait_all_stopped(self): try: t1 = time.time() t_last_sent_close = 0 @@ -263,9 +270,13 @@ def get_exit_status(self): if p.exit_status != 0: exit_status = p.exit_status return exit_status + + def close(self): + if sys.platform == "linux": + self.cgroup.close() -async def create_subprocess_exec(process, args, env, cwd): +async def create_subprocess_exec(process, args, env, cwd, launcher_cgroup=None): if sys.platform == "win32": job_handle = subprocess_impl_win32.win32_create_job_object() @@ -284,13 +295,17 @@ async def create_subprocess_exec(process, args, env, cwd): process = await asyncio.create_subprocess_exec(process,*args, \ stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,\ env=env, cwd=cwd, close_fds=True, start_new_session=True ) + if sys.platform == "linux" and launcher_cgroup is not None: + task_cgroup = launcher_cgroup.create_task_cgroup(process.pid) + return DrekarSubprocessImpl(process, task_cgroup = task_cgroup) return DrekarSubprocessImpl(process) class DrekarSubprocessImpl: - def __init__(self, asyncio_subprocess, job_handle = None): + def __init__(self, asyncio_subprocess, job_handle = None, task_cgroup = None): self._process = asyncio_subprocess self._job_handle = job_handle + self._task_cgroup = task_cgroup # TODO: Linux @property @@ -330,11 +345,11 @@ def send_term(self, attempt_count): def close(self): if sys.platform == "win32": subprocess_impl_win32.win32_close_job_object(self._job_handle) + elif sys.platform == "linux" and self._task_cgroup: + self._task_cgroup.close() else: - try: + with suppress(Exception): self._process.kill() - except Exception: - pass def get_exit_status(self): return self._process.returncode @@ -540,6 +555,196 @@ def _win32_send_ctrl_c_event(pid): return ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0,pid) +if sys.platform == "linux": + + class _linux_cgroupv2_launch_scope: + + @staticmethod + def cgroupv2_supported(): + if Path("/sys/fs/cgroup/cgroup.controllers").exists(): + return True + return False + + def __init__(self): + self._pid = os.getpid() + self.cgroup_parent_path = None + self.cgroup_path = None + + if not self.cgroupv2_supported(): + return + + self.cgroup_parent_path = _linux_cgroupv2_launch_scope.read_proc_cgroup(self._pid) + + self.sentinel_process = None + + @staticmethod + def read_proc_cgroup(pid): + cgroup_path = None + try: + # read cgroup path from /proc/{pid}/cgroup + with open(f"/proc/{pid}/cgroup","r") as f: + for line in f: + if line.startswith("0::/"): + path1 = line.split(":")[2] + if path1.strip() == "/": + # Not currently assigned to a cgroup + break + cgroup_path = Path("/sys/fs/cgroup") / Path(path1.strip().strip("/")) + break + except: + # TODO: log error + traceback.print_exc() + pass + + return cgroup_path + + def create_launcher_cgroup(self): + if not self.cgroupv2_supported(): + return + + # create a new cgroup scope for this launcher with the name drekar-launch-{random_uuid}.scope + try: + cgroup_name = f"drekar-launch-{uuid.uuid4().hex}.scope" + cgroup_path = self.cgroup_parent_path / cgroup_name + # TODO: log + # print(f"Creating cgroup {cgroup_path}") + cgroup_path.mkdir() + self.cgroup_path = cgroup_path + + enable_sentinel_environ = os.environ.get("DREKAR_LAUNCH_ENABLE_SENTINEL", "1").strip().lower() + if enable_sentinel_environ == "1" or enable_sentinel_environ == "true": + self.start_sentinel() + except: + traceback.print_exc() + pass + + def create_task_cgroup(self, task_pid): + task_name = f"task-{task_pid}" + if self.cgroup_path is None: + return _linux_cgroupv2_task_scope(None, task_name, task_pid) + + task_cgroup = _linux_cgroupv2_task_scope(self.cgroup_path, task_name, task_pid) + task_cgroup.create_task_cgroup() + return task_cgroup + + @staticmethod + def close_cgroup_path(cgroup_path): + try: + #Iterate through all subdirectories and recursively close them + for subpath in cgroup_path.iterdir(): + if subpath.is_dir(): + _linux_cgroupv2_launch_scope.close_cgroup_path(subpath) + cgroup_kill_path = cgroup_path / "cgroup.kill" + if cgroup_kill_path.exists(): + with open(cgroup_kill_path,"w") as f: + f.write("1") + pass + time.sleep(0.1) + cgroup_path.rmdir() + except: + traceback.print_exc() + pass + + def close(self): + if self.cgroup_path is not None: + # Iterate through all subdirectories depth first + + self.close_cgroup_path(self.cgroup_path) + + self.cgroup_path = None + + if self.sentinel_process is not None: + self.stop_sentinel() + + def start_sentinel(self): + if self.sentinel_process is not None: + return + + sentinel_environ = os.environ.copy() + sentinel_environ["DREKAR_LAUNCH_ENABLE_SENTINEL"] = "0" + self.sentinel_process=subprocess.Popen([sys.executable, "-m", "drekar_launch", "--sentinel", str(os.getpid()), str(self.cgroup_path)], + env=sentinel_environ, close_fds=True, start_new_session=True) + + def stop_sentinel(self): + if self.sentinel_process is None: + return + try: + os.killpg(self.sentinel_process.pid, signal.SIGTERM) + except: + pass + + def __enter__(self): + self.create_launcher_cgroup() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + + class _linux_cgroupv2_task_scope: + def __init__(self, cgroup_path, task_name, task_pid): + self.cgroup_path = cgroup_path + self.task_name = task_name + self.task_pid = task_pid + self.task_cgroup_path = None + + def create_task_cgroup(self): + if self.cgroup_path is None: + return + + self.task_cgroup_path = self.cgroup_path / f"{self.task_name}.scope" + self.task_cgroup_path.mkdir() + # Move task to new cgroup + with open(self.task_cgroup_path / "cgroup.procs","w") as f: + f.write(str(self.task_pid)) + + def close(self): + if self.task_cgroup_path is not None: + task_cgroup_kill_path = self.task_cgroup_path / "cgroup.kill" + if task_cgroup_kill_path.exists(): + with open(task_cgroup_kill_path,"w") as f: + f.write("1") + pass + self.task_cgroup_path.rmdir() + self.task_cgroup_path = None + + def __enter__(self): + self.create_task_cgroup() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def _sentinel_main(): + if not _linux_cgroupv2_launch_scope.cgroupv2_supported(): + return + assert len(sys.argv) >= 4 + assert sys.argv[1] == "--sentinel" + parent_pid = int(sys.argv[2]) + parent_cgroup_path = Path(sys.argv[3]) + parent_pid_proc_path = Path(f"/proc/{parent_pid}") + evt = threading.Event() + drekar_launch_process.wait_exit_callback(lambda: evt.set()) + while True: + evt.wait(15) + if evt.is_set(): + break + if not parent_cgroup_path.exists(): + return + + # Check if parent process is still alive + if not parent_pid_proc_path.exists(): + break + + # Parent process is dead, close cgroup + if not parent_cgroup_path.exists(): + return + time.sleep(10) + if not parent_cgroup_path.exists(): + return + _linux_cgroupv2_launch_scope.close_cgroup_path(Path(parent_cgroup_path)) + + def parse_task_launch_from_yaml(yaml_dict, cwd): # parse yaml_dict into DrekarTask tuple name = yaml_dict["name"] @@ -712,6 +917,13 @@ def parse_task_launches_from_jinja2_config(config, config_fname, cwd, extra_proc return name, task_launches def main(): + + # Run the sentinel if requsted on linux + if sys.platform == "linux" and "--sentinel" in sys.argv: + _sentinel_main() + return + + core = None try: parser = argparse.ArgumentParser("PyRI Core Launcher") parser.add_argument("--config", type=str, default=None, help="Configuration file") @@ -746,8 +958,10 @@ def main(): timestamp = datetime.now().strftime("-%Y-%m-%d--%H-%M-%S") log_dir = Path(appdirs.user_log_dir(appname="drekar-launch")).joinpath(name).joinpath(name + timestamp) - log_dir.mkdir(parents=True, exist_ok=True) - loop = asyncio.get_event_loop() + log_dir.mkdir(parents=True, exist_ok=True) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) exit_event = asyncio.Event() core = DrekarCore(parser_results.name, task_launch, exit_event, log_dir, not parser_results.quiet, loop) @@ -758,13 +972,12 @@ def main(): loop.call_soon(lambda: core.start_all()) def ctrl_c_pressed(): loop.call_soon_threadsafe(lambda: exit_event.set()) - loop.call_soon_threadsafe(lambda: core.close()) drekar_launch_process.wait_exit_callback(ctrl_c_pressed) print("Press Ctrl-C to exit") loop.run_until_complete(exit_event.wait()) print("Exit received, closing") - core.close() - loop.run_until_complete(core.wait_all_closed()) + core.stop_all() + loop.run_until_complete(core.wait_all_stopped()) #pending = asyncio.all_tasks(loop) # pending = asyncio.all_tasks() #loop.run_until_complete(asyncio.gather(*pending)) @@ -778,6 +991,9 @@ def ctrl_c_pressed(): except Exception: traceback.print_exc() raise + finally: + if core is not None: + core.close() diff --git a/examples/http_client_server/drekar-launch.yaml b/examples/http_client_server/drekar-launch.yaml index 6a36269..922fe5e 100644 --- a/examples/http_client_server/drekar-launch.yaml +++ b/examples/http_client_server/drekar-launch.yaml @@ -1,14 +1,14 @@ name: example_http_servers tasks: - name: http_server - program: python.exe + program: python args: - "example_http_server.py" - 8100 - "Hello from server 1" cwd: . - name: http_client - program: curl.exe + program: curl args: http://localhost:8100 start-delay: 2 quit-on-terminate: true \ No newline at end of file diff --git a/examples/http_servers/drekar-launch.yaml b/examples/http_servers/drekar-launch.yaml index 182312c..848c64e 100644 --- a/examples/http_servers/drekar-launch.yaml +++ b/examples/http_servers/drekar-launch.yaml @@ -1,11 +1,11 @@ name: example_http_servers tasks: - name: http_server_1 - program: python.exe + program: python args: example_http_server.py 8100 Server1 cwd: . - name: http_server_2 - program: python.exe + program: python args: - example_http_server.py - 8101