From 167d9071a0f8b0925e42b859d3e668e78011dfdc Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Thu, 12 Oct 2023 01:39:28 +0200 Subject: [PATCH] WIP --- agentspawner/__init__.py | 0 agentspawner/config.yaml | 5 ++ agentspawner/dispatcher.py | 88 +++++++++++---------- agentspawner/handle-ticket-on-background.py | 29 +++++-- agentspawner/helpers.py | 32 ++++++++ agentspawner/hook-probability | 13 +++ agentspawner/hook-release | 9 ++- agentspawner/{daemon.py => old_daemon.py} | 0 agentspawner/resalloc-testing-server | 7 +- 9 files changed, 130 insertions(+), 53 deletions(-) create mode 100644 agentspawner/__init__.py create mode 100644 agentspawner/config.yaml mode change 100644 => 100755 agentspawner/handle-ticket-on-background.py create mode 100644 agentspawner/helpers.py create mode 100755 agentspawner/hook-probability rename agentspawner/{daemon.py => old_daemon.py} (100%) diff --git a/agentspawner/__init__.py b/agentspawner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agentspawner/config.yaml b/agentspawner/config.yaml new file mode 100644 index 0000000..e7aff5c --- /dev/null +++ b/agentspawner/config.yaml @@ -0,0 +1,5 @@ +--- +cmd_converge_to: hook-converge-to +cmd_check_finished: hook-probability "agent already finished the task" 5 +cmd_prepare: hook-take +cmd_try_release: hook-probability "agent can be removed early" 25 diff --git a/agentspawner/dispatcher.py b/agentspawner/dispatcher.py index 0e670e6..1e2f278 100644 --- a/agentspawner/dispatcher.py +++ b/agentspawner/dispatcher.py @@ -5,7 +5,7 @@ import logging import subprocess from copr_common.dispatcher import Dispatcher -from copr_common.worker_manager import WorkerManager +from copr_common.worker_manager import WorkerManager, QueueTask from copr_common.log import setup_script_logger from copr_common.redis_helpers import get_redis_connection @@ -14,24 +14,32 @@ Ticket, ) -RESALLOC_SERVER = "http://localhost:49100" - -def _redis_key_to_ticket_id(key): - return int(key.split(":")[1]) +from agentspawner.helpers import ( + _redis_key_to_ticket_id, + _redis_key_prefixed, + _rel_path, + get_config, + CmdCallerMixin, +) -def _ticket_id_to_redis_key(ticket): - return "agent-spawner:{ticket}" +RESALLOC_SERVER = "http://localhost:49100" +class Task(QueueTask): + def __init__(self, task_id): + self.task_id = task_id + @property + def id(self): + return "agent_ticket_" + str(self.task_id) class AgentWorkerManager(WorkerManager): - worker_prefix = 'agent' + worker_prefix = 'agent_backround_handler' def start_task(self, worker_id, task): self.start_daemon_on_background([ - './handle-ticket-on-background.py', + _rel_path('./handle-ticket-on-background.py'), '--daemon', - '--ticket-id', repr(task), + '--ticket-id', str(task.task_id), '--worker-id', worker_id, ]) @@ -39,7 +47,7 @@ def finish_task(self, worker_id, task_info): return True -class AgentSpawnerDispatcher(Dispatcher): +class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin): """ Track list of opened tickets from Resalloc (representing Agent-like resources). Each Agent is in the following states: @@ -61,7 +69,6 @@ class AgentSpawnerDispatcher(Dispatcher): tags = ["A"] - def __init__(self, opts): super().__init__(opts) self.sleeptime = 10 @@ -74,8 +81,9 @@ def __init__(self, opts): def call_converge_to(self): """ Execute the configured hook script """ while True: - result = subprocess.run(["./hook-converge-to"], capture_output=True, - check=False) + result = subprocess.run(self.command("cmd_converge_to"), + capture_output=True, + check=False, shell=True) if result.returncode == 0: try: return int(result.stdout.decode("utf-8").strip()) @@ -88,30 +96,29 @@ def call_try_release(self, data): """ Call hook that releases the resource """ - result = subprocess.run(["./hook-release", f"{data}"], check=False) + cmd = self.command("cmd_try_release") + result = subprocess.run(cmd, check=False, **self.subproces_kwargs(data)) return not result.returncode def call_is_finished(self, data): """ Call hook that releases the resource """ - result = subprocess.run(["./hook-is-finished"], capture_output=True, - check=False) + result = subprocess.run(self.command("cmd_check_finished"), check=False, + **self.subproces_kwargs(data)) return not result.returncode def get_ticket_data(self, ticket_id): ticket = self.resalloc.getTicket(ticket_id) - data = ticket.collect() - if not data["ready"]: + if not ticket.collect(): return None - return not data["output"] + return ticket.output def try_to_stop(self, to_stop): """ Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all the resources may be closed at this time. """ - self.log.info("Trying to stop %s resources", to_stop) stopped = 0 for ticket_id in self.get_tickets("WORKING"): if stopped >= to_stop: @@ -121,12 +128,12 @@ def try_to_stop(self, to_stop): if not self.call_try_release(data): self.log.debug("Can't release %s", ticket_id) continue - + self.log.info("agent %s switches to TERMINATING (early)", ticket_id) self.set_ticket_attribute(ticket_id, "state", "TERMINATING") stopped += 1 def get_tickets(self, states=None): - keys = self.redis.keys('agent-spawner:*') + keys = self.redis.keys(_redis_key_prefixed("*")) output = [] for key in keys: if states and self.redis.hget(key, 'state') not in states: @@ -135,8 +142,8 @@ def get_tickets(self, states=None): return output def set_ticket_attribute(self, ticket_id, key, value): - hash_id = _ticket_id_to_redis_key(ticket_id) - self.redis.hset(hash_id, key, value) + hash_id = _redis_key_prefixed(ticket_id) + self.redis.hset(hash_id, str(key), str(value)) def agent_count(self): return len(self.get_tickets()) @@ -144,24 +151,22 @@ def agent_count(self): def agent_drop(self, ticket_id): ticket = self.resalloc.getTicket(ticket_id) ticket.close() - self.redis.delete(_ticket_id_to_redis_key(ticket_id)) + self.redis.delete(_redis_key_prefixed(ticket_id)) def clean_finished_workers(self): for ticket_id in self.get_tickets("WORKING"): data = self.get_ticket_data(ticket_id) if not self.call_is_finished(data): # TODO: move async continue + self.log.info("Agent %s finished task, normal TERMINATING", ticket_id) self.set_ticket_attribute(ticket_id, "state", "TERMINATING") def start_preparing(self): for ticket_id in self.get_tickets("NEW"): ticket = self.resalloc.getTicket(ticket_id) - data = ticket.collect() - if not data["ready"]: + if not ticket.collect(): continue - - output = data["output"] - self.set_ticket_attribute(ticket_id, "data", output) + self.set_ticket_attribute(ticket_id, "data", ticket.output) self.set_ticket_attribute(ticket_id, "state", "PREPARING") def get_frontend_tasks(self): @@ -204,11 +209,15 @@ def get_frontend_tasks(self): current = self.agent_count() ideal = self.call_converge_to() if current < ideal: - # spawn as "NEW" - ticket = self.resalloc.newTicket(self.tags) - self.log.debug("Requesting new agent via ticket %s", - ticket.id) - self.set_ticket_attribute(ticket.id, "state", "NEW") + todo = ideal - current + self.log.info("We want %s but have %s agents, starting %s new", + ideal, current, todo) + for _ in range(todo): + # spawn as "NEW" + ticket = self.resalloc.newTicket(self.tags) + self.log.debug("Requesting new agent via ticket %s", + ticket.id) + self.set_ticket_attribute(ticket.id, "state", "NEW") elif current > ideal: # WORKING — TERMINATING (even agents that did not start any task) @@ -217,12 +226,11 @@ def get_frontend_tasks(self): background_tasks = [] for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]): - background_tasks += [ticket_id] - - return [] + background_tasks += [Task(ticket_id)] + return background_tasks def _main(): - AgentSpawnerDispatcher({}).run() + AgentSpawnerDispatcher(get_config()).run() if __name__ == "__main__": diff --git a/agentspawner/handle-ticket-on-background.py b/agentspawner/handle-ticket-on-background.py old mode 100644 new mode 100755 index 7623990..6dc9c84 --- a/agentspawner/handle-ticket-on-background.py +++ b/agentspawner/handle-ticket-on-background.py @@ -1,14 +1,26 @@ +#! /bin/env python3 + +import subprocess + from copr_common.background_worker import BackgroundWorker from copr_common.redis_helpers import get_redis_connection -def _ticket_id_to_redis_key(ticket): - return "agent-spawner:{ticket}" +from agentspawner.helpers import _redis_key_prefixed, _rel_path, get_config, CmdCallerMixin -class AgentHandler(BackgroundWorker): +class AgentHandler(BackgroundWorker, CmdCallerMixin): def __init__(self): super().__init__() - self.opts = {} + self.opts = get_config() + + def call_take(self, data): + """ + Call hook that prepares the resource + """ + return not subprocess.run( + self.command("cmd_prepare"), check=True, shell=True, + **self.subproces_kwargs(data), + ) @classmethod def adjust_arg_parser(cls, parser): @@ -23,13 +35,14 @@ def handle_ticket(self, ticket_id): """ Import a single task """ - redis_key = _ticket_id_to_redis_key(ticket_id) - data = self._redis.hgetall(redis_key) - if data["state"] == "PREPARING": + redis_key = _redis_key_prefixed(ticket_id) + redis_dict = self._redis.hgetall(redis_key) + if redis_dict["state"] == "PREPARING": self._redis.hset(redis_key, "state", "WORKING") + self.call_take(redis_dict["data"]) return - if data["state"] == "TERMINATING": + if redis_dict["state"] == "TERMINATING": self._redis.hset(redis_key, "state", "ENDED") def handle_task(self): diff --git a/agentspawner/helpers.py b/agentspawner/helpers.py new file mode 100644 index 0000000..c355bc7 --- /dev/null +++ b/agentspawner/helpers.py @@ -0,0 +1,32 @@ +import os + +from resalloc.helpers import load_config_file + +def _redis_key_to_ticket_id(key): + return int(key.split(":")[1]) + +def _redis_key_prefixed(ticket): + return f"agent:{ticket}" + +def _rel_path(file): + return os.path.join(os.path.dirname(__file__), file) + +def get_config(): + config_file = _rel_path("config.yaml") + return load_config_file(config_file) + +class CmdCallerMixin: + # def hook_file(self, config_name): + # return _rel_path(self.opts[config_name]) + + def command(self, config_name): + return self.opts[config_name] + + def subproces_kwargs(self, data): + return { + "env": { + "AGENT_SPAWNER_RESOURCE_DATA": str(data), + "PATH": os.environ["PATH"] + ":" + _rel_path("."), + }, + "shell": True, + } diff --git a/agentspawner/hook-probability b/agentspawner/hook-probability new file mode 100755 index 0000000..4f15733 --- /dev/null +++ b/agentspawner/hook-probability @@ -0,0 +1,13 @@ +#! /bin/bash + +id=$(echo "$AGENT_SPAWNER_RESOURCE_DATA" | grep RESALLOC_ID=) +echo -n "Checking if '$1' ($id) (probability of success $2%) => " + +if test 1 -eq $(( RANDOM % 100 < $2 )); then + echo "Success!" + exit 0 +fi +echo No. +exit 1 + + diff --git a/agentspawner/hook-release b/agentspawner/hook-release index 7327aeb..c4a3d22 100755 --- a/agentspawner/hook-release +++ b/agentspawner/hook-release @@ -1,8 +1,13 @@ #! /bin/bash eval 'set -- $1' # strip -echo "Releasing with resalloc ticket data: $1" +echo -n "Checking if agent (resource) $1 can be removed: " # ~33% chance of closing this one -test $(( RANDOM % 3 )) -eq 0 && exit 0 +test $(( RANDOM % 3 )) -eq 0 && { + echo "YES!" + exit 0 +} + +echo "no, not removing." exit 1 diff --git a/agentspawner/daemon.py b/agentspawner/old_daemon.py similarity index 100% rename from agentspawner/daemon.py rename to agentspawner/old_daemon.py diff --git a/agentspawner/resalloc-testing-server b/agentspawner/resalloc-testing-server index 7e04cc8..b933139 100755 --- a/agentspawner/resalloc-testing-server +++ b/agentspawner/resalloc-testing-server @@ -1,6 +1,7 @@ -#! /bin/sh +#! /bin/sh -x -cd .. -rm /tmp/server-sql +cd "$(dirname "$(readlink -f "$0")")/.." || exit 1 +redis-cli flushall || exit 1 +rm -f /tmp/server-sql mkdir -p /tmp/logdir ./test-tooling/resalloc-server