Skip to content

Commit

Permalink
Finish removing teuthology-worker
Browse files Browse the repository at this point in the history
The dispatcher and supervisor were added in #1546, but code was copied and
pasted into the new modules, leaving the worker untouched. Also untouched were
the unit tests, meaning that the dispatcher and supervisor were never unit
tested. As the copied code changed, the dispatcher and supervisor were not being
tested for regressions, while the worker - which wasn't being anymore - had
passing unit tests, giving some false sense of security.

This commit removes the old worker code, and adapts the old worker tests to
apply to the dispatcher and supervisor. It also splits out teuthology-supervisor
into its own command.

Signed-off-by: Zack Cerza <[email protected]>
  • Loading branch information
zmc committed Jul 25, 2024
1 parent 55b2d2b commit 8d2b939
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 107 deletions.
71 changes: 43 additions & 28 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,50 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE
import argparse
import sys

Start a dispatcher for the specified tube. Grab jobs from a beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
import teuthology.dispatcher

Supervisor mode: Supervise the job run described by its config. Reimage
target machines and invoke teuthology command. Unlock the target machines
at the end of the run.

standard arguments:
-h, --help show this help message and exit
-v, --verbose be more verbose
-t, --tube TUBE which beanstalk tube to read jobs from
-l, --log-dir LOG_DIR path in which to store logs
-a DIR, --archive-dir DIR path to archive results in
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
"""
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Start a dispatcher for the specified tube. Grab jobs from a beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is teuthology-supervisor."
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path to archive results in",
)
parser.add_argument(
"-t",
"--tube",
type=str,
help="which beanstalk tube to read jobs from",
required=True,
)
parser.add_argument(
"-l",
"--log-dir",
type=str,
help="path in which to store the dispatcher log",
required=True,
)
parser.add_argument(
"--exit-on-empty-queue",
action="store_true",
help="if the queue is empty, exit",
)
return parser.parse_args(argv)

import docopt
import sys

import teuthology.dispatcher
def main():
sys.exit(teuthology.dispatcher.main(parse_args(sys.argv[1:])))


def main():
args = docopt.docopt(__doc__)
sys.exit(teuthology.dispatcher.main(args))
if __name__ == "__main__":
main()
44 changes: 44 additions & 0 deletions scripts/supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import sys

import teuthology.dispatcher.supervisor


def parse_args(argv):
parser = argparse.ArgumentParser(
description="Supervise and run a teuthology job; normally only run by the dispatcher",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path in which to store the job's logfiles",
required=True,
)
parser.add_argument(
"--bin-path",
type=str,
help="teuthology bin path",
required=True,
)
parser.add_argument(
"--job-config",
type=str,
help="file descriptor of job's config file",
required=True,
)
return parser.parse_args(argv)


def main():
sys.exit(teuthology.dispatcher.supervisor.main(parse_args(sys.argv[1:])))


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions scripts/test/test_dispatcher_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestDispatcher(Script):
script_name = 'teuthology-dispatcher'
5 changes: 5 additions & 0 deletions scripts/test/test_supervisor_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestSupervisor(Script):
script_name = 'teuthology-supervisor'
5 changes: 0 additions & 5 deletions scripts/test/test_worker.py

This file was deleted.

37 changes: 0 additions & 37 deletions scripts/worker.py

This file was deleted.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ console_scripts =
teuthology-wait = scripts.wait:main
teuthology-exporter = scripts.exporter:main
teuthology-node-cleanup = scripts.node_cleanup:main
teuthology-supervisor = scripts.supervisor:main

[options.extras_require]
manhole =
Expand Down
107 changes: 85 additions & 22 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
exporter,
report,
repo_utils,
worker,
)
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
from teuthology.exceptions import SkipJob
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology import safepath

Expand Down Expand Up @@ -66,21 +65,10 @@ def load_config(archive_dir=None):


def main(args):
# run dispatcher in job supervisor mode if --supervisor passed
if args["--supervisor"]:
return supervisor.main(args)

verbose = args["--verbose"]
tube = args["--tube"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]

if archive_dir is None:
archive_dir = teuth_config.archive_base
archive_dir = args.archive_dir or teuth_config.archive_base

# Refuse to start more than one dispatcher per machine type
procs = find_dispatcher_processes().get(tube)
procs = find_dispatcher_processes().get(args.tube)
if procs:
raise RuntimeError(
"There is already a teuthology-dispatcher process running:"
Expand All @@ -89,18 +77,18 @@ def main(args):

# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
if verbose:
if args.verbose:
loglevel = logging.DEBUG
logging.getLogger().setLevel(loglevel)
log.setLevel(loglevel)
log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
log_file_path = os.path.join(args.log_dir, f"dispatcher.{args.tube}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()

load_config(archive_dir=archive_dir)

connection = beanstalk.connect()
beanstalk.watch_tube(connection, tube)
beanstalk.watch_tube(connection, args.tube)
result_proc = None

if teuth_config.teuthology_path is None:
Expand Down Expand Up @@ -131,7 +119,7 @@ def main(args):
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
if exit_on_empty_queue and not job_procs:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
Expand All @@ -148,7 +136,7 @@ def main(args):
keep_running = False

try:
job_config, teuth_bin_path = worker.prep_job(
job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
Expand All @@ -161,8 +149,7 @@ def main(args):
job_config = lock_machines(job_config)

run_args = [
os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
'--supervisor',
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
Expand Down Expand Up @@ -243,6 +230,82 @@ def match(proc):
return procs


def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
archive_dir, safe_archive, str(job_id))
job_config['archive_path'] = archive_path_full

# If the teuthology branch was not specified, default to main and
# store that value.
teuthology_branch = job_config.get('teuthology_branch', 'main')
job_config['teuthology_branch'] = teuthology_branch
teuthology_sha1 = job_config.get('teuthology_sha1')
if not teuthology_sha1:
repo_url = repo_utils.build_git_url('teuthology', 'ceph')
try:
teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch)
except Exception as exc:
log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
if not teuthology_sha1:
reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch)
log.error(reason)
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=reason)
)
raise SkipJob()
if teuth_config.teuthology_path is None:
log.info('Using teuthology sha1 %s', teuthology_sha1)

try:
if teuth_config.teuthology_path is not None:
teuth_path = teuth_config.teuthology_path
else:
teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch,
commit=teuthology_sha1)
# For the teuthology tasks, we look for suite_branch, and if we
# don't get that, we look for branch, and fall back to 'main'.
# last-in-suite jobs don't have suite_branch or branch set.
ceph_branch = job_config.get('branch', 'main')
suite_branch = job_config.get('suite_branch', ceph_branch)
suite_sha1 = job_config.get('suite_sha1')
suite_repo = job_config.get('suite_repo')
if suite_repo:
teuth_config.ceph_qa_suite_git_url = suite_repo
job_config['suite_path'] = os.path.normpath(os.path.join(
repo_utils.fetch_qa_suite(suite_branch, suite_sha1),
job_config.get('suite_relpath', ''),
))
except (BranchNotFoundError, CommitNotFoundError) as exc:
log.exception("Requested version not found; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
except MaxWhileTries as exc:
log.exception("Failed to fetch or bootstrap; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()

teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
if not os.path.isdir(teuth_bin_path):
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
(teuthology_branch, teuth_bin_path))
return job_config, teuth_bin_path


def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
Expand Down
Loading

0 comments on commit 8d2b939

Please sign in to comment.