diff --git a/pipit/trace.py b/pipit/trace.py index d12a8a1..fbcd37b 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -210,16 +210,14 @@ def _match_messages(self): receive_events_names = ["MpiRecv", "MpiIrecv"] - receive_events = self.events[self.events["Name".isin(receive_events_names)]] + receive_events = self.events[self.events["Name"].isin(receive_events_names)] # Queue is a dictionary in which each receiving process (key) will have - # a list of tuples that each contain information about an associated - # send event - queue = {process: [] for process in self.events["Process"].unique()} - + # another dictionary of sending processes (key) that will have a list of + # tuple that each contain information about an associated send event + queue: dict[int, dict[int, (int, int)]] = {} df_indices = list(send_events.index) timestamps = list(send_events["Timestamp (ns)"]) - names = list(send_events["Name"]) attrs = list(send_events["Attributes"]) processes = list(send_events["Process"]) @@ -228,19 +226,28 @@ def _match_messages(self): for i in range(len(send_events)): curr_df_index = df_indices[i] curr_timestamp = timestamps[i] - curr_name = names[i] curr_attrs = attrs[i] curr_process = processes[i] # Add current dataframe index, timestamp, and process to stack if "receiver" in curr_attrs: - queue[curr_attrs["receiver"]].append( - (curr_df_index, curr_timestamp, curr_name, curr_process) + receiving_process = curr_attrs["receiver"] + + # Add receiving process to queue if not already present + if receiving_process not in queue: + queue[receiving_process] = {} + # Add sending process to receiving process's queue + # if not already present + if curr_process not in queue[receiving_process]: + queue[receiving_process][curr_process] = [] + + # Add current dataframe index and timestamp to the correct queue + queue[receiving_process][curr_process].append( + (curr_df_index, curr_timestamp) ) df_indices = list(receive_events.index) timestamps = list(receive_events["Timestamp (ns)"]) - names = list(receive_events["Name"]) attrs = list(receive_events["Attributes"]) processes = list(receive_events["Process"]) @@ -248,31 +255,20 @@ def _match_messages(self): for i in range(len(receive_events)): curr_df_index = df_indices[i] curr_timestamp = timestamps[i] - curr_name = names[i] curr_attrs = attrs[i] curr_process = processes[i] if "sender" in curr_attrs: - send_process = None - i = 0 - - # We want to iterate through the queue in order - # until we find the corresponding "send" event - while send_process != curr_attrs["sender"] and i < len( - queue[curr_process] - ): - send_df_index, send_timestamp, send_name, send_process = queue[ - curr_process - ][i] - i += 1 - - if send_process == curr_attrs["sender"] and i <= len( - queue[curr_process] - ): - # remove matched event from queue - del queue[curr_process][i - 1] - - # Fill in the lists with the matching values if event found + # send_process = None + send_process = curr_attrs["sender"] + + if len(queue[curr_process][send_process]) > 0: + # Get the corresponding "send" event + send_df_index, send_timestamp = queue[curr_process][ + send_process + ].pop(0) + + # Fill in the lists with the matching values matching_events[send_df_index] = curr_df_index matching_events[curr_df_index] = send_df_index