Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP, ENH: parquet demo #929

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 127 additions & 77 deletions darshan-util/pydarshan/darshan/cli/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,15 @@ def __init__(self, log_path: str):
# store the log path and use it to generate the report
self.log_path = log_path
# store the report
self.report = darshan.DarshanReport(log_path, read_all=True)
try:
self.report = darshan.DarshanReport(log_path, read_all=True)
except RuntimeError:
# assume parquet format input if not None
if not os.path.exists(log_path):
raise RuntimeError
else:
self.report = pd.read_parquet(log_path)

# create the header/footer
self.get_header()
self.get_footer()
Expand Down Expand Up @@ -162,7 +170,11 @@ def get_full_command(report: darshan.report.DarshanReport) -> str:

"""
# assign the executable from the report metadata
cmd = report.metadata["exe"]
try:
cmd = report.metadata["exe"]
except AttributeError:
# TODO: fix for parquet format
return "N/A"
if not cmd:
# if there is no executable
# label as not available
Expand Down Expand Up @@ -202,9 +214,14 @@ def get_header(self):
else:
app_name = os.path.basename(command.split()[0])
# collect the date from the time stamp
date = datetime.date.fromtimestamp(self.report.metadata["job"]["start_time_sec"])
# the header is the application name and the log date
self.header = f"{app_name} ({date})"
try:
date = datetime.date.fromtimestamp(self.report.metadata["job"]["start_time_sec"])
except AttributeError:
# TODO: fix for parquet input
self.header = f"N/A N/A"
else:
# the header is the application name and the log date
self.header = f"{app_name} ({date})"

def get_footer(self):
"""
Expand All @@ -218,53 +235,61 @@ def get_metadata_table(self):
Builds the metadata table (in html form) for the summary report.
"""
# assign the metadata dictionary
job_data = self.report.metadata["job"]
# build a dictionary with the appropriate metadata
metadata_dict = {
"Job ID": job_data["jobid"],
"User ID": job_data["uid"],
"# Processes": job_data["nprocs"],
"Run time (s)": self.get_runtime(report=self.report),
"Start Time": datetime.datetime.fromtimestamp(job_data["start_time_sec"]),
"End Time": datetime.datetime.fromtimestamp(job_data["end_time_sec"]),
"Command Line": self.get_full_command(report=self.report),
}
# convert the dictionary into a dataframe
metadata_df = pd.DataFrame.from_dict(data=metadata_dict, orient="index")
# write out the table in html
self.metadata_table = metadata_df.to_html(header=False, border=0)
try:
job_data = self.report.metadata["job"]
# build a dictionary with the appropriate metadata
metadata_dict = {
"Job ID": job_data["jobid"],
"User ID": job_data["uid"],
"# Processes": job_data["nprocs"],
"Run time (s)": self.get_runtime(report=self.report),
"Start Time": datetime.datetime.fromtimestamp(job_data["start_time_sec"]),
"End Time": datetime.datetime.fromtimestamp(job_data["end_time_sec"]),
"Command Line": self.get_full_command(report=self.report),
}
# convert the dictionary into a dataframe
metadata_df = pd.DataFrame.from_dict(data=metadata_dict, orient="index")
# write out the table in html
self.metadata_table = metadata_df.to_html(header=False, border=0)
except AttributeError:
# TODO: fix for parquet format
self.metadata_table = ""

def get_module_table(self):
"""
Builds the module table (in html form) for the summary report.
"""
# construct a dictionary containing the module names,
# their respective data stored in KiB, and the log metadata
job_data = self.report.metadata["job"]
module_dict= {
"Log Filename": [os.path.basename(self.log_path), ""],
"Runtime Library Version": [job_data["metadata"]["lib_ver"], ""],
"Log Format Version": [job_data["log_ver"], ""],
}
try:
job_data = self.report.metadata["job"]
module_dict= {
"Log Filename": [os.path.basename(self.log_path), ""],
"Runtime Library Version": [job_data["metadata"]["lib_ver"], ""],
"Log Format Version": [job_data["log_ver"], ""],
}

for mod in self.report.modules:
# retrieve the module version and buffer sizes
mod_version = self.report.modules[mod]["ver"]
# retrieve the buffer size converted to KiB
mod_buf_size = self.report.modules[mod]["len"] / 1024
# create the key/value pairs for the dictionary
key = f"{mod} (ver={mod_version}) Module Data"
val = f"{mod_buf_size:.2f} KiB"
flag = ""
if self.report.modules[mod]["partial_flag"]:
msg = "Module data incomplete due to runtime memory or record count limits"
flag = f"<p style='color:red'>&#x26A0; {msg}</p>"
module_dict[key] = [val, flag]

# convert the module dictionary into a dataframe
module_df = pd.DataFrame.from_dict(data=module_dict, orient="index")
# write out the table in html
self.module_table = module_df.to_html(header=False, border=0, escape=False)
for mod in self.report.modules:
# retrieve the module version and buffer sizes
mod_version = self.report.modules[mod]["ver"]
# retrieve the buffer size converted to KiB
mod_buf_size = self.report.modules[mod]["len"] / 1024
# create the key/value pairs for the dictionary
key = f"{mod} (ver={mod_version}) Module Data"
val = f"{mod_buf_size:.2f} KiB"
flag = ""
if self.report.modules[mod]["partial_flag"]:
msg = "Module data incomplete due to runtime memory or record count limits"
flag = f"<p style='color:red'>&#x26A0; {msg}</p>"
module_dict[key] = [val, flag]

# convert the module dictionary into a dataframe
module_df = pd.DataFrame.from_dict(data=module_dict, orient="index")
# write out the table in html
self.module_table = module_df.to_html(header=False, border=0, escape=False)
except AttributeError:
# TODO: fix for parquet format
self.module_table = ""

def get_stylesheet(self):
"""
Expand Down Expand Up @@ -333,6 +358,11 @@ def register_figures(self):
"""
self.figures = []

if not hasattr(self.report, "modules"):
# TODO: fix for parquet format;
# only supports POSIX for now
self.report.modules = ["POSIX"]

if not self.report.modules:
# no data in report to summarize, print warning and that's it
no_data_message = (
Expand Down Expand Up @@ -441,8 +471,12 @@ def register_figures(self):
"fig_description": io_cost_description,
"fig_width": 350,
}
io_cost_fig = ReportFigure(**io_cost_params)
self.figures.append(io_cost_fig)
try:
io_cost_fig = ReportFigure(**io_cost_params)
self.figures.append(io_cost_fig)
except AttributeError:
# TODO: fix for parquet format
pass

################################
## Per-Module Statistics
Expand Down Expand Up @@ -477,15 +511,19 @@ def register_figures(self):
"of the most frequently occurring access sizes can be found in "
"the <i>Common Access Sizes</i> table."
)
access_hist_fig = ReportFigure(
section_title=sect_title,
fig_title="Access Sizes",
fig_func=plot_access_histogram,
fig_args=dict(report=self.report, mod=mod),
fig_description=access_hist_description,
fig_width=350,
)
self.figures.append(access_hist_fig)
try:
access_hist_fig = ReportFigure(
section_title=sect_title,
fig_title="Access Sizes",
fig_func=plot_access_histogram,
fig_args=dict(report=self.report, mod=mod),
fig_description=access_hist_description,
fig_width=350,
)
self.figures.append(access_hist_fig)
except AttributeError:
# TODO: fix for parquet format
pass
if mod == "MPI-IO":
com_acc_tbl_description = (
"NOTE: MPI-IO accesses are given in "
Expand All @@ -505,22 +543,30 @@ def register_figures(self):

# add the operation counts figure
if mod in opcounts_mods:
opcount_fig = ReportFigure(
section_title=sect_title,
fig_title="Operation Counts",
fig_func=plot_opcounts,
fig_args=dict(report=self.report, mod=mod),
fig_description="Histogram of I/O operation frequency.",
fig_width=350,
)
self.figures.append(opcount_fig)
try:
opcount_fig = ReportFigure(
section_title=sect_title,
fig_title="Operation Counts",
fig_func=plot_opcounts,
fig_args=dict(report=self.report, mod=mod),
fig_description="Histogram of I/O operation frequency.",
fig_width=350,
)
self.figures.append(opcount_fig)
except AttributeError:
# TODO: fix for parquet
pass

try:
if mod in ["POSIX", "MPI-IO", "STDIO"]:
# get the module's record dataframe and then pass to
# Darshan accumulator interface to generate a cumulative
# record and derived metrics
rec_dict = self.report.records[mod].to_df()
try:
rec_dict = self.report.records[mod].to_df()
except AttributeError:
# TODO: fix for parquet format
break
nprocs = self.report.metadata['job']['nprocs']
acc = accumulate_records(rec_dict, mod, nprocs)

Expand Down Expand Up @@ -574,18 +620,22 @@ def register_figures(self):
#########################
# Data Access by Category
if not {"POSIX", "STDIO"}.isdisjoint(set(self.report.modules)):
data_access_by_cat_fig = ReportFigure(
section_title="Data Access by Category",
fig_title="",
fig_func=data_access_by_filesystem.plot_with_report,
fig_args=dict(report=self.report, num_cats=8),
fig_description="Summary of data access volume "
"categorized by storage "
"target (e.g., file system "
"mount point) and sorted by volume.",
fig_width=500,
)
self.figures.append(data_access_by_cat_fig)
try:
data_access_by_cat_fig = ReportFigure(
section_title="Data Access by Category",
fig_title="",
fig_func=data_access_by_filesystem.plot_with_report,
fig_args=dict(report=self.report, num_cats=8),
fig_description="Summary of data access volume "
"categorized by storage "
"target (e.g., file system "
"mount point) and sorted by volume.",
fig_width=500,
)
self.figures.append(data_access_by_cat_fig)
except AttributeError:
# TODO: fix for parquet format
pass



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def plot_common_access_table(report: darshan.DarshanReport, mod: str, n_rows: in
the `df` or `html` attributes, respectively.

"""
mod_df = report.records[mod].to_df(attach=None)["counters"]
try:
mod_df = report.records[mod].to_df(attach=None)["counters"]
except AttributeError:
# TODO: fix for parquet format
mod_df = report.iloc[..., :71] # type: ignore

