Skip to content

Commit

Permalink
feat(arcor2_runtime): fix CI, execute pause and resume callbacks only…
Browse files Browse the repository at this point in the history
… on main thread
  • Loading branch information
Kinali CI committed Aug 9, 2024
1 parent 4c7d14e commit d52db1c
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions src/python/arcor2_runtime/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class PackageStateHandler:
_on_resume_callbacks: list[Callable[..., None]] = []
_instance_lock = threading.Lock()
_execution_lock = threading.Lock()
_pause_callbacks_executed = threading.Event() # regular pausing
_resume_callbacks_executed = threading.Event()

def __init__(self):
"""Forbidden initializer."""
Expand Down Expand Up @@ -87,33 +89,41 @@ def remove_on_resume_callback(self, on_resume_callback: Callable[..., None]) ->

def execute_on_pause(self) -> None:
"""Executes all pause callbacks."""
thread_id = threading.get_ident() if threading.current_thread() is not threading.main_thread() else None

with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback.
g.disable_action_wrapper[thread_id] = True
if threading.current_thread() is threading.main_thread():
with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True

try:
for callback in self._on_pause_callbacks:
callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False
try:
for callback in self._on_pause_callbacks:
callback()
self._pause_callbacks_executed.set()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[threading.get_ident()] = False
else:
self._pause_callbacks_executed.wait()
self._resume_callbacks_executed.clear()

def execute_on_resume(self) -> None:
"""Executes all resume callbacks."""
thread_id = threading.get_ident() if threading.current_thread() is not threading.main_thread() else None

with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback.
g.disable_action_wrapper[thread_id] = True
if threading.current_thread() is threading.main_thread():
with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True

try:
for callback in self._on_resume_callbacks:
callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False
try:
for callback in self._on_resume_callbacks:
callback()
self._resume_callbacks_executed.set()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False
else:
self._resume_callbacks_executed.wait()
self._pause_callbacks_executed.clear()


def patch_aps(project: CachedProject) -> None:
Expand Down Expand Up @@ -297,7 +307,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An
g.depth[thread_id] = 0

# Execute action without wrapping in case that action wrapper is disabled for this thread.
if g.disable_action_wrapper.get(thread_id, False):
if g.disable_action_wrapper.get(threading.get_ident(), False):
g.depth[thread_id] += 1

try:
Expand Down

0 comments on commit d52db1c

Please sign in to comment.