Skip to content

Commit

Permalink
PoC: agent spawner
Browse files Browse the repository at this point in the history
Relates to #123
  • Loading branch information
praiskup committed Oct 24, 2023
1 parent 7af565a commit a264e58
Show file tree
Hide file tree
Showing 10 changed files with 527 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-diff-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
uses: fedora-copr/vcs-diff-lint-action@v1
with:
linter_tags: pylint
install_rpm_packages: python3-copr-common

- name: Upload artifact with detected defects in SARIF format
uses: actions/upload-artifact@v3
Expand Down
31 changes: 31 additions & 0 deletions config/agent-spawner/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Configuration for resalloc-agent-spawner.service. YAML format.
# Specify groups of aggents within the `agent_groups` section that agent spawner
# should take care of.

#agent_groups:
# workers:
# # These are executed in the background async, may take some time.
# cmd_prepare: /bin/true
# cmd_terminate: /bin/true
#
# # keep these super fast to avoid overall system halt!
# cmd_converge_to: echo 1
# cmd_check_finished: /bin/true
# cmd_try_release: /bin/false
#
# # List of resalloc tags to request in tickets
# tags:
# - kobo_worker

# Note that we use request_survives_server_restart resalloc client option,
# so the resalloc server must be running to avoid system hang!
#resalloc_server: "http://localhost:49100"

# Where to log events.
#logfile: /tmp/agent-spawner.log