if mod == "MPI-IO":
mod = "MPIIO"
Expand Down
19 changes: 19 additions & 0 deletions darshan-util/pydarshan/darshan/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import os
import glob
from typing import Optional, Any
import darshan
import pandas as pd

if "pytest" in sys.modules:
# only import pytest if used in a testing context
Expand Down Expand Up @@ -102,3 +104,20 @@ def get_log_path(filename: str) -> str:
pytest.skip(err_msg)
else:
raise FileNotFoundError(err_msg)


def convert_to_parquet(darshan_log_filename: str,
out_filepath: str):
# NOTE: only supports POSIX, and only tested on
# 1 log file so far...
with darshan.DarshanReport(darshan_log_filename) as report:
recs = report.data["records"]
df_posix_counters = recs["POSIX"].to_df()["counters"]
df_posix_fcounters = recs["POSIX"].to_df()["fcounters"]
# NOTE: is it always true that counters and counters will
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# NOTE: is it always true that counters and counters will
# NOTE: is it always true that counters and fcounters will

# have the same number of records for a given darshan log?
assert df_posix_counters.shape[0] == df_posix_fcounters.shape[0]
df_fused = pd.concat([df_posix_counters, df_posix_fcounters.iloc[..., 2:]],
axis=1)
df_fused.to_parquet(out_filepath,
compression="gzip")
40 changes: 39 additions & 1 deletion darshan-util/pydarshan/darshan/tests/test_log_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os

