Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arcor2_runtime): provide on pause and on resume callbacks #862

Closed
wants to merge 6 commits into from

Conversation

jirzajic
Copy link

@jirzajic jirzajic commented Aug 2, 2024

The intent is to allow registering/unregistering callbacks to pause and resume events and execute registered callbacks on these events.

Singleton class was implemented to provide this functionality and integrated into the action wrapper. It provides callback registration methods that can be used from the package main script (or object types if needed). Example usage will be:

from typing import cast
from arcor2_runtime.action import PackageStateHandler
from arcor2_runtime.resources import Resources
from object_types.some_object_type import SomeObjectType

global_resources: Resources

def on_pause() -> None:
    global global_resources
    some_object_type: SomeObjectType = cast(
        SomeObjectType, global_resources.objects["some_object_id"]
    )
    some_object_type.foo()

PackageStateHandler.get_instance().add_on_pause_callback(on_pause)

def main():
    global global_resources
    with Resources() as resources:
        global_resources = resources

        # script code ...

if __name__ == "__main__":
    main()

Since actions can be used inside callbacks and the callback itself is handled in the action wrapper, it was necessary to disable the action wrapper in case when action is called from inside callback.

@jirzajic jirzajic force-pushed the on-pause-resume-callbacks branch from 3c1e5e4 to 8f2857b Compare August 2, 2024 09:09
@jirzajic jirzajic force-pushed the on-pause-resume-callbacks branch from 8f2857b to 04dcf57 Compare August 2, 2024 09:42
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

if g.pause.is_set():
g.resume.wait()

# Execute on resume callbacks, if callback causes exception, it is in RUNNING state.
PackageStateHandler.get_instance().execute_on_resume()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to call this before "resume" actually happens (before calling g.resume.set())? Like this, the action will start being executed, and at the same time, the action wrapper will be disabled.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the code, handle_stdin_commands is called on thread(s) executing action, so in the current implementation it would execute pause callbacks and equally resume callbacks. In case that resume callbacks execution would be fired before g.resume.set() - from daemon thread handling _get_commands - it would lead to situation where pause callbacks are fired for each thread, but resume callbacks would be fired only once.

Current handling of commands is little confusing for me, because print_event PAUSE is fired from handle_stdin_commands thread and _get_commands thread, but print_event RUNNING is fired only from _get_commands thread.

I adjusted the behavior of pause/resume handling, so print_event for PAUSE and RUNNING is fired only from handle_stdin_commands thread. Is it better, or I overlooked something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, handle_stdin_commands should be renamed... It is not processing stdin commands (_get_commands does that) but does the actual waiting if the script is paused and additionally it can set pause event when there is a breakpoint (that the case when the "command" to pause does not come from the Execution service). It was not renamed properly when doing changes in the code, although the docstring was (somehow) updated. Sorry for possible confusion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted changes made in _get_commands and handle_stdin_commands and left a simple pause and resume callback execution in handle_stdin_commands. This way will callbacks be executed on thread handling action I suppose.


g = Globals()


class PackageStateHandler(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(object) is not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would implement this as a special OT or mixin (plugin for OTs) - that would probably be a cleaner solution and definitely more flexible for you (easier changes in that code). The code can react to states of already existing events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object inheritance removed.

As for the singleton vs OT (mixin): We have a requirement, that if resume conditions aren't met, the script cannot continue. As I understand suggested implementation, there would be daemon thread inside OT handling pause/resume. So it reacts independently on pause/resume.

What we want to achieve is to lock the doors to the workplace on script resume, so operator is safe. This might fail (doors remains open - cannot be locked) - we would like to notify user about this (via blocking dialog service), user will close the door, check passes, resume callback ends, script (thread) continues.

In OT (mixin) implementation this would require additional synchronization between OTs, leading to less readable code (maybe) for integrators. It is probably easy to implement but singleton solution seems easier for me.

@@ -215,6 +291,20 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An
if thread_id not in g.depth:
g.depth[thread_id] = 0

# Execute action without wrapping in case that action wrapper is disabled.
if g.disable_action_wrapper:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use "g.pause.is_set()" instead - it is (probably) the same information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the code correctly, g.pause.is_set() is global flag for all threads. So if one thread sets it, other threads would read the flag here, executing action directly and continue execution - without reaching handle_stdin_commands function and effectively pause.

I leaved disable_action_wrapper field and switched it from bool to dict[int, bool] (thread_id as a key) so this if clause is executed only if actions are disabled for current thread.

Unfortunately I don't have infrastructure to test the code, so I am not sure if it is correct solution, please review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...I will try to go through it and provide feedback tomorrow. Regarding testing - if you manage to get past the linter and mypy, we will see if the tests will pass or not. In the final state, the new functionality should be at least partially covered by tests. The CI currently stops on some formatting issues (have you ran pants fmt ::?) Also, there should be more type annotations in action.py, see https://github.com/robofit/arcor2/actions/runs/10303220550/job/28518715396?pr=862#step:9:33.

g.resume.wait()

# Signal that thread is running.
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will cause multiple printing of the event when there are more threads. It should be only printed once - it indicates that the package is running, not the individual thread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I am sorry, I didn't understand the code well enough - I reverted the changes.

@ZdenekM ZdenekM closed this Aug 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants