-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Relates to #123
- Loading branch information
Showing
10 changed files
with
539 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# Configuration for resalloc-agent-spawner.service. YAML format. | ||
# Specify groups of aggents within the `agent_groups` section that agent spawner | ||
# should take care of. | ||
|
||
#agent_groups: | ||
# workers: | ||
# # These are executed in the background async, may take some time. | ||
# cmd_prepare: /bin/true | ||
# cmd_terminate: /bin/true | ||
# | ||
# # keep these super fast to avoid overall system halt! | ||
# cmd_converge_to: echo 1 | ||
# cmd_check_finished: /bin/true | ||
# cmd_try_release: /bin/false | ||
# | ||
# # List of resalloc tags to request in tickets | ||
# tags: | ||
# - kobo_worker | ||
|
||
# Note that we use request_survives_server_restart resalloc client option, | ||
# so the resalloc server must be running to avoid system hang! | ||
#resalloc_server: "http://localhost:49100" | ||
|
||
# Where to log events. | ||
#logfile: /tmp/agent-spawner.log | ||
|
||
# How to connect to redis-db. By default connects to 127.0.0.1:6379. | ||
#redis_db: null | ||
#redis_host: null | ||
#redis_port": null | ||
#redis_password": null |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
""" | ||
Agent spawner daemon. Part of the resalloc project. | ||
""" | ||
|
||
import logging | ||
from copr_common.dispatcher import Dispatcher | ||
from copr_common.worker_manager import WorkerManager, QueueTask | ||
from copr_common.log import setup_script_logger | ||
from copr_common.redis_helpers import get_redis_connection | ||
|
||
from resalloc.client import ( | ||
Connection as ResallocConnection, | ||
) | ||
|
||
from resalloc_agent_spawner.helpers import ( | ||
get_config, | ||
CmdCallerMixin, | ||
rk2tid, | ||
tid2rk, | ||
) | ||
|
||
class Task(QueueTask): | ||
""" priority queue task handed ower to AgentWorkerManager """ | ||
def __init__(self, task_id): | ||
self.task_id = task_id | ||
@property | ||
def id(self): | ||
return "agent_ticket_" + str(self.task_id) | ||
|
||
class AgentWorkerManager(WorkerManager): | ||
""" | ||
Start async worker in the background | ||
""" | ||
|
||
worker_prefix = 'agent_backround_handler' | ||
|
||
def start_task(self, worker_id, task): | ||
self.start_daemon_on_background([ | ||
"resalloc-agent-worker", | ||
'--daemon', | ||
'--ticket-id', str(task.task_id), | ||
'--worker-id', worker_id, | ||
]) | ||
|
||
def finish_task(self, worker_id, task_info): | ||
return True | ||
|
||
|
||
class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin): | ||
""" | ||
Track list of opened tickets from Resalloc (representing Agent-like | ||
resources). Each Agent is in the following states: | ||
1. NEW -> ticket is taken, it is being allocated in the background | ||
2. PREPARING -> script is being run in the background to prepare the agent. | ||
3. WORKING -> up and running agent | ||
4. TERMINATING -> script is being run in the background to cleanup after the | ||
agent. | ||
5. ENDED -> ready for ticket close | ||
PREPARING and TERMINATING states are never changed by Dispatcher, but by | ||
BackgroundWorker. If one background worker fails to switch the state, new | ||
one is started instead of it. | ||
""" | ||
|
||
task_type = 'agent-manager' | ||
worker_manager_class = AgentWorkerManager | ||
|
||
def __init__(self, opts): | ||
super().__init__(opts) | ||
self.sleeptime = 10 | ||
self.log = logging.getLogger() | ||
setup_script_logger(self.log, self.opts["logfile"]) | ||
self.redis = get_redis_connection(opts) | ||
self.resalloc = ResallocConnection( | ||
opts["resalloc_server"], | ||
request_survives_server_restart=True, | ||
) | ||
|
||
def get_ticket_data(self, ticket_id): | ||
""" load resalloc ticket ID data """ | ||
ticket = self.resalloc.getTicket(ticket_id) | ||
if not ticket.collect(): | ||
return None | ||
return ticket.output | ||
|
||
def try_to_stop(self, group_id, to_stop): | ||
""" | ||
Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all | ||
the resources may be closed at this time. | ||
""" | ||
stopped = 0 | ||
for ticket_id in self.get_tickets("WORKING"): | ||
if stopped >= to_stop: | ||
break | ||
|
||
data = self.get_ticket_data(ticket_id) | ||
if not self.cmd_try_release(group_id, data): | ||
self.log.debug("Can't release %s", ticket_id) | ||
continue | ||
self.log.info("agent %s switches to TERMINATING (early)", ticket_id) | ||
self.set_ticket_attribute(ticket_id, "state", "TERMINATING") | ||
stopped += 1 | ||
|
||
def get_tickets(self, states=None): | ||
""" | ||
Get the list of ticket IDs currently stored in redis, | ||
optionally filtered by list of states. | ||
""" | ||
keys = self.redis.keys(tid2rk("*")) | ||
output = [] | ||
for key in keys: | ||
if states and self.redis.hget(key, 'state') not in states: | ||
continue | ||
output += [rk2tid(key)] | ||
return output | ||
|
||
def set_ticket_attribute(self, ticket_id, key, value): | ||
""" | ||
For the ticket_id set redis hash key to value. | ||
""" | ||
hash_id = tid2rk(ticket_id) | ||
self.redis.hset(hash_id, str(key), str(value)) | ||
|
||
def get_ticket_attributes(self, ticket_id, keys): | ||
""" | ||
Return list of redis hash values for the given ticket_id and | ||
list of keys. | ||
""" | ||
hash_id = tid2rk(ticket_id) | ||
keys = [str(s) for s in keys] | ||
return self.redis.hmget(hash_id, keys) | ||
|
||
def agent_counts(self): | ||
""" | ||
Get 'group_id → count' mapping | ||
""" | ||
keys = self.redis.keys(tid2rk("*")) | ||
output = {} | ||
for key in keys: | ||
group_id = self.redis.hget(key, "group_id") | ||
output.setdefault(group_id, 0) | ||
output[group_id] += 1 | ||
return output | ||
|
||
def agent_drop(self, ticket_id): | ||
""" drop agent per ticket id, cleanup everything """ | ||
ticket = self.resalloc.getTicket(ticket_id) | ||
ticket.close() | ||
self.redis.delete(tid2rk(ticket_id)) | ||
|
||
def clean_finished_workers(self): | ||
""" | ||
WORKING — TERMINATING; Check for finalized agents, plan their cleanup | ||
""" | ||
for ticket_id in self.get_tickets("WORKING"): | ||
(group_id,) = self.get_ticket_attributes(ticket_id, ["group_id"]) | ||
data = self.get_ticket_data(ticket_id) | ||
if not self.cmd_is_finished(group_id, data): | ||
continue | ||
self.log.info("Agent %s finished task, normal TERMINATING", ticket_id) | ||
self.set_ticket_attribute(ticket_id, "state", "TERMINATING") | ||
|
||
def start_preparing(self): | ||
""" | ||
When the ticket is resolved in Resalloc, we have a working | ||
resource that we can prepare (in the background). Switch the state. | ||
""" | ||
for ticket_id in self.get_tickets("NEW"): | ||
ticket = self.resalloc.getTicket(ticket_id) | ||
if not ticket.collect(): | ||
continue | ||
self.set_ticket_attribute(ticket_id, "data", ticket.output) | ||
self.set_ticket_attribute(ticket_id, "state", "PREPARING") | ||
|
||
def detect_failed_tickets(self): | ||
""" | ||
Check for failed tickets, close them. This rarely happens if everything | ||
is working smoothly. | ||
TODO: no need to do this in every single loop? | ||
TODO: call some hook? | ||
""" | ||
|
||
def converge(self): | ||
""" | ||
Go through all agent groups; check the ideal number of memgers and start | ||
new or try to terminate. | ||
""" | ||
for group_id, current in self.agent_counts(): | ||
ideal = self.cmd_converge_to(group_id) | ||
if ideal is None: | ||
# unable to call the hook | ||
continue | ||
|
||
if current < ideal: | ||
todo = ideal - current | ||
self.log.info("We want %s but have %s agents, starting %s new", | ||
ideal, current, todo) | ||
for _ in range(todo): | ||
# spawn as "NEW" | ||
tags = self.opts["agent_groups"][group_id]["tags"] | ||
ticket = self.resalloc.newTicket(tags) | ||
self.log.debug("Requesting new agent via ticket %s", | ||
ticket.id) | ||
self.set_ticket_attribute(ticket.id, "group_id", "NEW") | ||
self.set_ticket_attribute(ticket.id, "state", "NEW") | ||
|
||
elif current > ideal: | ||
# WORKING — TERMINATING (even agents that did not start any task) | ||
self.try_to_stop(group_id, current - ideal) | ||
|
||
def get_frontend_tasks(self): | ||
""" | ||
NEW → PREPARING → WORKING — TERMINATING — ENDED | ||
""" | ||
|
||
# Cleanup after failed tickets | ||
self.detect_failed_tickets() | ||
|
||
# Drop successfully terminated agents | ||
for ticket_id in self.get_tickets("ENDED"): | ||
self.agent_drop(ticket_id) | ||
|
||
# WORKING — TERMINATING (normal) | ||
self.clean_finished_workers() | ||
|
||
# NEW → PREPARING | ||
self.start_preparing() | ||
|
||
# spawn NEW or try to switch WORKING → TERMINATING | ||
self.converge() | ||
|
||
# Return the priority queue to process by background workers. | ||
background_tasks = [] | ||
for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]): | ||
background_tasks += [Task(ticket_id)] | ||
return background_tasks | ||
|
||
|
||
def main(): | ||
""" realloc-agent-spawner entrypoint """ | ||
AgentSpawnerDispatcher(get_config()).run() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.