Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
praiskup committed Oct 11, 2023
1 parent e1a3a42 commit 167d907
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 53 deletions.
Empty file added agentspawner/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions agentspawner/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
cmd_converge_to: hook-converge-to
cmd_check_finished: hook-probability "agent already finished the task" 5
cmd_prepare: hook-take
cmd_try_release: hook-probability "agent can be removed early" 25
88 changes: 48 additions & 40 deletions agentspawner/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import subprocess
from copr_common.dispatcher import Dispatcher
from copr_common.worker_manager import WorkerManager
from copr_common.worker_manager import WorkerManager, QueueTask
from copr_common.log import setup_script_logger
from copr_common.redis_helpers import get_redis_connection

Expand All @@ -14,32 +14,40 @@
Ticket,
)

RESALLOC_SERVER = "http://localhost:49100"

def _redis_key_to_ticket_id(key):
return int(key.split(":")[1])
from agentspawner.helpers import (
_redis_key_to_ticket_id,
_redis_key_prefixed,
_rel_path,
get_config,
CmdCallerMixin,
)

def _ticket_id_to_redis_key(ticket):
return "agent-spawner:{ticket}"
RESALLOC_SERVER = "http://localhost:49100"

class Task(QueueTask):

Check warning

Code scanning / vcs-diff-lint

Task: Missing class docstring Warning

Task: Missing class docstring
def __init__(self, task_id):
self.task_id = task_id
@property
def id(self):

Check warning

Code scanning / vcs-diff-lint

Task.id: Missing function or method docstring Warning

Task.id: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

Task.id: Attribute name "id" doesn't conform to snake_case naming style Warning

Task.id: Attribute name "id" doesn't conform to snake_case naming style
return "agent_ticket_" + str(self.task_id)

class AgentWorkerManager(WorkerManager):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager: Missing class docstring Warning

AgentWorkerManager: Missing class docstring

worker_prefix = 'agent'
worker_prefix = 'agent_backround_handler'

def start_task(self, worker_id, task):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.start_task: Missing function or method docstring Warning

AgentWorkerManager.start_task: Missing function or method docstring
self.start_daemon_on_background([
'./handle-ticket-on-background.py',
_rel_path('./handle-ticket-on-background.py'),
'--daemon',
'--ticket-id', repr(task),
'--ticket-id', str(task.task_id),
'--worker-id', worker_id,
])

def finish_task(self, worker_id, task_info):

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Missing function or method docstring Warning

AgentWorkerManager.finish_task: Missing function or method docstring

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'worker_id' Warning

AgentWorkerManager.finish_task: Unused argument 'worker_id'

Check warning

Code scanning / vcs-diff-lint

AgentWorkerManager.finish_task: Unused argument 'task_info' Warning

AgentWorkerManager.finish_task: Unused argument 'task_info'
return True


