Skip to content

Commit

Permalink
Trace: construct a CCT (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityaranjan authored Nov 14, 2023
1 parent 09142d1 commit ae4080e
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 18 deletions.
4 changes: 3 additions & 1 deletion pipit/readers/hpctoolkit_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,5 +1356,7 @@ def read(self) -> pipit.trace.Trace:
}
)

# cct is needed to create trace in hpctoolkit,
# so always return it as part of the trace
self.trace_df = trace_df
return pipit.trace.Trace(None, trace_df)
return pipit.trace.Trace(None, trace_df, self.meta_reader.cct)
9 changes: 7 additions & 2 deletions pipit/readers/nsight_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
class NsightReader:
"""Reader for Nsight trace files"""

def __init__(self, file_name) -> None:
def __init__(self, file_name, create_cct=False) -> None:
self.file_name = file_name
self.df = None
self.create_cct = create_cct

def read(self):
"""
Expand Down Expand Up @@ -103,4 +104,8 @@ def read(self):
# Applying the column list to the dataframe to rearrange
self.df = self.df.loc[:, cols]

return pipit.trace.Trace(None, self.df)
trace = pipit.trace.Trace(None, self.df)
if self.create_cct:
trace.create_cct()

return trace
9 changes: 7 additions & 2 deletions pipit/readers/otf2_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
class OTF2Reader:
"""Reader for OTF2 trace files"""

def __init__(self, dir_name, num_processes=None):
def __init__(self, dir_name, num_processes=None, create_cct=False):
self.dir_name = dir_name # directory of otf2 file being read
self.file_name = self.dir_name + "/traces.otf2"
self.create_cct = create_cct

num_cpus = mp.cpu_count()
if num_processes is None or num_processes < 1 or num_processes > num_cpus:
Expand Down Expand Up @@ -516,4 +517,8 @@ def read(self):

self.events = self.read_events() # events

return pipit.trace.Trace(self.definitions, self.events)
trace = pipit.trace.Trace(self.definitions, self.events)
if self.create_cct:
trace.create_cct()

return trace
13 changes: 10 additions & 3 deletions pipit/readers/projections_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#
# SPDX-License-Identifier: MIT


import os
import gzip
import pipit.trace
Expand Down Expand Up @@ -225,7 +224,9 @@ def read_sts_file(self):


class ProjectionsReader:
def __init__(self, projections_directory: str, num_processes=None) -> None:
def __init__(
self, projections_directory: str, num_processes=None, create_cct=False
) -> None:
if not os.path.isdir(projections_directory):
raise ValueError("Not a valid directory.")

Expand Down Expand Up @@ -270,6 +271,8 @@ def __init__(self, projections_directory: str, num_processes=None) -> None:
else:
self.num_processes = num_processes

self.create_cct = create_cct

# Returns an empty dict, used for reading log file into dataframe
@staticmethod
def _create_empty_dict() -> dict:
Expand Down Expand Up @@ -317,7 +320,11 @@ def read(self):
["Timestamp (ns)", "Event Type", "Name", "Process", "Attributes"]
]

return pipit.trace.Trace(None, trace_df)
trace = pipit.trace.Trace(None, trace_df)
if self.create_cct:
trace.create_cct()

return trace

def _read_log_file(self, rank_size) -> pd.DataFrame:
# has information needed in sts file
Expand Down
27 changes: 17 additions & 10 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

import numpy as np
import pandas as pd
from pipit.util.cct import create_cct


class Trace:
"""A trace dataset is read into an object of this type, which includes one
or more dataframes.
"""
A trace dataset is read into an object of this type, which
includes one or more dataframes and a calling context tree.
"""

def __init__(self, definitions, events):
def __init__(self, definitions, events, cct=None):
"""Create a new Trace object."""
self.definitions = definitions
self.events = events
self.cct = cct

