Skip to content

Commit

Permalink
PoC: agent spawner
Browse files Browse the repository at this point in the history
Relates to #123
  • Loading branch information
praiskup committed Oct 21, 2023
1 parent 7af565a commit b51821b
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 9 deletions.
25 changes: 25 additions & 0 deletions config/agent-spawner/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Configuration for resalloc-agent-spawner.service. YAML format.
# Specify groups of aggents within the `agent_groups` section that agent spawner
# should take care of.

#agent_groups:
# workers:
# # These are executed in the background async, may take some time.
# cmd_prepare: /bin/true
# cmd_terminate: /bin/true
#
# # keep these super fast to avoid overall system halt!
# cmd_converge_to: echo 1
# cmd_check_finished: /bin/true
# cmd_try_release: /bin/false
#
# # List of resalloc tags to request in tickets
# tags:
# - kobo_worker

# Note that we use request_survives_server_restart resalloc client option,
# so the resalloc server must be running to avoid system hang!
#resalloc_server: "http://localhost:49100"

# Where to log events.
#logfile: /tmp/agent-spawner.log
Empty file.
247 changes: 247 additions & 0 deletions resalloc_agent_spawner/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""
Agent spawner daemon. Part of the resalloc project.
"""

import logging
from copr_common.dispatcher import Dispatcher
from copr_common.worker_manager import WorkerManager, QueueTask
from copr_common.log import setup_script_logger
from copr_common.redis_helpers import get_redis_connection

from resalloc.client import (
Connection as ResallocConnection,
)

from resalloc_agent_spawner.helpers import (
_redis_key_to_ticket_id,
_redis_key_prefixed,
get_config,
CmdCallerMixin,
)

class Task(QueueTask):
""" priority queue task handed ower to AgentWorkerManager """
def __init__(self, task_id):
self.task_id = task_id
@property
def id(self):

Check warning

Code scanning / vcs-diff-lint

Task.id: Missing function or method docstring Warning

Task.id: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

Task.id: Attribute name "id" doesn't conform to snake_case naming style Warning

Task.id: Attribute name "id" doesn't conform to snake_case naming style
return "agent_ticket_" + str(self.task_id)

class AgentWorkerManager(WorkerManager):
"""
Start async worker in the background
"""

worker_prefix = 'agent_backround_handler'

def start_task(self, worker_id, task):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.start_task: Missing function or method docstring Warning

AgentWorkerManager.start_task: Missing function or method docstring
self.start_daemon_on_background([
"resalloc-agent-worker",
'--daemon',
'--ticket-id', str(task.task_id),
'--worker-id', worker_id,
])

def finish_task(self, worker_id, task_info):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Missing function or method docstring Warning

AgentWorkerManager.finish_task: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'worker_id' Warning

AgentWorkerManager.finish_task: Unused argument 'worker_id'

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'task_info' Warning

AgentWorkerManager.finish_task: Unused argument 'task_info'
return True


class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin):
"""
Track list of opened tickets from Resalloc (representing Agent-like
resources). Each Agent is in the following states:
1. NEW -> ticket is taken, it is being allocated in the background
2. PREPARING -> script is being run in the background to prepare the agent.
3. WORKING -> up and running agent
4. TERMINATING -> script is being run in the background to cleanup after the
agent.
5. ENDED -> ready for ticket close
PREPARING and TERMINATING states are never changed by Dispatcher, but by
BackgroundWorker. If one background worker fails to switch the state, new
one is started instead of it.
"""

task_type = 'agent-manager'
worker_manager_class = AgentWorkerManager

def __init__(self, opts):
super().__init__(opts)
self.sleeptime = 10
self.log = logging.getLogger()
setup_script_logger(self.log, self.opts["logfile"])
self.redis = get_redis_connection(opts)
self.resalloc = ResallocConnection(
opts["resalloc_server"],
request_survives_server_restart=True,
)

def get_ticket_data(self, ticket_id):
""" load resalloc ticket ID data """
ticket = self.resalloc.getTicket(ticket_id)
if not ticket.collect():
return None
return ticket.output

def try_to_stop(self, group_id, to_stop):
"""
Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all
the resources may be closed at this time.
"""
stopped = 0
for ticket_id in self.get_tickets("WORKING"):
if stopped >= to_stop:
break

data = self.get_ticket_data(ticket_id)
if not self.cmd_try_release(group_id, data):
self.log.debug("Can't release %s", ticket_id)
continue
self.log.info("agent %s switches to TERMINATING (early)", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")
stopped += 1

def get_tickets(self, states=None):
"""
Get the list of ticket IDs currently stored in redis,
optionally filtered by list of states.
"""
keys = self.redis.keys(_redis_key_prefixed("*"))
output = []
for key in keys:
if states and self.redis.hget(key, 'state') not in states:
continue
output += [_redis_key_to_ticket_id(key)]
return output

def set_ticket_attribute(self, ticket_id, key, value):
"""
For the ticket_id set redis hash key to value.
"""
hash_id = _redis_key_prefixed(ticket_id)
self.redis.hset(hash_id, str(key), str(value))

def get_ticket_attributes(self, ticket_id, keys):
"""
Return list of redis hash values for the given ticket_id and
list of keys.
"""
hash_id = _redis_key_prefixed(ticket_id)
keys = [str(s) for s in keys]
return self.redis.hmget(hash_id, keys)

def agent_counts(self):
"""
Get 'group_id → count' mapping
"""
keys = self.redis.keys(_redis_key_prefixed("*"))
output = {}
for key in keys:
group_id = self.redis.hget(key, "group_id")
output.setdefault(group_id, 0)
output[group_id] += 1
return output

def agent_drop(self, ticket_id):
""" drop agent per ticket id, cleanup everything """
ticket = self.resalloc.getTicket(ticket_id)
ticket.close()
self.redis.delete(_redis_key_prefixed(ticket_id))

def clean_finished_workers(self):
"""
WORKING — TERMINATING; Check for finalized agents, plan their cleanup
"""
for ticket_id in self.get_tickets("WORKING"):
(group_id,) = self.get_ticket_attributes(ticket_id, ["group_id"])
data = self.get_ticket_data(ticket_id)
if not self.cmd_is_finished(group_id, data):
continue
self.log.info("Agent %s finished task, normal TERMINATING", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")

def start_preparing(self):
"""
When the ticket is resolved in Resalloc, we have a working
resource that we can prepare (in the background). Switch the state.
"""
for ticket_id in self.get_tickets("NEW"):
ticket = self.resalloc.getTicket(ticket_id)
if not ticket.collect():
continue
self.set_ticket_attribute(ticket_id, "data", ticket.output)
self.set_ticket_attribute(ticket_id, "state", "PREPARING")

def detect_failed_tickets(self):
"""
Check for failed tickets, close them. This rarely happens if everything
is working smoothly.
TODO: no need to do this in every single loop?
TODO: call some hook?
"""

def converge(self):
"""
Go through all agent groups; check the ideal number of memgers and start
new or try to terminate.
"""
for group_id, current in self.agent_counts():
ideal = self.cmd_converge_to(group_id)
if ideal is None:
# unable to call the hook
continue

if current < ideal:
todo = ideal - current
self.log.info("We want %s but have %s agents, starting %s new",
ideal, current, todo)
for _ in range(todo):
# spawn as "NEW"
tags = self.opts["agent_groups"][group_id]["tags"]
ticket = self.resalloc.newTicket(tags)
self.log.debug("Requesting new agent via ticket %s",
ticket.id)
self.set_ticket_attribute(ticket.id, "group_id", "NEW")
self.set_ticket_attribute(ticket.id, "state", "NEW")

elif current > ideal:
# WORKING — TERMINATING (even agents that did not start any task)
self.try_to_stop(group_id, current - ideal)

def get_frontend_tasks(self):
"""
NEW → PREPARING → WORKING — TERMINATING — ENDED
"""

# Cleanup after failed tickets
self.detect_failed_tickets()

# Drop successfully terminated agents
for ticket_id in self.get_tickets("ENDED"):
self.agent_drop(ticket_id)

# WORKING — TERMINATING (normal)
self.clean_finished_workers()

# NEW → PREPARING
self.start_preparing()

# spawn NEW or try to switch WORKING → TERMINATING
self.converge()

# Return the priority queue to process by background workers.
background_tasks = []
for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]):
background_tasks += [Task(ticket_id)]
return background_tasks


def main():
""" realloc-agent-spawner entrypoint """
AgentSpawnerDispatcher(get_config()).run()


if __name__ == "__main__":
main()
116 changes: 116 additions & 0 deletions resalloc_agent_spawner/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
resalloc agent spawner helpers
"""

