diff --git a/pipit/tests/trace.py b/pipit/tests/trace.py index 87c32f16..7731fe61 100644 --- a/pipit/tests/trace.py +++ b/pipit/tests/trace.py @@ -32,6 +32,42 @@ def test_comm_matrix(data_dir, ping_pong_otf2_trace): assert count_comm_matrix[0][1] == count_comm_matrix[1][0] == 8 +def test_comm_over_time(data_dir, ping_pong_otf2_trace): + ping_pong = Trace.from_otf2(str(ping_pong_otf2_trace)) + + hist, edges = ping_pong.comm_over_time(output="size", message_type="send", bins=5) + + assert len(edges) == 6 + assert all(hist[0:3] == 0) + assert hist[4] == 4177920 * 2 + + hist, edges = ping_pong.comm_over_time( + output="count", message_type="receive", bins=5 + ) + + assert len(edges) == 6 + assert all(hist[0:3] == 0) + assert hist[4] == 8 * 2 + + +def test_comm_by_process(data_dir, ping_pong_otf2_trace): + ping_pong = Trace.from_otf2(str(ping_pong_otf2_trace)) + + sizes = ping_pong.comm_by_process() + + assert sizes.loc[0]["Sent"] == 4177920 + assert sizes.loc[0]["Received"] == 4177920 + assert sizes.loc[1]["Sent"] == 4177920 + assert sizes.loc[1]["Received"] == 4177920 + + counts = ping_pong.comm_by_process(output="count") + + assert counts.loc[0]["Sent"] == 8 + assert counts.loc[0]["Received"] == 8 + assert counts.loc[1]["Sent"] == 8 + assert counts.loc[1]["Received"] == 8 + + def test_match_events(data_dir, ping_pong_otf2_trace): trace = Trace.from_otf2(str(ping_pong_otf2_trace)) trace._match_events() diff --git a/pipit/trace.py b/pipit/trace.py index 6db16ed6..c82c3040 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -454,6 +454,63 @@ def message_histogram(self, bins=20, **kwargs): return np.histogram(sizes, bins=bins, **kwargs) + def comm_over_time(self, output="size", message_type="send", bins=50, **kwargs): + """Returns histogram of communication volume over time. + + Args: + output (str, optional). Whether to calculate communication by "count" or + "size". Defaults to "size". + + message_type (str, optional): Whether to compute for sends or + receives. Defaults to "send". + + bins (int, optional): Number of bins in the histogram. Defaults to + 50. + + Returns: + hist: Volume in size or number of messages in each time interval + edges: Edges of time intervals + """ + # Filter by send or receive events + events = self.events[ + self.events["Name"].isin( + ["MpiSend", "MpiIsend"] + if message_type == "send" + else ["MpiRecv", "MpiIrecv"] + ) + ] + + # Get timestamps and sizes + timestamps = events["Timestamp (ns)"] + sizes = events["Attributes"].apply(lambda x: x["msg_length"]) + + return np.histogram( + timestamps, + bins=bins, + weights=sizes.tolist() if output == "size" else None, + range=[ + self.events["Timestamp (ns)"].min(), + self.events["Timestamp (ns)"].max(), + ], + **kwargs + ) + + def comm_by_process(self, output="size"): + """Returns total communication volume in size or number of messages per + process. + + Returns: + pd.DataFrame: DataFrame containing total communication volume or + number of messags sent and received by each process. + """ + comm_matrix = self.comm_matrix(output=output) + + # Get total sent and received for each process + sent = comm_matrix.sum(axis=1) + received = comm_matrix.sum(axis=0) + + return pd.DataFrame({"Sent": sent, "Received": received}).rename_axis("Process") + def flat_profile( self, metrics="time.exc", groupby_column="Name", per_process=False ):