From 1080465bb79f2682a282154b7de762e94120510c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20Prchl=C3=ADk?= Date: Sun, 5 May 2024 23:22:29 +0200 Subject: [PATCH] Split plan to multiple plans by --max tests --- tmt/base.py | 129 +++++++++++++++++++++----- tmt/cli.py | 5 + tmt/plugins/__init__.py | 1 + tmt/plugins/plan_shapers/__init__.py | 73 +++++++++++++++ tmt/plugins/plan_shapers/max_tests.py | 74 +++++++++++++++ tmt/steps/discover/__init__.py | 30 +++--- tmt/steps/execute/__init__.py | 2 +- tmt/steps/execute/upgrade.py | 6 +- tmt/steps/prepare/__init__.py | 2 +- tmt/steps/prepare/distgit.py | 2 +- tmt/steps/report/reportportal.py | 2 +- 11 files changed, 282 insertions(+), 44 deletions(-) create mode 100644 tmt/plugins/plan_shapers/__init__.py create mode 100644 tmt/plugins/plan_shapers/max_tests.py diff --git a/tmt/base.py b/tmt/base.py index 08b1948d1c..49c76d8adf 100644 --- a/tmt/base.py +++ b/tmt/base.py @@ -43,6 +43,7 @@ import tmt.lint import tmt.log import tmt.plugins +import tmt.plugins.plan_shapers import tmt.result import tmt.steps import tmt.steps.discover @@ -712,9 +713,10 @@ def __init__(self, tree: Optional['Tree'] = None, parent: Optional[tmt.utils.Common] = None, logger: tmt.log.Logger, + name: Optional[str] = None, **kwargs: Any) -> None: """ Initialize the node """ - super().__init__(node=node, logger=logger, parent=parent, name=node.name, **kwargs) + super().__init__(node=node, logger=logger, parent=parent, name=name or node.name, **kwargs) self.node = node self.tree = tree @@ -1650,11 +1652,17 @@ class Plan( # Optional Login instance attached to the plan for easy login in tmt try login: Optional[tmt.steps.Login] = None - # When fetching remote plans we store links between the original - # plan with the fmf id and the imported plan with the content. - _imported_plan: Optional['Plan'] = field(default=None, internal=True) + # When fetching remote plans or splitting plans, we store links + # between the original plan with the fmf id and the imported or + # derived plans with the content. _original_plan: Optional['Plan'] = field(default=None, internal=True) - _remote_plan_fmf_id: Optional[FmfId] = field(default=None, internal=True) + _original_plan_fmf_id: Optional[FmfId] = field(default=None, internal=True) + + _imported_plan_fmf_id: Optional[FmfId] = field(default=None, internal=True) + _imported_plan: Optional['Plan'] = field(default=None, internal=True) + + _derived_plans: list['Plan'] = field(default_factory=list, internal=True) + derived_id: Optional[int] = field(default=None, internal=True) #: Used by steps to mark invocations that have been already applied to #: this plan's phases. Needed to avoid the second evaluation in @@ -1696,11 +1704,12 @@ def __init__( # set, incorrect default value is generated, and the field ends up being # set to `None`. See https://github.com/teemtee/tmt/issues/2630. self._applied_cli_invocations = [] + self._derived_plans = [] # Check for possible remote plan reference first reference = self.node.get(['plan', 'import']) if reference is not None: - self._remote_plan_fmf_id = FmfId.from_spec(reference) + self._imported_plan_fmf_id = FmfId.from_spec(reference) # Save the run, prepare worktree and plan data directory self.my_run = run @@ -2102,13 +2111,13 @@ def show(self) -> None: self._show_additional_keys() # Show fmf id of the remote plan in verbose mode - if (self._original_plan or self._remote_plan_fmf_id) and self.verbosity_level: + if (self._original_plan or self._original_plan_fmf_id) and self.verbosity_level: # Pick fmf id from the original plan by default, use the # current plan in shallow mode when no plans are fetched. if self._original_plan is not None: - fmf_id = self._original_plan._remote_plan_fmf_id + fmf_id = self._original_plan._original_plan_fmf_id else: - fmf_id = self._remote_plan_fmf_id + fmf_id = self._original_plan_fmf_id echo(tmt.utils.format('import', '', key_color='blue')) assert fmf_id is not None # narrow type @@ -2378,14 +2387,21 @@ def go(self) -> None: try: for step in self.steps(skip=['finish']): step.go() - # Finish plan if no tests found (except dry mode) - if (isinstance(step, tmt.steps.discover.Discover) and not step.tests() - and not self.is_dry_run and not step.extract_tests_later): - step.info( - 'warning', 'No tests found, finishing plan.', - color='yellow', shift=1) - abort = True - return + + if isinstance(step, tmt.steps.discover.Discover): + tests = step.tests() + + # Finish plan if no tests found (except dry mode) + if not tests and not self.is_dry_run and not step.extract_tests_later: + step.info( + 'warning', 'No tests found, finishing plan.', + color='yellow', shift=1) + abort = True + return + + if self.my_run and self.reshape(tests): + return + # Source the plan environment file after prepare and execute step if isinstance(step, (tmt.steps.prepare.Prepare, tmt.steps.execute.Execute)): self._source_plan_environment_file() @@ -2421,7 +2437,7 @@ def _export( @property def is_remote_plan_reference(self) -> bool: """ Check whether the plan is a remote plan reference """ - return self._remote_plan_fmf_id is not None + return self._imported_plan_fmf_id is not None def import_plan(self) -> Optional['Plan']: """ Import plan from a remote repository, return a Plan instance """ @@ -2431,8 +2447,8 @@ def import_plan(self) -> Optional['Plan']: if self._imported_plan: return self._imported_plan - assert self._remote_plan_fmf_id is not None # narrow type - plan_id = self._remote_plan_fmf_id + assert self._imported_plan_fmf_id is not None # narrow type + plan_id = self._imported_plan_fmf_id self.debug(f"Import remote plan '{plan_id.name}' from '{plan_id.url}'.", level=3) # Clone the whole git repository if executing tests (run is attached) @@ -2530,6 +2546,30 @@ def import_plan(self) -> Optional['Plan']: return self._imported_plan + def derive_plan(self, derived_id: int, tests: dict[str, list[Test]]) -> 'Plan': + derived_plan = Plan( + node=self.node, + run=self.my_run, + logger=self._logger, + name=f'{self.name}.{derived_id}') + + derived_plan._original_plan = self + derived_plan._original_plan_fmf_id = self.fmf_id + self._derived_plans.append(derived_plan) + + derived_plan.discover._tests = tests + derived_plan.discover.status('done') + + assert self.discover.workdir is not None + assert derived_plan.discover.workdir is not None + + shutil.copytree(self.discover.workdir, derived_plan.discover.workdir, dirs_exist_ok=True) + + for step_name in tmt.steps.STEPS: + getattr(derived_plan, step_name).save() + + return derived_plan + def prune(self) -> None: """ Remove all uninteresting files from the plan workdir """ @@ -2547,6 +2587,23 @@ def prune(self) -> None: for step in self.steps(enabled_only=False): step.prune(logger=step._logger) + def reshape(self, tests: list[tuple[str, 'tmt.Test']]) -> bool: + for shaper_id in tmt.plugins.plan_shapers._PLAN_SHAPER_PLUGIN_REGISTRY.iter_plugin_ids(): + shaper = tmt.plugins.plan_shapers._PLAN_SHAPER_PLUGIN_REGISTRY.get_plugin(shaper_id) + + assert shaper is not None # narrow type + + if not shaper.check(self, tests): + self.debug(f"Plan shaper '{shaper_id}' not applicable.") + continue + + if self.my_run: + self.my_run.swap_plans(self, *shaper.apply(self, tests)) + + return True + + return False + class StoryPriority(enum.Enum): MUST_HAVE = 'must have' @@ -3476,14 +3533,38 @@ def load(self) -> None: self.remove = self.remove or data.remove self.debug(f"Remove workdir when finished: {self.remove}", level=3) - @property - def plans(self) -> list[Plan]: + @functools.cached_property + def plans(self) -> Sequence[Plan]: """ Test plans for execution """ if self._plans is None: assert self.tree is not None # narrow type self._plans = self.tree.plans(run=self, filters=['enabled:true']) return self._plans + @functools.cached_property + def plan_queue(self) -> Sequence[Plan]: + """ + A list of plans remaining to be executed. + + It is being populated via :py:attr:`plans`, but eventually, + :py:meth:`go` will remove plans from it as they get processed. + :py:attr:`plans` will remain untouched and will represent all + plans collected. + """ + + return self.plans[:] + + def swap_plans(self, plan: Plan, *others: Plan) -> None: + plans = cast(list[Plan], self.plans) + plan_queue = cast(list[Plan], self.plan_queue) + + if plan in plan_queue: + plan_queue.remove(plan) + plans.remove(plan) + + plan_queue.extend(others) + plans.extend(others) + def finish(self) -> None: """ Check overall results, return appropriate exit code """ # We get interesting results only if execute or prepare step is enabled @@ -3647,7 +3728,9 @@ def go(self) -> None: # Iterate over plans crashed_plans: list[tuple[Plan, Exception]] = [] - for plan in self.plans: + while self.plan_queue: + plan = cast(list[Plan], self.plan_queue).pop(0) + try: plan.go() diff --git a/tmt/cli.py b/tmt/cli.py index 6dcee5d5b8..5dcb2aab62 100644 --- a/tmt/cli.py +++ b/tmt/cli.py @@ -23,6 +23,7 @@ import tmt.log import tmt.options import tmt.plugins +import tmt.plugins.plan_shapers import tmt.steps import tmt.templates import tmt.trying @@ -459,6 +460,10 @@ def run(context: Context, id_: Optional[str], **kwargs: Any) -> None: context.obj.run = run +for plugin_class in tmt.plugins.plan_shapers._PLAN_SHAPER_PLUGIN_REGISTRY.iter_plugins(): + run = create_options_decorator(plugin_class.run_options())(run) + + # Steps options run.add_command(tmt.steps.discover.DiscoverPlugin.command()) run.add_command(tmt.steps.provision.ProvisionPlugin.command()) diff --git a/tmt/plugins/__init__.py b/tmt/plugins/__init__.py index 7c987e1718..25a5fb322e 100644 --- a/tmt/plugins/__init__.py +++ b/tmt/plugins/__init__.py @@ -73,6 +73,7 @@ def _discover_packages() -> list[tuple[str, Path]]: ('tmt.frameworks', Path('frameworks')), ('tmt.checks', Path('checks')), ('tmt.package_managers', Path('package_managers')), + ('tmt.plugins.plan_shapers', Path('plugins/plan_shapers')), ] diff --git a/tmt/plugins/plan_shapers/__init__.py b/tmt/plugins/plan_shapers/__init__.py new file mode 100644 index 0000000000..9714c65737 --- /dev/null +++ b/tmt/plugins/plan_shapers/__init__.py @@ -0,0 +1,73 @@ +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any, Callable + +import tmt.log +import tmt.utils +from tmt.plugins import PluginRegistry + +if TYPE_CHECKING: + from tmt.base import Plan, Test + from tmt.options import ClickOptionDecoratorType + + +PlanShaperClass = type['PlanShaper'] + + +_PLAN_SHAPER_PLUGIN_REGISTRY: PluginRegistry[PlanShaperClass] = PluginRegistry() + + +def provides_plan_shaper( + package_manager: str) -> Callable[[PlanShaperClass], PlanShaperClass]: + """ + A decorator for registering package managers. + + Decorate a package manager plugin class to register a package manager. + """ + + def _provides_plan_shaper(package_manager_cls: PlanShaperClass) -> PlanShaperClass: + _PLAN_SHAPER_PLUGIN_REGISTRY.register_plugin( + plugin_id=package_manager, + plugin=package_manager_cls, + logger=tmt.log.Logger.get_bootstrap_logger()) + + return package_manager_cls + + return _provides_plan_shaper + + +def find_package_manager(name: str) -> 'PlanShaperClass': + """ + Find a plan shaper by its name. + + :raises GeneralError: when the plugin does not exist. + """ + + plugin = _PLAN_SHAPER_PLUGIN_REGISTRY.get_plugin(name) + + if plugin is None: + raise tmt.utils.GeneralError( + f"Package manager '{name}' was not found in package manager registry.") + + return plugin + + +class PlanShaper(tmt.utils.Common): + """ A base class for package manager plugins """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + @classmethod + def run_options(cls) -> list['ClickOptionDecoratorType']: + raise NotImplementedError + + @classmethod + def check(cls, plan: 'Plan', tests: list[tuple[str, 'Test']]) -> bool: + raise NotImplementedError + + @classmethod + def apply( + cls, + plan: 'Plan', + tests: list[tuple[str, 'Test']]) -> Iterator['Plan']: + raise NotImplementedError diff --git a/tmt/plugins/plan_shapers/max_tests.py b/tmt/plugins/plan_shapers/max_tests.py new file mode 100644 index 0000000000..6e52b875ad --- /dev/null +++ b/tmt/plugins/plan_shapers/max_tests.py @@ -0,0 +1,74 @@ +import itertools +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from tmt.base import Plan, Test + from tmt.options import ClickOptionDecoratorType + +from tmt.plugins.plan_shapers import PlanShaper, provides_plan_shaper + + +@provides_plan_shaper('max-tests') +class MaxTestsPlanShaper(PlanShaper): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + @classmethod + def run_options(cls) -> list['ClickOptionDecoratorType']: + from tmt.cli import option + + return [ + option( + '--max', + metavar='N', + help='Split plans to include N tests at max.', + type=int, + default=-1) + ] + + @classmethod + def check(cls, plan: 'Plan', tests: list[tuple[str, 'Test']]) -> bool: + if not plan.my_run: + return False + + max_test_count = plan.my_run.opt('max') + + if max_test_count <= 0: + return False + + if len(tests) <= max_test_count: + return False + + return True + + @classmethod + def apply( + cls, + plan: 'Plan', + tests: list[tuple[str, 'Test']]) -> Iterator['Plan']: + assert plan.my_run is not None + + max_test_per_batch = plan.my_run.opt('max') + + plan.info(f'Splitting plan to batches of {max_test_per_batch} tests.') + + for batch_id in itertools.count(1): + if not tests: + break + + batch: dict[str, list[Test]] = {} + + for _ in range(max_test_per_batch): + if not tests: + break + + phase_name, test = tests.pop(0) + + if phase_name not in batch: + batch[phase_name] = [test] + + else: + batch[phase_name].append(test) + + yield plan.derive_plan(batch_id, batch) diff --git a/tmt/steps/discover/__init__.py b/tmt/steps/discover/__init__.py index ccc416fbc6..31821a44b8 100644 --- a/tmt/steps/discover/__init__.py +++ b/tmt/steps/discover/__init__.py @@ -149,8 +149,8 @@ def log_import_plan_details(self) -> None: """ Log details about the imported plan """ parent = cast(Optional[tmt.steps.discover.Discover], self.parent) if parent and parent.plan._original_plan and \ - parent.plan._original_plan._remote_plan_fmf_id: - remote_plan_id = parent.plan._original_plan._remote_plan_fmf_id + parent.plan._original_plan._imported_plan_fmf_id: + remote_plan_id = parent.plan._original_plan._imported_plan_fmf_id # FIXME: cast() - https://github.com/python/mypy/issues/7981 # Note the missing Optional for values - to_minimal_dict() would # not include unset keys, therefore all values should be valid. @@ -326,7 +326,7 @@ def summary(self) -> None: text = listed(len(self.tests(enabled=True)), 'test') + ' selected' self.info('summary', text, 'green', shift=1) # Test list in verbose mode - for test in self.tests(enabled=True): + for _, test in self.tests(enabled=True): self.verbose(test.name, color='red', shift=2) def go(self, force: bool = False) -> None: @@ -364,7 +364,7 @@ def go(self, force: bool = False) -> None: else: raise GeneralError(f'Unexpected phase in discover step: {phase}') - for test in self.tests(): + for _, test in self.tests(): test.serial_number = self.plan.draw_test_serial_number(test) # Show fmf identifiers for tests discovered in plan @@ -373,7 +373,7 @@ def go(self, force: bool = False) -> None: if self.tests(enabled=True): export_fmf_ids: list[str] = [] - for test in self.tests(enabled=True): + for _, test in self.tests(enabled=True): fmf_id = test.fmf_id if not fmf_id.url: @@ -420,21 +420,23 @@ def tests( self, *, phase_name: Optional[str] = None, - enabled: Optional[bool] = None) -> list['tmt.Test']: - def _iter_all_tests() -> Iterator['tmt.Test']: - tests = self._failed_tests if self._failed_tests else self._tests - for phase_tests in tests.values(): - yield from phase_tests + enabled: Optional[bool] = None) -> list[tuple[str, 'tmt.Test']]: + tests = self._failed_tests if self._failed_tests else self._tests + + def _iter_all_tests() -> Iterator[tuple[str, 'tmt.Test']]: + for phase_name, phase_tests in tests.items(): + for test in phase_tests: + yield phase_name, test - def _iter_phase_tests() -> Iterator['tmt.Test']: + def _iter_phase_tests() -> Iterator[tuple[str, 'tmt.Test']]: assert phase_name is not None - tests = self._failed_tests if self._failed_tests else self._tests - yield from tests[phase_name] + for test in self._tests[phase_name]: + yield phase_name, test iterator = _iter_all_tests if phase_name is None else _iter_phase_tests if enabled is None: return list(iterator()) - return [test for test in iterator() if test.enabled is enabled] + return [(phase_name, test) for phase_name, test in iterator() if test.enabled is enabled] diff --git a/tmt/steps/execute/__init__.py b/tmt/steps/execute/__init__.py index 1fd9468cea..da43cf5229 100644 --- a/tmt/steps/execute/__init__.py +++ b/tmt/steps/execute/__init__.py @@ -536,7 +536,7 @@ def prepare_tests(self, guest: Guest, logger: tmt.log.Logger) -> list[TestInvoca """ invocations: list[TestInvocation] = [] - for test in self.discover.tests(phase_name=self.discover_phase, enabled=True): + for _, test in self.discover.tests(phase_name=self.discover_phase, enabled=True): invocation = TestInvocation(phase=self, test=test, guest=guest, logger=logger) invocations.append(invocation) diff --git a/tmt/steps/execute/upgrade.py b/tmt/steps/execute/upgrade.py index 801ad11957..6da19d6b5a 100644 --- a/tmt/steps/execute/upgrade.py +++ b/tmt/steps/execute/upgrade.py @@ -386,7 +386,7 @@ def _run_test_phase( The prefix is also set as IN_PLACE_UPGRADE environment variable. """ names_backup = [] - for test in self.discover.tests(enabled=True): + for _, test in cast(list[tuple[str, tmt.base.Test]], self.discover.tests(enabled=True)): names_backup.append(test.name) test.name = f'/{prefix}/{test.name.lstrip("/")}' @@ -395,6 +395,6 @@ def _run_test_phase( extra_environment=Environment({STATUS_VARIABLE: EnvVarValue(prefix)}), logger=logger) - tests = self.discover.tests(enabled=True) - for i, test in enumerate(tests): + tests = cast(list[tuple[str, tmt.base.Test]], self.discover.tests(enabled=True)) + for i, (_, test) in enumerate(tests): test.name = names_backup[i] diff --git a/tmt/steps/prepare/__init__.py b/tmt/steps/prepare/__init__.py index 3ec2080249..bdbb9b9cb3 100644 --- a/tmt/steps/prepare/__init__.py +++ b/tmt/steps/prepare/__init__.py @@ -233,7 +233,7 @@ def as_key(self) -> frozenset['tmt.base.DependencySimple']: # use what the step loads from its storage, `tests.yaml`. Which # means, there probably would be no phases to inspect from time to # time, therefore going after the step itself. - for test in self.plan.discover.tests(enabled=True): + for _, test in self.plan.discover.tests(enabled=True): if not test.enabled_on_guest(guest): continue diff --git a/tmt/steps/prepare/distgit.py b/tmt/steps/prepare/distgit.py index b0d471aeab..1ed00943f6 100644 --- a/tmt/steps/prepare/distgit.py +++ b/tmt/steps/prepare/distgit.py @@ -290,7 +290,7 @@ def go( for guest in self.step.plan.provision.guests(): collected_requires: list[tmt.base.DependencySimple] = [] collected_recommends: list[tmt.base.DependencySimple] = [] - for test in self.step.plan.discover.tests(enabled=True): + for _, test in self.step.plan.discover.tests(enabled=True): if not test.enabled_on_guest(guest): continue diff --git a/tmt/steps/report/reportportal.py b/tmt/steps/report/reportportal.py index bdd3f0d790..0b8e5d1c33 100644 --- a/tmt/steps/report/reportportal.py +++ b/tmt/steps/report/reportportal.py @@ -492,7 +492,7 @@ def go(self, *, logger: Optional[tmt.log.Logger] = None) -> None: self.data.suite_uuid = suite_uuid # For each test - for test in self.step.plan.discover.tests(): + for _, test in self.step.plan.discover.tests(): test_time = self.time() if executed: result = next((result for result in self.step.plan.execute.results()