class AgentSpawnerDispatcher(Dispatcher):
class AgentSpawnerDispatcher(Dispatcher, CmdCallerMixin):
"""
Track list of opened tickets from Resalloc (representing Agent-like
resources). Each Agent is in the following states:
Expand All @@ -61,7 +69,6 @@ class AgentSpawnerDispatcher(Dispatcher):

tags = ["A"]


def __init__(self, opts):
super().__init__(opts)
self.sleeptime = 10
Expand All @@ -74,8 +81,9 @@ def __init__(self, opts):
def call_converge_to(self):
""" Execute the configured hook script """
while True:
result = subprocess.run(["./hook-converge-to"], capture_output=True,
check=False)
result = subprocess.run(self.command("cmd_converge_to"),
capture_output=True,
check=False, shell=True)
if result.returncode == 0:
try:
return int(result.stdout.decode("utf-8").strip())
Expand All @@ -88,30 +96,29 @@ def call_try_release(self, data):
"""
Call hook that releases the resource
"""
result = subprocess.run(["./hook-release", f"{data}"], check=False)
cmd = self.command("cmd_try_release")
result = subprocess.run(cmd, check=False, **self.subproces_kwargs(data))
return not result.returncode

def call_is_finished(self, data):
"""
Call hook that releases the resource
"""
result = subprocess.run(["./hook-is-finished"], capture_output=True,
check=False)
result = subprocess.run(self.command("cmd_check_finished"), check=False,
**self.subproces_kwargs(data))
return not result.returncode

def get_ticket_data(self, ticket_id):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.get_ticket_data: Missing function or method docstring Warning

AgentSpawnerDispatcher.get_ticket_data: Missing function or method docstring
ticket = self.resalloc.getTicket(ticket_id)
data = ticket.collect()
if not data["ready"]:
if not ticket.collect():
return None
return not data["output"]
return ticket.output

def try_to_stop(self, to_stop):
"""
Attempt to stop TO_STOP resources by closing Resalloc tickets. Not all
the resources may be closed at this time.
"""
self.log.info("Trying to stop %s resources", to_stop)
stopped = 0
for ticket_id in self.get_tickets("WORKING"):
if stopped >= to_stop:
Expand All @@ -121,12 +128,12 @@ def try_to_stop(self, to_stop):
if not self.call_try_release(data):
self.log.debug("Can't release %s", ticket_id)
continue

self.log.info("agent %s switches to TERMINATING (early)", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")
stopped += 1

def get_tickets(self, states=None):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.get_tickets: Missing function or method docstring Warning

AgentSpawnerDispatcher.get_tickets: Missing function or method docstring
keys = self.redis.keys('agent-spawner:*')
keys = self.redis.keys(_redis_key_prefixed("*"))
output = []
for key in keys:
if states and self.redis.hget(key, 'state') not in states:
Expand All @@ -135,33 +142,31 @@ def get_tickets(self, states=None):
return output

def set_ticket_attribute(self, ticket_id, key, value):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.set_ticket_attribute: Missing function or method docstring Warning

AgentSpawnerDispatcher.set_ticket_attribute: Missing function or method docstring
hash_id = _ticket_id_to_redis_key(ticket_id)
self.redis.hset(hash_id, key, value)
hash_id = _redis_key_prefixed(ticket_id)
self.redis.hset(hash_id, str(key), str(value))

def agent_count(self):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.agent_count: Missing function or method docstring Warning

AgentSpawnerDispatcher.agent_count: Missing function or method docstring
return len(self.get_tickets())

def agent_drop(self, ticket_id):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.agent_drop: Missing function or method docstring Warning

AgentSpawnerDispatcher.agent_drop: Missing function or method docstring
ticket = self.resalloc.getTicket(ticket_id)
ticket.close()
self.redis.delete(_ticket_id_to_redis_key(ticket_id))
self.redis.delete(_redis_key_prefixed(ticket_id))

def clean_finished_workers(self):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.clean_finished_workers: Missing function or method docstring Warning

AgentSpawnerDispatcher.clean_finished_workers: Missing function or method docstring
for ticket_id in self.get_tickets("WORKING"):
data = self.get_ticket_data(ticket_id)
if not self.call_is_finished(data): # TODO: move async

Check warning

Code scanning / vcs-diff-lint

TODO: move async Warning

TODO: move async
continue
self.log.info("Agent %s finished task, normal TERMINATING", ticket_id)
self.set_ticket_attribute(ticket_id, "state", "TERMINATING")

def start_preparing(self):

Check warning

Code scanning / vcs-diff-lint

AgentSpawnerDispatcher.start_preparing: Missing function or method docstring Warning

AgentSpawnerDispatcher.start_preparing: Missing function or method docstring
for ticket_id in self.get_tickets("NEW"):
ticket = self.resalloc.getTicket(ticket_id)
data = ticket.collect()
if not data["ready"]:
if not ticket.collect():
continue

output = data["output"]
self.set_ticket_attribute(ticket_id, "data", output)
self.set_ticket_attribute(ticket_id, "data", ticket.output)
self.set_ticket_attribute(ticket_id, "state", "PREPARING")

def get_frontend_tasks(self):
Expand Down Expand Up @@ -204,11 +209,15 @@ def get_frontend_tasks(self):
current = self.agent_count()
ideal = self.call_converge_to()
if current < ideal:
# spawn as "NEW"
ticket = self.resalloc.newTicket(self.tags)
self.log.debug("Requesting new agent via ticket %s",
ticket.id)
self.set_ticket_attribute(ticket.id, "state", "NEW")
todo = ideal - current
self.log.info("We want %s but have %s agents, starting %s new",
ideal, current, todo)
for _ in range(todo):
# spawn as "NEW"
ticket = self.resalloc.newTicket(self.tags)
self.log.debug("Requesting new agent via ticket %s",
ticket.id)
self.set_ticket_attribute(ticket.id, "state", "NEW")

elif current > ideal:
# WORKING — TERMINATING (even agents that did not start any task)
Expand All @@ -217,12 +226,11 @@ def get_frontend_tasks(self):

background_tasks = []
for ticket_id in self.get_tickets(["PREPARING", "TERMINATING"]):
background_tasks += [ticket_id]

return []
background_tasks += [Task(ticket_id)]
return background_tasks

def _main():
AgentSpawnerDispatcher({}).run()
AgentSpawnerDispatcher(get_config()).run()


if __name__ == "__main__":
Expand Down
29 changes: 21 additions & 8 deletions agentspawner/handle-ticket-on-background.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
#! /bin/env python3

Check warning

Code scanning / vcs-diff-lint

Missing module docstring Warning

Missing module docstring

Check warning

Code scanning / vcs-diff-lint

Module name "handle-ticket-on-background" doesn't conform to snake_case naming style Warning

Module name "handle-ticket-on-background" doesn't conform to snake_case naming style

import subprocess

from copr_common.background_worker import BackgroundWorker
from copr_common.redis_helpers import get_redis_connection

Check warning

Code scanning / vcs-diff-lint

Unused get_redis_connection imported from copr_common.redis_helpers Warning

Unused get_redis_connection imported from copr_common.redis_helpers

def _ticket_id_to_redis_key(ticket):
return "agent-spawner:{ticket}"
from agentspawner.helpers import _redis_key_prefixed, _rel_path, get_config, CmdCallerMixin

Check warning

Code scanning / vcs-diff-lint

Unused _rel_path imported from agentspawner.helpers Warning

Unused _rel_path imported from agentspawner.helpers

class AgentHandler(BackgroundWorker):
class AgentHandler(BackgroundWorker, CmdCallerMixin):

Check warning

Code scanning / vcs-diff-lint

AgentHandler: Missing class docstring Warning

AgentHandler: Missing class docstring

def __init__(self):
super().__init__()
self.opts = {}
self.opts = get_config()

def call_take(self, data):
"""
Call hook that prepares the resource
"""
return not subprocess.run(

Check warning

Code scanning / vcs-diff-lint

AgentHandler.call_take: Got multiple values for keyword argument 'shell' in function call Warning

AgentHandler.call_take: Got multiple values for keyword argument 'shell' in function call
self.command("cmd_prepare"), check=True, shell=True,
**self.subproces_kwargs(data),
)

@classmethod
def adjust_arg_parser(cls, parser):

Check warning

Code scanning / vcs-diff-lint

AgentHandler.adjust_arg_parser: Missing function or method docstring Warning

AgentHandler.adjust_arg_parser: Missing function or method docstring
Expand All @@ -23,13 +35,14 @@ def handle_ticket(self, ticket_id):
"""
Import a single task
"""
redis_key = _ticket_id_to_redis_key(ticket_id)
data = self._redis.hgetall(redis_key)
if data["state"] == "PREPARING":
redis_key = _redis_key_prefixed(ticket_id)
redis_dict = self._redis.hgetall(redis_key)
if redis_dict["state"] == "PREPARING":
self._redis.hset(redis_key, "state", "WORKING")
self.call_take(redis_dict["data"])
return

if data["state"] == "TERMINATING":
if redis_dict["state"] == "TERMINATING":
self._redis.hset(redis_key, "state", "ENDED")

def handle_task(self):

Check warning

Code scanning / vcs-diff-lint

AgentHandler.handle_task: Missing function or method docstring Warning

AgentHandler.handle_task: Missing function or method docstring
Expand Down
32 changes: 32 additions & 0 deletions agentspawner/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os

Check warning

Code scanning / vcs-diff-lint

Missing module docstring Warning

Missing module docstring

from resalloc.helpers import load_config_file

def _redis_key_to_ticket_id(key):
return int(key.split(":")[1])

def _redis_key_prefixed(ticket):
return f"agent:{ticket}"

def _rel_path(file):
return os.path.join(os.path.dirname(__file__), file)

def get_config():

Check warning

Code scanning / vcs-diff-lint

get_config: Missing function or method docstring Warning

get_config: Missing function or method docstring
config_file = _rel_path("config.yaml")
return load_config_file(config_file)

class CmdCallerMixin:

Check warning

Code scanning / vcs-diff-lint

CmdCallerMixin: Missing class docstring Warning

CmdCallerMixin: Missing class docstring
# def hook_file(self, config_name):
# return _rel_path(self.opts[config_name])

def command(self, config_name):

Check warning

Code scanning / vcs-diff-lint

CmdCallerMixin.command: Missing function or method docstring Warning

CmdCallerMixin.command: Missing function or method docstring
return self.opts[config_name]

def subproces_kwargs(self, data):

Check warning

Code scanning / vcs-diff-lint

CmdCallerMixin.subproces_kwargs: Missing function or method docstring Warning

CmdCallerMixin.subproces_kwargs: Missing function or method docstring
return {
"env": {
"AGENT_SPAWNER_RESOURCE_DATA": str(data),
"PATH": os.environ["PATH"] + ":" + _rel_path("."),
},
"shell": True,
}
13 changes: 13 additions & 0 deletions agentspawner/hook-probability
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#! /bin/bash

id=$(echo "$AGENT_SPAWNER_RESOURCE_DATA" | grep RESALLOC_ID=)
echo -n "Checking if '$1' ($id) (probability of success $2%) => "

if test 1 -eq $(( RANDOM % 100 < $2 )); then
echo "Success!"
exit 0
fi
echo No.
exit 1


9 changes: 7 additions & 2 deletions agentspawner/hook-release
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
#! /bin/bash

eval 'set -- $1' # strip
echo "Releasing with resalloc ticket data: $1"
echo -n "Checking if agent (resource) $1 can be removed: "

# ~33% chance of closing this one
test $(( RANDOM % 3 )) -eq 0 && exit 0
test $(( RANDOM % 3 )) -eq 0 && {
echo "YES!"
exit 0
}

echo "no, not removing."
exit 1
File renamed without changes.
7 changes: 4 additions & 3 deletions agentspawner/resalloc-testing-server
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#! /bin/sh
#! /bin/sh -x

cd ..
rm /tmp/server-sql
cd "$(dirname "$(readlink -f "$0")")/.." || exit 1
redis-cli flushall || exit 1
rm -f /tmp/server-sql
mkdir -p /tmp/logdir
./test-tooling/resalloc-server

0 comments on commit 167d907

Please sign in to comment.