From b6c77ce6905d3151050628758dfdc73e3792b385 Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Sat, 6 May 2023 14:47:15 -0600 Subject: [PATCH 1/2] WIP, ENH: parquet demo * a quick demo with POSIX-only support for a single summary report plot for parquet input, and a converter that only support POSIX and was only tested on a single log file * this does appear to allow the full test suite to pass while adding incredeibly-crude summary report support for working with a parquet file that has POSIX counter/fcounter data * there are a few reasons to demo this: 1) It may help spark some discussion about how this should work because I already made some potentially-controversial decisions like concatenating along the columns to fuse counter and fcounters 2) The various `TODO` comments I added around try/except blocks should give a good indicator of the number of places in the code where changes would be needed to produce a more complete summary report from parquet input 3) Sometimes it is easier to develop from a (crude) prototype if a summer student picks this up (vs. from scratch) --- darshan-util/pydarshan/darshan/cli/summary.py | 204 +++++++++++------- .../plots/plot_common_access_table.py | 6 +- darshan-util/pydarshan/darshan/log_utils.py | 19 ++ .../pydarshan/darshan/tests/test_log_utils.py | 40 +++- 4 files changed, 190 insertions(+), 79 deletions(-) diff --git a/darshan-util/pydarshan/darshan/cli/summary.py b/darshan-util/pydarshan/darshan/cli/summary.py index 1a824b0fc..4ecae5503 100644 --- a/darshan-util/pydarshan/darshan/cli/summary.py +++ b/darshan-util/pydarshan/darshan/cli/summary.py @@ -133,7 +133,15 @@ def __init__(self, log_path: str): # store the log path and use it to generate the report self.log_path = log_path # store the report - self.report = darshan.DarshanReport(log_path, read_all=True) + try: + self.report = darshan.DarshanReport(log_path, read_all=True) + except RuntimeError: + # assume parquet format input if not None + if not os.path.exists(log_path): + raise RuntimeError + else: + self.report = pd.read_parquet(log_path) + # create the header/footer self.get_header() self.get_footer() @@ -162,7 +170,11 @@ def get_full_command(report: darshan.report.DarshanReport) -> str: """ # assign the executable from the report metadata - cmd = report.metadata["exe"] + try: + cmd = report.metadata["exe"] + except AttributeError: + # TODO: fix for parquet format + return "N/A" if not cmd: # if there is no executable # label as not available @@ -202,9 +214,14 @@ def get_header(self): else: app_name = os.path.basename(command.split()[0]) # collect the date from the time stamp - date = datetime.date.fromtimestamp(self.report.metadata["job"]["start_time_sec"]) - # the header is the application name and the log date - self.header = f"{app_name} ({date})" + try: + date = datetime.date.fromtimestamp(self.report.metadata["job"]["start_time_sec"]) + except AttributeError: + # TODO: fix for parquet input + self.header = f"N/A N/A" + else: + # the header is the application name and the log date + self.header = f"{app_name} ({date})" def get_footer(self): """ @@ -218,21 +235,25 @@ def get_metadata_table(self): Builds the metadata table (in html form) for the summary report. """ # assign the metadata dictionary - job_data = self.report.metadata["job"] - # build a dictionary with the appropriate metadata - metadata_dict = { - "Job ID": job_data["jobid"], - "User ID": job_data["uid"], - "# Processes": job_data["nprocs"], - "Run time (s)": self.get_runtime(report=self.report), - "Start Time": datetime.datetime.fromtimestamp(job_data["start_time_sec"]), - "End Time": datetime.datetime.fromtimestamp(job_data["end_time_sec"]), - "Command Line": self.get_full_command(report=self.report), - } - # convert the dictionary into a dataframe - metadata_df = pd.DataFrame.from_dict(data=metadata_dict, orient="index") - # write out the table in html - self.metadata_table = metadata_df.to_html(header=False, border=0) + try: + job_data = self.report.metadata["job"] + # build a dictionary with the appropriate metadata + metadata_dict = { + "Job ID": job_data["jobid"], + "User ID": job_data["uid"], + "# Processes": job_data["nprocs"], + "Run time (s)": self.get_runtime(report=self.report), + "Start Time": datetime.datetime.fromtimestamp(job_data["start_time_sec"]), + "End Time": datetime.datetime.fromtimestamp(job_data["end_time_sec"]), + "Command Line": self.get_full_command(report=self.report), + } + # convert the dictionary into a dataframe + metadata_df = pd.DataFrame.from_dict(data=metadata_dict, orient="index") + # write out the table in html + self.metadata_table = metadata_df.to_html(header=False, border=0) + except AttributeError: + # TODO: fix for parquet format + self.metadata_table = "" def get_module_table(self): """ @@ -240,31 +261,35 @@ def get_module_table(self): """ # construct a dictionary containing the module names, # their respective data stored in KiB, and the log metadata - job_data = self.report.metadata["job"] - module_dict= { - "Log Filename": [os.path.basename(self.log_path), ""], - "Runtime Library Version": [job_data["metadata"]["lib_ver"], ""], - "Log Format Version": [job_data["log_ver"], ""], - } + try: + job_data = self.report.metadata["job"] + module_dict= { + "Log Filename": [os.path.basename(self.log_path), ""], + "Runtime Library Version": [job_data["metadata"]["lib_ver"], ""], + "Log Format Version": [job_data["log_ver"], ""], + } - for mod in self.report.modules: - # retrieve the module version and buffer sizes - mod_version = self.report.modules[mod]["ver"] - # retrieve the buffer size converted to KiB - mod_buf_size = self.report.modules[mod]["len"] / 1024 - # create the key/value pairs for the dictionary - key = f"{mod} (ver={mod_version}) Module Data" - val = f"{mod_buf_size:.2f} KiB" - flag = "" - if self.report.modules[mod]["partial_flag"]: - msg = "Module data incomplete due to runtime memory or record count limits" - flag = f"

