Skip to content

Commit

Permalink
Trace: comm_by_process and comm_over_time (#91)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Abhinav Bhatele <[email protected]>
  • Loading branch information
hsirkar and bhatele authored Nov 14, 2023
1 parent 9c95a75 commit cdada37
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
36 changes: 36 additions & 0 deletions pipit/tests/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,42 @@ def test_comm_matrix(data_dir, ping_pong_otf2_trace):
assert count_comm_matrix[0][1] == count_comm_matrix[1][0] == 8


def test_comm_over_time(data_dir, ping_pong_otf2_trace):
ping_pong = Trace.from_otf2(str(ping_pong_otf2_trace))

hist, edges = ping_pong.comm_over_time(output="size", message_type="send", bins=5)

assert len(edges) == 6
assert all(hist[0:3] == 0)
assert hist[4] == 4177920 * 2

hist, edges = ping_pong.comm_over_time(
output="count", message_type="receive", bins=5
)

assert len(edges) == 6
assert all(hist[0:3] == 0)
assert hist[4] == 8 * 2


def test_comm_by_process(data_dir, ping_pong_otf2_trace):
ping_pong = Trace.from_otf2(str(ping_pong_otf2_trace))

sizes = ping_pong.comm_by_process()

assert sizes.loc[0]["Sent"] == 4177920
assert sizes.loc[0]["Received"] == 4177920
assert sizes.loc[1]["Sent"] == 4177920
assert sizes.loc[1]["Received"] == 4177920

counts = ping_pong.comm_by_process(output="count")

assert counts.loc[0]["Sent"] == 8
assert counts.loc[0]["Received"] == 8
assert counts.loc[1]["Sent"] == 8
assert counts.loc[1]["Received"] == 8


def test_match_events(data_dir, ping_pong_otf2_trace):
trace = Trace.from_otf2(str(ping_pong_otf2_trace))
trace._match_events()
Expand Down
57 changes: 57 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,63 @@ def message_histogram(self, bins=20, **kwargs):

return np.histogram(sizes, bins=bins, **kwargs)

def comm_over_time(self, output="size", message_type="send", bins=50, **kwargs):
"""Returns histogram of communication volume over time.
Args:
output (str, optional). Whether to calculate communication by "count" or
"size". Defaults to "size".
message_type (str, optional): Whether to compute for sends or
receives. Defaults to "send".
bins (int, optional): Number of bins in the histogram. Defaults to
50.
Returns:
hist: Volume in size or number of messages in each time interval
edges: Edges of time intervals
"""
# Filter by send or receive events
events = self.events[
self.events["Name"].isin(
["MpiSend", "MpiIsend"]
if message_type == "send"
else ["MpiRecv", "MpiIrecv"]
)
]

# Get timestamps and sizes
timestamps = events["Timestamp (ns)"]
sizes = events["Attributes"].apply(lambda x: x["msg_length"])

return np.histogram(
timestamps,
bins=bins,
weights=sizes.tolist() if output == "size" else None,
range=[
self.events["Timestamp (ns)"].min(),
self.events["Timestamp (ns)"].max(),
],
**kwargs
)

def comm_by_process(self, output="size"):
"""Returns total communication volume in size or number of messages per
process.
Returns:
pd.DataFrame: DataFrame containing total communication volume or
number of messags sent and received by each process.
"""
comm_matrix = self.comm_matrix(output=output)

# Get total sent and received for each process
sent = comm_matrix.sum(axis=1)
received = comm_matrix.sum(axis=0)

return pd.DataFrame({"Sent": sent, "Received": received}).rename_axis("Process")

def flat_profile(
self, metrics="time.exc", groupby_column="Name", per_process=False
):
Expand Down

0 comments on commit cdada37

Please sign in to comment.