import darshan
from darshan.log_utils import get_log_path
from darshan.log_utils import get_log_path, convert_to_parquet

import numpy as np
import pandas as pd
import pytest


Expand Down Expand Up @@ -52,3 +54,39 @@ def test_failure_bad_logname():
with pytest.raises(FileNotFoundError,
match="could not be found"):
get_log_path("garbage_$*(&**.darshan")


@pytest.mark.parametrize("log_filename", [
"runtime_and_dxt_heatmaps_diagonal_write_only.darshan",
])
def test_parquet_convert_simple(tmp_path, log_filename):
# crude/early stage testing for conversion from
# in-house binary darshan log format to parquet
# file format
try:
import pyarrow
except ImportError:
pytest.skip("pyarrow required for this test")
log_path = get_log_path(log_filename)
outfile = tmp_path / f"{log_filename}.gzip"
convert_to_parquet(log_path, outfile)
# check for appropriate data shape
# (POSIX module only for now)
actual_df = pd.read_parquet(outfile)
# there are 32 active ranks, and 71
# counter columns + 17 fcounter columns
# for POSIX (we don't repeat rank and id)
assert actual_df.shape == (32, 88)
actual_dtypes = actual_df.dtypes.values
# rank is first column and should be int64
assert actual_dtypes[0] == np.int64
# id is second column and should be uint64
assert actual_dtypes[1] == np.uint64
# next 30 columns should be int64 counters
counter_type = np.unique(actual_dtypes[2:71])
assert counter_type.size == 1
assert counter_type[0] == np.int64
# next 17 columns should float64 fcounters
fcounter_type = np.unique(actual_dtypes[71:88])
assert fcounter_type.size == 1
assert fcounter_type[0] == np.float64
3 changes: 3 additions & 0 deletions darshan-util/pydarshan/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ ignore_missing_imports = True
[mypy-pandas.*]
ignore_missing_imports = True

[mypy-pyarrow]
ignore_missing_imports = True

[mypy-seaborn]
ignore_missing_imports = True

Expand Down