# How to connect to redis-db. By default connects to 127.0.0.1:6379.
#redis_db: null
#redis_host: null
#redis_port": null
#redis_password": null
Empty file.
247 changes: 247 additions & 0 deletions resalloc_agent_spawner/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""
Agent spawner daemon. Part of the resalloc project.
"""

import logging
from copr_common.dispatcher import Dispatcher
from copr_common.worker_manager import WorkerManager, QueueTask
from copr_common.log import setup_script_logger
from copr_common.redis_helpers import get_redis_connection

from resalloc.client import (
Connection as ResallocConnection,
)

from resalloc_agent_spawner.helpers import (
get_config,
CmdCallerMixin,
rk2tid,
tid2rk,
)

class Task(QueueTask):
""" priority queue task handed ower to AgentWorkerManager """
def __init__(self, task_id):
self.task_id = task_id
@property
def id(self):

Check warning

Code scanning / vcs-diff-lint

Task.id: Missing function or method docstring

Task.id: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

Task.id: Attribute name "id" doesn't conform to snake_case naming style

Task.id: Attribute name "id" doesn't conform to snake_case naming style
return "agent_ticket_" + str(self.task_id)

class AgentWorkerManager(WorkerManager):
"""
Start async worker in the background
"""

worker_prefix = 'agent_backround_handler'

def start_task(self, worker_id, task):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.start_task: Missing function or method docstring

AgentWorkerManager.start_task: Missing function or method docstring
self.start_daemon_on_background([
"resalloc-agent-worker",
'--daemon',
'--ticket-id', str(task.task_id),
'--worker-id', worker_id,
])

def finish_task(self, worker_id, task_info):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Missing function or method docstring

AgentWorkerManager.finish_task: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'worker_id'

AgentWorkerManager.finish_task: Unused argument 'worker_id'

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'task_info'

AgentWorkerManager.finish_task: Unused argument 'task_info'
return True


class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin):
"""
Track list of opened tickets from Resalloc (representing Agent-like
resources). Each Agent is in the following states:
1. NEW -> ticket is taken, it is being allocated in the background
2. PREPARING -> script is being run in the background to prepare the agent.
3. WORKING -> up and running agent
4. TERMINATING -> script is being run in the background to cleanup after the
agent.
5. ENDED -> ready for ticket close
PREPARING and TERMINATING states are never changed by Dispatcher, but by
BackgroundWorker. If one background worker fails to switch the state, new
one is started instead of it.
"""

task_type = 'agent-manager'
worker_manager_class = AgentWorkerManager

def __init__(self, opts):
super().__init__(opts)
self.sleeptime = 10
self.log = logging.getLogger()
setup_script_logger(self.log, self.opts["logfile"])
self.redis = get_redis_connection(opts)
self.resalloc = ResallocConnection(
opts["resalloc_server"],
request_survives_server_restart=True,
)

def get_ticket_data(self, ticket_id):
""" load resalloc ticket ID data """
ticket = self.resalloc.getTicket(ticket_id)
if not ticket.collect():
return None
return ticket.output

def try_to_stop(self, group_id, to_stop):
"""
Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all
the resources may be closed at this time.
"""
stopped = 0
for ticket_id in self.get_tickets("WORKING"):
if stopped >= to_stop:
break

data = self.get_ticket_data(ticket_id)
if not self.cmd_try_release(group_id, data):
self.log.debug("Can't release %s", ticket_id)
continue
self.log.info("agent %s switches to TERMINATING (early)", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")
stopped += 1

def get_tickets(self, states=None):
"""
Get the list of ticket IDs currently stored in redis,
optionally filtered by list of states.
"""
keys = self.redis.keys(tid2rk("*"))
output = []
for key in keys:
if states and self.redis.hget(key, 'state') not in states:
continue
output += [rk2tid(key)]
return output

def set_ticket_attribute(self, ticket_id, key, value):
"""
For the ticket_id set redis hash key to value.
"""
hash_id = tid2rk(ticket_id)
self.redis.hset(hash_id, str(key), str(value))

def get_ticket_attributes(self, ticket_id, keys):
"""
Return list of redis hash values for the given ticket_id and
list of keys.
"""
hash_id = tid2rk(ticket_id)
keys = [str(s) for s in keys]
return self.redis.hmget(hash_id, keys)

def agent_counts(self):
"""
Get 'group_id → count' mapping
"""
keys = self.redis.keys(tid2rk("*"))
output = {}
for key in keys:
group_id = self.redis.hget(key, "group_id")
output.setdefault(group_id, 0)
output[group_id] += 1
return output

def agent_drop(self, ticket_id):
""" drop agent per ticket id, cleanup everything """
ticket = self.resalloc.getTicket(ticket_id)
ticket.close()
self.redis.delete(tid2rk(ticket_id))

def clean_finished_workers(self):
"""
WORKING — TERMINATING; Check for finalized agents, plan their cleanup
"""
for ticket_id in self.get_tickets("WORKING"):
(group_id,) = self.get_ticket_attributes(ticket_id, ["group_id"])
data = self.get_ticket_data(ticket_id)
if not self.cmd_is_finished(group_id, data):
continue
self.log.info("Agent %s finished task, normal TERMINATING", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")

def start_preparing(self):
"""
When the ticket is resolved in Resalloc, we have a working
resource that we can prepare (in the background). Switch the state.
"""
for ticket_id in self.get_tickets("NEW"):
ticket = self.resalloc.getTicket(ticket_id)
if not ticket.collect():
continue
self.set_ticket_attribute(ticket_id, "data", ticket.output)
self.set_ticket_attribute(ticket_id, "state", "PREPARING")

def detect_failed_tickets(self):
"""
Check for failed tickets, close them. This rarely happens if everything
is working smoothly.
TODO: no need to do this in every single loop?
TODO: call some hook?
"""

def converge(self):
"""
Go through all agent groups; check the ideal number of memgers and start
new or try to terminate.
"""
for group_id, current in self.agent_counts():
ideal = self.cmd_converge_to(group_id)
if ideal is None:
# unable to call the hook
continue

if current < ideal:
todo = ideal - current
self.log.info("We want %s but have %s agents, starting %s new",
ideal, current, todo)
for _ in range(todo):
# spawn as "NEW"
tags = self.opts["agent_groups"][group_id]["tags"]
ticket = self.resalloc.newTicket(tags)
self.log.debug("Requesting new agent via ticket %s",
ticket.id)
self.set_ticket_attribute(ticket.id, "group_id", "NEW")
self.set_ticket_attribute(ticket.id, "state", "NEW")

elif current > ideal:
# WORKING — TERMINATING (even agents that did not start any task)
self.try_to_stop(group_id, current - ideal)

def get_frontend_tasks(self):
"""
NEW → PREPARING → WORKING — TERMINATING — ENDED
"""

# Cleanup after failed tickets
self.detect_failed_tickets()

# Drop successfully terminated agents
for ticket_id in self.get_tickets("ENDED"):
self.agent_drop(ticket_id)

# WORKING — TERMINATING (normal)
self.clean_finished_workers()

# NEW → PREPARING
self.start_preparing()

# spawn NEW or try to switch WORKING → TERMINATING
self.converge()

# Return the priority queue to process by background workers.
background_tasks = []
for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]):
background_tasks += [Task(ticket_id)]
return background_tasks


def main():
""" realloc-agent-spawner entrypoint """
AgentSpawnerDispatcher(get_config()).run()


if __name__ == "__main__":
main()
Loading

0 comments on commit a264e58

Please sign in to comment.