⚠ {msg}

" - module_dict[key] = [val, flag] - - # convert the module dictionary into a dataframe - module_df = pd.DataFrame.from_dict(data=module_dict, orient="index") - # write out the table in html - self.module_table = module_df.to_html(header=False, border=0, escape=False) + for mod in self.report.modules: + # retrieve the module version and buffer sizes + mod_version = self.report.modules[mod]["ver"] + # retrieve the buffer size converted to KiB + mod_buf_size = self.report.modules[mod]["len"] / 1024 + # create the key/value pairs for the dictionary + key = f"{mod} (ver={mod_version}) Module Data" + val = f"{mod_buf_size:.2f} KiB" + flag = "" + if self.report.modules[mod]["partial_flag"]: + msg = "Module data incomplete due to runtime memory or record count limits" + flag = f"

⚠ {msg}

" + module_dict[key] = [val, flag] + + # convert the module dictionary into a dataframe + module_df = pd.DataFrame.from_dict(data=module_dict, orient="index") + # write out the table in html + self.module_table = module_df.to_html(header=False, border=0, escape=False) + except AttributeError: + # TODO: fix for parquet format + self.module_table = "" def get_stylesheet(self): """ @@ -333,6 +358,11 @@ def register_figures(self): """ self.figures = [] + if not hasattr(self.report, "modules"): + # TODO: fix for parquet format; + # only supports POSIX for now + self.report.modules = ["POSIX"] + if not self.report.modules: # no data in report to summarize, print warning and that's it no_data_message = ( @@ -441,8 +471,12 @@ def register_figures(self): "fig_description": io_cost_description, "fig_width": 350, } - io_cost_fig = ReportFigure(**io_cost_params) - self.figures.append(io_cost_fig) + try: + io_cost_fig = ReportFigure(**io_cost_params) + self.figures.append(io_cost_fig) + except AttributeError: + # TODO: fix for parquet format + pass ################################ ## Per-Module Statistics @@ -477,15 +511,19 @@ def register_figures(self): "of the most frequently occurring access sizes can be found in " "the Common Access Sizes table." ) - access_hist_fig = ReportFigure( - section_title=sect_title, - fig_title="Access Sizes", - fig_func=plot_access_histogram, - fig_args=dict(report=self.report, mod=mod), - fig_description=access_hist_description, - fig_width=350, - ) - self.figures.append(access_hist_fig) + try: + access_hist_fig = ReportFigure( + section_title=sect_title, + fig_title="Access Sizes", + fig_func=plot_access_histogram, + fig_args=dict(report=self.report, mod=mod), + fig_description=access_hist_description, + fig_width=350, + ) + self.figures.append(access_hist_fig) + except AttributeError: + # TODO: fix for parquet format + pass if mod == "MPI-IO": com_acc_tbl_description = ( "NOTE: MPI-IO accesses are given in " @@ -505,22 +543,30 @@ def register_figures(self): # add the operation counts figure if mod in opcounts_mods: - opcount_fig = ReportFigure( - section_title=sect_title, - fig_title="Operation Counts", - fig_func=plot_opcounts, - fig_args=dict(report=self.report, mod=mod), - fig_description="Histogram of I/O operation frequency.", - fig_width=350, - ) - self.figures.append(opcount_fig) + try: + opcount_fig = ReportFigure( + section_title=sect_title, + fig_title="Operation Counts", + fig_func=plot_opcounts, + fig_args=dict(report=self.report, mod=mod), + fig_description="Histogram of I/O operation frequency.", + fig_width=350, + ) + self.figures.append(opcount_fig) + except AttributeError: + # TODO: fix for parquet + pass try: if mod in ["POSIX", "MPI-IO", "STDIO"]: # get the module's record dataframe and then pass to # Darshan accumulator interface to generate a cumulative # record and derived metrics - rec_dict = self.report.records[mod].to_df() + try: + rec_dict = self.report.records[mod].to_df() + except AttributeError: + # TODO: fix for parquet format + break nprocs = self.report.metadata['job']['nprocs'] acc = accumulate_records(rec_dict, mod, nprocs) @@ -574,18 +620,22 @@ def register_figures(self): ######################### # Data Access by Category if not {"POSIX", "STDIO"}.isdisjoint(set(self.report.modules)): - data_access_by_cat_fig = ReportFigure( - section_title="Data Access by Category", - fig_title="", - fig_func=data_access_by_filesystem.plot_with_report, - fig_args=dict(report=self.report, num_cats=8), - fig_description="Summary of data access volume " - "categorized by storage " - "target (e.g., file system " - "mount point) and sorted by volume.", - fig_width=500, - ) - self.figures.append(data_access_by_cat_fig) + try: + data_access_by_cat_fig = ReportFigure( + section_title="Data Access by Category", + fig_title="", + fig_func=data_access_by_filesystem.plot_with_report, + fig_args=dict(report=self.report, num_cats=8), + fig_description="Summary of data access volume " + "categorized by storage " + "target (e.g., file system " + "mount point) and sorted by volume.", + fig_width=500, + ) + self.figures.append(data_access_by_cat_fig) + except AttributeError: + # TODO: fix for parquet format + pass diff --git a/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py b/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py index 6a1d50bb1..aa5f5f93d 100644 --- a/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py +++ b/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py @@ -145,7 +145,11 @@ def plot_common_access_table(report: darshan.DarshanReport, mod: str, n_rows: in the `df` or `html` attributes, respectively. """ - mod_df = report.records[mod].to_df(attach=None)["counters"] + try: + mod_df = report.records[mod].to_df(attach=None)["counters"] + except AttributeError: + # TODO: fix for parquet format + mod_df = report.iloc[..., :71] if mod == "MPI-IO": mod = "MPIIO" diff --git a/darshan-util/pydarshan/darshan/log_utils.py b/darshan-util/pydarshan/darshan/log_utils.py index 1519ee980..15baa5414 100644 --- a/darshan-util/pydarshan/darshan/log_utils.py +++ b/darshan-util/pydarshan/darshan/log_utils.py @@ -17,6 +17,8 @@ import os import glob from typing import Optional, Any +import darshan +import pandas as pd if "pytest" in sys.modules: # only import pytest if used in a testing context @@ -102,3 +104,20 @@ def get_log_path(filename: str) -> str: pytest.skip(err_msg) else: raise FileNotFoundError(err_msg) + + +def convert_to_parquet(darshan_log_filename: str, + out_filepath: str): + # NOTE: only supports POSIX, and only tested on + # 1 log file so far... + with darshan.DarshanReport(darshan_log_filename) as report: + recs = report.data["records"] + df_posix_counters = recs["POSIX"].to_df()["counters"] + df_posix_fcounters = recs["POSIX"].to_df()["fcounters"] + # NOTE: is it always true that counters and counters will + # have the same number of records for a given darshan log? + assert df_posix_counters.shape[0] == df_posix_fcounters.shape[0] + df_fused = pd.concat([df_posix_counters, df_posix_fcounters.iloc[..., 2:]], + axis=1) + df_fused.to_parquet(out_filepath, + compression="gzip") diff --git a/darshan-util/pydarshan/darshan/tests/test_log_utils.py b/darshan-util/pydarshan/darshan/tests/test_log_utils.py index 17c7468e6..dc25e2cd2 100644 --- a/darshan-util/pydarshan/darshan/tests/test_log_utils.py +++ b/darshan-util/pydarshan/darshan/tests/test_log_utils.py @@ -1,8 +1,10 @@ import os import darshan -from darshan.log_utils import get_log_path +from darshan.log_utils import get_log_path, convert_to_parquet +import numpy as np +import pandas as pd import pytest @@ -52,3 +54,39 @@ def test_failure_bad_logname(): with pytest.raises(FileNotFoundError, match="could not be found"): get_log_path("garbage_$*(&**.darshan") + + +@pytest.mark.parametrize("log_filename", [ + "runtime_and_dxt_heatmaps_diagonal_write_only.darshan", + ]) +def test_parquet_convert_simple(tmp_path, log_filename): + # crude/early stage testing for conversion from + # in-house binary darshan log format to parquet + # file format + try: + import pyarrow + except ImportError: + pytest.skip("pyarrow required for this test") + log_path = get_log_path(log_filename) + outfile = tmp_path / f"{log_filename}.gzip" + convert_to_parquet(log_path, outfile) + # check for appropriate data shape + # (POSIX module only for now) + actual_df = pd.read_parquet(outfile) + # there are 32 active ranks, and 71 + # counter columns + 17 fcounter columns + # for POSIX (we don't repeat rank and id) + assert actual_df.shape == (32, 88) + actual_dtypes = actual_df.dtypes.values + # rank is first column and should be int64 + assert actual_dtypes[0] == np.int64 + # id is second column and should be uint64 + assert actual_dtypes[1] == np.uint64 + # next 30 columns should be int64 counters + counter_type = np.unique(actual_dtypes[2:71]) + assert counter_type.size == 1 + assert counter_type[0] == np.int64 + # next 17 columns should float64 fcounters + fcounter_type = np.unique(actual_dtypes[71:88]) + assert fcounter_type.size == 1 + assert fcounter_type[0] == np.float64 From 1b1ef7582234533ea6d5efd97530091d1841472c Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Sat, 6 May 2023 15:09:02 -0600 Subject: [PATCH 2/2] MAINT: PR 929 revisions * minor mypy shims to get type checking passing again --- .../darshan/experimental/plots/plot_common_access_table.py | 2 +- darshan-util/pydarshan/mypy.ini | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py b/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py index aa5f5f93d..b1b8db7f4 100644 --- a/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py +++ b/darshan-util/pydarshan/darshan/experimental/plots/plot_common_access_table.py @@ -149,7 +149,7 @@ def plot_common_access_table(report: darshan.DarshanReport, mod: str, n_rows: in mod_df = report.records[mod].to_df(attach=None)["counters"] except AttributeError: # TODO: fix for parquet format - mod_df = report.iloc[..., :71] + mod_df = report.iloc[..., :71] # type: ignore if mod == "MPI-IO": mod = "MPIIO" diff --git a/darshan-util/pydarshan/mypy.ini b/darshan-util/pydarshan/mypy.ini index 60f7df846..ffddd8971 100644 --- a/darshan-util/pydarshan/mypy.ini +++ b/darshan-util/pydarshan/mypy.ini @@ -16,6 +16,9 @@ ignore_missing_imports = True [mypy-pandas.*] ignore_missing_imports = True +[mypy-pyarrow] +ignore_missing_imports = True + [mypy-seaborn] ignore_missing_imports = True