Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for importing data from ELK #256

Merged
merged 8 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions perun/cli_groups/import_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,30 @@ def from_stacks(ctx: click.Context, imported: list[str], **kwargs: Any) -> None:
"""
kwargs.update(ctx.obj)
imports.import_perf_from_stack(imported, **kwargs)


@import_group.group("elk")
@click.pass_context
def elk_group(ctx: click.Context, **kwargs: Any) -> None:
"""Imports Perun profiles from elk results

By ELK we mean Elasticsearch Stack (Elasticsearch, Logstash, Kibana)

We assume the data are already flattened and are in form of:

[{key: value, ...}, ...]

The command supports profiles collected in:

1. JSON format: files, that are extracted from ELK or are stored using format compatible with ELK.
"""
ctx.obj.update(kwargs)


@elk_group.command("json")
@click.argument("imported", nargs=-1, required=True)
@click.pass_context
def from_json(ctx: click.Context, imported: list[str], **kwargs: Any) -> None:
"""Imports Perun profiles from json compatible with elk infrastructure"""
kwargs.update(ctx.obj)
imports.import_elk_from_json(imported, **kwargs)
4 changes: 2 additions & 2 deletions perun/profile/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ def to_flame_graph_format(
for alloc in snapshot:
if "subtype" not in alloc.keys() or alloc["subtype"] != "free":
# Workaround for total time used in some collectors, so it is not outputted
if alloc["uid"] != "%TOTAL_TIME%":
if alloc["uid"] != "%TOTAL_TIME%" and profile_key in alloc:
stack_str = to_uid(alloc["uid"]) + ";"
for frame in alloc["trace"][::-1]:
for frame in alloc.get("trace", [])[::-1]:
line = to_uid(frame, minimize)
stack_str += line + ";"
if stack_str and stack_str.endswith(";"):
Expand Down
4 changes: 4 additions & 0 deletions perun/profile/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class Profile(MutableMapping[str, Any]):
"address",
"timestamp",
"exclusive",
"metric.iteration",
"metric.value",
"metric.score-value",
"metric.percentile",
}
persistent = {"trace", "type", "subtype", "uid", "location"}

Expand Down
241 changes: 229 additions & 12 deletions perun/profile/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from __future__ import annotations

# Standard Imports
from typing import Any, Optional, Iterator, Callable
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from pathlib import Path
import json
from typing import Any, Optional, Iterator, Callable
import csv
import json
import os
import subprocess
import statistics
from dataclasses import dataclass, field, asdict
import subprocess

# Third-Party Imports
import gzip
Expand All @@ -20,7 +21,7 @@
from perun.profile import helpers as p_helpers
from perun.logic import commands, index, pcs
from perun.utils import log, streams
from perun.utils.common import script_kit
from perun.utils.common import script_kit, common_kit
from perun.utils.external import commands as external_commands, environment
from perun.utils.structs import MinorVersion
from perun.profile.factory import Profile
Expand All @@ -40,6 +41,15 @@ class ImportProfileSpec:


class ImportedProfiles:
"""
Note: I would reconsider this class or refactor it, removing the logical elements, it obfuscates the logic a little
and makes the functions less readable (there are not streams/pipes as is most of the logic/perun); I for one am
rather "fan" of generic functions that takes structures and returns structure than classes with methods/logic.
TODO: the import-dir could be removed by extracting this functionality to command-line callback and massage
the paths during the CLI parsing; hence assuming that the paths are correct when importing. I think the parameter
only complicates the code.
Comment on lines +48 to +50
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: this wouldn't really work, as the import-dir path prefix is also applied to paths that can be found in CSV file(s). Although we could technically parse the files in the callback, it doesn't seem like the proper place to parse files with a bit more complicated format and error checking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would extracting this to config hierarchy work? Have lookup.path, e.g. that would be gathered recursively through runtime, local and shared config and helper function that would resolve the path wrt to this set path? Then --import-dir would simply add to the runtime(), one could add it to local or shared to not have to use import-dir at all.

"""

__slots__ = "import_dir", "stats", "profiles"

def __init__(self, targets: list[str], import_dir: str | None, stats_info: str | None) -> None:
Expand Down Expand Up @@ -122,6 +132,11 @@ def _add_imported_profile(self, target: list[str]) -> None:


def load_file(filepath: Path) -> str:
"""Tests if the file is packed by gzip and unpacks it, otherwise reads it as a text file

