diff --git a/perun/cli_groups/import_cli.py b/perun/cli_groups/import_cli.py index de282661..eb54f35d 100755 --- a/perun/cli_groups/import_cli.py +++ b/perun/cli_groups/import_cli.py @@ -9,15 +9,17 @@ import click # Perun Imports -from perun.logic import commands +from perun.logic import commands, config from perun.profile import imports +from perun.utils.common import cli_kit @click.group("import") @click.option( "--machine-info", "-i", - type=click.Path(resolve_path=True, readable=True), + type=click.Path(), + default="", help="Imports machine info from file in JSON format (by default, machine info is loaded from " "the current host). You can use `utils/generate_machine_info.sh` script to generate the " "machine info file.", @@ -26,7 +28,9 @@ "--import-dir", "-d", type=click.Path(resolve_path=True, readable=True), - help="Specifies the directory to import profiles from.", + callback=cli_kit.set_config_option_from_flag(config.runtime, "import.dir"), + help="Specifies the directory from which to import profiles and other files (e.g., stats, " + "machine info, ...) that are provided as relative paths (default = ./).", ) @click.option( "--minor-version", @@ -37,23 +41,30 @@ help="Specifies the head minor version, for which the profiles will be imported.", ) @click.option( - "--stats-info", + "--stats-headers", "-t", nargs=1, - default=None, - metavar="", - help="Describes the stats associated with the imported profiles. Please see the import " - "documentation for details regarding the stat description format.", + default="", + metavar="[STAT_HEADER+]", + help="Describes the stats headers associated with imported profiles specified directly in CLI. " + "A stats header has the form of 'NAME[|COMPARISON_TYPE[|UNIT[|AGGREGATE_BY[|DESCRIPTION]]]]'.", +) +@click.option( + "--metadata", + "-md", + multiple=True, + metavar="['KEY|VALUE|[DESCRIPTION]'] or [FILE.json]", + help="Describes a single metadata entry associated with the imported profiles as a " + "'key|value[|description]' string, or a JSON file that may contain multiple metadata entries " + "that will have its keys flattened. The --metadata option may be specified multiple times.", ) @click.option( "--cmd", "-c", nargs=1, default="", - help=( - "Command that was being profiled. Either corresponds to some" - " script, binary or command, e.g. ``./mybin`` or ``perun``." - ), + help="Command that was being profiled. Either corresponds to some script, binary or command, " + "e.g. ``./mybin`` or ``perun``.", ) @click.option( "--workload", @@ -66,12 +77,17 @@ "--save-to-index", "-s", is_flag=True, - help="Saves the imported profile to index.", default=False, + help="Saves the imported profile to index.", ) @click.pass_context def import_group(ctx: click.Context, **kwargs: Any) -> None: - """Imports Perun profiles from different formats""" + """Imports Perun profiles from different formats. + + If the --import-dir parameter is specified, relative file paths will be prefixed with the + import directory path (with the default value being the current working directory). + Absolute file paths ignore the import directory. + """ commands.try_init() ctx.obj = kwargs @@ -89,15 +105,16 @@ def perf_group(ctx: click.Context, **kwargs: Any) -> None: This supports either profiles collected in: - 1. Binary format: e.g., `collected.data` files, that are results of `perf record` - 2. Text format: result of `perf script` that parses the binary into user-friendly and - parsing-friendly text format + 1. Binary format: e.g., `collected.data` files, that are results of `perf record` + + 2. Text format: result of `perf script` that parses the binary into user-friendly and + parsing-friendly text format """ ctx.obj.update(kwargs) @perf_group.command("record") -@click.argument("imported", nargs=-1, required=True) +@click.argument("import_entries", nargs=-1, required=True) @click.pass_context @click.option( "--with-sudo", @@ -106,30 +123,83 @@ def perf_group(ctx: click.Context, **kwargs: Any) -> None: help="Runs the conversion of the data in sudo mode.", default=False, ) -def from_binary(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: - """Imports Perun profiles from binary generated by `perf record` command""" +def from_binary(ctx: click.Context, import_entries: list[str], **kwargs: Any) -> None: + """Imports Perun profiles from binary generated by `perf record` command. + + Multiple import entries may be specified; an import entry is either a profile entry + + 'profile_path[,[,]+]' + + where each stat value corresponds to a stats header specified in the --stats-headers option, + or a CSV file entry + + 'file.csv' + + where the CSV file is in the format + + #Profile,Exit_code[,stat-header1]+ + profile_path[,[,]+] + ... + + that combines the --stats-headers option and profile entries. + """ kwargs.update(ctx.obj) - imports.import_perf_from_record(imported, **kwargs) + imports.import_perf_from_record(import_entries, **kwargs) @perf_group.command("script") -@click.argument("imported", type=str, nargs=-1, required=True) +@click.argument("import_entries", type=str, nargs=-1, required=True) @click.pass_context -def from_text(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: - """Import Perun profiles from output generated by `perf script` command""" +def from_text(ctx: click.Context, import_entries: list[str], **kwargs: Any) -> None: + """Import Perun profiles from output generated by `perf script` command. + + Multiple import entries may be specified; an import entry is either a profile entry + + 'profile_path[,[,]+]' + + where each stat value corresponds to a stats header specified in the --stats-headers option, + or a CSV file entry + + 'file.csv' + + where the CSV file is in the format + + #Profile,Exit_code[,stat-header1]+ + profile_path[,[,]+] + ... + + that combines the --stats-headers option and profile entries. + """ kwargs.update(ctx.obj) - imports.import_perf_from_script(imported, **kwargs) + imports.import_perf_from_script(import_entries, **kwargs) @perf_group.command("stack") -@click.argument("imported", type=str, nargs=-1, required=True) +@click.argument("import_entries", type=str, nargs=-1, required=True) @click.pass_context -def from_stacks(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: +def from_stacks(ctx: click.Context, import_entries: list[str], **kwargs: Any) -> None: """Import Perun profiles from output generated by `perf script | stackcollapse-perf.pl` - command + command. + + Multiple import entries may be specified; an import entry is either a profile entry + + 'profile_path[,[,]+]' + + where each stat value corresponds to a stats header specified in the --stats-headers option, + or a CSV file entry + + 'file_path.csv' + + where the CSV file is in the format + + #Profile,Exit_code[,stat-header1]+ + profile_path[,[,]+] + ... + + that combines the --stats-headers option and profile entries. """ kwargs.update(ctx.obj) - imports.import_perf_from_stack(imported, **kwargs) + imports.import_perf_from_stack(import_entries, **kwargs) @import_group.group("elk") @@ -145,15 +215,18 @@ def elk_group(ctx: click.Context, **kwargs: Any) -> None: The command supports profiles collected in: - 1. JSON format: files, that are extracted from ELK or are stored using format compatible with ELK. + 1. JSON format: files extracted from ELK or stored using format compatible with ELK. """ ctx.obj.update(kwargs) @elk_group.command("json") -@click.argument("imported", nargs=-1, required=True) +@click.argument("import_entries", nargs=-1, required=True) @click.pass_context -def from_json(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: - """Imports Perun profiles from json compatible with elk infrastructure""" +def from_json(ctx: click.Context, import_entries: list[str], **kwargs: Any) -> None: + """Imports Perun profiles from JSON compatible with elk infrastructure. + + Each import entry may specify a JSON path 'file_path.json'. + """ kwargs.update(ctx.obj) - imports.import_elk_from_json(imported, **kwargs) + imports.import_elk_from_json(import_entries, **kwargs) diff --git a/perun/profile/factory.py b/perun/profile/factory.py index d16f125a..52d2dcee 100644 --- a/perun/profile/factory.py +++ b/perun/profile/factory.py @@ -21,7 +21,7 @@ # Perun Imports from perun.logic import config from perun.postprocess.regression_analysis import regression_models -from perun.profile import convert, query +from perun.profile import convert, query, stats, helpers from perun.utils import log from perun.utils.common import common_kit import perun.check.detection_kit as detection @@ -455,6 +455,22 @@ def all_snapshots(self) -> Iterable[tuple[int, list[dict[str, Any]]]]: for i in range(0, maximal_snapshot + 1): yield i, snapshot_map[i] + def all_stats(self) -> Iterable[stats.ProfileStat]: + """Iterates through all the stats records in the profile. + + :return: iterable of all stats records + """ + for stat in self._storage.get("stats", {}): + yield stats.ProfileStat.from_profile(stat) + + def all_metadata(self) -> Iterable[helpers.ProfileMetadata]: + """Iterates through all the metadata records in the profile. + + :return: iterable of all metadata records + """ + for entry in self._storage.get("metadata", {}): + yield helpers.ProfileMetadata.from_profile(entry) + # TODO: discuss the intent of __len__ and possibly merge? def resources_size(self) -> int: """Returns the number of resources stored in the internal storage. diff --git a/perun/profile/helpers.py b/perun/profile/helpers.py index 062d605c..d2dc67b4 100644 --- a/perun/profile/helpers.py +++ b/perun/profile/helpers.py @@ -17,13 +17,13 @@ from __future__ import annotations # Standard Imports -from typing import Any, TYPE_CHECKING, ClassVar +import dataclasses import json import operator import os import re import time -from dataclasses import dataclass +from typing import Any, TYPE_CHECKING # Third-Party Imports @@ -612,49 +612,42 @@ def is_compatible_with_profile(self, profile: profiles.Profile) -> bool: ] -@dataclass -class ProfileStat: - ALLOWED_ORDERING: ClassVar[dict[str, bool]] = { - "higher_is_better": True, - "lower_is_better": False, - } +@dataclasses.dataclass +class ProfileMetadata: + """A representation of a single profile metadata entry. + + :ivar name: the name (key) of the metadata entry + :ivar value: the value of the metadata entry + :ivar description: detailed description of the metadata entry + """ name: str - unit: str = "#" - ordering: bool = True - tooltip: str = "" - value: int | float = 0.0 + value: str | float + description: str = "" @classmethod - def from_string( - cls, - name: str = "empty", - unit: str = "#", - ordering: str = "higher_is_better", - tooltip: str = "", - *_: Any, - ) -> ProfileStat: - if name == "empty": - # Invalid stat specification, warn - perun_log.warn("Empty profile stat specification. Creating a dummy 'empty' stat.") - if ordering not in cls.ALLOWED_ORDERING: - # Invalid stat ordering, warn - perun_log.warn( - f"Unknown stat ordering: {ordering}. Please choose one of " - f"({', '.join(cls.ALLOWED_ORDERING.keys())}). " - f"Using the default stat ordering value." - ) - ordering_bool = ProfileStat.ordering - else: - ordering_bool = cls.ALLOWED_ORDERING[ordering] - return cls(name, unit, ordering_bool, tooltip) - - def get_normalized_tooltip(self) -> str: - # Find the string representation of the ordering to use in the tooltip - ordering: str = "" - for str_desc, bool_repr in self.ALLOWED_ORDERING.items(): - if bool_repr == self.ordering: - ordering = str_desc.replace("_", " ") - if self.tooltip: - return f"{self.tooltip} ({ordering})" - return ordering + def from_string(cls, metadata: str) -> ProfileMetadata: + """Constructs a ProfileMetadata object from a string representation. + + :param metadata: the string representation of a metadata entry + + :return: the constructed ProfileMetadata object + """ + return cls(*metadata.split("|")) + + @classmethod + def from_profile(cls, metadata: dict[str, Any]) -> ProfileMetadata: + """Constructs a ProfileMetadata object from a dictionary representation used in Profile. + + :param metadata: the dictionary representation of a metadata entry + + :return: the constructed ProfileMetadata object + """ + return cls(**metadata) + + def as_tuple(self) -> tuple[str, str | float, str]: + """Converts the metadata object into a tuple. + + :return: the tuple representation of a metadata entry + """ + return self.name, self.value, self.description diff --git a/perun/profile/imports.py b/perun/profile/imports.py index 6e07f56e..e7fff622 100755 --- a/perun/profile/imports.py +++ b/perun/profile/imports.py @@ -4,271 +4,64 @@ # Standard Imports from collections import defaultdict -from dataclasses import dataclass, field, asdict -from pathlib import Path -from typing import Any, Optional, Iterator, Callable import csv +from dataclasses import asdict, dataclass +import gzip import json import os -import statistics +from pathlib import Path import subprocess +from typing import Any # Third-Party Imports -import gzip # Perun Imports from perun.collect.kperf import parser -from perun.profile import helpers as p_helpers -from perun.logic import commands, index, pcs +from perun.logic import commands, config, index, pcs +from perun.profile import query, helpers as profile_helpers, stats as profile_stats +from perun.profile.factory import Profile from perun.utils import log, streams from perun.utils.common import script_kit, common_kit from perun.utils.external import commands as external_commands, environment from perun.utils.structs import MinorVersion -from perun.profile.factory import Profile from perun.vcs import vcs_kit -# TODO: add documentation -# TODO: fix stats in other types of diffviews -# TODO: refactor the perf import type commands: there is a lot of code duplication - - @dataclass -class ImportProfileSpec: - path: Path - exit_code: int = 0 - values: list[float] = field(default_factory=list) - - -class ImportedProfiles: - """ - Note: I would reconsider this class or refactor it, removing the logical elements, it obfuscates the logic a little - and makes the functions less readable (there are not streams/pipes as is most of the logic/perun); I for one am - rather "fan" of generic functions that takes structures and returns structure than classes with methods/logic. - TODO: the import-dir could be removed by extracting this functionality to command-line callback and massage - the paths during the CLI parsing; hence assuming that the paths are correct when importing. I think the parameter - only complicates the code. - """ +class _PerfProfileSpec: + """A representation of a perf profile record to import. - __slots__ = "import_dir", "stats", "profiles" - - def __init__(self, targets: list[str], import_dir: str | None, stats_info: str | None) -> None: - self.import_dir: Path = Path(import_dir) if import_dir is not None else Path.cwd() - # Parse the CLI stats if available - self.stats: list[p_helpers.ProfileStat] = [] - self.profiles: list[ImportProfileSpec] = [] - - if stats_info is not None: - self.stats = [ - p_helpers.ProfileStat.from_string(*stat.split("|")) - for stat in stats_info.split(",") - ] - - for target in targets: - if target.lower().endswith(".csv"): - # The input is a csv file - self._parse_import_csv(target) - else: - # The input is a file path - self._add_imported_profile(target.split(",")) - - def __iter__(self) -> Iterator[ImportProfileSpec]: - return iter(self.profiles) - - def __len__(self) -> int: - return len(self.profiles) - - def get_exit_codes(self) -> str: - return ", ".join(str(p.exit_code) for p in self.profiles) - - def aggregate_stats( - self, agg: Callable[[list[float | int]], float] - ) -> Iterator[p_helpers.ProfileStat]: - stat_value_lists: list[list[float | int]] = [[] for _ in range(len(self.stats))] - for profile in self.profiles: - value_list: list[float | int] - stat_value: float | int - for value_list, stat_value in zip(stat_value_lists, profile.values): - value_list.append(stat_value) - for value_list, stat_obj in zip(stat_value_lists, self.stats): - stat_obj.value = agg(value_list) - yield stat_obj - - def _parse_import_csv(self, target: str) -> None: - with open(self.import_dir / target, "r") as csvfile: - csv_reader = csv.reader(csvfile, delimiter=",") - header: list[str] = next(csv_reader) - stats: list[p_helpers.ProfileStat] = [ - p_helpers.ProfileStat.from_string(*stat_definition.split("|")) - for stat_definition in header[2:] - ] - # Parse the CSV stat definition and check that they are not in conflict with the CLI - # stat definitions, if any - for idx, stat in enumerate(stats): - if idx >= len(self.stats): - self.stats.append(stat) - elif stat != self.stats[idx]: - log.warn( - f"Mismatching profile stat definition from CLI and CSV: " - f"cli.{self.stats[idx].name} != csv.{stat.name}. " - f"Using the CLI stat definition." - ) - # Parse the remaining rows that should represent profile specifications - for row in csv_reader: - self._add_imported_profile(row) - - def _add_imported_profile(self, target: list[str]) -> None: - if len(target) == 0: - # Empty profile specification, warn - log.warn("Empty import profile specification. Skipping.") - else: - self.profiles.append( - ImportProfileSpec( - self.import_dir / target[0], - int(target[1]) if len(target) >= 2 else ImportProfileSpec.exit_code, - list(map(float, target[2:])), - ) - ) - - -def load_file(filepath: Path) -> str: - """Tests if the file is packed by gzip and unpacks it, otherwise reads it as a text file - - :param filepath: path with source file - :return: the content of the file - """ - if filepath.suffix.lower() == ".gz": - with open(filepath, "rb") as f: - header = f.read(2) - f.seek(0) - assert header == b"\x1f\x8b" - with gzip.GzipFile(fileobj=f) as gz: - return gz.read().decode("utf-8") - with open(filepath, "r", encoding="utf-8") as imported_handle: - return imported_handle.read() - - -def get_machine_info(machine_info: Optional[str] = None) -> dict[str, Any]: - """Returns machine info either from input file or constructs it from environment - - :param machine_info: file in json format, which contains machine specification - :return: parsed dictionary format of machine specification - """ - if machine_info is not None: - with open(machine_info, "r") as machine_handle: - return json.load(machine_handle) - else: - return environment.get_machine_specification() - - -def import_perf_profile( - profiles: ImportedProfiles, - resources: list[dict[str, Any]], - minor_version: MinorVersion, - machine_info: Optional[str] = None, - with_sudo: bool = False, - save_to_index: bool = False, - **kwargs: Any, -) -> None: - """Constructs the profile for perf-collected data and saves them to jobs or index - - :param profiles: list of to-be-imported profiles - :param resources: list of parsed resources - :param minor_version: minor version corresponding to the imported profiles - :param machine_info: additional dictionary with machine specification - :param with_sudo: indication whether the data were collected with sudo - :param save_to_index: indication whether we should save the imported profiles to index - :param kwargs: rest of the paramters + :ivar path: the absolute path to the perf profile. + :ivar exit_code: the exit code of the profile collection process. """ - prof = Profile( - { - "global": { - "time": "???", - "resources": resources, - } - } - ) - prof.update({"origin": minor_version.checksum}) - prof.update({"machine": get_machine_info(machine_info)}) - prof.update({"stats": [asdict(stat) for stat in profiles.aggregate_stats(statistics.median)]}), - prof.update( - { - "header": { - "type": "time", - "cmd": kwargs.get("cmd", ""), - "exitcode": profiles.get_exit_codes(), - "workload": kwargs.get("workload", ""), - "units": {"time": "sample"}, - } - } - ) - prof.update( - { - "collector_info": { - "name": "kperf", - "params": { - "with_sudo": with_sudo, - "warmup": kwargs.get("warmup", 0), - "repeat": len(profiles), - }, - } - } - ) - prof.update({"postprocessors": []}) - save_imported_profile(prof, save_to_index, minor_version) - - -def save_imported_profile(prof: Profile, save_to_index: bool, minor_version: MinorVersion) -> None: - """Saves the imported profile either to index or to pending jobs - - :param prof: imported profile - :param minor_version: minor version corresponding to the imported profiles - :param save_to_index: indication whether we should save the imported profiles to index - """ - full_profile_name = p_helpers.generate_profile_name(prof) - profile_directory = pcs.get_job_directory() - full_profile_path = os.path.join(profile_directory, full_profile_name) - - streams.store_json(prof.serialize(), full_profile_path) - log.minor_status( - "stored generated profile ", - status=f"{log.path_style(os.path.relpath(full_profile_path))}", - ) - if save_to_index: - commands.add([full_profile_path], minor_version.checksum, keep_profile=False) - else: - # Else we register the profile in pending index - index.register_in_pending_index(full_profile_path, prof) + path: Path + exit_code: int = 0 @vcs_kit.lookup_minor_version def import_perf_from_record( - imported: list[str], - import_dir: str | None, - stats_info: str | None, + import_entries: list[str], + stats_headers: str, minor_version: str, with_sudo: bool = False, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record` + """Imports profiles collected by `perf record` command. - It does some black magic in ImportedProfiles probably, then for each filename it runs the - perf script + parser script to generate the profile. + First, the function parses all the perf import entries and stats headers, and then it runs + the perf script + parser script for each entry to generate the profile. - :param imported: list of files with imported data - :param import_dir: different directory for importing the profiles - :param stats_info: additional statistics collected for the profile (i.e. non-resource types) - :param minor_version: minor version corresponding to the imported profiles - :param with_sudo: indication whether the data were collected with sudo - :param kwargs: rest of the paramters + :param import_entries: a collection of import entries (profiles or CSV files). + :param stats_headers: CLI-specified stats headers. + :param minor_version: minor version corresponding to the imported profiles. + :param with_sudo: indication whether the data were collected with sudo. + :param kwargs: rest of the parameters. """ parse_script = script_kit.get_script("stackcollapse-perf.pl") - minor_version_info = pcs.vcs().get_minor_version_info(minor_version) - - profiles = ImportedProfiles(imported, import_dir, stats_info) - + profiles, stats = _parse_perf_import_entries(import_entries, stats_headers) resources = [] + for imported_file in profiles: perf_script_command = ( f"{'sudo ' if with_sudo else ''}perf script -i {imported_file.path} | {parse_script}" @@ -285,165 +78,256 @@ def import_perf_from_record( log.error(f"Cannot load data due to: {err}") resources.extend(parser.parse_events(out.decode("utf-8").split("\n"))) log.minor_success(log.path_style(str(imported_file.path)), "imported") - import_perf_profile(profiles, resources, minor_version_info, with_sudo=with_sudo, **kwargs) + minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + import_perf_profile( + profiles, stats, resources, minor_version_info, with_sudo=with_sudo, **kwargs + ) @vcs_kit.lookup_minor_version def import_perf_from_script( - imported: list[str], - import_dir: str | None, - stats_info: str | None, + import_entries: list[str], + stats_headers: str, minor_version: str, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record | perf script` + """Imports profiles collected by `perf record | perf script` command. - It does some black magic in ImportedProfiles probably, then for each filename it runs the - parser script to generate the profile. + First, the function parses all the perf import entries and stats headers, and then it runs + the parser script for each entry to generate the profile. - :param imported: list of files with imported data - :param import_dir: different directory for importing the profiles - :param stats_info: additional statistics collected for the profile (i.e. non-resource types) - :param minor_version: minor version corresponding to the imported profiles - :param kwargs: rest of the paramters + :param import_entries: a collection of import entries (profiles or CSV files). + :param stats_headers: CLI-specified stats headers. + :param minor_version: minor version corresponding to the imported profiles. + :param kwargs: rest of the parameters. """ parse_script = script_kit.get_script("stackcollapse-perf.pl") - minor_version_info = pcs.vcs().get_minor_version_info(minor_version) - - profiles = ImportedProfiles(imported, import_dir, stats_info) - + profiles, stats = _parse_perf_import_entries(import_entries, stats_headers) resources = [] + for imported_file in profiles: perf_script_command = f"cat {imported_file.path} | {parse_script}" out, _ = external_commands.run_safely_external_command(perf_script_command) log.minor_success(f"Raw data from {log.path_style(str(imported_file.path))}", "collected") resources.extend(parser.parse_events(out.decode("utf-8").split("\n"))) log.minor_success(log.path_style(str(imported_file.path)), "imported") - import_perf_profile(profiles, resources, minor_version_info, **kwargs) + minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + import_perf_profile(profiles, stats, resources, minor_version_info, **kwargs) @vcs_kit.lookup_minor_version def import_perf_from_stack( - imported: list[str], - import_dir: str | None, - stats_info: str | None, + import_entries: list[str], + stats_headers: str, minor_version: str, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record | perf script` + """Imports profiles collected by `perf record | perf script | stackcollapse-perf.pl` command. - It does some black magic in ImportedProfiles probably, then for each filename parses the files. + First, the function parses all the perf import entries and stats headers, and then it parses + each entry to generate the profile. - :param imported: list of files with imported data - :param import_dir: different directory for importing the profiles - :param stats_info: additional statistics collected for the profile (i.e. non-resource types) - :param minor_version: minor version corresponding to the imported profiles - :param kwargs: rest of the paramters + :param import_entries: a collection of import entries (profiles or CSV files). + :param stats_headers: CLI-specified stats headers. + :param minor_version: minor version corresponding to the imported profiles. + :param kwargs: rest of the parameters. """ - minor_version_info = pcs.vcs().get_minor_version_info(minor_version) - profiles = ImportedProfiles(imported, import_dir, stats_info) - + profiles, stats = _parse_perf_import_entries(import_entries, stats_headers) resources = [] for imported_profile in profiles: - out = load_file(imported_profile.path) + out = load_perf_file(imported_profile.path) resources.extend(parser.parse_events(out.split("\n"))) log.minor_success(log.path_style(str(imported_profile.path)), "imported") - import_perf_profile(profiles, resources, minor_version_info, **kwargs) + minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + import_perf_profile(profiles, stats, resources, minor_version_info, **kwargs) -def extract_machine_info_from_metadata(metadata: dict[str, Any]) -> dict[str, Any]: - """Extracts the parts of the profile, that corresponds to machine info +@vcs_kit.lookup_minor_version +def import_elk_from_json( + import_entries: list[str], + metadata: tuple[str, ...], + minor_version: str, + **kwargs: Any, +) -> None: + """Imports the ELK stored data from the json data. - Note that not many is collected from the ELK formats and it can vary greatly, - hence, most of the machine specification and environment should be in metadata instead. + The loading expects the json files to be in form of `{'queries': []}`. - :param metadata: metadata extracted from the ELK profiles - :return: machine info extracted from the profiles + :param import_entries: list of filenames with elk data. + :param metadata: CLI-supplied additional metadata. Metadata specified in JSON take precedence. + :param minor_version: minor version corresponding to the imported profiles. + :param kwargs: rest of the parameters. """ - machine_info = { - "architecture": metadata.get("machine.arch", "?"), - "system": metadata.get("machine.os", "?").capitalize(), - "release": metadata.get("extra.machine.platform", "?"), - "host": metadata.get("machine.hostname", "?"), - "cpu": { - "physical": "?", - "total": metadata.get("machine.cpu-cores", "?"), - "frequency": "?", - }, - "memory": { - "total_ram": metadata.get("machine.ram", "?"), - "swap": "?", - }, + import_dir = Path(config.lookup_key_recursively("import.dir", os.getcwd())) + resources: list[dict[str, Any]] = [] + # Load the CLI-supplied metadata, if any + elk_metadata: dict[str, profile_helpers.ProfileMetadata] = { + data.name: data for data in _import_metadata(metadata, import_dir) } - machine_info["boot_info"] = "?" - machine_info["mem_details"] = {} - machine_info["cpu_details"] = [] - return machine_info + for elk_file in import_entries: + elk_file_path = _massage_import_path(elk_file, import_dir) + with streams.safely_open_and_log(elk_file_path, "r", fatal_fail=True) as elk_handle: + imported_json = json.load(elk_handle) + assert ( + "queries" in imported_json.keys() + ), "expected the JSON to contain list of dictionaries in 'queries' key" + r, m = extract_from_elk(imported_json["queries"]) + resources.extend(r) + # Possibly overwrite CLI-supplied metadata when identical keys are found + elk_metadata.update(m) + log.minor_success(log.path_style(str(elk_file_path)), "imported") + minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + import_elk_profile(resources, elk_metadata, minor_version_info, **kwargs) -def import_elk_profile( +def import_perf_profile( + profiles: list[_PerfProfileSpec], + stats: list[profile_stats.ProfileStat], resources: list[dict[str, Any]], - metadata: dict[str, Any], minor_version: MinorVersion, - save_to_index: bool = False, **kwargs: Any, ) -> None: - """Constructs the profile for elk-stored data and saves them to jobs or index + """Constructs the profile for perf-collected data and saves them to jobs or index. - :param resources: list of parsed resources - :param metadata: parts of the profiles that will be stored as metadata in the profile - :param minor_version: minor version corresponding to the imported profiles - :param save_to_index: indication whether we should save the imported profiles to index - :param kwargs: rest of the paramters + :param profiles: a collection of specifications of the profiles that are being imported. + :param stats: a collection of profiles statistics that should be associated with the profile. + :param resources: a collection of parsed resources. + :param minor_version: minor version corresponding to the imported profiles. + :param kwargs: rest of the parameters. """ + import_dir = Path(config.lookup_key_recursively("import.dir", os.getcwd())) prof = Profile( { "global": { "time": "???", "resources": resources, - } + }, + "origin": minor_version.checksum, + "machine": get_machine_info(kwargs.get("machine_info", ""), import_dir), + "metadata": [ + asdict(data) + for data in _import_metadata(kwargs.get("metadata", tuple()), import_dir) + ], + "stats": [asdict(stat) for stat in stats], + "header": { + "type": "time", + "cmd": kwargs.get("cmd", ""), + "exitcode": [profile.exit_code for profile in profiles], + "workload": kwargs.get("workload", ""), + "units": {"time": "sample"}, + }, + "collector_info": { + "name": "kperf", + "params": { + "with_sudo": kwargs.get("with_sudo", False), + "warmup": kwargs.get("warmup", 0), + "repeat": len(profiles), + }, + }, + "postprocessors": [], } ) - prof.update({"origin": minor_version.checksum}) - prof.update({"metadata": metadata}) - prof.update({"machine": extract_machine_info_from_metadata(metadata)}) - prof.update( + save_imported_profile(prof, kwargs.get("save_to_index", False), minor_version) + + +def import_elk_profile( + resources: list[dict[str, Any]], + metadata: dict[str, profile_helpers.ProfileMetadata], + minor_version: MinorVersion, + save_to_index: bool = False, + **kwargs: Any, +) -> None: + """Constructs the profile for elk-stored data and saves them to jobs or index. + + :param resources: list of parsed resources. + :param metadata: parts of the profiles that will be stored as metadata in the profile. + :param minor_version: minor version corresponding to the imported profiles. + :param save_to_index: indication whether we should save the imported profiles to index. + :param kwargs: rest of the parameters. + """ + prof = Profile( { + "global": { + "time": "???", + "resources": resources, + }, + "origin": minor_version.checksum, + "metadata": [asdict(data) for data in metadata.values()], + "machine": extract_machine_info_from_elk_metadata(metadata), "header": { "type": "time", "cmd": kwargs.get("cmd", ""), "exitcode": "?", "workload": kwargs.get("workload", ""), "units": {"time": "sample"}, - } - } - ) - prof.update( - { + }, "collector_info": { "name": "???", "params": {}, - } + }, + "postprocessors": [], } ) - prof.update({"postprocessors": []}) - save_imported_profile(prof, save_to_index, minor_version) +def save_imported_profile(prof: Profile, save_to_index: bool, minor_version: MinorVersion) -> None: + """Saves the imported profile either to index or to pending jobs. + + :param prof: imported profile + :param minor_version: minor version corresponding to the imported profiles. + :param save_to_index: indication whether we should save the imported profiles to index. + """ + full_profile_name = profile_helpers.generate_profile_name(prof) + profile_directory = pcs.get_job_directory() + full_profile_path = os.path.join(profile_directory, full_profile_name) + + streams.store_json(prof.serialize(), full_profile_path) + log.minor_status( + "stored generated profile ", + status=f"{log.path_style(os.path.relpath(full_profile_path))}", + ) + if save_to_index: + commands.add([full_profile_path], minor_version.checksum, keep_profile=False) + else: + # Else we register the profile in pending index + index.register_in_pending_index(full_profile_path, prof) + + +def load_perf_file(filepath: Path) -> str: + """Tests if the file is packed by gzip and unpacks it, otherwise reads it as a text file. + + :param filepath: path to the perf file. + + :return: the content of the file. + """ + if filepath.suffix.lower() == ".gz": + with streams.safely_open_and_log(filepath, "rb", fatal_fail=True) as gz_handle: + header = gz_handle.read(2) + gz_handle.seek(0) + assert header == b"\x1f\x8b" + with gzip.GzipFile(fileobj=gz_handle) as gz: + return gz.read().decode("utf-8") + with streams.safely_open_and_log( + filepath, "r", fatal_fail=True, encoding="utf-8" + ) as txt_handle: + return txt_handle.read() + + def extract_from_elk( elk_query: list[dict[str, Any]] -) -> tuple[list[dict[str, Any]], dict[str, Any]]: +) -> tuple[list[dict[str, Any]], dict[str, profile_helpers.ProfileMetadata]]: """For the given elk query, extracts resources and metadata. - For metadata we consider any key that has only single value through the profile, + For metadata, we consider any key that has only single value through the profile, and is not linked to keywords `metric` or `benchmarking`. - For resources we consider anything that is not identified as metadata + For resources, we consider anything that is not identified as metadata. - :param elk_query: query from the elk in form of list of resource - :return: list of resources and metadata + :param elk_query: query from the elk in form of list of resource. + + :return: list of resources and metadata. """ res_counter = defaultdict(set) for res in elk_query: @@ -455,7 +339,7 @@ def extract_from_elk( if not k.startswith("metric") and not k.startswith("benchmarking") and len(v) == 1 } - metadata = {k: res_counter[k].pop() for k in metadata_keys} + metadata = {k: profile_helpers.ProfileMetadata(k, res_counter[k].pop()) for k in metadata_keys} resources = [ { k: common_kit.try_convert(v, [int, float, str]) @@ -473,32 +357,292 @@ def extract_from_elk( return resources, metadata -@vcs_kit.lookup_minor_version -def import_elk_from_json( - imported: list[str], - minor_version: str, - **kwargs: Any, +def get_machine_info(machine_info: str, import_dir: Path) -> dict[str, Any]: + """Returns machine info either from an input file or constructs it from the environment. + + :param machine_info: relative or absolute path to machine specification JSON file. In case of + an empty string, the machine info will be constructed from the environment. + :param import_dir: import directory where to look for the machine info file if the provided + path is relative. + + :return: parsed or constructed machine specification. + """ + if machine_info: + # Some machine info path has been provided. + info_path = _massage_import_path(machine_info, import_dir) + with streams.safely_open_and_log( + info_path, "r", fail_msg="not found, generating info from environment instead" + ) as info_handle: + if info_handle is not None: + json_data = json.load(info_handle) + log.minor_success(log.path_style(str(info_path)), "parsed") + return json_data + # No machine info file might have been provided, or an invalid path was specified. + # Construct the machine info from the current machine. + return environment.get_machine_specification() + + +def extract_machine_info_from_elk_metadata( + metadata: dict[str, profile_helpers.ProfileMetadata] +) -> dict[str, Any]: + """Extracts the parts of the profile that correspond to machine info. + + Note that not many is collected from the ELK formats, and it can vary greatly, + hence, most of the machine specification and environment should be in metadata instead. + + :param metadata: metadata extracted from the ELK profiles. + + :return: machine info extracted from the profiles. + """ + machine_info: dict[str, Any] = { + "architecture": metadata.get( + "machine.arch", profile_helpers.ProfileMetadata("", "?") + ).value, + "system": str( + metadata.get("machine.os", profile_helpers.ProfileMetadata("", "?")).value + ).capitalize(), + "release": metadata.get( + "extra.machine.platform", profile_helpers.ProfileMetadata("", "?") + ).value, + "host": metadata.get("machine.hostname", profile_helpers.ProfileMetadata("", "?")).value, + "cpu": { + "physical": "?", + "total": metadata.get( + "machine.cpu-cores", profile_helpers.ProfileMetadata("", "?") + ).value, + "frequency": "?", + }, + "memory": { + "total_ram": metadata.get( + "machine.ram", profile_helpers.ProfileMetadata("", "?") + ).value, + "swap": "?", + }, + "boot_info": "?", + "mem_details": {}, + "cpu_details": [], + } + + return machine_info + + +def _import_metadata( + metadata: tuple[str, ...], import_dir: Path +) -> list[profile_helpers.ProfileMetadata]: + """Parse the metadata entries from CLI and convert them to our internal representation. + + :param import_dir: the import directory to use for relative metadata file paths. + :param metadata: a collection of metadata entries or JSON files. + + :return: a collection of parsed and converted metadata objects + """ + p_metadata: list[profile_helpers.ProfileMetadata] = [] + # Normalize the metadata string for parsing and/or opening the file + for metadata_str in map(str.strip, metadata): + if metadata_str.lower().endswith(".json"): + # Update the metadata collection with entries from the json file + p_metadata.extend(_parse_metadata_json(_massage_import_path(metadata_str, import_dir))) + else: + # Add a single metadata entry parsed from its string representation + try: + p_metadata.append(profile_helpers.ProfileMetadata.from_string(metadata_str)) + except TypeError: + log.warn(f"Ignoring invalid profile metadata string '{metadata_str}'.") + return p_metadata + + +def _parse_metadata_json(metadata_path: Path) -> list[profile_helpers.ProfileMetadata]: + """Parse a metadata JSON file into the metadata objects. + + If the JSON file contains nested dictionaries, the hierarchical keys will be flattened. + + :param metadata_path: the path to the metadata JSON. + + :return: a collection of parsed metadata objects. + """ + with streams.safely_open_and_log( + metadata_path, "r", fail_msg="not found, skipping" + ) as metadata_handle: + if metadata_handle is None: + return [] + # Make sure we flatten the input + metadata_list = [ + profile_helpers.ProfileMetadata(k, v) + for k, v in query.all_items_of(json.load(metadata_handle)) + ] + log.minor_success(log.path_style(str(metadata_path)), "parsed") + return metadata_list + + +def _parse_perf_import_entries( + import_entries: list[str], cli_stats_headers: str +) -> tuple[list[_PerfProfileSpec], list[profile_stats.ProfileStat]]: + """Parses perf import entries and stats. + + An import entry is either a profile entry + + 'profile_path[,[,]+]' + + where each stat value corresponds to a stats header specified in the cli_stats_headers, or + a CSV file entry + + 'file_path.csv' + + where the CSV file is in the format + + #Profile,Exit_code[,stat-header1]+ + profile_path[,[,]+] + ... + + that combines the --stats-headers option and profile entries. Stats specified in a CSV file + apply only to profile entries in the JSON file. Similarly, CLI-specified stats apply only to + profile entries specified directly in CLI. + + :param import_entries: the perf import entries to parse. + :param cli_stats_headers: the stats headers specified in CLI. + + :return: parsed profiles and stats. + """ + stats = [ + profile_stats.ProfileStat.from_string(*stat.split("|")) + for stat in cli_stats_headers.split(",") + ] + cli_stats_len = len(stats) + + import_dir = Path(config.lookup_key_recursively("import.dir", os.getcwd())) + profiles: list[_PerfProfileSpec] = [] + + for record in import_entries: + if record.strip().lower().endswith(".csv"): + # The input is a csv file + _parse_perf_import_csv(record, import_dir, profiles, stats) + elif ( + profile_spec := _parse_perf_entry(record.split(","), import_dir, stats[:cli_stats_len]) + ) is not None: + # The input is a string profile spec + profiles.append(profile_spec) + return profiles, stats + + +def _parse_perf_import_csv( + csv_file: str, + import_dir: Path, + profiles: list[_PerfProfileSpec], + stats: list[profile_stats.ProfileStat], ) -> None: - """Imports the ELK stored data from the json data. + """Parse stats headers and perf import entries in a CSV file. - The loading expects the json files to be in form of `{'queries': []}`. + :param csv_file: the CSV file to parse. + :param import_dir: the import directory to use for relative profile file paths. + :param profiles: profile specifications that will be extended with the parsed profiles. + :param stats: profile stats that will be merged with the CSV stats. + """ + csv_path = _massage_import_path(csv_file, import_dir) + with streams.safely_open_and_log(csv_path, "r", fatal_fail=True) as csvfile: + csv_reader = csv.reader(csvfile, delimiter=",") + try: + header: list[str] = next(csv_reader) + except StopIteration: + # Empty CSV file, skip + log.warn(f"Empty import file {csv_path}. Skipping.") + return + # Parse the stats headers + csv_stats: list[profile_stats.ProfileStat] = [ + profile_stats.ProfileStat.from_string(*stat_definition.split("|")) + for stat_definition in header[2:] + ] + # Parse the remaining rows that represent profile specifications and filter invalid ones + profiles.extend( + record + for row in csv_reader + if (record := _parse_perf_entry(row, import_dir, csv_stats)) is not None + ) + # Merge CSV stats with the other stats + for csv_stat in csv_stats: + _merge_stats(csv_stat, stats) + log.minor_success(log.path_style(str(csv_path)), "parsed") - :param imported: list of filenames with elk data. - :param minor_version: minor version corresponding to the imported profiles - :param kwargs: rest of the paramters + +def _parse_perf_entry( + entry: list[str], import_dir: Path, stats: list[profile_stats.ProfileStat] +) -> _PerfProfileSpec | None: + """Parse a single perf profile import entry. + + :param entry: the perf import entry to parse. + :param import_dir: the import directory to use for relative profile file paths. + :param stats: the profile stats associated with this profile. + + :return: the parsed profile, or None if the import entry is invalid. """ - minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + if len(entry) == 0 or not entry[0]: + # Empty profile specification, warn + log.warn("Empty import profile specification. Skipping.") + return None + # Parse the profile specification + profile_info = _PerfProfileSpec( + _massage_import_path(entry[0], import_dir), + int(entry[1].strip()) if len(entry) >= 2 else _PerfProfileSpec.exit_code, + ) + # Parse the stat values and add them to respective stats + for stat_value, stat_obj in zip(map(_massage_stat_value, entry[2:]), stats): + stat_obj.value.append(stat_value) + if len(entry[2:]) > len(stats): + log.warn( + f"Imported profile {profile_info.path} specifies more stats values than stats headers." + " Ignoring additional stats." + ) + if profile_info.exit_code != 0: + log.warn("Importing a profile with non-zero exit code.") + return profile_info - resources: list[dict[str, Any]] = [] - metadata: dict[str, Any] = {} - for imported_file in imported: - with open(imported_file, "r") as imported_handle: - imported_json = json.load(imported_handle) - assert ( - "queries" in imported_json.keys() - ), "expected the JSON to contain list of dictionaries in 'queries' key" - r, m = extract_from_elk(imported_json["queries"]) - resources.extend(r) - metadata.update(m) - log.minor_success(log.path_style(str(imported_file)), "imported") - import_elk_profile(resources, metadata, minor_version_info, **kwargs) + +def _merge_stats( + new_stat: profile_stats.ProfileStat, into_stats: list[profile_stats.ProfileStat] +) -> None: + """Merge a new profile stat values into the current profile stats. + + If an existing stat with the same name exists, the values of both stats are merged. If no such + stat is found, the new stat is added to the collection of current stats. + + :param new_stat: the new profile stat to merge. + :param into_stats: the current collection of profile stats. + """ + for stat in into_stats: + if new_stat.name == stat.name: + # We found a stat with a matching name, merge + stat.merge_with(new_stat) + return + # There is no stat to merge with, extend the current collection of stats + into_stats.append(new_stat) + + +def _massage_stat_value(stat_value: str) -> str | float: + """Massages a stat value read from a string to check whether it is numerical or not. + + :param stat_value: the stat value to massage. + + :return: a massaged stat value. + """ + stat_value = stat_value.strip() + try: + return float(stat_value) + except ValueError: + return stat_value + + +def _massage_import_path(path_str: str, import_dir: Path) -> Path: + """Massages path strings into a unified path format. + + First, the path string is stripped of leading and trailing whitespaces. + Next, absolute paths are kept as is, while relative paths are prepended with the + provided import directory. + + :param import_dir: the import directory to use for relative paths. + :param path_str: the path string to massage. + + :return: the massaged path. + """ + path: Path = Path(path_str.strip()) + if path.is_absolute(): + return path + return import_dir / path diff --git a/perun/profile/meson.build b/perun/profile/meson.build index e4fe1166..ba4baac8 100644 --- a/perun/profile/meson.build +++ b/perun/profile/meson.build @@ -7,6 +7,7 @@ perun_profile_files = files( 'helpers.py', 'imports.py', 'query.py', + 'stats.py', ) py3.install_sources( diff --git a/perun/profile/stats.py b/perun/profile/stats.py new file mode 100644 index 00000000..3f7b1129 --- /dev/null +++ b/perun/profile/stats.py @@ -0,0 +1,489 @@ +"""Profile Stats are additional metrics or statistics associated with a Profile. + +Profile stats are identified by a name and include additional information about the metrics unit, +description and value(s). When the stat value is a collection, the values may be aggregated into +a single representative value and a collection of other descriptive values or statistics. + +For example, if a stat contains a collection of float values, the aggregation creates a statistical +description of the values (i.e., min, max, mean, median, first and last decile, and first and third +quartile) and selects a representative value out of these using the aggregate-by key. A collection +of strings is aggregated into a histogram, where each value is a bin. + +When comparing the representative value, the stat value comparison type defines the type of +comparison operator to use (e.g., lower than, higher than, equal, etc.) on the representative value. +Note that using equality for float values will not work properly. The AUTO comparison type +automatically selects a sane default comparison operator based on the aggregation type. +""" + +from __future__ import annotations + +# Standard Imports +from collections import Counter +import dataclasses +import enum +import statistics +from typing import Any, Protocol, Iterable, ClassVar, Union, cast + +# Third-Party Imports + +# Perun Imports +from perun.utils import log as perun_log + + +class ProfileStatComparison(str, enum.Enum): + """The profile stat comparison types. + + The auto comparison type selects a sane default comparison operator based on the aggregation + type. + + Note: the enum derives from str so that ProfileStat serialization using asdict() works properly. + """ + + AUTO = "auto" + HIGHER = "higher_is_better" + LOWER = "lower_is_better" + EQUALITY = "equality" + + @staticmethod + def supported() -> set[str]: + """Provides the set of supported comparison tupes. + + :return: The set of supported comparison types. + """ + return {comparison.value for comparison in ProfileStatComparison} + + @staticmethod + def default() -> ProfileStatComparison: + """Provides the default comparison type. + + :return: The default comparison type. + """ + return ProfileStatComparison.AUTO + + @classmethod + def str_to_comparison(cls, comparison: str) -> ProfileStatComparison: + """Convert a comparison type string into a ProfileStatComparison enum value. + + If an invalid comparison type is provided, the default type will be used. + + :param comparison: The comparison type as a string. + + :return: The comparison type as an enum value. + """ + if not comparison: + return cls.default() + try: + return cls(comparison.strip()) + except ValueError: + # Invalid stat comparison, warn + perun_log.warn( + f"Unknown stat comparison: {comparison}. Using the default stat comparison value " + f"instead. Please choose one of ({', '.join(cls.supported())})." + ) + return cls.default() + + +class StatComparisonResult(enum.Enum): + """The result of stat representative value comparison. + + Since the comparison is determined by the comparison operator and the type of the representative + key, there is a number of valid comparison results that need to be represented. + """ + + EQUAL = 1 + UNEQUAL = 2 + BASELINE_BETTER = 3 + TARGET_BETTER = 4 + INVALID = 5 + + +@dataclasses.dataclass +class ProfileStat: + """An internal representation of a profile stat. + + :ivar name: The name of the stat. + :ivar cmp: The comparison type of the stat values. + :ivar unit: The unit of the stat value(s). + :ivar aggregate_by: The aggregation (representative value) key. + :ivar description: A detailed description of the stat. + :ivar value: The value(s) of the stat. + """ + + name: str + cmp: ProfileStatComparison = ProfileStatComparison.default() + unit: str = "#" + aggregate_by: str = "" + description: str = "" + value: list[str | float] = dataclasses.field(default_factory=list) + + @classmethod + def from_string( + cls, + name: str = "", + cmp: str = "", + unit: str = "#", + aggregate_by: str = "", + description: str = "", + *_: Any, + ) -> ProfileStat: + """Constructs a ProfileStat object from a string describing a stat header. + + The value of the stat is ignored when parsing from a string, as string representation is + used solely for specifying the stat header. + + :param name: The name of the stat. + :param cmp: The comparison type of the stat values. + :param unit: The unit of the stat value(s). + :param aggregate_by: The aggregation (representative value) key. + :param description: A detailed description of the stat. + + :return: A constructed ProfileStat object. + """ + if not name: + # Invalid stat specification, warn + perun_log.warn("Empty profile stat specification. Creating a dummy '[empty]' stat.") + name = "[empty]" + comparison_enum = ProfileStatComparison.str_to_comparison(cmp) + return cls(name, comparison_enum, unit, aggregate_by, description) + + @classmethod + def from_profile(cls, stat: dict[str, Any]) -> ProfileStat: + """Constructs a ProfileStat object from a Perun profile. + + :param stat: The stat dictionary from a Perun profile. + + :return: A constructed ProfileStat object. + """ + stat["cmp"] = ProfileStatComparison.str_to_comparison(stat.get("cmp", "")) + return cls(**stat) + + def merge_with(self, other: ProfileStat) -> ProfileStat: + """Merges value(s) from another ProfileStat object to this one. + + In case of mismatching headers, this ProfileStat header is used over the other one. + + :param other: The other ProfileStat object to merge with. + + :return: This ProfileStat object with merged values. + """ + if self.get_header() != other.get_header(): + perun_log.warn( + f"Merged ProfileStats '{self.name}' have mismatching headers, using the current " + f"header {self.get_header()}" + ) + self.value += other.value + return self + + def get_header(self) -> tuple[str, str, str, str, str]: + """Obtains the ProfileStat header, i.e., all attributes except the values. + + :return: the ProfileStat header. + """ + return self.name, self.cmp, self.unit, self.aggregate_by, self.description + + +class ProfileStatAggregation(Protocol): + """A protocol for profile stat aggregation objects. + + Since individual aggregation types may differ in a lot of ways (e.g., the supported + representative/aggregation keys, table representation, auto comparison type, ...), we provide + an abstract protocol for all aggregation objects. + """ + + _SUPPORTED_KEYS: ClassVar[set[str]] = set() + _DEFAULT_KEY: ClassVar[str] = "" + + def normalize_aggregate_key(self, key: str = _DEFAULT_KEY) -> str: + """Check and normalize the aggregation/representative key. + + If no key is provided, or the key is invalid or unsupported by the aggregation type, the + default key is used instead. + + :param key: The key to check. + + :return: The checked (and possibly normalized) key. + """ + if key not in self._SUPPORTED_KEYS: + if key: + # A key was provided, but it is an invalid one + perun_log.warn( + f"{self.__class__.__name__}: Invalid aggregate key '{key}'. " + f"Using the default key '{self._DEFAULT_KEY}' instead." + ) + key = self._DEFAULT_KEY + return key + + def get_value(self, key: str = _DEFAULT_KEY) -> Any: + """Obtain a value associated with the key from the aggregation / statistic description. + + If no key is provided, or the key is invalid, the value associated with the default key is + returned. + + :param key: The key of the value to obtain. + + :return: The value associated with the key. + """ + return getattr(self, self.normalize_aggregate_key(key)) + + def infer_auto_comparison(self, comparison: ProfileStatComparison) -> ProfileStatComparison: + """Selects the correct auto comparison type for the aggregation type. + + :param comparison: May be auto or any other valid comparison type. For the auto comparison + type, another non-auto comparison type is returned. For the other comparison types, + the method works as an identity function. + + :return: A non-auto comparison type. + """ + + def as_table( + self, key: str = _DEFAULT_KEY + ) -> tuple[str | float | tuple[str, int], dict[str, Any]]: + """Transforms the aggregation object into the representative value and a table of the + aggregation / statistic description values. + + :param key: The key of the aggregation / statistic description. + + :return: The representative value and a table representation of the aggregation. + """ + + +@dataclasses.dataclass +class SingleValue(ProfileStatAggregation): + """A single value "aggregation". + + Used for single value profile stats that need to adhere to the same interface as the "proper" + aggregations. + + :ivar value: The value of the stat. + """ + + _SUPPORTED_KEYS: ClassVar[set[str]] = {"value"} + _DEFAULT_KEY = "value" + + value: str | float = "[missing]" + + def infer_auto_comparison(self, comparison: ProfileStatComparison) -> ProfileStatComparison: + if comparison != ProfileStatComparison.AUTO: + return comparison + if isinstance(self.value, str): + return ProfileStatComparison.EQUALITY + return ProfileStatComparison.HIGHER + + def as_table(self, _: str = "") -> tuple[str | float, dict[str, str | float]]: + # There are no details of a single value to generate into a table + return self.value, {} + + +@dataclasses.dataclass +class StatisticalSummary(ProfileStatAggregation): + """A statistical description / summary aggregation type. + + Used for collections of floats. + + :ivar min: The minimum value in the collection. + :ivar p10: The first decile value. + :ivar p25: The first quartile value. + :ivar median: The median value of the entire collection. + :ivar p75: The third quartile value. + :ivar p90: The last decile value. + :ivar max: The maximum value in the collection. + :ivar mean: The mean value of the entire collection. + """ + + _SUPPORTED_KEYS: ClassVar[set[str]] = { + "min", + "p10", + "p25", + "median", + "p75", + "p90", + "max", + "mean", + } + _DEFAULT_KEY: ClassVar[str] = "median" + + min: float = 0.0 + p10: float = 0.0 + p25: float = 0.0 + median: float = 0.0 + p75: float = 0.0 + p90: float = 0.0 + max: float = 0.0 + mean: float = 0.0 + + @classmethod + def from_values(cls, values: Iterable[float]) -> StatisticalSummary: + """Constructs a StatisticalSummary object from a collection of values. + + :param values: The collection of values to construct from. + + :return: The constructed StatisticalSummary object. + """ + # We assume there aren't too many values so that multiple passes of the list don't matter + # too much. If this becomes a bottleneck, we can use pandas describe() instead. + values = list(values) + quantiles = statistics.quantiles(values, n=20, method="inclusive") + return cls( + float(min(values)), + quantiles[2], # p10 + quantiles[5], # p25 + quantiles[10], # p50 + quantiles[15], # p75 + quantiles[18], # p90 + float(max(values)), + statistics.mean(values), + ) + + def infer_auto_comparison(self, comparison: ProfileStatComparison) -> ProfileStatComparison: + if comparison != ProfileStatComparison.AUTO: + return comparison + return ProfileStatComparison.HIGHER + + def as_table(self, key: str = _DEFAULT_KEY) -> tuple[float, dict[str, float]]: + return self.get_value(key), dataclasses.asdict(self) + + +@dataclasses.dataclass +class StringCollection(ProfileStatAggregation): + """An aggregation type for a collection of strings. + + Supports numerous keys that attempt to aggregate and describe the string values. Also allows + to compare the entire sequence of values for equality if needed. + + :ivar sequence: The sequence of strings. + :ivar counts: A histogram where each string has a separate bin. + """ + + _SUPPORTED_KEYS: ClassVar[set[str]] = { + "total", + "unique", + "min_count", + "max_count", + "counts", + "sequence", + } + _DEFAULT_KEY: ClassVar[str] = "unique" + + sequence: list[str] = dataclasses.field(default_factory=list) + counts: Counter[str] = dataclasses.field(init=False) + + def __post_init__(self) -> None: + """Computes the histogram from the sequence.""" + self.counts = Counter(self.sequence) + + @property + def unique(self) -> int: + """Get the number of unique strings in the collection. + + :return: The number of unique strings. + """ + return len(self.counts) + + @property + def total(self) -> int: + """Get the total number of strings in the collection. + + :return: The total number of strings. + """ + return len(self.sequence) + + @property + def min_count(self) -> tuple[str, int]: + """Get the string with the least number of occurrences in the collection. + + :return: The string with the least number of occurrences. + """ + return self.counts.most_common()[-1] + + @property + def max_count(self) -> tuple[str, int]: + """Get the string with the most number of occurrences in the collection. + + :return: The string with the most number of occurrences. + """ + return self.counts.most_common()[0] + + def infer_auto_comparison(self, comparison: ProfileStatComparison) -> ProfileStatComparison: + if comparison != ProfileStatComparison.AUTO: + return comparison + return ProfileStatComparison.EQUALITY + + def as_table( + self, key: str = _DEFAULT_KEY + ) -> tuple[int | str | tuple[str, int], dict[str, int] | dict[str, str]]: + representative_val: str | int | tuple[str, int] + if key in ("counts", "sequence"): + # The Counter and list objects are not suitable for direct printing in a table. + representative_val = f"[{key}]" + else: + # A little type hinting help. The list and Counter types have already been covered. + representative_val = cast(Union[str, int, tuple[str, int]], self.get_value(key)) + if key == "sequence": + # The 'sequence' key table format is a bit different from the rest. + return representative_val, {f"{idx}.": value for idx, value in enumerate(self.sequence)} + return representative_val, self.counts + + +def aggregate_stats(stat: ProfileStat) -> ProfileStatAggregation: + """A factory that constructs the proper aggregation object based on the stat value(s) type. + + :param stat: The stat to create the aggregate object from. + + :return: The constructed aggregation object. + """ + if len(stat.value) == 0: + perun_log.warn(f"ProfileStat aggregation: Missing value of stat '{stat.name}'") + return SingleValue() + elif len(stat.value) == 1: + return SingleValue(stat.value[0]) + elif all(isinstance(value, (int, float)) for value in stat.value): + # All values are integers or floats + return StatisticalSummary.from_values(map(float, stat.value)) + else: + # Even heterogeneous lists will be aggregated as lists of strings + return StringCollection(list(map(str, stat.value))) + + +def compare_stats( + stat: ProfileStatAggregation, + other_stat: ProfileStatAggregation, + key: str, + comparison: ProfileStatComparison, +) -> StatComparisonResult: + """Compares two aggregated stats using the representative key and comparison type. + + :param stat: The first aggregate stat to compare. + :param other_stat: The second aggregate stat to compare. + :param key: The representative key from the aggregates to compare. + :param comparison: The comparison type. + + :return: The comparison result. + """ + value, other_value = stat.get_value(key), other_stat.get_value(key) + # Handle auto comparison according to the aggregation type + comparison = stat.infer_auto_comparison(comparison) + if type(stat) is not type(other_stat): + # Invalid comparison attempt + perun_log.warn( + f"Invalid comparison of {stat.__class__.__name__} and {other_stat.__class__.__name__}." + ) + return StatComparisonResult.INVALID + if value == other_value: + # The values are the same, the result is the same regardless of the comparison used + return StatComparisonResult.EQUAL + if comparison == ProfileStatComparison.EQUALITY: + # The values are different and we compare for equality + return StatComparisonResult.UNEQUAL + elif value > other_value: + return ( + StatComparisonResult.BASELINE_BETTER + if comparison == ProfileStatComparison.HIGHER + else StatComparisonResult.TARGET_BETTER + ) + else: + # value < other_value + return ( + StatComparisonResult.BASELINE_BETTER + if comparison == ProfileStatComparison.LOWER + else StatComparisonResult.TARGET_BETTER + ) diff --git a/perun/templates/diff_view_datatables.html.jinja2 b/perun/templates/diff_view_datatables.html.jinja2 index efec46c5..f60c2992 100755 --- a/perun/templates/diff_view_datatables.html.jinja2 +++ b/perun/templates/diff_view_datatables.html.jinja2 @@ -24,6 +24,9 @@ .right { float: right; } + .header-center { + text-align: center; + } .column-head { border-bottom: 1px solid #ddd; border-top: 1px solid #ddd; @@ -83,16 +86,19 @@ +

