Skip to content

Commit

Permalink
Merge pull request #247 from Perfexionists/import-fix
Browse files Browse the repository at this point in the history
Fix minor issues in import
  • Loading branch information
tfiedor authored Jul 30, 2024
2 parents b5a2fa3 + 49621f4 commit f71714f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 91 deletions.
25 changes: 6 additions & 19 deletions perun/cli_groups/import_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,25 @@
@click.option(
"--minor-version",
"-m",
"minor_version_list",
nargs=1,
multiple=True,
callback=cli_kit.minor_version_list_callback,
default=["HEAD"],
default=None,
is_eager=True,
help="Specifies the head minor version, for which the profiles will be imported.",
)
@click.option(
"--exitcode",
"-e",
nargs=1,
required=False,
multiple=True,
default=["?"],
default="?",
help=("Exit code of the command."),
)
@click.option(
"--cmd",
"-c",
nargs=1,
required=False,
multiple=True,
default=[""],
default="",
help=(
"Command that was being profiled. Either corresponds to some"
" script, binary or command, e.g. ``./mybin`` or ``perun``."
Expand All @@ -58,8 +54,7 @@
"-w",
nargs=1,
required=False,
multiple=True,
default=[""],
default="",
help="Inputs for <cmd>. E.g. ``./subdir`` is possible workload for ``ls`` command.",
)
@click.option(
Expand All @@ -80,17 +75,9 @@ def import_group(ctx: click.Context, **kwargs: Any) -> None:
@click.option(
"--warmup",
"-w",
multiple=True,
default=[0],
default=0,
help="Sets [INT] warm up iterations of ith profiled command.",
)
@click.option(
"--repeat",
"-r",
multiple=True,
default=[1],
help="Sets [INT] samplings of the ith profiled command.",
)
@click.pass_context
def perf_group(ctx: click.Context, **kwargs: Any) -> None:
"""Imports Perun profiles from perf results
Expand Down
133 changes: 65 additions & 68 deletions perun/profile/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import subprocess

# Third-Party Imports
import gzip

# Perun Imports
from perun.collect.kperf import parser
Expand All @@ -19,6 +20,19 @@
from perun.utils.external import commands as external_commands, environment
from perun.utils.structs import MinorVersion
from perun.profile.factory import Profile
from perun.vcs import vcs_kit


def load_file(filename: str) -> str:
if filename.endswith(".gz"):
with open(filename, "rb") as f:
header = f.read(2)
f.seek(0)
assert header == b"\x1f\x8b"
with gzip.GzipFile(fileobj=f) as gz:
return gz.read().decode("utf-8")
with open(filename, "r", encoding="utf-8") as imported_handle:
return imported_handle.read()


def get_machine_info(machine_info: Optional[str] = None) -> dict[str, Any]:
Expand All @@ -34,32 +48,14 @@ def get_machine_info(machine_info: Optional[str] = None) -> dict[str, Any]:
return environment.get_machine_specification()


def get_param(cfg: dict[str, Any], param: str, index: int) -> Any:
"""Helper function for retrieving parameter from the dictionary of lists.
This assumes, that dictionary contains list of parameters under certain keys.
It retrieves the list under the key and then returns the index. The function
fails, when the index is out of bounds.
:param l: list we are getting from
:param param: param which contains the list
:param index: index from which we are retrieving
:return: value of the param
"""
assert index < len(cfg[param]), f"Not enough values set up for the '{param}' command."
return cfg[param][index]


def import_from_string(
out: str,
resources: list[dict[str, Any]],
minor_version: MinorVersion,
prof_index: int,
machine_info: Optional[str] = None,
with_sudo: bool = False,
save_to_index: bool = False,
**kwargs: Any,
) -> None:
resources = parser.parse_events(out.split("\n"))
prof = Profile(
{
"global": {
Expand All @@ -74,9 +70,9 @@ def import_from_string(
{
"header": {
"type": "time",
"cmd": get_param(kwargs, "cmd", prof_index),
"exitcode": get_param(kwargs, "exitcode", prof_index),
"workload": get_param(kwargs, "workload", prof_index),
"cmd": kwargs.get("cmd", ""),
"exitcode": kwargs.get("exitcode", "?"),
"workload": kwargs.get("workload", ""),
"units": {"time": "sample"},
}
}
Expand All @@ -87,8 +83,8 @@ def import_from_string(
"name": "kperf",
"params": {
"with_sudo": with_sudo,
"warmup": get_param(kwargs, "warmup", prof_index),
"repeat": get_param(kwargs, "repeat", prof_index),
"warmup": kwargs.get("warmup", 0),
"repeat": kwargs.get("repeat", 1),
},
}
}
Expand All @@ -111,23 +107,24 @@ def import_from_string(
index.register_in_pending_index(full_profile_path, prof)


@vcs_kit.lookup_minor_version
def import_perf_from_record(
imported: list[str],
machine_info: Optional[str],
minor_version_list: list[MinorVersion],
minor_version: str,
with_sudo: bool = False,
save_to_index: bool = False,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record`"""
assert (
len(minor_version_list) == 1
), f"One can import profile for single version only (got {len(minor_version_list)} instead)"
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)
kwargs["repeat"] = len(imported)

parse_script = script_kit.get_script("stackcollapse-perf.pl")
out = b""

for i, imported_file in enumerate(imported):
resources = []
for imported_file in imported:
perf_script_command = (
f"{'sudo ' if with_sudo else ''}perf script -i {imported_file} | {parse_script}"
)
Expand All @@ -137,69 +134,69 @@ def import_perf_from_record(
except subprocess.CalledProcessError as err:
log.minor_fail(f"Raw data from {log.path_style(imported_file)}", "not collected")
log.error(f"Cannot load data due to: {err}")
import_from_string(
out.decode("utf-8"),
minor_version_list[0],
i,
machine_info,
with_sudo=with_sudo,
save_to_index=save_to_index,
**kwargs,
)
resources.extend(parser.parse_events(out.decode("utf-8").split("\n")))
log.minor_success(log.path_style(imported_file), "imported")
import_from_string(
resources,
minor_version_info,
machine_info,
with_sudo=with_sudo,
save_to_index=save_to_index,
**kwargs,
)


@vcs_kit.lookup_minor_version
def import_perf_from_script(
imported: list[str],
machine_info: Optional[str],
minor_version_list: list[MinorVersion],
minor_version: str,
save_to_index: bool = False,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record; perf script`"""
assert (
len(minor_version_list) == 1
), f"One can import profile for single version only (got {len(minor_version_list)} instead)"

parse_script = script_kit.get_script("stackcollapse-perf.pl")
out = b""
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)
kwargs["repeat"] = len(imported)

for i, imported_file in enumerate(imported):
resources = []
for imported_file in imported:
perf_script_command = f"cat {imported_file} | {parse_script}"
out, _ = external_commands.run_safely_external_command(perf_script_command)
log.minor_success(f"Raw data from {log.path_style(imported_file)}", "collected")
import_from_string(
out.decode("utf-8"),
minor_version_list[0],
i,
machine_info,
save_to_index=save_to_index,
**kwargs,
)
resources.extend(parser.parse_events(out.decode("utf-8").split("\n")))
log.minor_success(log.path_style(imported_file), "imported")
import_from_string(
resources,
minor_version_info,
machine_info,
save_to_index=save_to_index,
**kwargs,
)


@vcs_kit.lookup_minor_version
def import_perf_from_stack(
imported: list[str],
machine_info: Optional[str],
minor_version_list: list[MinorVersion],
minor_version: str,
save_to_index: bool = False,
**kwargs: Any,
) -> None:
"""Imports profile collected by `perf record; perf script | stackcollapse-perf.pl`"""
assert (
len(minor_version_list) == 1
), f"One can import profile for single version only (got {len(minor_version_list)} instead)"

for i, imported_file in enumerate(imported):
with open(imported_file, "r", encoding="utf-8") as imported_handle:
out = imported_handle.read()
import_from_string(
out,
minor_version_list[0],
i,
machine_info,
save_to_index=save_to_index,
**kwargs,
)
minor_version_info = pcs.vcs().get_minor_version_info(minor_version)
kwargs["repeat"] = len(imported)

resources = []
for imported_file in imported:
out = load_file(imported_file)
resources.extend(parser.parse_events(out.split("\n")))
log.minor_success(log.path_style(imported_file), "imported")
import_from_string(
resources,
minor_version_info,
machine_info,
save_to_index=save_to_index,
**kwargs,
)
6 changes: 5 additions & 1 deletion perun/vcs/vcs_kit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ def lookup_minor_version(func: Callable[..., Any]) -> Callable[..., Any]:

def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
"""Inner wrapper of the function"""
if "minor_version" in kwargs:
if kwargs["minor_version"] is None:
kwargs["minor_version"] = pcs.vcs().get_minor_head()
pcs.vcs().check_minor_version_validity(kwargs["minor_version"])
# if the minor_version is None, then we obtain the minor head for the wrapped type
if minor_version_position < len(args) and args[minor_version_position] is None:
elif minor_version_position < len(args) and args[minor_version_position] is None:
# note: since tuples are immutable we have to do this workaround
arg_list = list(args)
arg_list[minor_version_position] = pcs.vcs().get_minor_head()
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

# Standard Imports
from typing import Iterable, Callable
from typing import Iterable, Callable, Optional
import glob
import os
import shutil
Expand Down Expand Up @@ -180,7 +180,7 @@ def memory_profiles():


def load_all_profiles_in(
directory: str, prof_filter: Callable[[str], bool] = None
directory: str, prof_filter: Optional[Callable[[str], bool]] = None
) -> Iterable[tuple[str, "Profile"]]:
"""Generates stream of loaded (i.e. dictionaries) profiles in the specified directory.
Expand Down
Binary file added tests/sources/imports/import.stack.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def test_imports(pcs_with_svs):
["import", "-c", "ls", "-w", ".", "perf", "record", os.path.join(pool_path, "import.data")],
)
assert result.exit_code == 0
print(result.output)
assert len(os.listdir(os.path.join(".perun", "jobs"))) == 2

result = runner.invoke(
Expand Down Expand Up @@ -67,6 +66,7 @@ def test_imports(pcs_with_svs):
"perf",
"stack",
os.path.join(pool_path, "import.stack"),
os.path.join(pool_path, "import.stack.gz"),
],
)
assert result.exit_code == 0
Expand Down

0 comments on commit f71714f

Please sign in to comment.