-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(arcor2_runtime): provide on pause and on resume callbacks #862
Changes from 1 commit
04dcf57
12df69f
e344b34
4c7d14e
d52db1c
0ccca41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,10 +41,79 @@ class Globals: | |
|
||
lock: threading.Lock = field(default_factory=threading.Lock) | ||
|
||
disable_action_wrapper: bool = False | ||
|
||
|
||
g = Globals() | ||
|
||
|
||
class PackageStateHandler(object): | ||
"""Singleton class that manages callbacks for PAUSE and RESUME events.""" | ||
|
||
_instance = None | ||
_on_pause_callbacks: list[Callable[..., None]] = [] | ||
_on_resume_callbacks: list[Callable[..., None]] = [] | ||
_instance_lock = threading.Lock() | ||
_execution_lock = threading.Lock() | ||
|
||
def __init__(self): | ||
"""Forbidden initializer.""" | ||
raise RuntimeError("Call get_instance() instead") | ||
|
||
@classmethod | ||
def get_instance(cls): | ||
"""Returns the singleton instance of the class.""" | ||
if cls._instance is None: | ||
with cls._instance_lock: | ||
if cls._instance is None: | ||
cls._instance = cls.__new__(cls) | ||
return cls._instance | ||
|
||
def add_on_pause_callback(self, on_pause_callback: Callable[..., None]): | ||
"""Adds a callback to be executed when the script is paused.""" | ||
self._on_pause_callbacks.append(on_pause_callback) | ||
|
||
def remove_on_pause_callback(self, on_pause_callback: Callable[..., None]): | ||
"""Removes a callback to be executed when the script is paused.""" | ||
self._on_pause_callbacks.remove(on_pause_callback) | ||
|
||
def add_on_resume_callback(self, on_resume_callback: Callable[..., None]): | ||
"""Adds a callback to be executed when the script is resumed.""" | ||
self._on_resume_callbacks.append(on_resume_callback) | ||
|
||
def remove_on_resume_callback(self, on_resume_callback: Callable[..., None]): | ||
"""Removes a callback to be executed when the script is resumed.""" | ||
self._on_resume_callbacks.remove(on_resume_callback) | ||
|
||
def execute_on_pause(self): | ||
"""Executes all pause callbacks.""" | ||
|
||
with self._execution_lock: | ||
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback. | ||
g.disable_action_wrapper = True | ||
|
||
try: | ||
for callback in self._on_pause_callbacks: | ||
callback() | ||
finally: | ||
# Enable action wrapper back. | ||
g.disable_action_wrapper = False | ||
|
||
def execute_on_resume(self): | ||
"""Executes all resume callbacks.""" | ||
|
||
with self._execution_lock: | ||
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback. | ||
g.disable_action_wrapper = True | ||
|
||
try: | ||
for callback in self._on_resume_callbacks: | ||
callback() | ||
finally: | ||
# Enable action wrapper back. | ||
g.disable_action_wrapper = False | ||
|
||
|
||
def patch_aps(project: CachedProject) -> None: | ||
"""orientations / joints have to be monkey-patched with AP's ID in order to | ||
make breakpoints work in @action.""" | ||
|
@@ -162,11 +231,18 @@ def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None: | |
g.pause_on_next_action.clear() | ||
g.resume.clear() | ||
g.pause.set() | ||
|
||
# Execute on pause callbacks, prevent transfer to PAUSED state if callback causes exception. | ||
PackageStateHandler.get_instance().execute_on_pause() | ||
|
||
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) | ||
|
||
if g.pause.is_set(): | ||
g.resume.wait() | ||
|
||
# Execute on resume callbacks, if callback causes exception, it is in RUNNING state. | ||
PackageStateHandler.get_instance().execute_on_resume() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be better to call this before "resume" actually happens (before calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I understand the code, Current handling of commands is little confusing for me, because I adjusted the behavior of pause/resume handling, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reverted changes made in |
||
|
||
|
||
F = TypeVar("F", bound=Callable[..., Any]) | ||
|
||
|
@@ -215,6 +291,20 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An | |
if thread_id not in g.depth: | ||
g.depth[thread_id] = 0 | ||
|
||
# Execute action without wrapping in case that action wrapper is disabled. | ||
if g.disable_action_wrapper: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would use "g.pause.is_set()" instead - it is (probably) the same information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand the code correctly, I leaved Unfortunately I don't have infrastructure to test the code, so I am not sure if it is correct solution, please review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ...I will try to go through it and provide feedback tomorrow. Regarding testing - if you manage to get past the linter and mypy, we will see if the tests will pass or not. In the final state, the new functionality should be at least partially covered by tests. The CI currently stops on some formatting issues (have you ran |
||
g.depth[thread_id] += 1 | ||
|
||
try: | ||
res = f(obj, *action_args, an=an, **kwargs) | ||
except Arcor2Exception: | ||
g.depth[thread_id] = 0 | ||
g.ea[thread_id] = None | ||
raise | ||
|
||
g.depth[thread_id] -= 1 | ||
return res | ||
|
||
try: | ||
action_id: None | str = None | ||
action_mapping_provided = hasattr(obj, ACTION_NAME_ID_MAPPING_ATTR) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(object)
is not necessary.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would implement this as a special OT or mixin (plugin for OTs) - that would probably be a cleaner solution and definitely more flexible for you (easier changes in that code). The code can react to states of already existing events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object
inheritance removed.As for the singleton vs OT (mixin): We have a requirement, that if resume conditions aren't met, the script cannot continue. As I understand suggested implementation, there would be daemon thread inside OT handling pause/resume. So it reacts independently on pause/resume.
What we want to achieve is to lock the doors to the workplace on script resume, so operator is safe. This might fail (doors remains open - cannot be locked) - we would like to notify user about this (via blocking dialog service), user will close the door, check passes, resume callback ends, script (thread) continues.
In OT (mixin) implementation this would require additional synchronization between OTs, leading to less readable code (maybe) for integrators. It is probably easy to implement but singleton solution seems easier for me.