Skip to content

Commit

Permalink
Merge pull request #37 from score-p/feature/performance-monitoring
Browse files Browse the repository at this point in the history
Feature/performance monitoring
  • Loading branch information
elwer authored Nov 21, 2024
2 parents 51c9356 + 174732b commit b5d78f5
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 102 deletions.
189 changes: 105 additions & 84 deletions src/jumper/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from itables import show
from jumper.userpersistence import PersHelper, scorep_script_name
from jumper.userpersistence import magics_cleanup

import importlib
from jumper.perfdatahandler import PerformanceDataHandler
import jumper.visualization as perfvis

Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(self, **kwargs):
self.scorep_available_ = shutil.which("scorep")
self.scorep_python_available_ = True
try:
import scorep
importlib.import_module("scorep")
except ModuleNotFoundError:
self.scorep_python_available_ = False

Expand All @@ -121,13 +121,14 @@ def standard_reply(self):

def scorep_not_available(self):
if not self.scorep_available_:
self.cell_output("Score-P not available, cell ignored.",
"stderr")
self.cell_output("Score-P not available, cell ignored.", "stderr")
return self.standard_reply()
if not self.scorep_python_available_:
self.cell_output("Score-P Python not available, cell ignored. "
"Consider installing it via `pip install scorep`",
"stderr")
self.cell_output(
"Score-P Python not available, cell ignored. "
"Consider installing it via `pip install scorep`",
"stderr",
)
return self.standard_reply()
else:
return None
Expand Down Expand Up @@ -268,7 +269,7 @@ def append_multicellmode(self, code):
f"print('Executing cell {self.multicell_cellcount}')\n"
+ f"print('''{code}''')\n"
+ f"print('-' * {max_line_len})\n"
+ f"print('MCM_TS'+str(time.time()))\n"
+ "print('MCM_TS'+str(time.time()))\n"
+ f"{code}\n"
+ "print('''\n''')\n"
)
Expand Down Expand Up @@ -673,7 +674,6 @@ def report_perfdata(self, performance_data_nodes, duration):
async def scorep_execute(
self,
code,
code_for_history,
silent,
user_expressions=None,
allow_stdin=False,
Expand Down Expand Up @@ -701,7 +701,6 @@ async def scorep_execute(
os.open(scorep_script_name, os.O_WRONLY | os.O_CREAT), "w"
) as file:
file.write(self.pershelper.subprocess_wrapper(code))

# For disk mode use implicit synchronization between kernel and
# subprocess: await jupyter_dump, subprocess.wait(),
# await jupyter_update Ghost cell - dump current Jupyter session for
Expand Down Expand Up @@ -751,6 +750,7 @@ async def scorep_execute(
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env
)

self.perfdata_handler.start_perfmonitor(proc.pid)
# For memory mode jupyter_dump and jupyter_update must be awaited
# concurrently to the running subprocess
Expand Down Expand Up @@ -809,27 +809,38 @@ async def scorep_execute(
# explicit timestamps, but aligns the colorization of the plot based
# on the number of perf measurements we have, which is individual per
# node

time_indices = None
if len(multicellmode_timestamps):
# retrieve the index this cell will have in the global history
sub_idx = len(self.perfdata_handler.get_code_history())
# append to have end of last code fragment
multicellmode_timestamps.append("MCM_TS"+str(time.time()))
multicellmode_timestamps.append("MCM_TS" + str(time.time()))
time_indices = [[]]
nb_ms = 0.0
for idx, ts_string in enumerate(multicellmode_timestamps[:-1]):
secs = (float(multicellmode_timestamps[idx+1][6:]) -
float(ts_string[6:]))
nb_ms += (secs /
int(os.environ.get("JUMPER_REPORT_FREQUENCY", 2)))
secs = float(multicellmode_timestamps[idx + 1][6:]) - float(
ts_string[6:]
)
nb_ms += secs / int(
os.environ.get("JUMPER_REPORT_FREQUENCY", 2)
)
if nb_ms >= 1.0:
# only consider if we have measurements
time_indices[0].append((str(sub_idx)+"_"+str(idx), nb_ms))
time_indices[0].append(
(str(sub_idx) + "_" + str(idx), nb_ms)
)
nb_ms %= 1.0
# add time for last to last measurement

if nb_ms >= 0.0:
sub_idx, val = time_indices[0][-1]
time_indices[0][-1] = (sub_idx, val + nb_ms)
if len(time_indices[0]):
sub_idx, val = time_indices[0][-1]
time_indices[0][-1] = (sub_idx, val + nb_ms)
else:
time_indices[0].append(
(str(sub_idx) + "_" + str(0), nb_ms)
)

nb_ms = 0.0
for idx, val in enumerate(time_indices[0]):
Expand Down Expand Up @@ -898,7 +909,7 @@ async def scorep_execute(
f"Instrumentation results can be found in {scorep_folder}"
)
else:
# Find last creasted directory with scorep* name
# Find last created directory with scorep* name
# TODO: Directory isn't created local when running scorep-collector
max_iterations = 5
while max_iterations > 0:
Expand Down Expand Up @@ -932,13 +943,22 @@ async def scorep_execute(
self.pershelper.postprocess()
if performance_data_nodes:
self.report_perfdata(performance_data_nodes, duration)
self.perfdata_handler.append_code(datetime.datetime.now(),
code_for_history, time_indices)
self.perfdata_handler.append_code(
datetime.datetime.now(), code, time_indices
)
return self.standard_reply()

async def do_execute(self, code, silent, store_history=False,
user_expressions=None, allow_stdin=False, *,
cell_id=None, **kwargs):
async def do_execute(
self,
code,
silent,
store_history=False,
user_expressions=None,
allow_stdin=False,
*,
cell_id=None,
**kwargs,
):
"""
Override of do_execute() method of IPythonKernel. If no custom magic
commands specified, execute cell with super().do_execute(),
Expand Down Expand Up @@ -972,20 +992,20 @@ async def do_execute(self, code, silent, store_history=False,
"""
if code.startswith("%%display_graph_for_last"):
if not len(self.perfdata_handler.get_perfdata_history()):
self.cell_output(
"No performance data available."
)
self.cell_output("No performance data available.")
time_indices = self.perfdata_handler.get_time_indices()[-1]
if time_indices:
sub_idxs = [x[0] for x in time_indices[0]]
self.cell_output(f"Cell seemed to be tracked in multi cell"
" mode. Got performance data for the"
f" following sub cells: {sub_idxs}")
self.cell_output(
f"Cell seemed to be tracked in multi cell"
" mode. Got performance data for the"
f" following sub cells: {sub_idxs}"
)
perfvis.draw_performance_graph(
self.nodelist,
self.perfdata_handler.get_perfdata_history()[-1],
self.gpu_avail,
time_indices
time_indices,
)
return self.standard_reply()
elif code.startswith("%%display_graph_for_index"):
Expand All @@ -1005,14 +1025,16 @@ async def do_execute(self, code, silent, store_history=False,
time_indices = self.perfdata_handler.get_time_indices()[index]
if time_indices:
sub_idxs = [x[0] for x in time_indices[0]]
self.cell_output(f"Cell seemed to be tracked in multi cell"
" mode. Got performance data for the"
f" following sub cells: {sub_idxs}")
self.cell_output(
f"Cell seemed to be tracked in multi cell"
" mode. Got performance data for the"
f" following sub cells: {sub_idxs}"
)
perfvis.draw_performance_graph(
self.nodelist,
self.perfdata_handler.get_perfdata_history()[index],
self.gpu_avail,
time_indices
time_indices,
)
return self.standard_reply()
elif code.startswith("%%display_graph_for_all"):
Expand Down Expand Up @@ -1057,8 +1079,9 @@ async def do_execute(self, code, silent, store_history=False,
pd.DataFrame(
self.perfdata_handler.get_code_history(),
columns=["timestamp", "code"],
).reset_index(), layout={"topStart": "search", "topEnd": None},
columnDefs=[{"className": 'dt-left', "targets": 2}],
).reset_index(),
layout={"topStart": "search", "topEnd": None},
columnDefs=[{"className": "dt-left", "targets": 2}],
)
return self.standard_reply()
elif code.startswith("%%perfdata_to_variable"):
Expand All @@ -1078,15 +1101,18 @@ async def do_execute(self, code, silent, store_history=False,
# measurements, e.g. (2_0, 5), (2_1, 3), (2_2, 7)
mcm_time_indices = self.perfdata_handler.get_time_indices()
mcm_time_indices = list(
filter(lambda item: item is not None, mcm_time_indices))
filter(lambda item: item is not None, mcm_time_indices)
)

code = (f"{varname}="
f"{self.perfdata_handler.get_perfdata_history()}")
code = (
f"{varname}="
f"{self.perfdata_handler.get_perfdata_history()}"
)

if mcm_time_indices:
code += f"\n{varname}.append({mcm_time_indices})"

await super().do_execute(code,silent=True)
await super().do_execute(code, silent=True)
self.cell_output(
"Exported performance data to "
+ str(varname)
Expand All @@ -1097,10 +1123,10 @@ async def do_execute(self, code, silent, store_history=False,
self.cell_output(
"Detected that cells were executed in multi cell mode."
+ f"Last entry in {varname} is a list that contains "
f"the sub indices per cell that were executed in "
f"in multi cell mode and a counter for the number of"
f" performance measurements within this sub cell, "
f"e.g. f{mcm_time_indices[-1]}",
f"the sub indices per cell that were executed in "
f"in multi cell mode and a counter for the number of"
f" performance measurements within this sub cell, "
f"e.g. f{mcm_time_indices[-1]}",
"stdout",
)
return self.standard_reply()
Expand Down Expand Up @@ -1137,8 +1163,9 @@ async def do_execute(self, code, silent, store_history=False,
elif code.startswith("%%set_perfmonitor"):
return self.set_perfmonitor(code)
elif code.startswith("%%scorep_python_binding_arguments"):
return (self.scorep_not_available() or
self.set_scorep_pythonargs(code))
return self.scorep_not_available() or self.set_scorep_pythonargs(
code
)
elif code.startswith("%%serializer_settings"):
self.cell_output(
"Deprecated. Use: %%marshalling_settings"
Expand All @@ -1147,51 +1174,45 @@ async def do_execute(self, code, silent, store_history=False,
)
return self.standard_reply()
elif code.startswith("%%marshalling_settings"):
return (self.scorep_not_available() or
self.marshaller_settings(code))
return self.scorep_not_available() or self.marshaller_settings(
code
)
elif code.startswith("%%enable_multicellmode"):
return self.scorep_not_available() or self.enable_multicellmode()
elif code.startswith("%%abort_multicellmode"):
return self.scorep_not_available() or self.abort_multicellmode()
elif code.startswith("%%finalize_multicellmode"):
# Cannot be put into a separate function due to tight coupling
# between do_execute and scorep_execute
if not self.scorep_available_:
self.cell_output(
"Score-P not available, cell ignored.", "stderr"
)
return self.standard_reply()
else:
if self.mode == KernelMode.MULTICELL:
self.mode = KernelMode.DEFAULT
try:
reply_status = await self.scorep_execute(
self.multicell_code,
silent,
store_history,
user_expressions,
allow_stdin,
cell_id=cell_id,
)
except Exception:
self.cell_output(
"KernelError: Multicell execution failed.",
"stderr",
)
return self.standard_reply()
self.multicell_code = ""
self.multicell_cellcount = 0
return reply_status
elif self.mode == KernelMode.WRITEFILE:
self.writefile_multicell = False
return self.standard_reply()
else:
if self.mode == KernelMode.MULTICELL:
self.mode = KernelMode.DEFAULT
try:
reply_status = await self.scorep_execute(
self.multicell_code,
silent,
user_expressions,
allow_stdin,
cell_id=cell_id,
)
except Exception as e:
self.cell_output(
f"KernelWarning: Currently in {self.mode},"
f" ignore command",
"KernelError: Multicell execution failed.",
"stderr",
)
return self.standard_reply()
self.multicell_code = ""
self.multicell_cellcount = -1
return reply_status
elif self.mode == KernelMode.WRITEFILE:
self.writefile_multicell = False
return self.standard_reply()
else:
self.cell_output(
f"KernelWarning: Currently in {self.mode},"
f" ignore command",
"stderr",
)
return self.standard_reply()
elif code.startswith("%%start_writefile"):
return self.scorep_not_available() or self.start_writefile(code)
elif code.startswith("%%abort_writefile"):
Expand All @@ -1205,7 +1226,6 @@ async def do_execute(self, code, silent, store_history=False,
return await self.scorep_execute(
code.split("\n", 1)[1],
silent,
store_history,
user_expressions,
allow_stdin,
cell_id=cell_id,
Expand Down Expand Up @@ -1242,8 +1262,9 @@ async def do_execute(self, code, silent, store_history=False,
)
if performance_data_nodes:
self.report_perfdata(performance_data_nodes, duration)
self.perfdata_handler.append_code(datetime.datetime.now(),
code)
self.perfdata_handler.append_code(
datetime.datetime.now(), code
)
return parent_ret
elif self.mode == KernelMode.MULTICELL:
return self.append_multicellmode(magics_cleanup(code)[1])
Expand Down
Loading

0 comments on commit b5d78f5

Please sign in to comment.