From a264e581fc57fe5d822d8d1b4b21ce9d0357ff94 Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Wed, 6 Sep 2023 13:12:20 +0200 Subject: [PATCH] PoC: agent spawner Relates to #123 --- .github/workflows/python-diff-lint.yml | 1 + config/agent-spawner/config.yaml | 31 ++++ resalloc_agent_spawner/__init__.py | 0 resalloc_agent_spawner/dispatcher.py | 247 +++++++++++++++++++++++++ resalloc_agent_spawner/helpers.py | 113 +++++++++++ resalloc_agent_spawner/worker.py | 62 +++++++ rpm/Makefile | 2 +- rpm/resalloc-agent-spawner.service | 15 ++ rpm/resalloc.spec.tpl | 58 +++++- setup.py | 7 + 10 files changed, 527 insertions(+), 9 deletions(-) create mode 100644 config/agent-spawner/config.yaml create mode 100644 resalloc_agent_spawner/__init__.py create mode 100644 resalloc_agent_spawner/dispatcher.py create mode 100644 resalloc_agent_spawner/helpers.py create mode 100644 resalloc_agent_spawner/worker.py create mode 100644 rpm/resalloc-agent-spawner.service diff --git a/.github/workflows/python-diff-lint.yml b/.github/workflows/python-diff-lint.yml index 9ee7013..c89e00a 100644 --- a/.github/workflows/python-diff-lint.yml +++ b/.github/workflows/python-diff-lint.yml @@ -24,6 +24,7 @@ jobs: uses: fedora-copr/vcs-diff-lint-action@v1 with: linter_tags: pylint + install_rpm_packages: python3-copr-common - name: Upload artifact with detected defects in SARIF format uses: actions/upload-artifact@v3 diff --git a/config/agent-spawner/config.yaml b/config/agent-spawner/config.yaml new file mode 100644 index 0000000..667cb1e --- /dev/null +++ b/config/agent-spawner/config.yaml @@ -0,0 +1,31 @@ +# Configuration for resalloc-agent-spawner.service. YAML format. +# Specify groups of aggents within the `agent_groups` section that agent spawner +# should take care of. + +#agent_groups: +# workers: +# # These are executed in the background async, may take some time. +# cmd_prepare: /bin/true +# cmd_terminate: /bin/true +# +# # keep these super fast to avoid overall system halt! +# cmd_converge_to: echo 1 +# cmd_check_finished: /bin/true +# cmd_try_release: /bin/false +# +# # List of resalloc tags to request in tickets +# tags: +# - kobo_worker + +# Note that we use request_survives_server_restart resalloc client option, +# so the resalloc server must be running to avoid system hang! +#resalloc_server: "http://localhost:49100" + +# Where to log events. +#logfile: /tmp/agent-spawner.log + +# How to connect to redis-db. By default connects to 127.0.0.1:6379. +#redis_db: null +#redis_host: null +#redis_port": null +#redis_password": null diff --git a/resalloc_agent_spawner/__init__.py b/resalloc_agent_spawner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resalloc_agent_spawner/dispatcher.py b/resalloc_agent_spawner/dispatcher.py new file mode 100644 index 0000000..c10864d --- /dev/null +++ b/resalloc_agent_spawner/dispatcher.py @@ -0,0 +1,247 @@ +""" +Agent spawner daemon. Part of the resalloc project. +""" + +import logging +from copr_common.dispatcher import Dispatcher +from copr_common.worker_manager import WorkerManager, QueueTask +from copr_common.log import setup_script_logger +from copr_common.redis_helpers import get_redis_connection + +from resalloc.client import ( + Connection as ResallocConnection, +) + +from resalloc_agent_spawner.helpers import ( + get_config, + CmdCallerMixin, + rk2tid, + tid2rk, +) + +class Task(QueueTask): + """ priority queue task handed ower to AgentWorkerManager """ + def __init__(self, task_id): + self.task_id = task_id + @property + def id(self): + return "agent_ticket_" + str(self.task_id) + +class AgentWorkerManager(WorkerManager): + """ + Start async worker in the background + """ + + worker_prefix = 'agent_backround_handler' + + def start_task(self, worker_id, task): + self.start_daemon_on_background([ + "resalloc-agent-worker", + '--daemon', + '--ticket-id', str(task.task_id), + '--worker-id', worker_id, + ]) + + def finish_task(self, worker_id, task_info): + return True + + +class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin): + """ + Track list of opened tickets from Resalloc (representing Agent-like + resources). Each Agent is in the following states: + + 1. NEW -> ticket is taken, it is being allocated in the background + 2. PREPARING -> script is being run in the background to prepare the agent. + 3. WORKING -> up and running agent + 4. TERMINATING -> script is being run in the background to cleanup after the + agent. + 5. ENDED -> ready for ticket close + + PREPARING and TERMINATING states are never changed by Dispatcher, but by + BackgroundWorker. If one background worker fails to switch the state, new + one is started instead of it. + """ + + task_type = 'agent-manager' + worker_manager_class = AgentWorkerManager + + def __init__(self, opts): + super().__init__(opts) + self.sleeptime = 10 + self.log = logging.getLogger() + setup_script_logger(self.log, self.opts["logfile"]) + self.redis = get_redis_connection(opts) + self.resalloc = ResallocConnection( + opts["resalloc_server"], + request_survives_server_restart=True, + ) + + def get_ticket_data(self, ticket_id): + """ load resalloc ticket ID data """ + ticket = self.resalloc.getTicket(ticket_id) + if not ticket.collect(): + return None + return ticket.output + + def try_to_stop(self, group_id, to_stop): + """ + Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all + the resources may be closed at this time. + """ + stopped = 0 + for ticket_id in self.get_tickets("WORKING"): + if stopped >= to_stop: + break + + data = self.get_ticket_data(ticket_id) + if not self.cmd_try_release(group_id, data): + self.log.debug("Can't release %s", ticket_id) + continue + self.log.info("agent %s switches to TERMINATING (early)", ticket_id) + self.set_ticket_attribute(ticket_id, "state", "TERMINATING") + stopped += 1 + + def get_tickets(self, states=None): + """ + Get the list of ticket IDs currently stored in redis, + optionally filtered by list of states. + """ + keys = self.redis.keys(tid2rk("*")) + output = [] + for key in keys: + if states and self.redis.hget(key, 'state') not in states: + continue + output += [rk2tid(key)] + return output + + def set_ticket_attribute(self, ticket_id, key, value): + """ + For the ticket_id set redis hash key to value. + """ + hash_id = tid2rk(ticket_id) + self.redis.hset(hash_id, str(key), str(value)) + + def get_ticket_attributes(self, ticket_id, keys): + """ + Return list of redis hash values for the given ticket_id and + list of keys. + """ + hash_id = tid2rk(ticket_id) + keys = [str(s) for s in keys] + return self.redis.hmget(hash_id, keys) + + def agent_counts(self): + """ + Get 'group_id → count' mapping + """ + keys = self.redis.keys(tid2rk("*")) + output = {} + for key in keys: + group_id = self.redis.hget(key, "group_id") + output.setdefault(group_id, 0) + output[group_id] += 1 + return output + + def agent_drop(self, ticket_id): + """ drop agent per ticket id, cleanup everything """ + ticket = self.resalloc.getTicket(ticket_id) + ticket.close() + self.redis.delete(tid2rk(ticket_id)) + + def clean_finished_workers(self): + """ + WORKING — TERMINATING; Check for finalized agents, plan their cleanup + """ + for ticket_id in self.get_tickets("WORKING"): + (group_id,) = self.get_ticket_attributes(ticket_id, ["group_id"]) + data = self.get_ticket_data(ticket_id) + if not self.cmd_is_finished(group_id, data): + continue + self.log.info("Agent %s finished task, normal TERMINATING", ticket_id) + self.set_ticket_attribute(ticket_id, "state", "TERMINATING") + + def start_preparing(self): + """ + When the ticket is resolved in Resalloc, we have a working + resource that we can prepare (in the background). Switch the state. + """ + for ticket_id in self.get_tickets("NEW"): + ticket = self.resalloc.getTicket(ticket_id) + if not ticket.collect(): + continue + self.set_ticket_attribute(ticket_id, "data", ticket.output) + self.set_ticket_attribute(ticket_id, "state", "PREPARING") + + def detect_failed_tickets(self): + """ + Check for failed tickets, close them. This rarely happens if everything + is working smoothly. + + TODO: no need to do this in every single loop? + TODO: call some hook? + """ + + def converge(self): + """ + Go through all agent groups; check the ideal number of memgers and start + new or try to terminate. + """ + for group_id, current in self.agent_counts(): + ideal = self.cmd_converge_to(group_id) + if ideal is None: + # unable to call the hook + continue + + if current < ideal: + todo = ideal - current + self.log.info("We want %s but have %s agents, starting %s new", + ideal, current, todo) + for _ in range(todo): + # spawn as "NEW" + tags = self.opts["agent_groups"][group_id]["tags"] + ticket = self.resalloc.newTicket(tags) + self.log.debug("Requesting new agent via ticket %s", + ticket.id) + self.set_ticket_attribute(ticket.id, "group_id", "NEW") + self.set_ticket_attribute(ticket.id, "state", "NEW") + + elif current > ideal: + # WORKING — TERMINATING (even agents that did not start any task) + self.try_to_stop(group_id, current - ideal) + + def get_frontend_tasks(self): + """ + NEW → PREPARING → WORKING — TERMINATING — ENDED + """ + + # Cleanup after failed tickets + self.detect_failed_tickets() + + # Drop successfully terminated agents + for ticket_id in self.get_tickets("ENDED"): + self.agent_drop(ticket_id) + + # WORKING — TERMINATING (normal) + self.clean_finished_workers() + + # NEW → PREPARING + self.start_preparing() + + # spawn NEW or try to switch WORKING → TERMINATING + self.converge() + + # Return the priority queue to process by background workers. + background_tasks = [] + for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]): + background_tasks += [Task(ticket_id)] + return background_tasks + + +def main(): + """ realloc-agent-spawner entrypoint """ + AgentSpawnerDispatcher(get_config()).run() + + +if __name__ == "__main__": + main() diff --git a/resalloc_agent_spawner/helpers.py b/resalloc_agent_spawner/helpers.py new file mode 100644 index 0000000..5c88e02 --- /dev/null +++ b/resalloc_agent_spawner/helpers.py @@ -0,0 +1,113 @@ +""" +resalloc agent spawner helpers +""" + +import os +import subprocess + +from resalloc.helpers import load_config_file + + +def rk2tid(key): + """ Redis agent key to Resalloc Ticket ID """ + return int(key.split(":")[1]) + + +def tid2rk(ticket): + """ Resalloc Ticket ID to Redis Agent key """ + return f"agent:{ticket}" + + +def get_config(): + """ + Load the agent-spawner YAML configuration + """ + conf_dir = os.environ.get("CONFIG_DIR", "/etc/resalloc-agent-spawner") + config_file = os.path.join(conf_dir, "config.yaml") + config = load_config_file(config_file) + config.setdefault("agent_groups", {}) + config.setdefault("resalloc_server", "http://localhost:49100") + config.setdefault("logfile", "/tmp/agent-spawner.log") + ag = config["agent_groups"] + for group_id in ag.keys(): + ag[group_id].setdefault("cmd_converge_to", "/usr/bin/echo 1") + ag[group_id].setdefault("cmd_check_finished", "/bin/false") + ag[group_id].setdefault("cmd_prepare", "/bin/true") + ag[group_id].setdefault("cmd_terminate", "/bin/true") + ag[group_id].setdefault("cmd_try_release", "/bin/false") + ag[group_id].setdefault("tags", ["please-specify-some-tags"]) + return config + + +class CmdCallerMixin: + """ + Wrapper around calling command hooks. + """ + + def _cmd(self, group_id, cmd_id): + return self.opts["agent_groups"][group_id][cmd_id] + + def cmd_converge_to(self, group_id): + """ + Query the outside world for the ideal number of agents in given group. + """ + result = subprocess.run( + self._cmd(group_id, "cmd_converge_to"), + stdout=subprocess.PIPE, check=False, shell=True) + if result.returncode == 0: + try: + return int(result.stdout.decode("utf-8").strip()) + except ValueError: + self.log.error("Converge-to hook failure, expected int, " + "got: %s", result.stdout) + return None + + self.log.debug("Failing to run converge-to hook") + return None + + def cmd_try_release(self, group_id, data): + """ + Call hook that releases the resource + """ + cmd = self._cmd(group_id, "cmd_try_release") + result = subprocess.run(cmd, check=False, **self.subproces_kwargs(data)) + return not result.returncode + + def cmd_is_finished(self, group_id, data): + """ + Call hook that releases the resource + """ + result = subprocess.run( + self._cmd(group_id, "cmd_check_finished"), + check=False, **self.subproces_kwargs(data)) + return not result.returncode + + def cmd_take(self, group_id, data): + """ + Initialize the agent + """ + return not subprocess.run( + self._cmd(group_id, "cmd_prepare"), check=True, + **self.subproces_kwargs(data), + ) + + def cmd_terminate(self, group_id, data): + """ + Prepare the agent for removal. + """ + return not subprocess.run( + self._cmd(group_id, "cmd_terminate"), + check=False, + **self.subproces_kwargs(data), + ) + + def subproces_kwargs(self, data): + """ + generate "generic" subprocess.Popen kwargs + """ + return { + "env": { + "AGENT_SPAWNER_RESOURCE_DATA": str(data), + }, + "shell": True, + } diff --git a/resalloc_agent_spawner/worker.py b/resalloc_agent_spawner/worker.py new file mode 100644 index 0000000..ad93d1d --- /dev/null +++ b/resalloc_agent_spawner/worker.py @@ -0,0 +1,62 @@ +""" +Handle certain tasks by a background daemon process. +""" + +from copr_common.background_worker import BackgroundWorker + +from resalloc_agent_spawner.helpers import ( + get_config, + CmdCallerMixin, + tid2rk, +) + +class AgentHandler(BackgroundWorker, CmdCallerMixin): + """ Start daemon process per given task from AgentWorkerManager """ + + def __init__(self): + super().__init__() + self.opts = get_config() + + @classmethod + def adjust_arg_parser(cls, parser): + parser.add_argument( + "--ticket-id", + type=int, + required=True, + help="ticket ID to handle", + ) + + def handle_ticket(self, ticket_id): + """ + Import a single task + """ + redis_key = tid2rk(ticket_id) + + # We know there's self._redis initialized by parent class so we don't + # create yet another connection. + redis_dict = self._redis.hgetall(redis_key) + + if redis_dict["state"] == "PREPARING": + self._redis.hset(redis_key, "state", "WORKING") + self.cmd_take(redis_dict["group_id"], redis_dict["data"]) + return + + if redis_dict["state"] == "TERMINATING": + self.cmd_terminate(redis_dict["group_id"], redis_dict["data"]) + self._redis.hset(redis_key, "state", "ENDED") + + def handle_task(self): + try: + self.handle_ticket(self.args.ticket_id) + finally: + self.redis_set_worker_flag("status", "done") + + +def main(): + """ realloc-agent-worker entrypoint """ + worker = AgentHandler() + worker.process() + + +if __name__ == "__main__": + main() diff --git a/rpm/Makefile b/rpm/Makefile index 21788b1..0d1804b 100644 --- a/rpm/Makefile +++ b/rpm/Makefile @@ -57,4 +57,4 @@ rpm: srpm clean: - rm -rf *.src.rpm *.tar.gz $(project)-* + rm -rf *.src.rpm *.tar.gz $(project)-*/ diff --git a/rpm/resalloc-agent-spawner.service b/rpm/resalloc-agent-spawner.service new file mode 100644 index 0000000..b4bb2c6 --- /dev/null +++ b/rpm/resalloc-agent-spawner.service @@ -0,0 +1,15 @@ +[Unit] +Description=Start self-stending agent-like resources using Resalloc +After=syslog.target network.target auditd.service + +[Service] +Type=simple +User=resalloc +Group=resalloc +ExecStart=/usr/bin/resalloc-agent-spawner +# we don't want to kill background action processors (daemoncontext) +KillMode=process +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/rpm/resalloc.spec.tpl b/rpm/resalloc.spec.tpl index 8a29e93..ecf7254 100644 --- a/rpm/resalloc.spec.tpl +++ b/rpm/resalloc.spec.tpl @@ -2,9 +2,20 @@ %global sysuser resalloc %global sysgroup %sysuser -%global _logdir %_var/log/%{name}server %global _homedir %_sharedstatedir/%{name}server +%global agent_user resalloc-agent-spawner +%global agent_group %agent_user + +%global create_user_group() \ +getent group "%1" >/dev/null || groupadd -r "%1" \ +getent passwd "%1" >/dev/null || \\\ +useradd -r -g "%2" -G "%2" -s "%3" \\\ + -c "%1 service user" "%1" \\\ + -d "%4" + +%global _logdir %_var/log/%{name}server + %global sum Resource allocator for expensive resources %global desc \ The resalloc project aims to help with taking care of dynamically \ @@ -73,6 +84,7 @@ Requires: %default_python-%srcname = %version-%release Source0: https://github.com/praiskup/%name/releases/download/v%version/%name-@TARBALL_VERSION@.tar.gz Source1: resalloc.service +Source5: resalloc-agent-spawner.service Source2: logrotate Source3: merge-hook-logs Source4: cron.hourly @@ -126,6 +138,24 @@ The %name-webui package provides the resalloc webui, it shows page with information about resalloc resources. %endif +%if %{with python3} +%package agent-spawner +Summary: %sum - daemon starting agent-like resources + +Requires(pre): /usr/sbin/useradd +Requires: python3-copr-common + +%description agent-spawner +%desc + +Agent Spawner maintains sets resources (agents) of certain kind and in certain +number, according to given configuration. Typical Resalloc resource is +completely dummy, fully controlled from the outside. With agent-like resources +this is different — such resources are self-standing, they take care of +themselves, perhaps interacting/competing with each other. The only thing that +agent-spawner needs to do is to control the ideal number of them. +%endif + %if %{with python3} %package -n python3-%srcname Summary: %sum - Python 3 client library @@ -168,6 +198,9 @@ restorecon -R %_var/www/cgi-%{name} || : %prep %autosetup -p1 -n %name-@TARBALL_VERSION@ +%if %{without python3} +rm -rf resalloc_agent_spawner +%endif %build @@ -195,6 +228,7 @@ install -p -m 755 %{name}webui/cgi-resalloc %buildroot%_var/www/cgi-%{name} mkdir -p %buildroot%_unitdir mkdir -p %buildroot%_logdir install -p -m 644 %SOURCE1 %buildroot%_unitdir +install -p -m 644 %SOURCE5 %buildroot%_unitdir install -d -m 700 %buildroot%_homedir install -d -m 700 %buildroot%_sysconfdir/logrotate.d install -p -m 644 %SOURCE2 %buildroot%_sysconfdir/logrotate.d/resalloc-server @@ -204,6 +238,10 @@ install -p -m 755 %SOURCE3 %buildroot/%_libexecdir/%name-merge-hook-logs install -d %buildroot%_sysconfdir/cron.hourly install -p -m 755 %SOURCE4 %buildroot%_sysconfdir/cron.hourly/resalloc +%if %{without python3} +rm %buildroot/%_bindir/%name-agent-* +%endif + %if %{with check} %check @@ -220,13 +258,11 @@ ln -s "%{default_sitelib}/%{name}server" %buildroot%_homedir/project %pre server -user=%sysuser -group=%sysgroup -getent group "$user" >/dev/null || groupadd -r "$group" -getent passwd "$user" >/dev/null || \ -useradd -r -g "$group" -G "$group" -s /bin/bash \ - -c "resalloc server's user" "$user" \ - -d "%_homedir" +%create_user_group %sysuser %sysgroup /bin/bash %_homedir + + +%pre agent-spawner +%create_user_group %agent_user %agent_group /bin/false / %post server @@ -282,6 +318,12 @@ useradd -r -g "$group" -G "$group" -s /bin/bash \ %config %attr(0755, root, root) %{_sysconfdir}/cron.hourly/resalloc %if %{with python3} +%files agent-spawner +%_bindir/resalloc-agent* +%{default_sitelib}/%{name}_agent_spawner +%_unitdir/resalloc-agent-spawner.service +%_sysconfdir/resalloc-agent-spawner + %files webui %doc %doc_files %license COPYING diff --git a/setup.py b/setup.py index f45fdbd..3a36e22 100644 --- a/setup.py +++ b/setup.py @@ -48,12 +48,19 @@ def get_requirements(): packages=find_packages(exclude=('tests',)), data_files=[ ('/etc/resallocserver', ['config/pools.yaml', 'config/server.yaml']), + ('/etc/resalloc-agent-spawner', ['config/agent-spawner/config.yaml']), ], package_data={ 'resallocserver': ['alembic.ini'], }, scripts=['bin/resalloc', 'bin/resalloc-server', 'bin/resalloc-maint', 'bin/resalloc-check-vm-ip'], + entry_points={ + 'console_scripts': [ + 'resalloc-agent-spawner = resalloc_agent_spawner.dispatcher:main', + 'resalloc-agent-worker = resalloc_agent_spawner.worker:main', + ], + }, install_requires=get_requirements(), cmdclass={ 'build_manpages': build_manpages,