Skip to content

Commit

Permalink
Add beanstalk as a possible queue backend for Teuthology Jobs along w…
Browse files Browse the repository at this point in the history
…ith Paddles

With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue.
In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID.
This is the ID teuthology will treat as the Job ID throughout the run of the job.

To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command.

Signed-off-by: Aishwarya Mathuria <[email protected]>
  • Loading branch information
amathuria committed Oct 12, 2023
1 parent 58857a9 commit 6d7d317
Show file tree
Hide file tree
Showing 14 changed files with 496 additions and 116 deletions.
35 changes: 35 additions & 0 deletions scripts/beanstalk_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import docopt

import teuthology.config
import teuthology.beanstalk

doc = """
usage: teuthology-beanstalk-queue -h
teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
queue.
Arguments:
-m, --machine_type MACHINE_TYPE [default: multi]
Which machine type queue to work on.
optional arguments:
-h, --help Show this help message and exit
-D, --delete PATTERN Delete Jobs with PATTERN in their name
-d, --description Show job descriptions
-r, --runs Only show run names
-f, --full Print the entire job config. Use with caution.
-s, --status Prints the status of the queue
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
"""


def main():

args = docopt.docopt(doc)
print(args)
teuthology.beanstalk.main(args)
5 changes: 3 additions & 2 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
Start a dispatcher for the specified machine type. Grab jobs from a paddles
Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
Expand All @@ -22,6 +22,7 @@
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
--queue-backend BACKEND choose between paddles and beanstalk
"""

import docopt
Expand Down
18 changes: 9 additions & 9 deletions scripts/queue.py → scripts/paddles_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import teuthology.paddles_queue

doc = """
usage: teuthology-queue -h
teuthology-queue -s -m MACHINE_TYPE
teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
teuthology-queue [-r] -m MACHINE_TYPE -U USER
teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
teuthology-queue -u -m MACHINE_TYPE -U USER
usage: teuthology-paddles-queue -h
teuthology-paddles-queue -s -m MACHINE_TYPE
teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
Expand All @@ -36,7 +36,7 @@
-P, --priority PRIORITY
Change priority of queued jobs
-U, --user USER User who owns the jobs
-R, --run_name RUN_NAME
-R, --run-name RUN_NAME
Used to change priority of all jobs in the run.
"""

Expand Down
214 changes: 214 additions & 0 deletions teuthology/beanstalk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import beanstalkc
import yaml
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology.config import config
from teuthology import report

log = logging.getLogger(__name__)


def connect():
host = config.queue_host
port = config.queue_port
if host is None or port is None:
raise RuntimeError(
'Beanstalk queue information not found in {conf_path}'.format(
conf_path=config.teuthology_yaml))
return beanstalkc.Connection(host=host, port=port)


def watch_tube(connection, tube_name):
"""
Watch a given tube, potentially correcting to 'multi' if necessary. Returns
the tube_name that was actually used.
"""
if ',' in tube_name:
log.debug("Correcting tube name to 'multi'")
tube_name = 'multi'
connection.watch(tube_name)
connection.ignore('default')
return tube_name


def walk_jobs(connection, tube_name, processor, pattern=None):
"""
def callback(jobs_dict)
"""
log.info("Checking Beanstalk Queue...")
job_count = connection.stats_tube(tube_name)['current-jobs-ready']
if job_count == 0:
log.info('No jobs in Beanstalk Queue')
return

# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
if job is None or job.body is None:
continue
job_config = yaml.safe_load(job.body)
job_name = job_config['name']
job_id = job.stats()['id']
if pattern is not None and pattern not in job_name:
continue
processor.add_job(job_id, job_config, job)
end_progress()
processor.complete()


def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()


def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()


class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
job_obj = self.jobs[job_id].get('job_obj')
if job_obj:
job_obj.delete()
report.try_delete_jobs(job_name, job_id)


def pause_tube(connection, tube, duration):
duration = int(duration)
if not tube:
tubes = sorted(connection.tubes())
else:
tubes = [tube]

prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
templ = prefix + ": {tubes}"
log.info(templ.format(dur=duration, tubes=tubes))
for tube in tubes:
connection.pause_tube(tube, duration)


def stats_tube(connection, tube):
stats = connection.stats_tube(tube)
result = dict(
name=tube,
count=stats['current-jobs-ready'],
paused=(stats['pause'] != 0),
)
return result


def main(args):
machine_type = args['--machine_type']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
pause_duration = args['--pause']
try:
connection = connect()
if machine_type and not pause_duration:
# watch_tube needs to be run before we inspect individual jobs;
# it is not needed for pausing tubes
watch_tube(connection, machine_type)
if status:
print(stats_tube(connection, machine_type))
elif pause_duration:
pause_tube(connection, machine_type, pause_duration)
elif delete:
walk_jobs(connection, machine_type,
JobDeleter(delete))
elif runs:
walk_jobs(connection, machine_type,
RunPrinter())
else:
walk_jobs(connection, machine_type,
JobPrinter(show_desc=show_desc, full=full))
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
connection.close()
1 change: 1 addition & 0 deletions teuthology/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class TeuthologyConfig(YamlConfig):
'archive_upload_key': None,
'archive_upload_url': None,
'automated_scheduling': False,
'backend': 'paddles',
'reserve_machines': 5,
'ceph_git_base_url': 'https://github.com/ceph/',
'ceph_git_url': None,
Expand Down
18 changes: 16 additions & 2 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def load_config(archive_dir=None):
def clean_config(config):
result = {}
for key in config:
if key == 'status':
continue
if config[key] is not None:
result[key] = config[key]
return result
Expand All @@ -80,6 +82,7 @@ def main(args):
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]
backend = args['--queue-backend']

if archive_dir is None:
archive_dir = teuth_config.archive_base
Expand All @@ -106,6 +109,10 @@ def main(args):

load_config(archive_dir=archive_dir)

if backend == 'beanstalk':
connection = beanstalk.connect()
beanstalk.watch_tube(connection, machine_type)

result_proc = None

if teuth_config.teuthology_path is None:
Expand Down Expand Up @@ -200,6 +207,13 @@ def main(args):
status='fail',
failure_reason=error_message))

# This try/except block is to keep the worker from dying when
# beanstalkc throws a SocketError
if backend == 'beanstalk':
try:
job.delete()
except Exception:
log.exception("Saw exception while trying to delete job")

return worst_returncode

Expand Down Expand Up @@ -257,7 +271,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir):


def pause_queue(machine_type, paused, paused_by, pause_duration=None):
if paused == True:
if paused:
report.pause_queue(machine_type, paused, paused_by, pause_duration)
'''
If there is a pause duration specified
Expand All @@ -267,5 +281,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None):
sleep(int(pause_duration))
paused = False
report.pause_queue(machine_type, paused, paused_by)
elif paused == False:
elif not paused:
report.pause_queue(machine_type, paused, paused_by)
6 changes: 6 additions & 0 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def main(args):
def run_job(job_config, teuth_bin_path, archive_dir, verbose):
safe_archive = safepath.munge(job_config['name'])
if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
if teuth_config.results_server:
try:
report.try_delete_jobs(job_config['name'], job_config['job_id'])
except Exception as e:
log.warning("Unable to delete job %s, exception occurred: %s",
job_config['job_id'], e)
job_archive = os.path.join(archive_dir, safe_archive)
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
Expand Down
Loading

0 comments on commit 6d7d317

Please sign in to comment.