From 0ccca41cb1f96231c9108cad8bbf6ee78c306676 Mon Sep 17 00:00:00 2001 From: Kinali CI Date: Fri, 9 Aug 2024 13:28:01 +0200 Subject: [PATCH] feat(arcor2_runtime): fix CI, simplify callbacks, revert pause, resume event changes --- src/python/arcor2_runtime/action.py | 122 ++++++++-------------------- 1 file changed, 32 insertions(+), 90 deletions(-) diff --git a/src/python/arcor2_runtime/action.py b/src/python/arcor2_runtime/action.py index c9f48c73..24d79a9b 100644 --- a/src/python/arcor2_runtime/action.py +++ b/src/python/arcor2_runtime/action.py @@ -3,7 +3,7 @@ import threading from dataclasses import dataclass, field from functools import wraps -from typing import Any, Callable, TypeVar, cast +from typing import Any, Callable, Optional, TypeVar, cast from arcor2.cached import CachedProject, CachedScene from arcor2.data.common import Pose, ProjectRobotJoints, StrEnum @@ -43,87 +43,37 @@ class Globals: disable_action_wrapper: dict[int, bool] = field(default_factory=dict) + paused_callback: Optional[Callable[..., None]] = None + resumed_callback: Optional[Callable[..., None]] = None -g = Globals() + def execute_paused_callback(self) -> None: + """Executes pause callback.""" + if self.paused_callback is not None: + # Disable action wrapper to prevent stack overflow when action is called from PAUSE callback. + thread_id = threading.get_ident() + g.disable_action_wrapper[thread_id] = True + try: + self.paused_callback() + finally: + # Enable action wrapper back. + g.disable_action_wrapper[thread_id] = False + + def execute_resumed_callback(self) -> None: + """Executes resume callback.""" + if self.resumed_callback is not None: + # Disable action wrapper to prevent stack overflow when action is called from RESUME callback. + thread_id = threading.get_ident() + g.disable_action_wrapper[thread_id] = True -class PackageStateHandler: - """Singleton class that manages callbacks for PAUSE and RESUME events.""" - - _instance = None - _on_pause_callbacks: list[Callable[..., None]] = [] - _on_resume_callbacks: list[Callable[..., None]] = [] - _instance_lock = threading.Lock() - _execution_lock = threading.Lock() - _pause_callbacks_executed = threading.Event() # regular pausing - _resume_callbacks_executed = threading.Event() - - def __init__(self): - """Forbidden initializer.""" - raise RuntimeError("Call get_instance() instead") - - @classmethod - def get_instance(cls): - """Returns the singleton instance of the class.""" - if cls._instance is None: - with cls._instance_lock: - if cls._instance is None: - cls._instance = cls.__new__(cls) - return cls._instance - - def add_on_pause_callback(self, on_pause_callback: Callable[..., None]) -> None: - """Adds a callback to be executed when the script is paused.""" - self._on_pause_callbacks.append(on_pause_callback) - - def remove_on_pause_callback(self, on_pause_callback: Callable[..., None]) -> None: - """Removes a callback to be executed when the script is paused.""" - self._on_pause_callbacks.remove(on_pause_callback) - - def add_on_resume_callback(self, on_resume_callback: Callable[..., None]) -> None: - """Adds a callback to be executed when the script is resumed.""" - self._on_resume_callbacks.append(on_resume_callback) - - def remove_on_resume_callback(self, on_resume_callback: Callable[..., None]) -> None: - """Removes a callback to be executed when the script is resumed.""" - self._on_resume_callbacks.remove(on_resume_callback) - - def execute_on_pause(self) -> None: - """Executes all pause callbacks.""" - if threading.current_thread() is threading.main_thread(): - with self._execution_lock: - # Disable action wrapper to prevent stack overflow when action is called from PAUSE callback. - thread_id = threading.get_ident() - g.disable_action_wrapper[thread_id] = True + try: + self.resumed_callback() + finally: + # Enable action wrapper back. + g.disable_action_wrapper[thread_id] = False - try: - for callback in self._on_pause_callbacks: - callback() - self._pause_callbacks_executed.set() - finally: - # Enable action wrapper back. - g.disable_action_wrapper[threading.get_ident()] = False - else: - self._pause_callbacks_executed.wait() - self._resume_callbacks_executed.clear() - - def execute_on_resume(self) -> None: - """Executes all resume callbacks.""" - if threading.current_thread() is threading.main_thread(): - with self._execution_lock: - # Disable action wrapper to prevent stack overflow when action is called from RESUME callback. - thread_id = threading.get_ident() - g.disable_action_wrapper[thread_id] = True - try: - for callback in self._on_resume_callbacks: - callback() - self._resume_callbacks_executed.set() - finally: - # Enable action wrapper back. - g.disable_action_wrapper[thread_id] = False - else: - self._resume_callbacks_executed.wait() - self._pause_callbacks_executed.clear() +g = Globals() def patch_aps(project: CachedProject) -> None: @@ -217,6 +167,7 @@ def _get_commands(): if g.pause.is_set(): if cmd in (Commands.STEP, Commands.RESUME): g.pause.clear() + print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING))) if cmd == Commands.STEP: g.pause_on_next_action.set() @@ -226,6 +177,7 @@ def _get_commands(): if cmd == Commands.PAUSE: g.resume.clear() g.pause.set() + print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) _cmd_thread = threading.Thread(target=_get_commands) @@ -241,22 +193,12 @@ def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None: g.pause_on_next_action.clear() g.resume.clear() g.pause.set() + print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) if g.pause.is_set(): - # Execute on pause callbacks, prevent transfer to PAUSED state if callback causes exception. - PackageStateHandler.get_instance().execute_on_pause() - - # Signal that thread is paused. - print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) - - # Wait for resume. + g.execute_paused_callback() g.resume.wait() - - # Signal that thread is running. - print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING))) - - # Execute on resume callbacks, if callback causes exception, it is in RUNNING state. - PackageStateHandler.get_instance().execute_on_resume() + g.execute_resumed_callback() F = TypeVar("F", bound=Callable[..., Any])