From 87607e51a97423347080f8b5e4d89600f8106161 Mon Sep 17 00:00:00 2001 From: Elias Werner Date: Wed, 20 Nov 2024 18:04:05 +0100 Subject: [PATCH 1/3] fix linter, improve scorep python check --- src/jumper/kernel.py | 115 +++++++++++++++++++++--------------- src/jumper/visualization.py | 41 +++++++++---- 2 files changed, 97 insertions(+), 59 deletions(-) diff --git a/src/jumper/kernel.py b/src/jumper/kernel.py index d7ef4b8..898b804 100644 --- a/src/jumper/kernel.py +++ b/src/jumper/kernel.py @@ -15,7 +15,7 @@ from itables import show from jumper.userpersistence import PersHelper, scorep_script_name from jumper.userpersistence import magics_cleanup - +import importlib from jumper.perfdatahandler import PerformanceDataHandler import jumper.visualization as perfvis @@ -99,7 +99,7 @@ def __init__(self, **kwargs): self.scorep_available_ = shutil.which("scorep") self.scorep_python_available_ = True try: - import scorep + importlib.import_module("scorep") except ModuleNotFoundError: self.scorep_python_available_ = False @@ -121,13 +121,14 @@ def standard_reply(self): def scorep_not_available(self): if not self.scorep_available_: - self.cell_output("Score-P not available, cell ignored.", - "stderr") + self.cell_output("Score-P not available, cell ignored.", "stderr") return self.standard_reply() if not self.scorep_python_available_: - self.cell_output("Score-P Python not available, cell ignored. " - "Consider installing it via `pip install scorep`", - "stderr") + self.cell_output( + "Score-P Python not available, cell ignored. " + "Consider installing it via `pip install scorep`", + "stderr", + ) return self.standard_reply() else: return None @@ -268,7 +269,7 @@ def append_multicellmode(self, code): f"print('Executing cell {self.multicell_cellcount}')\n" + f"print('''{code}''')\n" + f"print('-' * {max_line_len})\n" - + f"print('MCM_TS'+str(time.time()))\n" + + "print('MCM_TS'+str(time.time()))\n" + f"{code}\n" + "print('''\n''')\n" ) @@ -814,17 +815,21 @@ async def scorep_execute( # retrieve the index this cell will have in the global history sub_idx = len(self.perfdata_handler.get_code_history()) # append to have end of last code fragment - multicellmode_timestamps.append("MCM_TS"+str(time.time())) + multicellmode_timestamps.append("MCM_TS" + str(time.time())) time_indices = [[]] nb_ms = 0.0 for idx, ts_string in enumerate(multicellmode_timestamps[:-1]): - secs = (float(multicellmode_timestamps[idx+1][6:]) - - float(ts_string[6:])) - nb_ms += (secs / - int(os.environ.get("JUMPER_REPORT_FREQUENCY", 2))) + secs = float(multicellmode_timestamps[idx + 1][6:]) - float( + ts_string[6:] + ) + nb_ms += secs / int( + os.environ.get("JUMPER_REPORT_FREQUENCY", 2) + ) if nb_ms >= 1.0: # only consider if we have measurements - time_indices[0].append((str(sub_idx)+"_"+str(idx), nb_ms)) + time_indices[0].append( + (str(sub_idx) + "_" + str(idx), nb_ms) + ) nb_ms %= 1.0 # add time for last to last measurement if nb_ms >= 0.0: @@ -932,13 +937,22 @@ async def scorep_execute( self.pershelper.postprocess() if performance_data_nodes: self.report_perfdata(performance_data_nodes, duration) - self.perfdata_handler.append_code(datetime.datetime.now(), - code_for_history, time_indices) + self.perfdata_handler.append_code( + datetime.datetime.now(), code_for_history, time_indices + ) return self.standard_reply() - async def do_execute(self, code, silent, store_history=False, - user_expressions=None, allow_stdin=False, *, - cell_id=None, **kwargs): + async def do_execute( + self, + code, + silent, + store_history=False, + user_expressions=None, + allow_stdin=False, + *, + cell_id=None, + **kwargs, + ): """ Override of do_execute() method of IPythonKernel. If no custom magic commands specified, execute cell with super().do_execute(), @@ -972,20 +986,20 @@ async def do_execute(self, code, silent, store_history=False, """ if code.startswith("%%display_graph_for_last"): if not len(self.perfdata_handler.get_perfdata_history()): - self.cell_output( - "No performance data available." - ) + self.cell_output("No performance data available.") time_indices = self.perfdata_handler.get_time_indices()[-1] if time_indices: sub_idxs = [x[0] for x in time_indices[0]] - self.cell_output(f"Cell seemed to be tracked in multi cell" - " mode. Got performance data for the" - f" following sub cells: {sub_idxs}") + self.cell_output( + f"Cell seemed to be tracked in multi cell" + " mode. Got performance data for the" + f" following sub cells: {sub_idxs}" + ) perfvis.draw_performance_graph( self.nodelist, self.perfdata_handler.get_perfdata_history()[-1], self.gpu_avail, - time_indices + time_indices, ) return self.standard_reply() elif code.startswith("%%display_graph_for_index"): @@ -1005,14 +1019,16 @@ async def do_execute(self, code, silent, store_history=False, time_indices = self.perfdata_handler.get_time_indices()[index] if time_indices: sub_idxs = [x[0] for x in time_indices[0]] - self.cell_output(f"Cell seemed to be tracked in multi cell" - " mode. Got performance data for the" - f" following sub cells: {sub_idxs}") + self.cell_output( + f"Cell seemed to be tracked in multi cell" + " mode. Got performance data for the" + f" following sub cells: {sub_idxs}" + ) perfvis.draw_performance_graph( self.nodelist, self.perfdata_handler.get_perfdata_history()[index], self.gpu_avail, - time_indices + time_indices, ) return self.standard_reply() elif code.startswith("%%display_graph_for_all"): @@ -1057,8 +1073,9 @@ async def do_execute(self, code, silent, store_history=False, pd.DataFrame( self.perfdata_handler.get_code_history(), columns=["timestamp", "code"], - ).reset_index(), layout={"topStart": "search", "topEnd": None}, - columnDefs=[{"className": 'dt-left', "targets": 2}], + ).reset_index(), + layout={"topStart": "search", "topEnd": None}, + columnDefs=[{"className": "dt-left", "targets": 2}], ) return self.standard_reply() elif code.startswith("%%perfdata_to_variable"): @@ -1078,15 +1095,18 @@ async def do_execute(self, code, silent, store_history=False, # measurements, e.g. (2_0, 5), (2_1, 3), (2_2, 7) mcm_time_indices = self.perfdata_handler.get_time_indices() mcm_time_indices = list( - filter(lambda item: item is not None, mcm_time_indices)) + filter(lambda item: item is not None, mcm_time_indices) + ) - code = (f"{varname}=" - f"{self.perfdata_handler.get_perfdata_history()}") + code = ( + f"{varname}=" + f"{self.perfdata_handler.get_perfdata_history()}" + ) if mcm_time_indices: code += f"\n{varname}.append({mcm_time_indices})" - await super().do_execute(code,silent=True) + await super().do_execute(code, silent=True) self.cell_output( "Exported performance data to " + str(varname) @@ -1097,10 +1117,10 @@ async def do_execute(self, code, silent, store_history=False, self.cell_output( "Detected that cells were executed in multi cell mode." + f"Last entry in {varname} is a list that contains " - f"the sub indices per cell that were executed in " - f"in multi cell mode and a counter for the number of" - f" performance measurements within this sub cell, " - f"e.g. f{mcm_time_indices[-1]}", + f"the sub indices per cell that were executed in " + f"in multi cell mode and a counter for the number of" + f" performance measurements within this sub cell, " + f"e.g. f{mcm_time_indices[-1]}", "stdout", ) return self.standard_reply() @@ -1137,8 +1157,9 @@ async def do_execute(self, code, silent, store_history=False, elif code.startswith("%%set_perfmonitor"): return self.set_perfmonitor(code) elif code.startswith("%%scorep_python_binding_arguments"): - return (self.scorep_not_available() or - self.set_scorep_pythonargs(code)) + return self.scorep_not_available() or self.set_scorep_pythonargs( + code + ) elif code.startswith("%%serializer_settings"): self.cell_output( "Deprecated. Use: %%marshalling_settings" @@ -1147,8 +1168,9 @@ async def do_execute(self, code, silent, store_history=False, ) return self.standard_reply() elif code.startswith("%%marshalling_settings"): - return (self.scorep_not_available() or - self.marshaller_settings(code)) + return self.scorep_not_available() or self.marshaller_settings( + code + ) elif code.startswith("%%enable_multicellmode"): return self.scorep_not_available() or self.enable_multicellmode() elif code.startswith("%%abort_multicellmode"): @@ -1242,8 +1264,9 @@ async def do_execute(self, code, silent, store_history=False, ) if performance_data_nodes: self.report_perfdata(performance_data_nodes, duration) - self.perfdata_handler.append_code(datetime.datetime.now(), - code) + self.perfdata_handler.append_code( + datetime.datetime.now(), code + ) return parent_ret elif self.mode == KernelMode.MULTICELL: return self.append_multicellmode(magics_cleanup(code)[1]) diff --git a/src/jumper/visualization.py b/src/jumper/visualization.py index dcff8b0..c0ef635 100644 --- a/src/jumper/visualization.py +++ b/src/jumper/visualization.py @@ -150,8 +150,8 @@ def plot_graph(ax, metric, perfdata, time_indices=None, color=None): # colorization of the plot in case of multiple cells if time_indices: - # in multi node case, we have to iterate over the indices (time_indices) - # and not only 0 here + # in multi node case, we have to iterate over the indices ( + # time_indices) and not only 0 here current_index = 0 target_index = -1 transition_offset = (x_scale[1] - x_scale[0]) / 2 @@ -165,21 +165,36 @@ def plot_graph(ax, metric, perfdata, time_indices=None, color=None): # don't use offset for last cell if sub_idx == last_idx: transition_offset = 0 - ax.axvspan(x_scale[current_index] + start_offset, - x_scale[target_index] + - transition_offset, - facecolor=color[cell_idx], alpha=0.3) + ax.axvspan( + x_scale[current_index] + start_offset, + x_scale[target_index] + transition_offset, + facecolor=color[cell_idx], + alpha=0.3, + ) - text_x_pos = x_scale[current_index] + start_offset + ( - (x_scale[target_index] + transition_offset - - x_scale[current_index] + start_offset) / 2) + text_x_pos = ( + x_scale[current_index] + + start_offset + + ( + ( + x_scale[target_index] + + transition_offset + - x_scale[current_index] + + start_offset + ) + / 2 + ) + ) text_y_pos = ax.get_ylim()[0] + (ax.get_ylim()[1] * 0.05) # add cell index to plot - ax.text(text_x_pos, text_y_pos, "#" + str(sub_idx), style='italic', - bbox={ - 'facecolor': 'lightgrey', 'alpha': 0.5, 'pad': 2} - ) + ax.text( + text_x_pos, + text_y_pos, + "#" + str(sub_idx), + style="italic", + bbox={"facecolor": "lightgrey", "alpha": 0.5, "pad": 2}, + ) current_index = target_index start_offset = transition_offset From 78ad091dc389530c8cbfc79308b6f297db73605f Mon Sep 17 00:00:00 2001 From: Elias Werner Date: Wed, 20 Nov 2024 19:15:12 +0100 Subject: [PATCH 2/3] not sure if tests fixed --- tests/kernel/multicell.yaml | 10 +++++----- tests/test_kernel.py | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/kernel/multicell.yaml b/tests/kernel/multicell.yaml index a0e4720..bda0392 100644 --- a/tests/kernel/multicell.yaml +++ b/tests/kernel/multicell.yaml @@ -3,7 +3,7 @@ - - "Multicell mode enabled. The following cells will be marked for instrumented execution." - - "c = np.sum(c_mtx)" - - - "Cell marked for multicell mode. It will be executed at position 1" + - - "Cell marked for multicell mode. It will be executed at position 0" - - "%%abort_multicellmode" - - "Multicell mode aborted." @@ -15,23 +15,23 @@ with scorep.instrumenter.enable(): c = np.sum(c_mtx) c_vec = np.arange(b, c) - - - "Cell marked for multicell mode. It will be executed at position 1" + - - "Cell marked for multicell mode. It will be executed at position 0" - - |- print('c =', c) print('Sum(c_vec) =', c_vec.sum()) - - - "Cell marked for multicell mode. It will be executed at position 2" + - - "Cell marked for multicell mode. It will be executed at position 1" - - "%%finalize_multicellmode" - - "\0" - - "Executing cell 1\n" + - "Executing cell 0\n" - "with scorep.instrumenter.enable():\n" - " c = np.sum(c_mtx)\n" - "c_vec = np.arange(b, c)\n" - "----------------------------------\n" - "\n" - "\n" - - "Executing cell 2\n" + - "Executing cell 1\n" - "print('c =', c)\n" - "print('Sum(c_vec) =', c_vec.sum())\n" - "----------------------------------\n" diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 7bd0584..abec581 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -34,6 +34,7 @@ def check_stream_output(self, code, expected_output, stream="stdout"): # some messages can be of type 'execute_result' # type instead of stdout # self.assertEqual(msg["content"]["name"], stream) + if msg["header"]["msg_type"] == "stream": self.assertEqual(msg["content"]["name"], stream) self.assertEqual(msg["content"]["text"], expected_msg) @@ -42,6 +43,7 @@ def check_stream_output(self, code, expected_output, stream="stdout"): msg["content"]["data"]["text/plain"], expected_msg ) + def check_from_file(self, filename): with open(filename, "r") as file: From 174732b58b3607c1b090b06a96c77fe3de5af7e0 Mon Sep 17 00:00:00 2001 From: Elias Werner Date: Thu, 21 Nov 2024 05:49:03 +0100 Subject: [PATCH 3/3] fix multicellmode for very short cells --- src/jumper/kernel.py | 76 +++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/src/jumper/kernel.py b/src/jumper/kernel.py index 898b804..fe3bfb6 100644 --- a/src/jumper/kernel.py +++ b/src/jumper/kernel.py @@ -674,7 +674,6 @@ def report_perfdata(self, performance_data_nodes, duration): async def scorep_execute( self, code, - code_for_history, silent, user_expressions=None, allow_stdin=False, @@ -702,7 +701,6 @@ async def scorep_execute( os.open(scorep_script_name, os.O_WRONLY | os.O_CREAT), "w" ) as file: file.write(self.pershelper.subprocess_wrapper(code)) - # For disk mode use implicit synchronization between kernel and # subprocess: await jupyter_dump, subprocess.wait(), # await jupyter_update Ghost cell - dump current Jupyter session for @@ -752,6 +750,7 @@ async def scorep_execute( proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env ) + self.perfdata_handler.start_perfmonitor(proc.pid) # For memory mode jupyter_dump and jupyter_update must be awaited # concurrently to the running subprocess @@ -810,6 +809,7 @@ async def scorep_execute( # explicit timestamps, but aligns the colorization of the plot based # on the number of perf measurements we have, which is individual per # node + time_indices = None if len(multicellmode_timestamps): # retrieve the index this cell will have in the global history @@ -832,9 +832,15 @@ async def scorep_execute( ) nb_ms %= 1.0 # add time for last to last measurement + if nb_ms >= 0.0: - sub_idx, val = time_indices[0][-1] - time_indices[0][-1] = (sub_idx, val + nb_ms) + if len(time_indices[0]): + sub_idx, val = time_indices[0][-1] + time_indices[0][-1] = (sub_idx, val + nb_ms) + else: + time_indices[0].append( + (str(sub_idx) + "_" + str(0), nb_ms) + ) nb_ms = 0.0 for idx, val in enumerate(time_indices[0]): @@ -903,7 +909,7 @@ async def scorep_execute( f"Instrumentation results can be found in {scorep_folder}" ) else: - # Find last creasted directory with scorep* name + # Find last created directory with scorep* name # TODO: Directory isn't created local when running scorep-collector max_iterations = 5 while max_iterations > 0: @@ -938,7 +944,7 @@ async def scorep_execute( if performance_data_nodes: self.report_perfdata(performance_data_nodes, duration) self.perfdata_handler.append_code( - datetime.datetime.now(), code_for_history, time_indices + datetime.datetime.now(), code, time_indices ) return self.standard_reply() @@ -1178,42 +1184,35 @@ async def do_execute( elif code.startswith("%%finalize_multicellmode"): # Cannot be put into a separate function due to tight coupling # between do_execute and scorep_execute - if not self.scorep_available_: - self.cell_output( - "Score-P not available, cell ignored.", "stderr" - ) - return self.standard_reply() - else: - if self.mode == KernelMode.MULTICELL: - self.mode = KernelMode.DEFAULT - try: - reply_status = await self.scorep_execute( - self.multicell_code, - silent, - store_history, - user_expressions, - allow_stdin, - cell_id=cell_id, - ) - except Exception: - self.cell_output( - "KernelError: Multicell execution failed.", - "stderr", - ) - return self.standard_reply() - self.multicell_code = "" - self.multicell_cellcount = 0 - return reply_status - elif self.mode == KernelMode.WRITEFILE: - self.writefile_multicell = False - return self.standard_reply() - else: + if self.mode == KernelMode.MULTICELL: + self.mode = KernelMode.DEFAULT + try: + reply_status = await self.scorep_execute( + self.multicell_code, + silent, + user_expressions, + allow_stdin, + cell_id=cell_id, + ) + except Exception as e: self.cell_output( - f"KernelWarning: Currently in {self.mode}," - f" ignore command", + "KernelError: Multicell execution failed.", "stderr", ) return self.standard_reply() + self.multicell_code = "" + self.multicell_cellcount = -1 + return reply_status + elif self.mode == KernelMode.WRITEFILE: + self.writefile_multicell = False + return self.standard_reply() + else: + self.cell_output( + f"KernelWarning: Currently in {self.mode}," + f" ignore command", + "stderr", + ) + return self.standard_reply() elif code.startswith("%%start_writefile"): return self.scorep_not_available() or self.start_writefile(code) elif code.startswith("%%abort_writefile"): @@ -1227,7 +1226,6 @@ async def do_execute( return await self.scorep_execute( code.split("\n", 1)[1], silent, - store_history, user_expressions, allow_stdin, cell_id=cell_id,