import os
import subprocess

from resalloc.helpers import load_config_file


def _redis_key_to_ticket_id(key):
return int(key.split(":")[1])


def _redis_key_prefixed(ticket):
return f"agent:{ticket}"


def _rel_path(file):
return os.path.join(os.path.dirname(__file__), file)


def get_config():
"""
Load the agent-spawner YAML configuration
"""
conf_dir = os.environ.get("CONFIG_DIR", "/etc/resalloc-agent-spawner")
config_file = os.path.join(conf_dir, "config.yaml")
config = load_config_file(config_file)
config.setdefault("agent_groups", {})
config.setdefault("resalloc_server", "http://localhost:49100")
config.setdefault("logfile", "/tmp/agent-spawner.log")
ag = config["agent_groups"]

Check warning

Code scanning / vcs-diff-lint

get_config: Variable name "ag" doesn't conform to snake_case naming style Warning

get_config: Variable name "ag" doesn't conform to snake_case naming style
for group_id in ag.keys():
ag[group_id].setdefault("cmd_converge_to", "/usr/bin/echo 1")
ag[group_id].setdefault("cmd_check_finished", "/bin/false")
ag[group_id].setdefault("cmd_prepare", "/bin/true")
ag[group_id].setdefault("cmd_terminate", "/bin/true")
ag[group_id].setdefault("cmd_try_release", "/bin/false")
ag[group_id].setdefault("tags", ["please-specify-some-tags"])
return config


