-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
""" | ||
Agent spawner daemon. Part of the resalloc project. | ||
""" | ||
|
||
import logging | ||
import subprocess | ||
from copr_common.dispatcher import Dispatcher | ||
from copr_common.worker_manager import WorkerManager | ||
from copr_common.log import setup_script_logger | ||
from copr_common.redis_helpers import get_redis_connection | ||
|
||
from resalloc.client import ( | ||
Check warning Code scanning / vcs-diff-lint Unused Ticket imported from resalloc.client Warning
Unused Ticket imported from resalloc.client
|
||
Connection as ResallocConnection, | ||
Ticket, | ||
) | ||
|
||
RESALLOC_SERVER = "http://localhost:49100" | ||
|
||
def _redis_key_to_ticket_id(key): | ||
return int(key.split(":")[1]) | ||
|
||
def _ticket_id_to_redis_key(ticket): | ||
Check warning Code scanning / vcs-diff-lint _ticket_id_to_redis_key: Unused argument 'ticket' Warning
_ticket_id_to_redis_key: Unused argument 'ticket'
|
||
return "agent-spawner:{ticket}" | ||
|
||
|
||
class AgentWorkerManager(WorkerManager): | ||
Check warning Code scanning / vcs-diff-lint AgentWorkerManager: Missing class docstring Warning
AgentWorkerManager: Missing class docstring
|
||
|
||
worker_prefix = 'agent' | ||
|
||
def start_task(self, worker_id, task): | ||
Check warning Code scanning / vcs-diff-lint AgentWorkerManager.start_task: Missing function or method docstring Warning
AgentWorkerManager.start_task: Missing function or method docstring
|
||
self.start_daemon_on_background([ | ||
'./handle-ticket-on-background.py', | ||
'--daemon', | ||
'--ticket-id', repr(task), | ||
'--worker-id', worker_id, | ||
]) | ||
|
||
def finish_task(self, worker_id, task_info): | ||
Check warning Code scanning / vcs-diff-lint AgentWorkerManager.finish_task: Missing function or method docstring Warning
AgentWorkerManager.finish_task: Missing function or method docstring
Check warning Code scanning / vcs-diff-lint AgentWorkerManager.finish_task: Unused argument 'worker_id' Warning
AgentWorkerManager.finish_task: Unused argument 'worker_id'
Check warning Code scanning / vcs-diff-lint AgentWorkerManager.finish_task: Unused argument 'task_info' Warning
AgentWorkerManager.finish_task: Unused argument 'task_info'
|
||
return True | ||
|
||
|
||
class AgentSpawnerDispatcher(Dispatcher): | ||
""" | ||
Track list of opened tickets from Resalloc (representing Agent-like | ||
resources). Each Agent is in the following states: | ||
1. NEW -> ticket is taken, it is being allocated in the background | ||
2. PREPARING -> script is being run in the background to prepare the agent. | ||
3. WORKING -> up and running agent | ||
4. TERMINATING -> script is being run in the background to cleanup after the | ||
agent. | ||
5. ENDED -> ready for ticket close | ||
PREPARING and TERMINATING states are never changed by Dispatcher, but by | ||
BackgroundWorker. If one background worker fails to switch the state, new | ||
one is started instead of it. | ||
""" | ||
|
||
task_type = 'agent-manager' | ||
worker_manager_class = AgentWorkerManager | ||
|
||
tags = ["A"] | ||
|
||
|
||
def __init__(self, opts): | ||
super().__init__(opts) | ||
self.sleeptime = 10 | ||
self.log = logging.getLogger() | ||
setup_script_logger(self.log, "/tmp/agent-spawner.log") | ||
self.redis = get_redis_connection(opts) | ||
self.resalloc = ResallocConnection(RESALLOC_SERVER, | ||
request_survives_server_restart=True) | ||
|
||
def call_converge_to(self): | ||
""" Execute the configured hook script """ | ||
while True: | ||
result = subprocess.run(["./hook-converge-to"], capture_output=True, | ||
check=False) | ||
if result.returncode == 0: | ||
try: | ||
return int(result.stdout.decode("utf-8").strip()) | ||
except ValueError: | ||
pass | ||
|
||
self.log.debug("Failing to run converge-to hook") | ||
|
||
def call_try_release(self, data): | ||
""" | ||
Call hook that releases the resource | ||
""" | ||
result = subprocess.run(["./hook-release", f"{data}"], check=False) | ||
return not result.returncode | ||
|
||
def call_is_finished(self, data): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.call_is_finished: Unused argument 'data' Warning
AgentSpawnerDispatcher.call_is_finished: Unused argument 'data'
|
||
""" | ||
Call hook that releases the resource | ||
""" | ||
result = subprocess.run(["./hook-is-finished"], capture_output=True, | ||
check=False) | ||
return not result.returncode | ||
|
||
def get_ticket_data(self, ticket_id): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.get_ticket_data: Missing function or method docstring Warning
AgentSpawnerDispatcher.get_ticket_data: Missing function or method docstring
|
||
ticket = self.resalloc.getTicket(ticket_id) | ||
data = ticket.collect() | ||
if not data["ready"]: | ||
return None | ||
return not data["output"] | ||
|
||
def try_to_stop(self, to_stop): | ||
""" | ||
Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all | ||
the resources may be closed at this time. | ||
""" | ||
self.log.info("Trying to stop %s resources", to_stop) | ||
stopped = 0 | ||
for ticket_id in self.get_tickets("WORKING"): | ||
if stopped >= to_stop: | ||
break | ||
|
||
data = self.get_ticket_data(ticket_id) | ||
if not self.call_try_release(data): | ||
self.log.debug("Can't release %s", ticket_id) | ||
continue | ||
|
||
self.set_ticket_attribute(ticket_id, "state", "TERMINATING") | ||
stopped += 1 | ||
|
||
def get_tickets(self, states=None): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.get_tickets: Missing function or method docstring Warning
AgentSpawnerDispatcher.get_tickets: Missing function or method docstring
|
||
keys = self.redis.keys('agent-spawner:*') | ||
output = [] | ||
for key in keys: | ||
if states and self.redis.hget(key, 'state') not in states: | ||
continue | ||
output += [_redis_key_to_ticket_id(key)] | ||
return output | ||
|
||
def set_ticket_attribute(self, ticket_id, key, value): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.set_ticket_attribute: Missing function or method docstring Warning
AgentSpawnerDispatcher.set_ticket_attribute: Missing function or method docstring
|
||
hash_id = _ticket_id_to_redis_key(ticket_id) | ||
self.redis.hset(hash_id, key, value) | ||
|
||
def agent_count(self): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.agent_count: Missing function or method docstring Warning
AgentSpawnerDispatcher.agent_count: Missing function or method docstring
|
||
return len(self.get_tickets()) | ||
|
||
def agent_drop(self, ticket_id): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.agent_drop: Missing function or method docstring Warning
AgentSpawnerDispatcher.agent_drop: Missing function or method docstring
|
||
ticket = self.resalloc.getTicket(ticket_id) | ||
ticket.close() | ||
self.redis.delete(_ticket_id_to_redis_key(ticket_id)) | ||
|
||
def clean_finished_workers(self): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.clean_finished_workers: Missing function or method docstring Warning
AgentSpawnerDispatcher.clean_finished_workers: Missing function or method docstring
|
||
for ticket_id in self.get_tickets("WORKING"): | ||
data = self.get_ticket_data(ticket_id) | ||
if not self.call_is_finished(data): # TODO: move async | ||
Check warning Code scanning / vcs-diff-lint TODO: move async Warning
TODO: move async
|
||
continue | ||
self.set_ticket_attribute(ticket_id, "state", "TERMINATING") | ||
|
||
def start_preparing(self): | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.start_preparing: Missing function or method docstring Warning
AgentSpawnerDispatcher.start_preparing: Missing function or method docstring
|
||
for ticket_id in self.get_tickets("NEW"): | ||
ticket = self.resalloc.getTicket(ticket_id) | ||
data = ticket.collect() | ||
if not data["ready"]: | ||
continue | ||
|
||
output = data["output"] | ||
self.set_ticket_attribute(ticket_id, "data", output) | ||
self.set_ticket_attribute(ticket_id, "state", "PREPARING") | ||
|
||
def get_frontend_tasks(self): | ||
""" | ||
1. Check for failed tickets, close them. This rarely happens if | ||
everything is working smoothly. | ||
2. Check finalized agents, plan their cleanup. | ||
2. Find out how many Agents we should have started right now. | ||
3. a. either request Agents (take new tickets). | ||
b. or ask if some workers can be terminated early, perhaps request | ||
cleanup. | ||
c. or do nothing. | ||
4. Check resolved tickets, plan preparation. | ||
""" | ||
|
||
# TODO: no need to do this in every single loop | ||
Check warning Code scanning / vcs-diff-lint TODO: no need to do this in every single loop Warning
TODO: no need to do this in every single loop
|
||
# TODO: call some hook? | ||
Check warning Code scanning / vcs-diff-lint TODO: call some hook? Warning
TODO: call some hook?
|
||
#for ticket_id in self.get_tickets(): | ||
# ticket = self.resalloc.getTicket(ticket_id) | ||
# output = ticket.collect() | ||
|
||
output = { | ||
Check warning Code scanning / vcs-diff-lint AgentSpawnerDispatcher.get_frontend_tasks: Unused variable 'output' Warning
AgentSpawnerDispatcher.get_frontend_tasks: Unused variable 'output'
|
||
'ready': False, | ||
'output': None, | ||
'closed': None | ||
} | ||
|
||
# NEW → PREPARING → WORKING — TERMINATING — ENDED | ||
|
||
# Trow away ENDED machines. | ||
for ticket_id in self.get_tickets("ENDED"): | ||
self.agent_drop(ticket_id) | ||
|
||
# WORKING — TERMINATING (normal) | ||
self.clean_finished_workers() | ||
|
||
# NEW → PREPARING | ||
self.start_preparing() | ||
|
||
current = self.agent_count() | ||
ideal = self.call_converge_to() | ||
if current < ideal: | ||
# spawn as "NEW" | ||
ticket = self.resalloc.newTicket(self.tags) | ||
self.log.debug("Requesting new agent via ticket %s", | ||
ticket.id) | ||
self.set_ticket_attribute(ticket.id, "state", "NEW") | ||
|
||
elif current > ideal: | ||
# WORKING — TERMINATING (even agents that did not start any task) | ||
# TODO: move async | ||
Check warning Code scanning / vcs-diff-lint TODO: move async Warning
TODO: move async
|
||
self.try_to_stop(current - ideal) | ||
|
||
background_tasks = [] | ||
for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]): | ||
background_tasks += [ticket_id] | ||
|
||
return [] | ||
|
||
def _main(): | ||
AgentSpawnerDispatcher({}).run() | ||
|
||
|
||
if __name__ == "__main__": | ||
_main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from copr_common.background_worker import BackgroundWorker | ||
Check warning Code scanning / vcs-diff-lint Missing module docstring Warning
Missing module docstring
Check warning Code scanning / vcs-diff-lint Module name "handle-ticket-on-background" doesn't conform to snake_case naming style Warning
Module name "handle-ticket-on-background" doesn't conform to snake_case naming style
Check warning Code scanning / vcs-diff-lint Similar lines in 2 files ==daemon:[33:48] ==dispatcher:[75:90] while True: result = subprocess.run(["./hook-converge-to"], capture_output=True, check=False) if result.returncode == 0: try: return int(result.stdout.decode("utf-8").strip()) except ValueError: pass Warning
Similar lines in 2 files
==daemon:[33:48] ==dispatcher:[75:90] while True: result = subprocess.run(["./hook-converge-to"], capture_output=True, check=False) if result.returncode == 0: try: return int(result.stdout.decode("utf-8").strip()) except ValueError: pass |
||
from copr_common.redis_helpers import get_redis_connection | ||
Check warning Code scanning / vcs-diff-lint Unused get_redis_connection imported from copr_common.redis_helpers Warning
Unused get_redis_connection imported from copr_common.redis_helpers
|
||
|
||
def _ticket_id_to_redis_key(ticket): | ||
Check warning Code scanning / vcs-diff-lint _ticket_id_to_redis_key: Unused argument 'ticket' Warning
_ticket_id_to_redis_key: Unused argument 'ticket'
|
||
return "agent-spawner:{ticket}" | ||
|
||
class AgentHandler(BackgroundWorker): | ||
Check warning Code scanning / vcs-diff-lint AgentHandler: Missing class docstring Warning
AgentHandler: Missing class docstring
|
||
|
||
def __init__(self): | ||
super().__init__() | ||
self.opts = {} | ||
|
||
@classmethod | ||
def adjust_arg_parser(cls, parser): | ||
Check warning Code scanning / vcs-diff-lint AgentHandler.adjust_arg_parser: Missing function or method docstring Warning
AgentHandler.adjust_arg_parser: Missing function or method docstring
|
||
parser.add_argument( | ||
"--ticket-id", | ||
type=int, | ||
required=True, | ||
help="ticket ID to handle", | ||
) | ||
|
||
def handle_ticket(self, ticket_id): | ||
""" | ||
Import a single task | ||
""" | ||
redis_key = _ticket_id_to_redis_key(ticket_id) | ||
data = self._redis.hgetall(redis_key) | ||
if data["state"] == "PREPARING": | ||
self._redis.hset(redis_key, "state", "WORKING") | ||
return | ||
|
||
if data["state"] == "TERMINATING": | ||
self._redis.hset(redis_key, "state", "ENDED") | ||
|
||
def handle_task(self): | ||
Check warning Code scanning / vcs-diff-lint AgentHandler.handle_task: Missing function or method docstring Warning
AgentHandler.handle_task: Missing function or method docstring
|
||
try: | ||
self.handle_ticket(self.args.ticket_id) | ||
finally: | ||
self.redis_set_worker_flag("status", "done") | ||
|
||
|
||
if __name__ == "__main__": | ||
worker = AgentHandler() | ||
worker.process() |