# list of numeric columns which we can calculate inc/exc metrics with
self.numeric_cols = list(
Expand All @@ -26,13 +29,18 @@ def __init__(self, definitions, events):
self.inc_metrics = []
self.exc_metrics = []

def create_cct(self):
# adds a column of cct nodes to the events dataframe
# and stores the graph object in self.cct
self.cct = create_cct(self.events)

@staticmethod
def from_otf2(dirname, num_processes=None):
def from_otf2(dirname, num_processes=None, create_cct=False):
"""Read an OTF2 trace into a new Trace object."""
# import this lazily to avoid circular dependencies
from .readers.otf2_reader import OTF2Reader

return OTF2Reader(dirname, num_processes).read()
return OTF2Reader(dirname, num_processes, create_cct).read()

@staticmethod
def from_hpctoolkit(dirname):
Expand All @@ -43,20 +51,20 @@ def from_hpctoolkit(dirname):
return HPCToolkitReader(dirname).read()

@staticmethod
def from_projections(dirname, num_processes=None):
def from_projections(dirname, num_processes=None, create_cct=False):
"""Read a Projections trace into a new Trace object."""
# import this lazily to avoid circular dependencies
from .readers.projections_reader import ProjectionsReader

return ProjectionsReader(dirname, num_processes).read()
return ProjectionsReader(dirname, num_processes, create_cct).read()

@staticmethod
def from_nsight(filename):
def from_nsight(filename, create_cct=False):
"""Read an Nsight trace into a new Trace object."""
# import this lazily to avoid circular dependencies
from .readers.nsight_reader import NsightReader

return NsightReader(filename).read()
return NsightReader(filename, create_cct).read()

@staticmethod
def from_csv(filename):
Expand Down Expand Up @@ -354,7 +362,6 @@ def comm_matrix(self, output="size"):
Communication Matrix for Peer-to-Peer (P2P) MPI messages
Arguments:
1) output -
string to choose whether the communication volume should be measured
by bytes transferred between two processes or the number of messages
Expand Down
4 changes: 4 additions & 0 deletions pipit/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright 2023 Parallel Software and Systems Group, University of Maryland.
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
116 changes: 116 additions & 0 deletions pipit/util/cct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2023 Parallel Software and Systems Group, University of Maryland.
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from pipit.graph import Graph, Node


def create_cct(events):
"""
Generic function to iterate through the events dataframe and create a CCT.
Uses pipit's graph data structure for this. Returns a CCT
and creates a new column in the Events DataFrame that stores
a reference to each row's corresponding node in the CCT.
"""

# CCT and list of nodes in DataFrame
graph = Graph()
graph_nodes = [None for i in range(len(events))]

# determines whether a node exists or not
callpath_to_node = dict()

node_id = 0 # each node has a unique id

# Filter the DataFrame to only Enter/Leave
enter_leave_df = events.loc[events["Event Type"].isin(["Enter", "Leave"])]

# list of processes and/or threads to iterate over
if "Thread" in events.columns:
exec_locations = set(zip(events["Process"], events["Thread"]))
has_thread = True
else:
exec_locations = set(events["Process"])
has_thread = False

for curr_loc in exec_locations:
# only filter by thread if the trace has a thread column
if has_thread:
curr_process, curr_thread = curr_loc
filtered_df = enter_leave_df.loc[
(enter_leave_df["Process"] == curr_process)
& (enter_leave_df["Thread"] == curr_thread)
]
else:
filtered_df = enter_leave_df.loc[(enter_leave_df["Process"] == curr_loc)]

curr_depth, callpath = 0, ""

"""
Iterating over lists instead of
DataFrame columns is more efficient
"""
df_indices = filtered_df.index.to_list()
function_names = filtered_df["Name"].to_list()
event_types = filtered_df["Event Type"].to_list()

# stacks used to iterate through the trace and add nodes to the cct
functions_stack, nodes_stack = [], []

# iterating over the events of the current thread's trace
for i in range(len(filtered_df)):
curr_df_index, evt_type, function_name = (
df_indices[i],
event_types[i],
function_names[i],
)

# encounter a new function through its entry point.
if evt_type == "Enter":
# add the function to the stack and get the call path
functions_stack.append(function_name)
callpath = "->".join(functions_stack)

# get the parent node of the function if it exists
parent_node = None if curr_depth == 0 else nodes_stack[-1]

if callpath in callpath_to_node:
# don't create new node if callpath is in map
curr_node = callpath_to_node[callpath]
else:
# create new node if callpath isn't in map
curr_node = Node(node_id, parent_node, curr_depth)
callpath_to_node[callpath] = curr_node
node_id += 1

# add node as root or child of its
# parent depending on current depth
graph.add_root(
curr_node
) if curr_depth == 0 else parent_node.add_child(curr_node)

# Update nodes stack, column, and current depth
nodes_stack.append(curr_node)
graph_nodes[curr_df_index] = curr_node
curr_depth += 1
else:
# we want to iterate through the stack in reverse order
# until we find the corresponding "Enter" Event
enter_name, j = None, len(functions_stack) - 1
while enter_name != function_name and j > -1:
enter_name = functions_stack[j]
j -= 1

if enter_name == function_name:
# update stacks and current depth
del functions_stack[j + 1]
del nodes_stack[j + 1]
curr_depth -= 1
else:
continue

# Update the Trace with the generated cct
events["Graph_Node"] = graph_nodes

return graph

0 comments on commit ae4080e

Please sign in to comment.