Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_match_messages #131

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,96 @@ def _match_events(self):

self.events = self.events.astype({"_matching_event": "Int32"})

def _match_messages(self):
movsesyanae marked this conversation as resolved.
Show resolved Hide resolved
"""
Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events
"""
if "_matching_event" not in self.events.columns:
self.events["_matching_event"] = None

if "_matching_timestamp" not in self.events.columns:
self.events["_matching_timestamp"] = np.nan

matching_events = list(self.events["_matching_event"])
matching_times = list(self.events["_matching_timestamp"])

movsesyanae marked this conversation as resolved.
Show resolved Hide resolved
# Filter by send/receive events
send_events_names = ["MpiSend", "MpiISend"]

send_events = self.events[self.events["Name"].isin(send_events_names)]

receive_events_names = ["MpiRecv", "MpiIrecv"]

receive_events = self.events[self.events["Name"].isin(receive_events_names)]

# Queue is a dictionary in which each receiving process (key) will have
# another dictionary of sending processes (key) that will have a list of
# tuple that each contain information about an associated send event
queue: dict[int, dict[int, (int, int)]] = {}
df_indices = list(send_events.index)
timestamps = list(send_events["Timestamp (ns)"])
attrs = list(send_events["Attributes"])
processes = list(send_events["Process"])

# First iterate over send events, adding each event's information
# to the receiver's list in the dictionary
for i in range(len(send_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_attrs = attrs[i]
curr_process = processes[i]

# Add current dataframe index, timestamp, and process to stack
if "receiver" in curr_attrs:
receiving_process = curr_attrs["receiver"]

# Add receiving process to queue if not already present
if receiving_process not in queue:
queue[receiving_process] = {}
# Add sending process to receiving process's queue
# if not already present
if curr_process not in queue[receiving_process]:
queue[receiving_process][curr_process] = []

# Add current dataframe index and timestamp to the correct queue
queue[receiving_process][curr_process].append(
(curr_df_index, curr_timestamp)
)

df_indices = list(receive_events.index)
timestamps = list(receive_events["Timestamp (ns)"])
attrs = list(receive_events["Attributes"])
processes = list(receive_events["Process"])

# Now iterate over receive events
for i in range(len(receive_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_attrs = attrs[i]
curr_process = processes[i]

if "sender" in curr_attrs:
# send_process = None
send_process = curr_attrs["sender"]

if len(queue[curr_process][send_process]) > 0:
# Get the corresponding "send" event
send_df_index, send_timestamp = queue[curr_process][
send_process
].pop(0)

# Fill in the lists with the matching values
matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index

matching_times[send_df_index] = curr_timestamp
matching_times[curr_df_index] = send_timestamp

self.events["_matching_event"] = matching_events
self.events["_matching_timestamp"] = matching_times

self.events = self.events.astype({"_matching_event": "Int32"})

def _match_caller_callee(self):
"""Matches callers (parents) to callees (children) and adds three
columns to the dataframe:
Expand Down
Loading