diff --git a/containers/teuthology-dev/teuthology.sh b/containers/teuthology-dev/teuthology.sh index b7c3922f4..373f6efb8 100755 --- a/containers/teuthology-dev/teuthology.sh +++ b/containers/teuthology-dev/teuthology.sh @@ -28,6 +28,7 @@ if [ "$TEUTHOLOGY_SUITE" != "none" ]; then --filter-out "libcephfs,kclient" \ --force-priority \ --seed 349 \ + ${TEUTHOLOGY_SUITE_EXTRA_ARGS} \ $TEUTHOLOGY_CONF DISPATCHER_EXIT_FLAG='--exit-on-empty-queue' teuthology-queue -m $TEUTHOLOGY_MACHINE_TYPE -s | \ diff --git a/docs/siteconfig.rst b/docs/siteconfig.rst index 96d2658c4..effb8219c 100644 --- a/docs/siteconfig.rst +++ b/docs/siteconfig.rst @@ -81,12 +81,16 @@ Here is a sample configuration with many of the options set and documented:: # itself from git. This is disabled by default. automated_scheduling: false - # How often, in seconds, teuthology-worker should poll its child job + # How often, in seconds, teuthology-supervisor should poll its child job # processes watchdog_interval: 120 + # How old a scheduled job can be, in seconds, before the dispatcher + # considers it 'expired', skipping it. + max_job_age: 1209600 + # How long a scheduled job should be allowed to run, in seconds, before - # it is killed by the worker process. + # it is killed by the supervisor process. max_job_time: 259200 # The template from which the URL of the repository containing packages diff --git a/scripts/suite.py b/scripts/suite.py index 77561b7e0..c98a5cb89 100644 --- a/scripts/suite.py +++ b/scripts/suite.py @@ -112,6 +112,10 @@ When tests finish or time out, send an email here. May also be specified in ~/.teuthology.yaml as 'results_email' + --expire Do not execute jobs in the run if they have not + completed by this time. Valid formats include + ISO 8601, and relative offsets like '90s', '30m', + '1h', '3d', or '1w' --rocketchat Comma separated list of Rocket.Chat channels where to send a message when tests finished or time out. To be used with --sleep-before-teardown option. diff --git a/teuthology/config.py b/teuthology/config.py index 3983d3d0f..30204aa46 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -33,12 +33,13 @@ def __init__(self, yaml_path=None): self._conf = dict() def load(self, conf=None): - if conf: + if conf is not None: if isinstance(conf, dict): self._conf = conf - else: + return + elif conf: self._conf = yaml.safe_load(conf) - return + return if os.path.exists(self.yaml_path): with open(self.yaml_path) as f: self._conf = yaml.safe_load(f) @@ -157,6 +158,7 @@ class TeuthologyConfig(YamlConfig): 'job_threshold': 500, 'lab_domain': 'front.sepia.ceph.com', 'lock_server': 'http://paddles.front.sepia.ceph.com/', + 'max_job_age': 1209600, # 2 weeks 'max_job_time': 259200, # 3 days 'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update', 'results_server': 'http://paddles.front.sepia.ceph.com/', diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 15c4003c9..59f8ae327 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -22,6 +22,7 @@ from teuthology.dispatcher import supervisor from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries from teuthology.lock import ops as lock_ops +from teuthology.util.time import parse_timestamp from teuthology import safepath log = logging.getLogger(__name__) @@ -234,6 +235,8 @@ def match(proc): def prep_job(job_config, log_file_path, archive_dir): job_id = job_config['job_id'] + check_job_expiration(job_config) + safe_archive = safepath.munge(job_config['name']) job_config['worker_log'] = log_file_path archive_path_full = os.path.join( @@ -308,6 +311,31 @@ def prep_job(job_config, log_file_path, archive_dir): return job_config, teuth_bin_path +def check_job_expiration(job_config): + job_id = job_config['job_id'] + expired = False + now = datetime.datetime.now(datetime.timezone.utc) + if expire_str := job_config.get('timestamp'): + expire = parse_timestamp(expire_str) + \ + datetime.timedelta(seconds=teuth_config.max_job_age) + expired = expire < now + if not expired and (expire_str := job_config.get('expire')): + try: + expire = parse_timestamp(expire_str) + expired = expired or expire < now + except ValueError: + log.warning(f"Failed to parse job expiration: {expire_str=}") + pass + if expired: + log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past") + report.try_push_job_info( + job_config, + # TODO: Add a 'canceled' status to paddles, and use that. + dict(status='dead'), + ) + raise SkipJob() + + def lock_machines(job_config): report.try_push_job_info(job_config, dict(status='running')) fake_ctx = supervisor.create_fake_context(job_config, block=True) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index d2e86de36..83e6d997c 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -8,7 +8,7 @@ from urllib.parse import urljoin -from teuthology import exporter, kill, report, safepath +from teuthology import exporter, dispatcher, kill, report, safepath from teuthology.config import config as teuth_config from teuthology.exceptions import SkipJob, MaxWhileTries from teuthology import setup_log_file, install_except_hook @@ -37,6 +37,10 @@ def main(args): f"supervisor.{job_config['job_id']}.log") setup_log_file(log_file_path) install_except_hook() + try: + dispatcher.check_job_expiration(job_config) + except SkipJob: + return 0 # reimage target machines before running the job if 'targets' in job_config: @@ -54,25 +58,22 @@ def main(args): with open(args.job_config, 'w') as f: yaml.safe_dump(job_config, f, default_flow_style=False) - try: - suite = job_config.get("suite") - if suite: - with exporter.JobTime().time(suite=suite): - return run_job( - job_config, - args.bin_path, - args.archive_dir, - args.verbose - ) - else: + suite = job_config.get("suite") + if suite: + with exporter.JobTime().time(suite=suite): return run_job( job_config, args.bin_path, args.archive_dir, args.verbose ) - except SkipJob: - return 0 + else: + return run_job( + job_config, + args.bin_path, + args.archive_dir, + args.verbose + ) def run_job(job_config, teuth_bin_path, archive_dir, verbose): diff --git a/teuthology/dispatcher/test/test_dispatcher.py b/teuthology/dispatcher/test/test_dispatcher.py index e7c59d8bd..58f58cf9c 100644 --- a/teuthology/dispatcher/test/test_dispatcher.py +++ b/teuthology/dispatcher/test/test_dispatcher.py @@ -7,6 +7,7 @@ from teuthology import dispatcher from teuthology.config import FakeNamespace from teuthology.contextutil import MaxWhileTries +from teuthology.util.time import TIMESTAMP_FMT class TestDispatcher(object): @@ -172,3 +173,31 @@ def test_main_loop_13925( for i in range(len(jobs)): push_call = m_try_push_job_info.call_args_list[i] assert push_call[0][1]['status'] == 'dead' + + @pytest.mark.parametrize( + ["timestamp", "expire", "skip"], + [ + [datetime.timedelta(days=-1), None, False], + [datetime.timedelta(days=-30), None, True], + [None, datetime.timedelta(days=1), False], + [None, datetime.timedelta(days=-1), True], + [datetime.timedelta(days=-1), datetime.timedelta(days=1), False], + [datetime.timedelta(days=1), datetime.timedelta(days=-1), True], + ] + ) + @patch("teuthology.dispatcher.report.try_push_job_info") + def test_check_job_expiration(self, _, timestamp, expire, skip): + now = datetime.datetime.now(datetime.timezone.utc) + job_config = dict( + job_id="1", + name="job_name", + ) + if timestamp: + job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT) + if expire: + job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT) + if skip: + with pytest.raises(dispatcher.SkipJob): + dispatcher.check_job_expiration(job_config) + else: + dispatcher.check_job_expiration(job_config) diff --git a/teuthology/suite/__init__.py b/teuthology/suite/__init__.py index 6fc167fab..8a17cf5f1 100644 --- a/teuthology/suite/__init__.py +++ b/teuthology/suite/__init__.py @@ -63,6 +63,9 @@ def process_args(args): elif key == 'subset' and value is not None: # take input string '2/3' and turn into (2, 3) value = tuple(map(int, value.split('/'))) + elif key == 'expire' and value is None: + # Skip empty 'expire' values + continue elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'): if not value: value = [] diff --git a/teuthology/suite/placeholder.py b/teuthology/suite/placeholder.py index 3a1775117..f812fccac 100644 --- a/teuthology/suite/placeholder.py +++ b/teuthology/suite/placeholder.py @@ -45,6 +45,7 @@ def _substitute(input_dict, values_dict): # Template for the config that becomes the base for each generated job config dict_templ = { 'branch': Placeholder('ceph_branch'), + 'expire': Placeholder('expire'), 'sha1': Placeholder('ceph_hash'), 'teuthology_branch': Placeholder('teuthology_branch'), 'teuthology_sha1': Placeholder('teuthology_sha1'), diff --git a/teuthology/suite/run.py b/teuthology/suite/run.py index 8fa45b5b4..4f425cada 100644 --- a/teuthology/suite/run.py +++ b/teuthology/suite/run.py @@ -1,4 +1,5 @@ import copy +import datetime import logging import os import pwd @@ -8,7 +9,6 @@ from humanfriendly import format_timespan -from datetime import datetime from tempfile import NamedTemporaryFile from teuthology import repo_utils @@ -24,6 +24,7 @@ from teuthology.suite.merge import config_merge from teuthology.suite.build_matrix import build_matrix from teuthology.suite.placeholder import substitute_placeholders, dict_templ +from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT log = logging.getLogger(__name__) @@ -43,7 +44,7 @@ def __init__(self, args): self.args = args # We assume timestamp is a datetime.datetime object self.timestamp = self.args.timestamp or \ - datetime.now().strftime('%Y-%m-%d_%H:%M:%S') + datetime.datetime.now().strftime(TIMESTAMP_FMT) self.user = self.args.user or pwd.getpwuid(os.getuid()).pw_name self.name = self.make_run_name() @@ -86,6 +87,15 @@ def create_initial_config(self): :returns: A JobConfig object """ + now = datetime.datetime.now(datetime.timezone.utc) + expires = self.get_expiration() + if expires: + if now > expires: + util.schedule_fail( + f"Refusing to schedule because the expiration date is in the past: {self.args.expire}", + dry_run=self.args.dry_run, + ) + self.os = self.choose_os() self.kernel_dict = self.choose_kernel() ceph_hash = self.choose_ceph_hash() @@ -122,9 +132,29 @@ def create_initial_config(self): suite_repo=config.get_ceph_qa_suite_git_url(), suite_relpath=self.args.suite_relpath, flavor=self.args.flavor, + expire=expires.strftime(TIMESTAMP_FMT) if expires else None, ) return self.build_base_config() + def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None: + """ + _base_time: For testing, calculate relative offsets from this base time + + :returns: True if the job should run; False if it has expired + """ + log.info(f"Checking for expiration ({self.args.expire})") + expires_str = self.args.expire + if expires_str is None: + return None + now = datetime.datetime.now(datetime.timezone.utc) + if _base_time is None: + _base_time = now + try: + expires = parse_timestamp(expires_str) + except ValueError: + expires = _base_time + parse_offset(expires_str) + return expires + def choose_os(self): os_type = self.args.distro os_version = self.args.distro_version diff --git a/teuthology/suite/test/conftest.py b/teuthology/suite/test/conftest.py new file mode 100644 index 000000000..4285bdcfc --- /dev/null +++ b/teuthology/suite/test/conftest.py @@ -0,0 +1,4 @@ +from teuthology.config import config + +def pytest_runtest_setup(): + config.load({}) diff --git a/teuthology/suite/test/test_placeholder.py b/teuthology/suite/test/test_placeholder.py index acf1b6a44..31b51755d 100644 --- a/teuthology/suite/test/test_placeholder.py +++ b/teuthology/suite/test/test_placeholder.py @@ -22,7 +22,8 @@ def test_substitute_placeholders(self): suite_repo='https://example.com/ceph/suite.git', suite_relpath='', ceph_repo='https://example.com/ceph/ceph.git', - flavor='default' + flavor='default', + expire='expire', ) output_dict = substitute_placeholders(dict_templ, input_dict) assert output_dict['suite'] == 'suite' @@ -50,6 +51,7 @@ def test_null_placeholders_dropped(self): suite_relpath='', ceph_repo='https://example.com/ceph/ceph.git', flavor=None, + expire='expire', ) output_dict = substitute_placeholders(dict_templ, input_dict) assert 'os_type' not in output_dict diff --git a/teuthology/suite/test/test_run_.py b/teuthology/suite/test/test_run_.py index 1dc23e20d..51ce29a85 100644 --- a/teuthology/suite/test/test_run_.py +++ b/teuthology/suite/test/test_run_.py @@ -4,7 +4,7 @@ import contextlib import yaml -from datetime import datetime +from datetime import datetime, timedelta, timezone from mock import patch, call, ANY from io import StringIO from io import BytesIO @@ -12,6 +12,7 @@ from teuthology.config import config, YamlConfig from teuthology.exceptions import ScheduleFailError from teuthology.suite import run +from teuthology.util.time import TIMESTAMP_FMT class TestRun(object): @@ -52,7 +53,7 @@ def test_email_addr(self, m_git_validate_sha1, m_choose_ceph_version, @patch('teuthology.suite.run.util.fetch_repos') def test_name(self, m_fetch_repos): - stamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') + stamp = datetime.now().strftime(TIMESTAMP_FMT) with patch.object(run.Run, 'create_initial_config', return_value=run.JobConfig()): name = run.Run(self.args).name @@ -89,6 +90,45 @@ def test_branch_nonexistent( with pytest.raises(ScheduleFailError): self.klass(self.args) + @pytest.mark.parametrize( + ["expire", "delta", "result"], + [ + [None, timedelta(), False], + ["1m", timedelta(), True], + ["1m", timedelta(minutes=-2), False], + ["1m", timedelta(minutes=2), True], + ["7d", timedelta(days=-14), False], + ] + ) + @patch('teuthology.repo_utils.fetch_repo') + @patch('teuthology.suite.run.util.git_branch_exists') + @patch('teuthology.suite.run.util.package_version_for_hash') + @patch('teuthology.suite.run.util.git_ls_remote') + def test_get_expiration( + self, + m_git_ls_remote, + m_package_version_for_hash, + m_git_branch_exists, + m_fetch_repo, + expire, + delta, + result, + ): + m_git_ls_remote.side_effect = 'hash' + m_package_version_for_hash.return_value = 'a_version' + m_git_branch_exists.return_value = True + self.args.expire = expire + obj = self.klass(self.args) + now = datetime.now(timezone.utc) + expires_result = obj.get_expiration(_base_time=now + delta) + if expire is None: + assert expires_result is None + assert obj.base_config['expire'] is None + else: + assert expires_result is not None + assert (now < expires_result) is result + assert obj.base_config['expire'] + @patch('teuthology.suite.run.util.fetch_repos') @patch('requests.head') @patch('teuthology.suite.run.util.git_branch_exists') diff --git a/teuthology/test/test_misc.py b/teuthology/test/test_misc.py index 22341239c..f2a6ebfb8 100644 --- a/teuthology/test/test_misc.py +++ b/teuthology/test/test_misc.py @@ -1,12 +1,13 @@ import argparse +import pytest +import subprocess from unittest.mock import Mock, patch -from teuthology.orchestra import cluster -from teuthology.config import config -from teuthology import misc -import subprocess -import pytest +from teuthology import misc +from teuthology.config import config +from teuthology.orchestra import cluster +from teuthology.orchestra.remote import Remote class FakeRemote(object): @@ -26,7 +27,7 @@ def test_sh_truncate(caplog): def test_sh_fail(caplog): with pytest.raises(subprocess.CalledProcessError) as excinfo: - misc.sh("/bin/echo -n AB ; /bin/echo C ; exit 111", 2) == "ABC\n" + misc.sh("/bin/echo -n AB ; /bin/echo C ; exit 111", 2) assert excinfo.value.returncode == 111 for record in caplog.records: if record.levelname == 'ERROR': @@ -34,29 +35,20 @@ def test_sh_fail(caplog): 'ABC\n' == record.message) def test_sh_progress(caplog): - misc.sh("echo AB ; sleep 5 ; /bin/echo C", 2) == "ABC\n" + assert misc.sh("echo AB ; sleep 0.1 ; /bin/echo C", 2) == "AB\nC\n" records = caplog.records assert ':sh: ' in records[0].message assert 'AB' == records[1].message assert 'C' == records[2].message - # - # With a sleep 5 between the first and the second message, - # there must be at least 2 seconds between the log record - # of the first message and the log record of the second one - # - assert (records[2].created - records[1].created) > 2 + assert records[2].created > records[1].created def test_wait_until_osds_up(): ctx = argparse.Namespace() ctx.daemons = Mock() ctx.daemons.iter_daemons_of_role.return_value = list() - remote = FakeRemote() - - def s(self, **kwargs): - return 'IGNORED\n{"osds":[{"state":["up"]}]}' - - remote.sh = s + remote = Mock(spec=Remote) + remote.sh.return_value = 'IGNORED\n{"osds":[{"state":["up"]}]}' ctx.cluster = cluster.Cluster( remotes=[ (remote, ['osd.0', 'client.1']) @@ -64,7 +56,7 @@ def s(self, **kwargs): ) with patch.multiple( misc, - get_testdir=lambda ctx: "TESTDIR", + get_testdir=lambda _: "TESTDIR", ): misc.wait_until_osds_up(ctx, ctx.cluster, remote) diff --git a/teuthology/util/test/test_time.py b/teuthology/util/test/test_time.py new file mode 100644 index 000000000..c37b5f2f5 --- /dev/null +++ b/teuthology/util/test/test_time.py @@ -0,0 +1,54 @@ +import pytest + +from datetime import datetime, timedelta, timezone +from typing import Type + +from teuthology.util import time + + +@pytest.mark.parametrize( + ["timestamp", "result"], + [ + ["1999-12-31_23:59:59", datetime(1999, 12, 31, 23, 59, 59, tzinfo=timezone.utc)], + ["1999-12-31_23:59", datetime(1999, 12, 31, 23, 59, 0, tzinfo=timezone.utc)], + ["1999-12-31T23:59:59", datetime(1999, 12, 31, 23, 59, 59, tzinfo=timezone.utc)], + ["1999-12-31T23:59:59+00:00", datetime(1999, 12, 31, 23, 59, 59, tzinfo=timezone.utc)], + ["1999-12-31T17:59:59-06:00", datetime(1999, 12, 31, 23, 59, 59, tzinfo=timezone.utc)], + ["2024-01-01", datetime(2024, 1, 1, 0, 0, tzinfo=timezone.utc)], + ["tomorrow", ValueError], + ["1d", ValueError], + ["", ValueError], + ["2024", ValueError], + + ] +) +def test_parse_timestamp(timestamp: str, result: datetime | Type[Exception]): + if isinstance(result, datetime): + assert time.parse_timestamp(timestamp) == result + else: + with pytest.raises(result): + time.parse_timestamp(timestamp) + + +@pytest.mark.parametrize( + ["offset", "result"], + [ + ["1s", timedelta(seconds=1)], + ["1m", timedelta(minutes=1)], + ["1h", timedelta(hours=1)], + ["1d", timedelta(days=1)], + ["1w", timedelta(weeks=1)], + ["365d", timedelta(days=365)], + ["1x", ValueError], + ["-1m", ValueError], + ["0xde", ValueError], + ["frog", ValueError], + ["7dwarfs", ValueError], + ] +) +def test_parse_offset(offset: str, result: timedelta | Type[Exception]): + if isinstance(result, timedelta): + assert time.parse_offset(offset) == result + else: + with pytest.raises(result): + time.parse_offset(offset) diff --git a/teuthology/util/time.py b/teuthology/util/time.py new file mode 100644 index 000000000..8e0525fcc --- /dev/null +++ b/teuthology/util/time.py @@ -0,0 +1,52 @@ +import re + +from datetime import datetime, timedelta, timezone + +# When we're not using ISO format, we're using this +TIMESTAMP_FMT = "%Y-%m-%d_%H:%M:%S" + +def parse_timestamp(timestamp: str) -> datetime: + """ + timestamp: A string either in ISO 8601 format or TIMESTAMP_FMT. + If no timezone is specified, UTC is assumed. + + :returns: a datetime object + """ + try: + dt = datetime.fromisoformat(timestamp) + except ValueError: + dt = datetime.strptime(timestamp, TIMESTAMP_FMT) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + +def parse_offset(offset: str) -> timedelta: + """ + offset: A string consisting of digits followed by one of the following + characters: + s: seconds + m: minutes + h: hours + d: days + w: weeks + """ + err_msg = "Offsets must either be an ISO 8601-formatted timestamp or " \ + f"a relative value like '2w', '1d', '7h', '45m', '90s'. Got: {offset}" + match = re.match(r'(\d+)(s|m|h|d|w)$', offset) + if match is None: + raise ValueError(err_msg) + num = int(match.groups()[0]) + unit = match.groups()[1] + match unit: + case 's': + return timedelta(seconds=num) + case 'm': + return timedelta(minutes=num) + case 'h': + return timedelta(hours=num) + case 'd': + return timedelta(days=num) + case 'w': + return timedelta(weeks=num) + case _: + raise ValueError(err_msg)