diff --git a/.github/workflows/python-diff-lint.yml b/.github/workflows/python-diff-lint.yml index 9ee7013..c89e00a 100644 --- a/.github/workflows/python-diff-lint.yml +++ b/.github/workflows/python-diff-lint.yml @@ -24,6 +24,7 @@ jobs: uses: fedora-copr/vcs-diff-lint-action@v1 with: linter_tags: pylint + install_rpm_packages: python3-copr-common - name: Upload artifact with detected defects in SARIF format uses: actions/upload-artifact@v3 diff --git a/config/agent-spawner/config.yaml b/config/agent-spawner/config.yaml new file mode 100644 index 0000000..139b183 --- /dev/null +++ b/config/agent-spawner/config.yaml @@ -0,0 +1,56 @@ +# Configuration for resalloc-agent-spawner.service. YAML format. +# Specify groups of agents within the `agent_groups` section that agent spawner +# should take care of. + +#agent_groups: +# workers: +# # These commands are executed in the background async as they may take +# # quite some time to process. If `cmd_prepare` fails (exit non-zero), the +# # agent immediately marked for removal. `cmd_terminate` exit status is +# # just ignored (we need to remove the agent no matter what). +# +# # Prepare the agent. Variable $RESALLOC_RESOURCE_DATA (base64 encoded) +# # is provided in the script environment. Other variables like +# # RESOURCE_NAME, RESOURCE_POOL_ID, etc. are provided as well. +# cmd_prepare: /bin/true +# # Prepare the agent for termination. Upon finishing this command, the +# # resalloc resource ticket is closed and the resource deallocated. +# cmd_terminate: echo noop +# +# # The following commands are executed synchronously by the agent spawner +# # daemon (polling). Please keep them super fast to avoid overall system # +# halt! +# +# # The `cmd_converge_to` needs to print integer number (the currently ideal +# # number of agents to converge to) onto stdout. +# cmd_converge_to: echo 1 +# +# # Agents may decide to stop themselves. This hook is used to detect +# # such a case -> if exit status 0 is returned, agent is going to be +# # terminated (cmd_terminate is called against it). +# cmd_check_finished: /bin/false +# +# # Some agents might be expected to run long-term (or indefinitely). This +# # hook helps us to politely ask the agent whether it is OK to terminate. +# # Returning exit status 1 means the agent can not be terminated. +# # Returning 0 means that the agent was prepared for termination, and +# # this has to be removed now. This is useful for gently downsizing +# # the agent count while converging to `cmd_converge_to`. +# cmd_try_release: /bin/false +# +# # List of resalloc tags to use while requesting tickets +# tags: +# - kobo_worker + +# Note that we use the 'request_survives_server_restart' resalloc client option, +# so the resalloc server must be running to avoid the overall system hang! +#resalloc_server: "http://localhost:49100" + +# Where to log events. +#logfile: /tmp/agent-spawner.log + +# How to connect to redis-db. By default connects to 127.0.0.1:6379. +#redis_db: null +#redis_host: null +#redis_port": null +#redis_password": null diff --git a/resalloc_agent_spawner/__init__.py b/resalloc_agent_spawner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resalloc_agent_spawner/dispatcher.py b/resalloc_agent_spawner/dispatcher.py new file mode 100644 index 0000000..529032e --- /dev/null +++ b/resalloc_agent_spawner/dispatcher.py @@ -0,0 +1,282 @@ +""" +Agent spawner daemon. Part of the resalloc project. +""" + +import sys +import logging +from copr_common.dispatcher import Dispatcher +from copr_common.worker_manager import WorkerManager, QueueTask +from copr_common.log import setup_script_logger +from copr_common.redis_helpers import get_redis_connection + +from resalloc.client import ( + Connection as ResallocConnection, +) + +from resalloc_agent_spawner.helpers import ( + get_config, + CmdCallerMixin, + rk_to_tid, + tid_to_rk, +) + +class Task(QueueTask): + """ priority queue task handed ower to AgentWorkerManager """ + def __init__(self, task_id): + self.task_id = task_id + @property + def id(self): + return "agent_ticket_" + str(self.task_id) + +class AgentWorkerManager(WorkerManager): + """ + Start async worker in the background + """ + + worker_prefix = 'agent_backround_handler' + + def start_task(self, worker_id, task): + self.start_daemon_on_background([ + "resalloc-agent-worker", + '--daemon', + '--ticket-id', str(task.task_id), + '--worker-id', worker_id, + ]) + + def finish_task(self, worker_id, task_info): + return True + + +class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin): + """ + Track list of opened tickets from Resalloc (representing Agent-like + resources). Each Agent is in the following states: + + 1. NEW -> ticket is taken, it is being allocated in the background + 2. PREPARING -> script is being run in the background to prepare the agent. + 3. WORKING -> up and running agent + 4. TERMINATING -> script is being run in the background to cleanup after the + agent. + 5. ENDED -> ready for ticket close + + PREPARING and TERMINATING states are never changed by Dispatcher, but by + BackgroundWorker. If one background worker fails to switch the state, new + one is started instead of it. + """ + + task_type = 'agent-manager' + worker_manager_class = AgentWorkerManager + + def __init__(self, opts): + super().__init__(opts) + self.sleeptime = 10 + self.log = logging.getLogger() + setup_script_logger(self.log, self.opts["logfile"]) + self.redis = get_redis_connection(opts) + self.resalloc = ResallocConnection( + opts["resalloc_server"], + request_survives_server_restart=True, + ) + + def get_ticket_data(self, ticket_id): + """ load resalloc ticket ID data """ + ticket = self.resalloc.getTicket(ticket_id) + if not ticket.collect(): + # not yet resolved or failed + return None + return ticket.output + + def try_to_stop(self, group_id, to_stop): + """ + Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all + the resources may be closed at this time. + """ + stopped = 0 + for ticket_id in self.get_tickets("WORKING"): + if stopped >= to_stop: + break + + data = self.get_ticket_data(ticket_id) + if not data: + continue # failed resource, recovers in detect_failed_tickets() + if not self.cmd_try_release(group_id, data): + self.log.debug("Can't release %s", ticket_id) + continue + self.log.info("agent %s switches to TERMINATING (early)", ticket_id) + self.set_ticket_attribute(ticket_id, "state", "TERMINATING") + stopped += 1 + + def get_tickets(self, states=None): + """ + Get the list of ticket IDs currently stored in redis, + optionally filtered by list of states. + """ + keys = self.redis.keys(tid_to_rk("*")) + output = [] + for key in keys: + if states and self.redis.hget(key, 'state') not in states: + continue + output += [rk_to_tid(key)] + return output + + def set_ticket_attribute(self, ticket_id, key, value): + """ + For the ticket_id set redis hash key to value. + """ + hash_id = tid_to_rk(ticket_id) + self.redis.hset(hash_id, str(key), str(value)) + + def get_ticket_attributes(self, ticket_id, keys): + """ + Return list of redis hash values for the given ticket_id and + list of keys. + """ + hash_id = tid_to_rk(ticket_id) + keys = [str(s) for s in keys] + return self.redis.hmget(hash_id, keys) + + def agent_counts(self): + """ + Get 'group_id → count' mapping + """ + keys = self.redis.keys(tid_to_rk("*")) + output = {} + for key in keys: + group_id = self.redis.hget(key, "group_id") + self.log.debug("found worker for %s group id", group_id) + output.setdefault(group_id, 0) + output[group_id] += 1 + return output + + def agent_drop(self, ticket_id): + """ drop agent per ticket id, cleanup everything """ + ticket = self.resalloc.getTicket(ticket_id) + ticket.close() + self.redis.delete(tid_to_rk(ticket_id)) + + def clean_finished_workers(self): + """ + WORKING — TERMINATING; Check for finalized agents, plan their cleanup + """ + for ticket_id in self.get_tickets("WORKING"): + (group_id,) = self.get_ticket_attributes(ticket_id, ["group_id"]) + data = self.get_ticket_data(ticket_id) + if data is None: + continue # failed resource, recovers in detect_failed_tickets() + if not self.cmd_is_finished(group_id, data): + continue + self.log.info("Agent %s finished task, normal TERMINATING", ticket_id) + self.set_ticket_attribute(ticket_id, "state", "TERMINATING") + + def start_preparing(self): + """ + When the ticket is resolved in Resalloc, we have a working + resource that we can prepare (in the background). Switch the state. + """ + for ticket_id in self.get_tickets("NEW"): + ticket = self.resalloc.getTicket(ticket_id) + if not ticket.collect(): + continue + self.set_ticket_attribute(ticket_id, "data", ticket.output) + self.set_ticket_attribute(ticket_id, "state", "PREPARING") + + def detect_failed_tickets(self): + """ + Check for failed tickets, close them. This rarely happens if everything + is working smoothly. + """ + + for ticket_id in self.get_tickets(): + ticket = self.resalloc.getTicket(ticket_id) + ticket.collect() + if ticket.ready is None: + # Non-existing ticket. This really seems like the ticket + # comes from some testing. + key = tid_to_rk(ticket_id) + self.log.fatal("Can't handle redis key %s, remove manually", key) + sys.exit(1) + + if not ticket.failed: + continue + + state, = self.get_ticket_attributes(ticket_id, ["state"]) + if state in ["PREPARING", "TERMINATING"]: + # There's a background worker handling these agents. We + # need to let them finish. + continue + + if state == "WORKING": + # we still want to run the cmd_terminate hook + self.set_ticket_attribute(ticket_id, "state", "TERMINATING") + continue + + if state == "NEW": + # this has never been prepared, simply close the ticket + self.set_ticket_attribute(ticket_id, "state", "ENDED") + + + def converge(self): + """ + Go through all agent groups; check the ideal number of memgers and start + new or try to terminate. + """ + current_agents = self.agent_counts() + for group_id, group in self.opts["agent_groups"].items(): + current = current_agents.get(group_id, 0) + ideal = self.cmd_converge_to(group_id) + if ideal is None: + self.log.error("can't call converge to for group %s", group_id) + continue + + if current < ideal: + todo = ideal - current + self.log.info("We want %s but have %s agents, starting %s new", + ideal, current, todo) + for _ in range(todo): + # spawn as "NEW" + tags = group["tags"] + ticket = self.resalloc.newTicket(tags) + self.log.debug("Requesting new agent via ticket %s", + ticket.id) + self.set_ticket_attribute(ticket.id, "group_id", group_id) + self.set_ticket_attribute(ticket.id, "state", "NEW") + + elif current > ideal: + # WORKING — TERMINATING (even agents that did not start any task) + self.try_to_stop(group_id, current - ideal) + + def get_frontend_tasks(self): + """ + NEW → PREPARING → WORKING — TERMINATING — ENDED + """ + + # Cleanup after failed tickets + self.detect_failed_tickets() + + # Drop successfully terminated agents + for ticket_id in self.get_tickets("ENDED"): + self.agent_drop(ticket_id) + + # WORKING — TERMINATING (normal) + self.clean_finished_workers() + + # NEW → PREPARING + self.start_preparing() + + # spawn NEW or try to switch WORKING → TERMINATING + self.converge() + + # Return the priority queue to process by background workers. + background_tasks = [] + for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]): + background_tasks += [Task(ticket_id)] + return background_tasks + + +def main(): + """ realloc-agent-spawner entrypoint """ + AgentSpawnerDispatcher(get_config()).run() + + +if __name__ == "__main__": + main() diff --git a/resalloc_agent_spawner/helpers.py b/resalloc_agent_spawner/helpers.py new file mode 100644 index 0000000..1e25cae --- /dev/null +++ b/resalloc_agent_spawner/helpers.py @@ -0,0 +1,114 @@ +""" +resalloc agent spawner helpers +""" + +import os +import subprocess + +from resalloc.helpers import load_config_file + + +def rk_to_tid(key): + """ Redis agent key to Resalloc Ticket ID """ + return int(key.split(":")[1]) + + +def tid_to_rk(ticket): + """ Resalloc Ticket ID to Redis Agent key """ + return f"agent:{ticket}" + + +def get_config(): + """ + Load the agent-spawner YAML configuration + """ + conf_dir = os.environ.get("CONFIG_DIR", "/etc/resalloc-agent-spawner") + config_file = os.path.join(conf_dir, "config.yaml") + config = load_config_file(config_file) + config.setdefault("agent_groups", {}) + config.setdefault("resalloc_server", "http://localhost:49100") + config.setdefault("logfile", "/tmp/agent-spawner.log") + groups = config["agent_groups"] + for group_id in groups.keys(): + group = groups[group_id] + group.setdefault("cmd_converge_to", "/usr/bin/echo 1") + group.setdefault("cmd_check_finished", "/bin/false") + group.setdefault("cmd_prepare", "/bin/true") + group.setdefault("cmd_terminate", "/bin/true") + group.setdefault("cmd_try_release", "/bin/false") + group.setdefault("tags", ["please-specify-some-tags"]) + return config + + +class CmdCallerMixin: + """ + Wrapper around calling command hooks. + """ + + def _cmd(self, group_id, cmd_id): + return self.opts["agent_groups"][group_id][cmd_id] + + def cmd_converge_to(self, group_id): + """ + Query the outside world for the ideal number of agents in given group. + """ + result = subprocess.run( + self._cmd(group_id, "cmd_converge_to"), + stdout=subprocess.PIPE, check=False, shell=True) + if result.returncode == 0: + try: + return int(result.stdout.decode("utf-8").strip()) + except ValueError: + self.log.error("Converge-to hook failure, expected int, " + "got: %s", result.stdout) + return None + + self.log.debug("Failing to run converge-to hook") + return None + + def cmd_try_release(self, group_id, data): + """ + Call hook that releases the resource + """ + cmd = self._cmd(group_id, "cmd_try_release") + result = subprocess.run(cmd, check=False, **self.subproces_kwargs(data)) + return not result.returncode + + def cmd_is_finished(self, group_id, data): + """ + Call hook that releases the resource + """ + result = subprocess.run( + self._cmd(group_id, "cmd_check_finished"), + check=False, **self.subproces_kwargs(data)) + return not result.returncode + + def cmd_take(self, group_id, data): + """ + Initialize the agent + """ + return not subprocess.run( + self._cmd(group_id, "cmd_prepare"), check=False, + **self.subproces_kwargs(data), + ).returncode + + def cmd_terminate(self, group_id, data): + """ + Prepare the agent for removal. + """ + subprocess.run( + self._cmd(group_id, "cmd_terminate"), + check=False, + **self.subproces_kwargs(data), + ) + + def subproces_kwargs(self, data): + """ + generate "generic" subprocess.Popen kwargs + """ + return { + "env": { + "AGENT_SPAWNER_RESOURCE_DATA": str(data), + }, + "shell": True, + } diff --git a/resalloc_agent_spawner/worker.py b/resalloc_agent_spawner/worker.py new file mode 100644 index 0000000..7909a21 --- /dev/null +++ b/resalloc_agent_spawner/worker.py @@ -0,0 +1,64 @@ +""" +Handle certain tasks by a background daemon process. +""" + +from copr_common.background_worker import BackgroundWorker + +from resalloc_agent_spawner.helpers import ( + get_config, + CmdCallerMixin, + tid_to_rk, +) + +class AgentHandler(BackgroundWorker, CmdCallerMixin): + """ Start daemon process per given task from AgentWorkerManager """ + + def __init__(self): + super().__init__() + self.opts = get_config() + + @classmethod + def adjust_arg_parser(cls, parser): + parser.add_argument( + "--ticket-id", + type=int, + required=True, + help="ticket ID to handle", + ) + + def handle_ticket(self, ticket_id): + """ + Import a single task + """ + redis_key = tid_to_rk(ticket_id) + + # We know there's self._redis initialized by parent class so we don't + # create yet another connection. + redis_dict = self._redis.hgetall(redis_key) + if redis_dict["state"] == "PREPARING": + if self.cmd_take(redis_dict["group_id"], redis_dict["data"]): + self._redis.hset(redis_key, "state", "WORKING") + else: + # failed preparation -> prepare removal + self._redis.hset(redis_key, "state", "ENDED") + return + + if redis_dict["state"] == "TERMINATING": + self.cmd_terminate(redis_dict["group_id"], redis_dict["data"]) + self._redis.hset(redis_key, "state", "ENDED") + + def handle_task(self): + try: + self.handle_ticket(self.args.ticket_id) + finally: + self.redis_set_worker_flag("status", "done") + + +def main(): + """ realloc-agent-worker entrypoint """ + worker = AgentHandler() + worker.process() + + +if __name__ == "__main__": + main() diff --git a/rpm/Makefile b/rpm/Makefile index 21788b1..0d1804b 100644 --- a/rpm/Makefile +++ b/rpm/Makefile @@ -57,4 +57,4 @@ rpm: srpm clean: - rm -rf *.src.rpm *.tar.gz $(project)-* + rm -rf *.src.rpm *.tar.gz $(project)-*/ diff --git a/rpm/resalloc-agent-spawner.service b/rpm/resalloc-agent-spawner.service new file mode 100644 index 0000000..b4bb2c6 --- /dev/null +++ b/rpm/resalloc-agent-spawner.service @@ -0,0 +1,15 @@ +[Unit] +Description=Start self-stending agent-like resources using Resalloc +After=syslog.target network.target auditd.service + +[Service] +Type=simple +User=resalloc +Group=resalloc +ExecStart=/usr/bin/resalloc-agent-spawner +# we don't want to kill background action processors (daemoncontext) +KillMode=process +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/rpm/resalloc.spec.tpl b/rpm/resalloc.spec.tpl index c7cd7df..ab8efb8 100644 --- a/rpm/resalloc.spec.tpl +++ b/rpm/resalloc.spec.tpl @@ -2,9 +2,20 @@ %global sysuser resalloc %global sysgroup %sysuser -%global _logdir %_var/log/%{name}server %global _homedir %_sharedstatedir/%{name}server +%global agent_user resalloc-agent-spawner +%global agent_group %agent_user + +%global create_user_group() \ +getent group "%1" >/dev/null || groupadd -r "%1" \ +getent passwd "%1" >/dev/null || \\\ +useradd -r -g "%2" -G "%2" -s "%3" \\\ + -c "%1 service user" "%1" \\\ + -d "%4" + +%global _logdir %_var/log/%{name}server + %global sum Resource allocator for expensive resources %global desc \ The resalloc project aims to help with taking care of dynamically \ @@ -73,6 +84,7 @@ Requires: %default_python-%srcname = %version-%release Source0: https://github.com/praiskup/%name/releases/download/v%version/%name-@TARBALL_VERSION@.tar.gz Source1: resalloc.service +Source5: resalloc-agent-spawner.service Source2: logrotate Source3: merge-hook-logs Source4: cron.hourly @@ -126,6 +138,24 @@ The %name-webui package provides the resalloc webui, it shows page with information about resalloc resources. %endif +%if %{with python3} +%package agent-spawner +Summary: %sum - daemon starting agent-like resources + +Requires(pre): /usr/sbin/useradd +Requires: python3-copr-common + +%description agent-spawner +%desc + +Agent Spawner maintains sets resources (agents) of certain kind and in certain +number, according to given configuration. Typical Resalloc resource is +completely dummy, fully controlled from the outside. With agent-like resources +this is different — such resources are self-standing, they take care of +themselves, perhaps interacting/competing with each other. The only thing that +agent-spawner needs to do is to control the ideal number of them. +%endif + %if %{with python3} %package -n python3-%srcname Summary: %sum - Python 3 client library @@ -167,6 +197,9 @@ restorecon -R %_var/www/cgi-%{name} || : %prep %autosetup -p1 -n %name-@TARBALL_VERSION@ +%if %{without python3} +rm -r resalloc_agent_spawner +%endif %build @@ -194,6 +227,9 @@ install -p -m 755 %{name}webui/cgi-resalloc %buildroot%_var/www/cgi-%{name} mkdir -p %buildroot%_unitdir mkdir -p %buildroot%_logdir install -p -m 644 %SOURCE1 %buildroot%_unitdir +%if %{with python3} +install -p -m 644 %SOURCE5 %buildroot%_unitdir +%endif install -d -m 700 %buildroot%_homedir install -d -m 700 %buildroot%_sysconfdir/logrotate.d install -p -m 644 %SOURCE2 %buildroot%_sysconfdir/logrotate.d/resalloc-server @@ -203,6 +239,11 @@ install -p -m 755 %SOURCE3 %buildroot/%_libexecdir/%name-merge-hook-logs install -d %buildroot%_sysconfdir/cron.hourly install -p -m 755 %SOURCE4 %buildroot%_sysconfdir/cron.hourly/resalloc +%if %{without python3} +rm %buildroot%_bindir/%name-agent-* +rm %buildroot%_sysconfdir/resalloc-agent-spawner/config.yaml +%endif + %if %{with check} %check @@ -219,14 +260,7 @@ ln -s "%{default_sitelib}/%{name}server" %buildroot%_homedir/project %pre server -user=%sysuser -group=%sysgroup -getent group "$user" >/dev/null || groupadd -r "$group" -getent passwd "$user" >/dev/null || \ -useradd -r -g "$group" -G "$group" -s /bin/bash \ - -c "resalloc server's user" "$user" \ - -d "%_homedir" - +%create_user_group %sysuser %sysgroup /bin/bash %_homedir %post server %systemd_post resalloc.service @@ -235,6 +269,18 @@ useradd -r -g "$group" -G "$group" -s /bin/bash \ %systemd_postun_with_restart resalloc.service +%if %{with python3} +%pre agent-spawner +%create_user_group %agent_user %agent_group /bin/false / + +%post agent-spawner +%systemd_post resalloc-agent-spawner.service + +%postun agent-spawner +%systemd_postun_with_restart resalloc-agent-spawner.service +%endif + + %global doc_files NEWS README.md %files @@ -281,6 +327,12 @@ useradd -r -g "$group" -G "$group" -s /bin/bash \ %config %attr(0755, root, root) %{_sysconfdir}/cron.hourly/resalloc %if %{with python3} +%files agent-spawner +%_bindir/resalloc-agent* +%{default_sitelib}/%{name}_agent_spawner +%_unitdir/resalloc-agent-spawner.service +%config(noreplace) %_sysconfdir/resalloc-agent-spawner + %files webui %doc %doc_files %license COPYING diff --git a/setup.py b/setup.py index f45fdbd..3a36e22 100644 --- a/setup.py +++ b/setup.py @@ -48,12 +48,19 @@ def get_requirements(): packages=find_packages(exclude=('tests',)), data_files=[ ('/etc/resallocserver', ['config/pools.yaml', 'config/server.yaml']), + ('/etc/resalloc-agent-spawner', ['config/agent-spawner/config.yaml']), ], package_data={ 'resallocserver': ['alembic.ini'], }, scripts=['bin/resalloc', 'bin/resalloc-server', 'bin/resalloc-maint', 'bin/resalloc-check-vm-ip'], + entry_points={ + 'console_scripts': [ + 'resalloc-agent-spawner = resalloc_agent_spawner.dispatcher:main', + 'resalloc-agent-worker = resalloc_agent_spawner.worker:main', + ], + }, install_requires=get_requirements(), cmdclass={ 'build_manpages': build_manpages,