Skip to content

Commit

Permalink
Merge pull request ceph#1960 from ceph/worker-supervisor
Browse files Browse the repository at this point in the history
Finish removing teuthology-worker
  • Loading branch information
zmc authored Jul 25, 2024
2 parents 55b2d2b + 0e66771 commit 53ce146
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 109 deletions.
87 changes: 57 additions & 30 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,62 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE
Start a dispatcher for the specified tube. Grab jobs from a beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
Supervisor mode: Supervise the job run described by its config. Reimage
target machines and invoke teuthology command. Unlock the target machines
at the end of the run.
standard arguments:
-h, --help show this help message and exit
-v, --verbose be more verbose
-t, --tube TUBE which beanstalk tube to read jobs from
-l, --log-dir LOG_DIR path in which to store logs
-a DIR, --archive-dir DIR path to archive results in
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
"""

import docopt
import argparse
import sys

import teuthology.dispatcher
import teuthology.dispatcher.supervisor

from .supervisor import parse_args as parse_supervisor_args


def parse_args(argv):
parser = argparse.ArgumentParser(
description="Start a dispatcher for the specified tube. Grab jobs from a beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is teuthology-supervisor."
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path to archive results in",
)
parser.add_argument(
"-t",
"--tube",
type=str,
help="which beanstalk tube to read jobs from",
required=True,
)
parser.add_argument(
"-l",
"--log-dir",
type=str,
help="path in which to store the dispatcher log",
required=True,
)
parser.add_argument(
"--exit-on-empty-queue",
action="store_true",
help="if the queue is empty, exit",
)
return parser.parse_args(argv)


def main():
args = docopt.docopt(__doc__)
sys.exit(teuthology.dispatcher.main(args))
if "--supervisor" in sys.argv:
# This is for transitional compatibility, so the old dispatcher can
# invoke the new supervisor. Once old dispatchers are phased out,
# this block can be as well.
sys.argv.remove("--supervisor")
sys.argv[0] = "teuthology-supervisor"
sys.exit(teuthology.dispatcher.supervisor.main(
parse_supervisor_args(sys.argv[1:])
))
else:
sys.exit(teuthology.dispatcher.main(parse_args(sys.argv[1:])))


if __name__ == "__main__":
main()
44 changes: 44 additions & 0 deletions scripts/supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import sys

import teuthology.dispatcher.supervisor


def parse_args(argv):
parser = argparse.ArgumentParser(
description="Supervise and run a teuthology job; normally only run by the dispatcher",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path in which to store the job's logfiles",
required=True,
)
parser.add_argument(
"--bin-path",
type=str,
help="teuthology bin path",
required=True,
)
parser.add_argument(
"--job-config",
type=str,
help="file descriptor of job's config file",
required=True,
)
return parser.parse_args(argv)


def main():
sys.exit(teuthology.dispatcher.supervisor.main(parse_args(sys.argv[1:])))


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions scripts/test/test_dispatcher_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestDispatcher(Script):
script_name = 'teuthology-dispatcher'
5 changes: 5 additions & 0 deletions scripts/test/test_supervisor_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestSupervisor(Script):
script_name = 'teuthology-supervisor'
5 changes: 0 additions & 5 deletions scripts/test/test_worker.py

This file was deleted.

37 changes: 0 additions & 37 deletions scripts/worker.py

This file was deleted.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ console_scripts =
teuthology-wait = scripts.wait:main
teuthology-exporter = scripts.exporter:main
teuthology-node-cleanup = scripts.node_cleanup:main
teuthology-supervisor = scripts.supervisor:main

[options.extras_require]
manhole =
Expand Down
107 changes: 85 additions & 22 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
exporter,
report,
repo_utils,
worker,
)
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
from teuthology.exceptions import SkipJob
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology import safepath

Expand Down Expand Up @@ -66,21 +65,10 @@ def load_config(archive_dir=None):


def main(args):
# run dispatcher in job supervisor mode if --supervisor passed
if args["--supervisor"]:
return supervisor.main(args)

verbose = args["--verbose"]
tube = args["--tube"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]

if archive_dir is None:
archive_dir = teuth_config.archive_base
archive_dir = args.archive_dir or teuth_config.archive_base

# Refuse to start more than one dispatcher per machine type
procs = find_dispatcher_processes().get(tube)
procs = find_dispatcher_processes().get(args.tube)
if procs:
raise RuntimeError(
"There is already a teuthology-dispatcher process running:"
Expand All @@ -89,18 +77,18 @@ def main(args):

# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
if verbose:
if args.verbose:
loglevel = logging.DEBUG
logging.getLogger().setLevel(loglevel)
log.setLevel(loglevel)
log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
log_file_path = os.path.join(args.log_dir, f"dispatcher.{args.tube}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()

load_config(archive_dir=archive_dir)

connection = beanstalk.connect()
beanstalk.watch_tube(connection, tube)
beanstalk.watch_tube(connection, args.tube)
result_proc = None

if teuth_config.teuthology_path is None:
Expand Down Expand Up @@ -131,7 +119,7 @@ def main(args):
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
if exit_on_empty_queue and not job_procs:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
Expand All @@ -148,7 +136,7 @@ def main(args):
keep_running = False

try:
job_config, teuth_bin_path = worker.prep_job(
job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
Expand All @@ -161,8 +149,7 @@ def main(args):
job_config = lock_machines(job_config)

run_args = [
os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
'--supervisor',
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
Expand Down Expand Up @@ -243,6 +230,82 @@ def match(proc):
return procs


def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
archive_dir, safe_archive, str(job_id))
job_config['archive_path'] = archive_path_full

# If the teuthology branch was not specified, default to main and
# store that value.
teuthology_branch = job_config.get('teuthology_branch', 'main')
job_config['teuthology_branch'] = teuthology_branch
teuthology_sha1 = job_config.get('teuthology_sha1')
if not teuthology_sha1:
repo_url = repo_utils.build_git_url('teuthology', 'ceph')
try:
teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch)
except Exception as exc:
log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
if not teuthology_sha1:
reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch)
log.error(reason)
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=reason)
)
raise SkipJob()
if teuth_config.teuthology_path is None:
log.info('Using teuthology sha1 %s', teuthology_sha1)

try:
if teuth_config.teuthology_path is not None:
teuth_path = teuth_config.teuthology_path
else:
teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch,
commit=teuthology_sha1)
# For the teuthology tasks, we look for suite_branch, and if we
# don't get that, we look for branch, and fall back to 'main'.
# last-in-suite jobs don't have suite_branch or branch set.
ceph_branch = job_config.get('branch', 'main')
suite_branch = job_config.get('suite_branch', ceph_branch)
suite_sha1 = job_config.get('suite_sha1')
suite_repo = job_config.get('suite_repo')
if suite_repo:
teuth_config.ceph_qa_suite_git_url = suite_repo
job_config['suite_path'] = os.path.normpath(os.path.join(
repo_utils.fetch_qa_suite(suite_branch, suite_sha1),
job_config.get('suite_relpath', ''),
))
except (BranchNotFoundError, CommitNotFoundError) as exc:
log.exception("Requested version not found; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
except MaxWhileTries as exc:
log.exception("Failed to fetch or bootstrap; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()

teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
if not os.path.isdir(teuth_bin_path):
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
(teuthology_branch, teuth_bin_path))
return job_config, teuth_bin_path


def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
Expand Down
Loading

0 comments on commit 53ce146

Please sign in to comment.