From bb020d1bf0fd6a34c0d6531e2dfb9911fed73786 Mon Sep 17 00:00:00 2001 From: Rakrish Dhakal Date: Thu, 28 Mar 2024 00:41:30 -0400 Subject: [PATCH 1/6] initial commit --- pipit/trace.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pipit/trace.py b/pipit/trace.py index 2b4f111a..4a8557f6 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -190,6 +190,78 @@ def _match_events(self): self.events = self.events.astype({"_matching_event": "Int32"}) + def _match_messages(self): + """ + Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events + """ + if "_matching_event" not in self.events.columns: + self.events["_matching_event"] = None + + if "_matching_timestamp" not in self.events.columns: + self.events["_matching_timestamp"] = np.nan + + matching_events = list(self.events["_matching_event"]) + matching_times = list(self.events["_matching_timestamp"]) + + mpi_events = self.events[ + self.events["Name"].isin(["MpiSend", "MpiRecv", "MpiIsend", "MpiIrecv"]) + ] + + queue = [[] for _ in range(len(self.events["Process"].unique()))] + + df_indices = list(mpi_events.index) + timestamps = list(mpi_events["Timestamp (ns)"]) + names = list(mpi_events["Name"]) + attrs = list(mpi_events["Attributes"]) + processes = list(mpi_events["Process"]) + + # Iterate through all events + for i in range(len(mpi_events)): + curr_df_index = df_indices[i] + curr_timestamp = timestamps[i] + curr_name = names[i] + curr_attrs = attrs[i] + curr_process = processes[i] + + if curr_name == "MpiSend" or curr_name == "MpiIsend": + # Add current dataframe index, timestmap, and process to stack + if "receiver" in curr_attrs: + queue[curr_attrs["receiver"]].append( + (curr_df_index, curr_timestamp, curr_name, curr_process) + ) + elif curr_name == "MpiRecv" or curr_name == "MpiIrecv": + if "sender" in curr_attrs: + send_process = None + i = 0 + + # we want to iterate through the queue in order + # until we find the corresponding "send" event + while send_process != curr_attrs["sender"] and i < len( + queue[curr_process] + ): + send_df_index, send_timestamp, send_name, send_process = queue[ + curr_process + ][i] + i += 1 + + if send_process == curr_attrs["sender"] and i <= len( + queue[curr_process] + ): + # remove matched event from queue + del queue[curr_process][i - 1] + + # Fill in the lists with the matching values if event found + matching_events[send_df_index] = curr_df_index + matching_events[curr_df_index] = send_df_index + + matching_times[send_df_index] = curr_timestamp + matching_times[curr_df_index] = send_timestamp + + self.events["_matching_event"] = matching_events + self.events["_matching_timestamp"] = matching_times + + self.events = self.events.astype({"_matching_event": "Int32"}) + def _match_caller_callee(self): """Matches callers (parents) to callees (children) and adds three columns to the dataframe: From 0840a8b10e151904f0dde753487fabafc5190332 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sat, 28 Sep 2024 18:50:05 -0400 Subject: [PATCH 2/6] Make adjustments to _match_messages --- pipit/trace.py | 116 +++++++++++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 43 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 4a8557f6..c105732b 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -203,59 +203,89 @@ def _match_messages(self): matching_events = list(self.events["_matching_event"]) matching_times = list(self.events["_matching_timestamp"]) - mpi_events = self.events[ - self.events["Name"].isin(["MpiSend", "MpiRecv", "MpiIsend", "MpiIrecv"]) + + # Filter by send/receive events + send_events_names = ["MpiSend", "MpiISend"] + + send_events = self.events[ + self.events["Name"].isin(send_events_names) + ] + + receive_events_names = ["MpiRecv", "MpiIrecv"] + + receive_events = self.events[ + self.events["Name".isin(receive_events_names)] ] - queue = [[] for _ in range(len(self.events["Process"].unique()))] + # Queue is a dictionary in which each receiving process (key) will have + # a list of tuples that each contain information about an associated + # send event + queue = {process: [] for process in self.events["Process"].unique()} - df_indices = list(mpi_events.index) - timestamps = list(mpi_events["Timestamp (ns)"]) - names = list(mpi_events["Name"]) - attrs = list(mpi_events["Attributes"]) - processes = list(mpi_events["Process"]) + df_indices = list(send_events.index) + timestamps = list(send_events["Timestamp (ns)"]) + names = list(send_events["Name"]) + attrs = list(send_events["Attributes"]) + processes = list(send_events["Process"]) - # Iterate through all events - for i in range(len(mpi_events)): + # First iterate over send events, adding each event's information + # to the receiver's list in the dictionary + for i in range(len(send_events)): curr_df_index = df_indices[i] curr_timestamp = timestamps[i] curr_name = names[i] curr_attrs = attrs[i] curr_process = processes[i] - if curr_name == "MpiSend" or curr_name == "MpiIsend": - # Add current dataframe index, timestmap, and process to stack - if "receiver" in curr_attrs: - queue[curr_attrs["receiver"]].append( - (curr_df_index, curr_timestamp, curr_name, curr_process) - ) - elif curr_name == "MpiRecv" or curr_name == "MpiIrecv": - if "sender" in curr_attrs: - send_process = None - i = 0 - - # we want to iterate through the queue in order - # until we find the corresponding "send" event - while send_process != curr_attrs["sender"] and i < len( - queue[curr_process] - ): - send_df_index, send_timestamp, send_name, send_process = queue[ - curr_process - ][i] - i += 1 - - if send_process == curr_attrs["sender"] and i <= len( - queue[curr_process] - ): - # remove matched event from queue - del queue[curr_process][i - 1] - - # Fill in the lists with the matching values if event found - matching_events[send_df_index] = curr_df_index - matching_events[curr_df_index] = send_df_index - - matching_times[send_df_index] = curr_timestamp - matching_times[curr_df_index] = send_timestamp + # Add current dataframe index, timestamp, and process to stack + if "receiver" in curr_attrs: + queue[curr_attrs["receiver"]].append( + (curr_df_index, curr_timestamp, curr_name, curr_process) + ) + + + df_indices = list(receive_events.index) + timestamps = list(receive_events["Timestamp (ns)"]) + names = list(receive_events["Name"]) + attrs = list(receive_events["Attributes"]) + processes = list(receive_events["Process"]) + + # Now iterate over receive events + for i in range(len(receive_events)): + curr_df_index = df_indices[i] + curr_timestamp = timestamps[i] + curr_name = names[i] + curr_attrs = attrs[i] + curr_process = processes[i] + + if "sender" in curr_attrs: + send_process = None + i = 0 + + # We want to iterate through the queue in order + # until we find the corresponding "send" event + while send_process != curr_attrs["sender"] and i < len( + queue[curr_process] + ): + send_df_index, send_timestamp, send_name, send_process = queue[ + curr_process + ][i] + i += 1 + + if send_process == curr_attrs["sender"] and i <= len( + queue[curr_process] + ): + # remove matched event from queue + del queue[curr_process][i - 1] + + # Fill in the lists with the matching values if event found + matching_events[send_df_index] = curr_df_index + matching_events[curr_df_index] = send_df_index + + matching_times[send_df_index] = curr_timestamp + matching_times[curr_df_index] = send_timestamp + + self.events["_matching_event"] = matching_events self.events["_matching_timestamp"] = matching_times From 3e65096590b7178c78e6ab491ecd61b77d104678 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sat, 28 Sep 2024 19:15:39 -0400 Subject: [PATCH 3/6] Fix some formatting things --- pipit/trace.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index c105732b..d12a8a12 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -203,19 +203,14 @@ def _match_messages(self): matching_events = list(self.events["_matching_event"]) matching_times = list(self.events["_matching_timestamp"]) - # Filter by send/receive events send_events_names = ["MpiSend", "MpiISend"] - send_events = self.events[ - self.events["Name"].isin(send_events_names) - ] + send_events = self.events[self.events["Name"].isin(send_events_names)] receive_events_names = ["MpiRecv", "MpiIrecv"] - receive_events = self.events[ - self.events["Name".isin(receive_events_names)] - ] + receive_events = self.events[self.events["Name".isin(receive_events_names)]] # Queue is a dictionary in which each receiving process (key) will have # a list of tuples that each contain information about an associated @@ -243,7 +238,6 @@ def _match_messages(self): (curr_df_index, curr_timestamp, curr_name, curr_process) ) - df_indices = list(receive_events.index) timestamps = list(receive_events["Timestamp (ns)"]) names = list(receive_events["Name"]) @@ -285,8 +279,6 @@ def _match_messages(self): matching_times[send_df_index] = curr_timestamp matching_times[curr_df_index] = send_timestamp - - self.events["_matching_event"] = matching_events self.events["_matching_timestamp"] = matching_times From 2ee3f03ef8da966c2c438de628a762e66ee68cdf Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Mon, 30 Sep 2024 14:25:28 -0400 Subject: [PATCH 4/6] updating how queue is used in match_messages --- pipit/trace.py | 58 +++++++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index d12a8a12..fbcd37b2 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -210,16 +210,14 @@ def _match_messages(self): receive_events_names = ["MpiRecv", "MpiIrecv"] - receive_events = self.events[self.events["Name".isin(receive_events_names)]] + receive_events = self.events[self.events["Name"].isin(receive_events_names)] # Queue is a dictionary in which each receiving process (key) will have - # a list of tuples that each contain information about an associated - # send event - queue = {process: [] for process in self.events["Process"].unique()} - + # another dictionary of sending processes (key) that will have a list of + # tuple that each contain information about an associated send event + queue: dict[int, dict[int, (int, int)]] = {} df_indices = list(send_events.index) timestamps = list(send_events["Timestamp (ns)"]) - names = list(send_events["Name"]) attrs = list(send_events["Attributes"]) processes = list(send_events["Process"]) @@ -228,19 +226,28 @@ def _match_messages(self): for i in range(len(send_events)): curr_df_index = df_indices[i] curr_timestamp = timestamps[i] - curr_name = names[i] curr_attrs = attrs[i] curr_process = processes[i] # Add current dataframe index, timestamp, and process to stack if "receiver" in curr_attrs: - queue[curr_attrs["receiver"]].append( - (curr_df_index, curr_timestamp, curr_name, curr_process) + receiving_process = curr_attrs["receiver"] + + # Add receiving process to queue if not already present + if receiving_process not in queue: + queue[receiving_process] = {} + # Add sending process to receiving process's queue + # if not already present + if curr_process not in queue[receiving_process]: + queue[receiving_process][curr_process] = [] + + # Add current dataframe index and timestamp to the correct queue + queue[receiving_process][curr_process].append( + (curr_df_index, curr_timestamp) ) df_indices = list(receive_events.index) timestamps = list(receive_events["Timestamp (ns)"]) - names = list(receive_events["Name"]) attrs = list(receive_events["Attributes"]) processes = list(receive_events["Process"]) @@ -248,31 +255,20 @@ def _match_messages(self): for i in range(len(receive_events)): curr_df_index = df_indices[i] curr_timestamp = timestamps[i] - curr_name = names[i] curr_attrs = attrs[i] curr_process = processes[i] if "sender" in curr_attrs: - send_process = None - i = 0 - - # We want to iterate through the queue in order - # until we find the corresponding "send" event - while send_process != curr_attrs["sender"] and i < len( - queue[curr_process] - ): - send_df_index, send_timestamp, send_name, send_process = queue[ - curr_process - ][i] - i += 1 - - if send_process == curr_attrs["sender"] and i <= len( - queue[curr_process] - ): - # remove matched event from queue - del queue[curr_process][i - 1] - - # Fill in the lists with the matching values if event found + # send_process = None + send_process = curr_attrs["sender"] + + if len(queue[curr_process][send_process]) > 0: + # Get the corresponding "send" event + send_df_index, send_timestamp = queue[curr_process][ + send_process + ].pop(0) + + # Fill in the lists with the matching values matching_events[send_df_index] = curr_df_index matching_events[curr_df_index] = send_df_index From 098318e8c40af27979c3a028df32002c82a4ffbe Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Tue, 29 Oct 2024 21:26:21 -0400 Subject: [PATCH 5/6] updated to use new _matching_message_event column --- pipit/trace.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 671e30c2..f3f91363 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -194,14 +194,10 @@ def _match_messages(self): """ Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events """ - if "_matching_event" not in self.events.columns: - self.events["_matching_event"] = None - - if "_matching_timestamp" not in self.events.columns: - self.events["_matching_timestamp"] = np.nan + if "_matching_message_event" not in self.events.columns: + self.events["_matching_message_event"] = None - matching_events = list(self.events["_matching_event"]) - matching_times = list(self.events["_matching_timestamp"]) + matching_events = list(self.events["_matching_message_event"]) # Filter by send/receive events send_events_names = ["MpiSend", "MpiISend"] @@ -272,13 +268,9 @@ def _match_messages(self): matching_events[send_df_index] = curr_df_index matching_events[curr_df_index] = send_df_index - matching_times[send_df_index] = curr_timestamp - matching_times[curr_df_index] = send_timestamp - - self.events["_matching_event"] = matching_events - self.events["_matching_timestamp"] = matching_times - self.events = self.events.astype({"_matching_event": "Int32"}) + self.events["_matching_message_event"] = matching_events + self.events = self.events.astype({"_matching_message_event": "Int32"}) def _match_caller_callee(self): """Matches callers (parents) to callees (children) and adds three From 8c20fb93e9b56791abfc3c498cb3bb46c98a1510 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Tue, 29 Oct 2024 22:44:34 -0400 Subject: [PATCH 6/6] updated match messages to match the enter/leave events to the instant events --- pipit/trace.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index f3f91363..391b0c04 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -193,19 +193,22 @@ def _match_events(self): def _match_messages(self): """ Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events + in a new column _matching_message_event """ if "_matching_message_event" not in self.events.columns: self.events["_matching_message_event"] = None + else: + return matching_events = list(self.events["_matching_message_event"]) # Filter by send/receive events + send_event_wrapper_names = ["MPI_Send"] send_events_names = ["MpiSend", "MpiISend"] - send_events = self.events[self.events["Name"].isin(send_events_names)] + receive_event_wrapper_names = ["MPI_Recv"] receive_events_names = ["MpiRecv", "MpiIrecv"] - receive_events = self.events[self.events["Name"].isin(receive_events_names)] # Queue is a dictionary in which each receiving process (key) will have @@ -243,14 +246,12 @@ def _match_messages(self): ) df_indices = list(receive_events.index) - timestamps = list(receive_events["Timestamp (ns)"]) attrs = list(receive_events["Attributes"]) processes = list(receive_events["Process"]) # Now iterate over receive events for i in range(len(receive_events)): curr_df_index = df_indices[i] - curr_timestamp = timestamps[i] curr_attrs = attrs[i] curr_process = processes[i] @@ -267,9 +268,32 @@ def _match_messages(self): # Fill in the lists with the matching values matching_events[send_df_index] = curr_df_index matching_events[curr_df_index] = send_df_index + # update matched message event for instant events + self.events["_matching_message_event"] = matching_events + # match send enter/leave events with their corresponding instant events + self.events.loc[ + (self.events["Name"].isin(send_event_wrapper_names)) + & (self.events["Event Type"] == "Enter"), + "_matching_message_event", + ] = send_events.index + self.events.loc[ + (self.events["Name"].isin(send_event_wrapper_names)) + & (self.events["Event Type"] == "Leave"), + "_matching_message_event", + ] = send_events.index + # match receive enter/leave events with their corresponding instant events + self.events.loc[ + (self.events["Name"].isin(receive_event_wrapper_names)) + & (self.events["Event Type"] == "Enter"), + "_matching_message_event", + ] = receive_events.index + self.events.loc[ + (self.events["Name"].isin(receive_event_wrapper_names)) + & (self.events["Event Type"] == "Leave"), + "_matching_message_event", + ] = receive_events.index - self.events["_matching_message_event"] = matching_events self.events = self.events.astype({"_matching_message_event": "Int32"}) def _match_caller_callee(self):