Diff View Generated by Perun v{{ perun_version }}

+

{{ timestamp }}

+

{{ lhs_tag }}

- {{ profile_overview.overview_table('toggleLeftCollapse', 'left-info', lhs_header, rhs_header, "Profile Specification") }} + {{ profile_overview.overview_table('toggleSpecificationCollapse', 'left-specification-info', lhs_header, "Profile Specification") }}
 

{{ rhs_tag }}

- {{ profile_overview.overview_table('toggleRightCollapse', 'right-info', rhs_header, lhs_header, "Profile Specification") }} + {{ profile_overview.overview_table('toggleSpecificationCollapse', 'right-specification-info', rhs_header, "Profile Specification") }}
 
@@ -191,8 +197,7 @@ {% include 'jquery-3.6.0.min.js' %} {% include 'dataTables.min.js' %} {%- endif %} - {{ profile_overview.toggle_script('toggleLeftCollapse', 'left-info') }} - {{ profile_overview.toggle_script('toggleRightCollapse', 'right-info') }} + {{ profile_overview.toggle_script('toggleSpecificationCollapse', 'left-specification-info', 'right-specification-info') }} $(document).ready( function () { var lhs = $("#table1").DataTable({ data: lhs_data.data, diff --git a/perun/templates/diff_view_flamegraph.html.jinja2 b/perun/templates/diff_view_flamegraph.html.jinja2 index a3e3540f..c3818948 100755 --- a/perun/templates/diff_view_flamegraph.html.jinja2 +++ b/perun/templates/diff_view_flamegraph.html.jinja2 @@ -31,6 +31,9 @@ .right { float: right; } + .header-center { + text-align: center; + } .column-head { border-bottom: 1px solid #ddd; text-align: center; @@ -97,26 +100,29 @@ +

Diff View Generated by Perun v{{ perun_version }}

+

{{ timestamp }}

+

{{ lhs_tag }}

- {{ profile_overview.overview_table('toggleLeftCollapse', 'left-info', lhs_header, rhs_header, "Profile Specification") }} + {{ profile_overview.overview_table('toggleSpecificationCollapse', 'left-specification-info', lhs_header, "Profile Specification") }}
 
- {{ profile_overview.overview_table('toggleLeftProfileCollapse', 'left-profile-info', lhs_stats, rhs_stats, "Profile Stats") }} + {{ profile_overview.nested_overview_table('toggleStatsCollapse', 'left-stats-info', lhs_stats, "Profile Stats") }}
 
{%- if rhs_metadata%} - {{ profile_overview.overview_table('toggleLeftMetadataCollapse', 'left-metadata-info', lhs_metadata, rhs_metadata, "Profile Metadata") }} + {{ profile_overview.overview_table('toggleMetadataCollapse', 'left-metadata-info', lhs_metadata, "Profile Metadata") }}
 
{%- endif %}

{{ rhs_tag }}

- {{ profile_overview.overview_table('toggleRightCollapse', 'right-info', rhs_header, lhs_header, "Profile Specification") }} + {{ profile_overview.overview_table('toggleSpecificationCollapse', 'right-specification-info', rhs_header, "Profile Specification") }}
 
- {{ profile_overview.overview_table('toggleRightProfileCollapse', 'right-profile-info', rhs_stats, lhs_stats, "Profile Stats") }} + {{ profile_overview.nested_overview_table('toggleStatsCollapse', 'right-stats-info', rhs_stats, "Profile Stats") }}
 
{%- if rhs_metadata%} - {{ profile_overview.overview_table('toggleRightMetadata', 'right-metadata-info', rhs_metadata, lhs_metadata, "Profile Metadata") }} + {{ profile_overview.overview_table('toggleMetadataCollapse', 'right-metadata-info', rhs_metadata, "Profile Metadata") }}
 
{%- endif %}
@@ -173,12 +179,14 @@