:param filepath: path with source file
:return: the content of the file
"""
if filepath.suffix.lower() == ".gz":
with open(filepath, "rb") as f:
header = f.read(2)
Expand All @@ -146,7 +161,7 @@ def get_machine_info(machine_info: Optional[str] = None) -> dict[str, Any]:
return environment.get_machine_specification()


def import_profile(
def import_perf_profile(
profiles: ImportedProfiles,
resources: list[dict[str, Any]],
minor_version: MinorVersion,
Expand All @@ -155,6 +170,16 @@ def import_profile(
save_to_index: bool = False,
**kwargs: Any,
) -> None:
"""Constructs the profile for perf-collected data and saves them to jobs or index

:param profiles: list of to-be-imported profiles
:param resources: list of parsed resources
:param minor_version: minor version corresponding to the imported profiles
:param machine_info: additional dictionary with machine specification
:param with_sudo: indication whether the data were collected with sudo
:param save_to_index: indication whether we should save the imported profiles to index
:param kwargs: rest of the paramters
"""
prof = Profile(
{
"global": {
Expand Down Expand Up @@ -191,6 +216,16 @@ def import_profile(
)
prof.update({"postprocessors": []})

save_imported_profile(prof, save_to_index, minor_version)


def save_imported_profile(prof: Profile, save_to_index: bool, minor_version: MinorVersion) -> None:
"""Saves the imported profile either to index or to pending jobs

:param prof: imported profile
:param minor_version: minor version corresponding to the imported profiles
:param save_to_index: indication whether we should save the imported profiles to index
"""
full_profile_name = p_helpers.generate_profile_name(prof)
profile_directory = pcs.get_job_directory()
full_profile_path = os.path.join(profile_directory, full_profile_name)
Expand All @@ -216,7 +251,18 @@ def import_perf_from_record(
with_sudo: bool = False,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record`"""
"""Imports profile collected by `perf record`

It does some black magic in ImportedProfiles probably, then for each filename it runs the
perf script + parser script to generate the profile.

:param imported: list of files with imported data
:param import_dir: different directory for importing the profiles
:param stats_info: additional statistics collected for the profile (i.e. non-resource types)
:param minor_version: minor version corresponding to the imported profiles
:param with_sudo: indication whether the data were collected with sudo
:param kwargs: rest of the paramters
"""
parse_script = script_kit.get_script("stackcollapse-perf.pl")
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)

