Skip to content

Commit

Permalink
Fix pipe persistence communication, minor refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
user committed Apr 17, 2024
1 parent 246a16a commit 1a1c406
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 130 deletions.
56 changes: 16 additions & 40 deletions src/scorep_jupyter/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ def switch_serializer(self, code):
"""
Switch serializer backend used for persistence in kernel.
"""
# clean pipes before switching
# Clean files/pipes before switching
self.pershelper.postprocess()

serializer = code.split('\n')[1]
if serializer == 'dill':
self.pershelper = PersHelper('dill')
elif serializer == 'cloudpickle':
self.pershelper = PersHelper('cloudpickle')

self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
if serializer in ['dill', 'cloudpickle']:
self.pershelper = PersHelper(serializer)
self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
else:
self.cell_output(f'KernelError: {serializer} serializer backend is not supported.', 'stderr')
return self.standard_reply()

def set_scorep_env(self, code):
Expand Down Expand Up @@ -223,14 +222,15 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
"""
Execute given code with Score-P Python bindings instrumentation.
"""
# set up pipes for the communication
# Set up files/pipes for persistence communication
if not self.pershelper.preprocess():
self.cell_output("Error setting up the communication. Please try again. Aborting.", "stderr")
self.pershelper.postprocess()
self.cell_output("KernelError: Failed to set up the persistence communication files/pipes.", "stderr")
return self.standard_reply()

# Prepare code for the Score-P instrumented execution as subprocess
# Transmit user persistence and updated sys.path from Jupyter notebook to subprocess
# After running code, transmit subprocess persistence back to Jupyter notebook
# After running the code, transmit subprocess persistence back to Jupyter notebook
with open(scorep_script_name, 'w') as file:
file.write(self.pershelper.subprocess_wrapper(code))

Expand All @@ -239,45 +239,29 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
self.scorep_binding_args + [scorep_script_name]
proc_env = self.scorep_env.copy()
proc_env.update({'PATH': os.environ['PATH'], 'PYTHONUNBUFFERED': 'x'}) # scorep path, subprocess observation

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env)

# Ghost cell - dump current Jupyter session for subprocess
# Run in a "silent" way to not increase cells counter
#self.cell_output("Subprocess launched")
#self.cell_output(self.pershelper.jupyter_dump())


reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)
if reply_status_dump['status'] != 'ok':
self.shell.execution_count += 1
reply_status_dump['execution_count'] = self.shell.execution_count - 1
self.pershelper.postprocess()
self.cell_output("KernelError: Failed to pickle previous notebook's persistence and variables.",
'stderr')
self.cell_output("KernelError: Failed to pickle notebook's persistence.", 'stderr')
return reply_status_dump

# Redirect process stderr to stdout and observe the latter
# Observing two stream with two threads causes interference in cell_output in Jupyter notebook
# stdout is read in chunks, which are split into lines using \r or \n as delimiter
# Last element in the list might be "incomplete line", not ending with \n or \r, it is saved
# and merged with the first line in the next chunk

if os.environ['SCOREP_KERNEL_PERSISTENCE_MODE'] == 'MEMORY':
# connect to the communication pipe in case of in-memory communication for persistence
flags = os.O_RDONLY | os.O_NONBLOCK
comm_pipe =os.open(self.pershelper.paths['subprocess']['comm'], flags=flags)

incomplete_line = ''
endline_pattern = re.compile(r'(.*?[\r\n]|.+$)')
# Empty cell output, required for interactive output e.g. tqdm for-loop progress bar
self.cell_output('\0')
while True:
if os.environ['SCOREP_KERNEL_PERSISTENCE_MODE'] == 'MEMORY':
comm_chunk = os.read(comm_pipe, 1)
if comm_chunk != b'':
# for pipe communication, we break as soon as os we get notified by subprocess via comm pipe
break
chunk = b'' + proc.stdout.read(READ_CHUNK_SIZE)
if chunk == b'':
break
Expand All @@ -293,31 +277,24 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
self.cell_output(line)

# os_environ_.clear()

# sys_path_.clear()

# Ghost cell - load subprocess definitions and persistence back to Jupyter notebook
# Ghost cell - load subprocess persistence back to Jupyter notebook
# Run in a "silent" way to not increase cells counter
reply_status_update = await super().do_execute(self.pershelper.jupyter_update(code), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

if reply_status_update['status'] != 'ok':
# tidy up
self.pershelper.postprocess()
self.shell.execution_count += 1
reply_status_update['execution_count'] = self.shell.execution_count - 1
self.cell_output("KernelError: Failed to load cell's persistence and variables to the notebook.",
'stderr')
self.cell_output("KernelError: Failed to load cell's persistence to the notebook.", 'stderr')
return reply_status_update

if proc.returncode:
# tidy up
self.pershelper.postprocess()
self.cell_output(
'KernelError: Cell execution failed, cell persistence and variables are not recorded.',
'stderr')
self.cell_output('KernelError: Cell execution failed, cell persistence was not recorded.', 'stderr')
return self.standard_reply()

# Determine directory to which trace files were saved by Score-P
if 'SCOREP_EXPERIMENT_DIRECTORY' in self.scorep_env:
scorep_folder = self.scorep_env['SCOREP_EXPERIMENT_DIRECTORY']
self.cell_output(
Expand All @@ -333,7 +310,6 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
else:
self.cell_output("KernelWarning: Instrumentation results were not saved locally.", 'stderr')

# tidy up
self.pershelper.postprocess()
return self.standard_reply()

Expand Down
Loading

0 comments on commit 1a1c406

Please sign in to comment.