class CmdCallerMixin:
"""
Wrapper around calling command hooks.
"""

def _cmd(self, group_id, cmd_id):
return self.opts["agent_groups"][group_id][cmd_id]

def cmd_converge_to(self, group_id):
"""
Query the outside world for the ideal number of agents in given group.
"""
result = subprocess.run(
self._cmd(group_id, "cmd_converge_to"),
stdout=subprocess.PIPE, check=False, shell=True)
if result.returncode == 0:
try:
return int(result.stdout.decode("utf-8").strip())
except ValueError:
self.log.error("Converge-to hook failure, expected int, "
"got: %s", result.stdout)
return None

self.log.debug("Failing to run converge-to hook")
return None

def cmd_try_release(self, group_id, data):
"""
Call hook that releases the resource
"""
cmd = self._cmd(group_id, "cmd_try_release")
result = subprocess.run(cmd, check=False, **self.subproces_kwargs(data))
return not result.returncode

def cmd_is_finished(self, group_id, data):
"""
Call hook that releases the resource
"""
result = subprocess.run(
self._cmd(group_id, "cmd_check_finished"),
check=False, **self.subproces_kwargs(data))
return not result.returncode

def cmd_take(self, group_id, data):
"""
Initialize the agent
"""
return not subprocess.run(
self._cmd(group_id, "cmd_prepare"), check=True,
**self.subproces_kwargs(data),
)

def cmd_terminate(self, group_id, data):
"""
Prepare the agent for removal.
"""
return not subprocess.run(
self._cmd(group_id, "cmd_terminate"),
check=False,
**self.subproces_kwargs(data),
)

def subproces_kwargs(self, data):
"""
generate "generic" subprocess.Popen kwargs
"""
return {
"env": {
"AGENT_SPAWNER_RESOURCE_DATA": str(data),
"PATH": os.environ["PATH"] + ":" + _rel_path("."),
},
"shell": True,
}
Loading

0 comments on commit b51821b

Please sign in to comment.