diff --git a/perun/cli_groups/import_cli.py b/perun/cli_groups/import_cli.py index bb5aa6bf..de282661 100755 --- a/perun/cli_groups/import_cli.py +++ b/perun/cli_groups/import_cli.py @@ -130,3 +130,30 @@ def from_stacks(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: """ kwargs.update(ctx.obj) imports.import_perf_from_stack(imported, **kwargs) + + +@import_group.group("elk") +@click.pass_context +def elk_group(ctx: click.Context, **kwargs: Any) -> None: + """Imports Perun profiles from elk results + + By ELK we mean Elasticsearch Stack (Elasticsearch, Logstash, Kibana) + + We assume the data are already flattened and are in form of: + + [{key: value, ...}, ...] + + The command supports profiles collected in: + + 1. JSON format: files, that are extracted from ELK or are stored using format compatible with ELK. + """ + ctx.obj.update(kwargs) + + +@elk_group.command("json") +@click.argument("imported", nargs=-1, required=True) +@click.pass_context +def from_json(ctx: click.Context, imported: list[str], **kwargs: Any) -> None: + """Imports Perun profiles from json compatible with elk infrastructure""" + kwargs.update(ctx.obj) + imports.import_elk_from_json(imported, **kwargs) diff --git a/perun/profile/convert.py b/perun/profile/convert.py index 1ccc1e6c..0b8a6ef4 100644 --- a/perun/profile/convert.py +++ b/perun/profile/convert.py @@ -142,9 +142,9 @@ def to_flame_graph_format( for alloc in snapshot: if "subtype" not in alloc.keys() or alloc["subtype"] != "free": # Workaround for total time used in some collectors, so it is not outputted - if alloc["uid"] != "%TOTAL_TIME%": + if alloc["uid"] != "%TOTAL_TIME%" and profile_key in alloc: stack_str = to_uid(alloc["uid"]) + ";" - for frame in alloc["trace"][::-1]: + for frame in alloc.get("trace", [])[::-1]: line = to_uid(frame, minimize) stack_str += line + ";" if stack_str and stack_str.endswith(";"): diff --git a/perun/profile/factory.py b/perun/profile/factory.py index 5e06db89..d16f125a 100644 --- a/perun/profile/factory.py +++ b/perun/profile/factory.py @@ -54,6 +54,10 @@ class Profile(MutableMapping[str, Any]): "address", "timestamp", "exclusive", + "metric.iteration", + "metric.value", + "metric.score-value", + "metric.percentile", } persistent = {"trace", "type", "subtype", "uid", "location"} diff --git a/perun/profile/imports.py b/perun/profile/imports.py index 6e8784cd..6e07f56e 100755 --- a/perun/profile/imports.py +++ b/perun/profile/imports.py @@ -3,14 +3,15 @@ from __future__ import annotations # Standard Imports -from typing import Any, Optional, Iterator, Callable +from collections import defaultdict +from dataclasses import dataclass, field, asdict from pathlib import Path -import json +from typing import Any, Optional, Iterator, Callable import csv +import json import os -import subprocess import statistics -from dataclasses import dataclass, field, asdict +import subprocess # Third-Party Imports import gzip @@ -20,7 +21,7 @@ from perun.profile import helpers as p_helpers from perun.logic import commands, index, pcs from perun.utils import log, streams -from perun.utils.common import script_kit +from perun.utils.common import script_kit, common_kit from perun.utils.external import commands as external_commands, environment from perun.utils.structs import MinorVersion from perun.profile.factory import Profile @@ -40,6 +41,15 @@ class ImportProfileSpec: class ImportedProfiles: + """ + Note: I would reconsider this class or refactor it, removing the logical elements, it obfuscates the logic a little + and makes the functions less readable (there are not streams/pipes as is most of the logic/perun); I for one am + rather "fan" of generic functions that takes structures and returns structure than classes with methods/logic. + TODO: the import-dir could be removed by extracting this functionality to command-line callback and massage + the paths during the CLI parsing; hence assuming that the paths are correct when importing. I think the parameter + only complicates the code. + """ + __slots__ = "import_dir", "stats", "profiles" def __init__(self, targets: list[str], import_dir: str | None, stats_info: str | None) -> None: @@ -122,6 +132,11 @@ def _add_imported_profile(self, target: list[str]) -> None: def load_file(filepath: Path) -> str: + """Tests if the file is packed by gzip and unpacks it, otherwise reads it as a text file + + :param filepath: path with source file + :return: the content of the file + """ if filepath.suffix.lower() == ".gz": with open(filepath, "rb") as f: header = f.read(2) @@ -146,7 +161,7 @@ def get_machine_info(machine_info: Optional[str] = None) -> dict[str, Any]: return environment.get_machine_specification() -def import_profile( +def import_perf_profile( profiles: ImportedProfiles, resources: list[dict[str, Any]], minor_version: MinorVersion, @@ -155,6 +170,16 @@ def import_profile( save_to_index: bool = False, **kwargs: Any, ) -> None: + """Constructs the profile for perf-collected data and saves them to jobs or index + + :param profiles: list of to-be-imported profiles + :param resources: list of parsed resources + :param minor_version: minor version corresponding to the imported profiles + :param machine_info: additional dictionary with machine specification + :param with_sudo: indication whether the data were collected with sudo + :param save_to_index: indication whether we should save the imported profiles to index + :param kwargs: rest of the paramters + """ prof = Profile( { "global": { @@ -191,6 +216,16 @@ def import_profile( ) prof.update({"postprocessors": []}) + save_imported_profile(prof, save_to_index, minor_version) + + +def save_imported_profile(prof: Profile, save_to_index: bool, minor_version: MinorVersion) -> None: + """Saves the imported profile either to index or to pending jobs + + :param prof: imported profile + :param minor_version: minor version corresponding to the imported profiles + :param save_to_index: indication whether we should save the imported profiles to index + """ full_profile_name = p_helpers.generate_profile_name(prof) profile_directory = pcs.get_job_directory() full_profile_path = os.path.join(profile_directory, full_profile_name) @@ -216,7 +251,18 @@ def import_perf_from_record( with_sudo: bool = False, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record`""" + """Imports profile collected by `perf record` + + It does some black magic in ImportedProfiles probably, then for each filename it runs the + perf script + parser script to generate the profile. + + :param imported: list of files with imported data + :param import_dir: different directory for importing the profiles + :param stats_info: additional statistics collected for the profile (i.e. non-resource types) + :param minor_version: minor version corresponding to the imported profiles + :param with_sudo: indication whether the data were collected with sudo + :param kwargs: rest of the paramters + """ parse_script = script_kit.get_script("stackcollapse-perf.pl") minor_version_info = pcs.vcs().get_minor_version_info(minor_version) @@ -239,7 +285,7 @@ def import_perf_from_record( log.error(f"Cannot load data due to: {err}") resources.extend(parser.parse_events(out.decode("utf-8").split("\n"))) log.minor_success(log.path_style(str(imported_file.path)), "imported") - import_profile(profiles, resources, minor_version_info, with_sudo=with_sudo, **kwargs) + import_perf_profile(profiles, resources, minor_version_info, with_sudo=with_sudo, **kwargs) @vcs_kit.lookup_minor_version @@ -250,7 +296,17 @@ def import_perf_from_script( minor_version: str, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record; perf script`""" + """Imports profile collected by `perf record | perf script` + + It does some black magic in ImportedProfiles probably, then for each filename it runs the + parser script to generate the profile. + + :param imported: list of files with imported data + :param import_dir: different directory for importing the profiles + :param stats_info: additional statistics collected for the profile (i.e. non-resource types) + :param minor_version: minor version corresponding to the imported profiles + :param kwargs: rest of the paramters + """ parse_script = script_kit.get_script("stackcollapse-perf.pl") minor_version_info = pcs.vcs().get_minor_version_info(minor_version) @@ -263,7 +319,7 @@ def import_perf_from_script( log.minor_success(f"Raw data from {log.path_style(str(imported_file.path))}", "collected") resources.extend(parser.parse_events(out.decode("utf-8").split("\n"))) log.minor_success(log.path_style(str(imported_file.path)), "imported") - import_profile(profiles, resources, minor_version_info, **kwargs) + import_perf_profile(profiles, resources, minor_version_info, **kwargs) @vcs_kit.lookup_minor_version @@ -274,7 +330,16 @@ def import_perf_from_stack( minor_version: str, **kwargs: Any, ) -> None: - """Imports profile collected by `perf record; perf script | stackcollapse-perf.pl`""" + """Imports profile collected by `perf record | perf script` + + It does some black magic in ImportedProfiles probably, then for each filename parses the files. + + :param imported: list of files with imported data + :param import_dir: different directory for importing the profiles + :param stats_info: additional statistics collected for the profile (i.e. non-resource types) + :param minor_version: minor version corresponding to the imported profiles + :param kwargs: rest of the paramters + """ minor_version_info = pcs.vcs().get_minor_version_info(minor_version) profiles = ImportedProfiles(imported, import_dir, stats_info) @@ -284,4 +349,156 @@ def import_perf_from_stack( out = load_file(imported_profile.path) resources.extend(parser.parse_events(out.split("\n"))) log.minor_success(log.path_style(str(imported_profile.path)), "imported") - import_profile(profiles, resources, minor_version_info, **kwargs) + import_perf_profile(profiles, resources, minor_version_info, **kwargs) + + +def extract_machine_info_from_metadata(metadata: dict[str, Any]) -> dict[str, Any]: + """Extracts the parts of the profile, that corresponds to machine info + + Note that not many is collected from the ELK formats and it can vary greatly, + hence, most of the machine specification and environment should be in metadata instead. + + :param metadata: metadata extracted from the ELK profiles + :return: machine info extracted from the profiles + """ + machine_info = { + "architecture": metadata.get("machine.arch", "?"), + "system": metadata.get("machine.os", "?").capitalize(), + "release": metadata.get("extra.machine.platform", "?"), + "host": metadata.get("machine.hostname", "?"), + "cpu": { + "physical": "?", + "total": metadata.get("machine.cpu-cores", "?"), + "frequency": "?", + }, + "memory": { + "total_ram": metadata.get("machine.ram", "?"), + "swap": "?", + }, + } + + machine_info["boot_info"] = "?" + machine_info["mem_details"] = {} + machine_info["cpu_details"] = [] + return machine_info + + +def import_elk_profile( + resources: list[dict[str, Any]], + metadata: dict[str, Any], + minor_version: MinorVersion, + save_to_index: bool = False, + **kwargs: Any, +) -> None: + """Constructs the profile for elk-stored data and saves them to jobs or index + + :param resources: list of parsed resources + :param metadata: parts of the profiles that will be stored as metadata in the profile + :param minor_version: minor version corresponding to the imported profiles + :param save_to_index: indication whether we should save the imported profiles to index + :param kwargs: rest of the paramters + """ + prof = Profile( + { + "global": { + "time": "???", + "resources": resources, + } + } + ) + prof.update({"origin": minor_version.checksum}) + prof.update({"metadata": metadata}) + prof.update({"machine": extract_machine_info_from_metadata(metadata)}) + prof.update( + { + "header": { + "type": "time", + "cmd": kwargs.get("cmd", ""), + "exitcode": "?", + "workload": kwargs.get("workload", ""), + "units": {"time": "sample"}, + } + } + ) + prof.update( + { + "collector_info": { + "name": "???", + "params": {}, + } + } + ) + prof.update({"postprocessors": []}) + + save_imported_profile(prof, save_to_index, minor_version) + + +def extract_from_elk( + elk_query: list[dict[str, Any]] +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + """For the given elk query, extracts resources and metadata. + + For metadata we consider any key that has only single value through the profile, + and is not linked to keywords `metric` or `benchmarking`. + For resources we consider anything that is not identified as metadata + + :param elk_query: query from the elk in form of list of resource + :return: list of resources and metadata + """ + res_counter = defaultdict(set) + for res in elk_query: + for key, val in res.items(): + res_counter[key].add(val) + metadata_keys = { + k + for (k, v) in res_counter.items() + if not k.startswith("metric") and not k.startswith("benchmarking") and len(v) == 1 + } + + metadata = {k: res_counter[k].pop() for k in metadata_keys} + resources = [ + { + k: common_kit.try_convert(v, [int, float, str]) + for k, v in res.items() + if k not in metadata_keys + } + for res in elk_query + ] + # We register uid + for res in resources: + res["uid"] = res["metric.name"] + res["benchmarking.time"] = res["benchmarking.end-ts"] - res["benchmarking.start-ts"] + res.pop("benchmarking.end-ts") + res.pop("benchmarking.start-ts") + return resources, metadata + + +@vcs_kit.lookup_minor_version +def import_elk_from_json( + imported: list[str], + minor_version: str, + **kwargs: Any, +) -> None: + """Imports the ELK stored data from the json data. + + The loading expects the json files to be in form of `{'queries': []}`. + + :param imported: list of filenames with elk data. + :param minor_version: minor version corresponding to the imported profiles + :param kwargs: rest of the paramters + """ + minor_version_info = pcs.vcs().get_minor_version_info(minor_version) + + resources: list[dict[str, Any]] = [] + metadata: dict[str, Any] = {} + for imported_file in imported: + with open(imported_file, "r") as imported_handle: + imported_json = json.load(imported_handle) + assert ( + "queries" in imported_json.keys() + ), "expected the JSON to contain list of dictionaries in 'queries' key" + r, m = extract_from_elk(imported_json["queries"]) + resources.extend(r) + metadata.update(m) + log.minor_success(log.path_style(str(imported_file)), "imported") + import_elk_profile(resources, metadata, minor_version_info, **kwargs) diff --git a/perun/templates/diff_view_flamegraph.html.jinja2 b/perun/templates/diff_view_flamegraph.html.jinja2 index fc5c85bb..a3e3540f 100755 --- a/perun/templates/diff_view_flamegraph.html.jinja2 +++ b/perun/templates/diff_view_flamegraph.html.jinja2 @@ -103,6 +103,10 @@