Expand All @@ -239,7 +285,7 @@ def import_perf_from_record(
log.error(f"Cannot load data due to: {err}")
resources.extend(parser.parse_events(out.decode("utf-8").split("\n")))
log.minor_success(log.path_style(str(imported_file.path)), "imported")
import_profile(profiles, resources, minor_version_info, with_sudo=with_sudo, **kwargs)
import_perf_profile(profiles, resources, minor_version_info, with_sudo=with_sudo, **kwargs)


@vcs_kit.lookup_minor_version
Expand All @@ -250,7 +296,17 @@ def import_perf_from_script(
minor_version: str,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record; perf script`"""
"""Imports profile collected by `perf record | perf script`

It does some black magic in ImportedProfiles probably, then for each filename it runs the
parser script to generate the profile.

:param imported: list of files with imported data
:param import_dir: different directory for importing the profiles
:param stats_info: additional statistics collected for the profile (i.e. non-resource types)
:param minor_version: minor version corresponding to the imported profiles
:param kwargs: rest of the paramters
"""
parse_script = script_kit.get_script("stackcollapse-perf.pl")
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)

Expand All @@ -263,7 +319,7 @@ def import_perf_from_script(
log.minor_success(f"Raw data from {log.path_style(str(imported_file.path))}", "collected")
resources.extend(parser.parse_events(out.decode("utf-8").split("\n")))
log.minor_success(log.path_style(str(imported_file.path)), "imported")
import_profile(profiles, resources, minor_version_info, **kwargs)
import_perf_profile(profiles, resources, minor_version_info, **kwargs)


@vcs_kit.lookup_minor_version
Expand All @@ -274,7 +330,16 @@ def import_perf_from_stack(
minor_version: str,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record; perf script | stackcollapse-perf.pl`"""
"""Imports profile collected by `perf record | perf script`

It does some black magic in ImportedProfiles probably, then for each filename parses the files.

:param imported: list of files with imported data
:param import_dir: different directory for importing the profiles
:param stats_info: additional statistics collected for the profile (i.e. non-resource types)
:param minor_version: minor version corresponding to the imported profiles
:param kwargs: rest of the paramters
"""
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)
profiles = ImportedProfiles(imported, import_dir, stats_info)

Expand All @@ -284,4 +349,156 @@ def import_perf_from_stack(
out = load_file(imported_profile.path)
resources.extend(parser.parse_events(out.split("\n")))
log.minor_success(log.path_style(str(imported_profile.path)), "imported")
import_profile(profiles, resources, minor_version_info, **kwargs)
import_perf_profile(profiles, resources, minor_version_info, **kwargs)


def extract_machine_info_from_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
"""Extracts the parts of the profile, that corresponds to machine info

Note that not many is collected from the ELK formats and it can vary greatly,
hence, most of the machine specification and environment should be in metadata instead.

:param metadata: metadata extracted from the ELK profiles
:return: machine info extracted from the profiles
"""
machine_info = {
"architecture": metadata.get("machine.arch", "?"),
"system": metadata.get("machine.os", "?").capitalize(),
"release": metadata.get("extra.machine.platform", "?"),
"host": metadata.get("machine.hostname", "?"),
"cpu": {
"physical": "?",
"total": metadata.get("machine.cpu-cores", "?"),
"frequency": "?",
},
"memory": {
"total_ram": metadata.get("machine.ram", "?"),
"swap": "?",
},
}

machine_info["boot_info"] = "?"
machine_info["mem_details"] = {}
machine_info["cpu_details"] = []
return machine_info


def import_elk_profile(
resources: list[dict[str, Any]],
metadata: dict[str, Any],
minor_version: MinorVersion,
save_to_index: bool = False,
**kwargs: Any,
) -> None:
"""Constructs the profile for elk-stored data and saves them to jobs or index

:param resources: list of parsed resources
:param metadata: parts of the profiles that will be stored as metadata in the profile
:param minor_version: minor version corresponding to the imported profiles
:param save_to_index: indication whether we should save the imported profiles to index
:param kwargs: rest of the paramters
"""
prof = Profile(
{
"global": {
"time": "???",
"resources": resources,
}
}
)
prof.update({"origin": minor_version.checksum})
prof.update({"metadata": metadata})
prof.update({"machine": extract_machine_info_from_metadata(metadata)})
prof.update(
{
"header": {
"type": "time",
"cmd": kwargs.get("cmd", ""),
"exitcode": "?",
"workload": kwargs.get("workload", ""),
"units": {"time": "sample"},
}
}
)
prof.update(
{
"collector_info": {
"name": "???",
"params": {},
}
}
)
prof.update({"postprocessors": []})

save_imported_profile(prof, save_to_index, minor_version)


def extract_from_elk(
elk_query: list[dict[str, Any]]
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""For the given elk query, extracts resources and metadata.

For metadata we consider any key that has only single value through the profile,
and is not linked to keywords `metric` or `benchmarking`.
For resources we consider anything that is not identified as metadata

:param elk_query: query from the elk in form of list of resource
:return: list of resources and metadata
"""
res_counter = defaultdict(set)
for res in elk_query:
for key, val in res.items():
res_counter[key].add(val)
metadata_keys = {
k
for (k, v) in res_counter.items()
if not k.startswith("metric") and not k.startswith("benchmarking") and len(v) == 1
}

metadata = {k: res_counter[k].pop() for k in metadata_keys}
resources = [
{
k: common_kit.try_convert(v, [int, float, str])
for k, v in res.items()
if k not in metadata_keys
}
for res in elk_query
]
# We register uid
for res in resources:
res["uid"] = res["metric.name"]
res["benchmarking.time"] = res["benchmarking.end-ts"] - res["benchmarking.start-ts"]
res.pop("benchmarking.end-ts")
res.pop("benchmarking.start-ts")
return resources, metadata


@vcs_kit.lookup_minor_version
def import_elk_from_json(
imported: list[str],
minor_version: str,
**kwargs: Any,
) -> None:
"""Imports the ELK stored data from the json data.

The loading expects the json files to be in form of `{'queries': []}`.

:param imported: list of filenames with elk data.
:param minor_version: minor version corresponding to the imported profiles
:param kwargs: rest of the paramters
"""
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)

resources: list[dict[str, Any]] = []
metadata: dict[str, Any] = {}
for imported_file in imported:
with open(imported_file, "r") as imported_handle:
imported_json = json.load(imported_handle)
assert (
"queries" in imported_json.keys()
), "expected the JSON to contain list of dictionaries in 'queries' key"
r, m = extract_from_elk(imported_json["queries"])
resources.extend(r)
metadata.update(m)
log.minor_success(log.path_style(str(imported_file)), "imported")
import_elk_profile(resources, metadata, minor_version_info, **kwargs)
Loading
Loading