From 8b3b5b110c7d74a7099e7861bec2322093a4812d Mon Sep 17 00:00:00 2001 From: ZdenekM Date: Fri, 9 Aug 2024 14:28:53 +0200 Subject: [PATCH] feat(arcor_runtime): add callbacks for pause and resume events --- README.md | 4 +- compose-files/fit-demo/docker-compose.yml | 2 +- src/docker/arcor2_execution/BUILD | 2 +- src/python/arcor2_execution/CHANGELOG.md | 6 + src/python/arcor2_execution/VERSION | 2 +- src/python/arcor2_runtime/CHANGELOG.md | 6 + src/python/arcor2_runtime/VERSION | 2 +- src/python/arcor2_runtime/action.py | 38 +++- .../arcor2_runtime/tests/test_action.py | 173 ++++++++++++------ 9 files changed, 167 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 6cfbd11a..7693a7d7 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o [README](src/python/arcor2_execution/README.md) | [CHANGELOG](src/python/arcor2_execution/CHANGELOG.md) - - 2024-06-26: [1.4.2](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.4.2) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.4.2), [pypi](https://pypi.org/project/arcor2-execution/1.4.2/)). + - 2024-08-14: [1.5.0](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.5.0) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.5.0), [pypi](https://pypi.org/project/arcor2-execution/1.5.0/)). ### arcor2_execution_data @@ -126,7 +126,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o [README](src/python/arcor2_runtime/README.md) | [CHANGELOG](src/python/arcor2_runtime/CHANGELOG.md) - - 2024-06-14: [1.3.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.3.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.3.0/)). + - 2024-08-14: [1.4.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.4.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.4.0/)). ### arcor2_scene diff --git a/compose-files/fit-demo/docker-compose.yml b/compose-files/fit-demo/docker-compose.yml index 0a3eb222..6166cc39 100644 --- a/compose-files/fit-demo/docker-compose.yml +++ b/compose-files/fit-demo/docker-compose.yml @@ -55,7 +55,7 @@ services: - fit-demo-project-network fit-demo-execution: - image: arcor2/arcor2_execution:1.4.2 + image: arcor2/arcor2_execution:1.5.0 container_name: fit-demo-execution networks: - fit-demo-execution-network diff --git a/src/docker/arcor2_execution/BUILD b/src/docker/arcor2_execution/BUILD index 54d2df5c..8af5523c 100644 --- a/src/docker/arcor2_execution/BUILD +++ b/src/docker/arcor2_execution/BUILD @@ -1 +1 @@ -docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.4.2"]) +docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.5.0"]) diff --git a/src/python/arcor2_execution/CHANGELOG.md b/src/python/arcor2_execution/CHANGELOG.md index 71f7298c..7d65a4dd 100644 --- a/src/python/arcor2_execution/CHANGELOG.md +++ b/src/python/arcor2_execution/CHANGELOG.md @@ -2,6 +2,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +## [1.5.0] - 2024-08-14 + +### Changed + +- Dependency on `arcor2_runtime~=1.4.0`. + ## [1.4.2] - 2024-06-26 ### Fixed diff --git a/src/python/arcor2_execution/VERSION b/src/python/arcor2_execution/VERSION index c9929e36..3e1ad720 100644 --- a/src/python/arcor2_execution/VERSION +++ b/src/python/arcor2_execution/VERSION @@ -1 +1 @@ -1.4.2 \ No newline at end of file +1.5.0 \ No newline at end of file diff --git a/src/python/arcor2_runtime/CHANGELOG.md b/src/python/arcor2_runtime/CHANGELOG.md index fff549e5..72ff5f8c 100644 --- a/src/python/arcor2_runtime/CHANGELOG.md +++ b/src/python/arcor2_runtime/CHANGELOG.md @@ -2,6 +2,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +## [1.4.0] - 2024-08-14 + +### Added + +- There are optional callbacks for pause and resume events in the `action.Globals`. + ## [1.3.0] - 2024-06-14 ### Changed diff --git a/src/python/arcor2_runtime/VERSION b/src/python/arcor2_runtime/VERSION index 589268e6..e21e727f 100644 --- a/src/python/arcor2_runtime/VERSION +++ b/src/python/arcor2_runtime/VERSION @@ -1 +1 @@ -1.3.0 \ No newline at end of file +1.4.0 \ No newline at end of file diff --git a/src/python/arcor2_runtime/action.py b/src/python/arcor2_runtime/action.py index 64cba145..6a601cb4 100644 --- a/src/python/arcor2_runtime/action.py +++ b/src/python/arcor2_runtime/action.py @@ -12,6 +12,7 @@ from arcor2.object_types.abstract import Generic from arcor2.object_types.utils import iterate_over_actions from arcor2.parameter_plugins.utils import plugin_from_instance +from arcor2_runtime.exceptions import print_exception class Commands(StrEnum): @@ -23,6 +24,8 @@ class Commands(StrEnum): ACTION_NAME_ID_MAPPING_ATTR = "_action_name_id_mapping" AP_ID_ATTR = "_ap_id" +CB_TYPE = Callable[[], None] | None + @dataclass class Globals: @@ -41,6 +44,13 @@ class Globals: lock: threading.Lock = field(default_factory=threading.Lock) + # can be used to react on pause/resume events + # callbacks are called before the actual event happens + pause_callback: CB_TYPE = None + resume_callback: CB_TYPE = None + # id of the thread within the one of the callbacks (only one can be called at the time) + callback_thread_id: int = 0 + g = Globals() @@ -114,7 +124,7 @@ def print_event(event: Event) -> None: sys.stdout.flush() -def _get_commands(): +def _get_stdin_commands(): """Reads stdin and checks for commands from parent script (e.g. Execution unit). Prints events to stdout. State is signalled using events. @@ -124,6 +134,16 @@ def _get_commands(): :return: """ + def callback(cb: CB_TYPE) -> None: + if cb: + g.callback_thread_id = threading.get_ident() + try: + cb() + except Exception as e: # otherwise, the thread will stop silently + print_exception(e) + sys.exit(1) + g.callback_thread_id = 0 + while True: raw_cmd = read_stdin(0.1) @@ -135,6 +155,7 @@ def _get_commands(): with g.lock: if g.pause.is_set(): if cmd in (Commands.STEP, Commands.RESUME): + callback(g.resume_callback) g.pause.clear() print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING))) @@ -144,26 +165,29 @@ def _get_commands(): g.resume.set() else: if cmd == Commands.PAUSE: + callback(g.pause_callback) g.resume.clear() g.pause.set() print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) -_cmd_thread = threading.Thread(target=_get_commands) +_cmd_thread = threading.Thread(target=_get_stdin_commands) _cmd_thread.daemon = True _cmd_thread.start() -def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None: +def handle_commands(*, before: bool, breakpoint: bool = False) -> None: """Actual handling of commands in (potentially parallel) actions.""" with g.lock: + # this is to handle pause caused by breakpoint if (breakpoint or (before and g.pause_on_next_action.is_set())) and not g.pause.is_set(): g.pause_on_next_action.clear() g.resume.clear() g.pause.set() print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED))) + # waiting - the same for both "external" and "internal" pause (command from the Execution service, or breakpoint) if g.pause.is_set(): g.resume.wait() @@ -215,6 +239,10 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An if thread_id not in g.depth: g.depth[thread_id] = 0 + # this allows calling actions in pause/resume callbacks by treating them as ordinary methods (no events, etc.) + if thread_id == g.callback_thread_id: + return f(obj, *action_args, an=an, **kwargs) + try: action_id: None | str = None action_mapping_provided = hasattr(obj, ACTION_NAME_ID_MAPPING_ATTR) @@ -273,7 +301,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An print_event(state_before) - handle_stdin_commands(before=True, breakpoint=make_a_break) + handle_commands(before=True, breakpoint=make_a_break) else: # handle outermost actions # if not set (e.g. when writing actions manually), do not attempt to get action IDs from names @@ -311,7 +339,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An g.ea[thread_id] = None - handle_stdin_commands(before=False) + handle_commands(before=False) return res diff --git a/src/python/arcor2_runtime/tests/test_action.py b/src/python/arcor2_runtime/tests/test_action.py index ee11a1f8..cd3fe715 100644 --- a/src/python/arcor2_runtime/tests/test_action.py +++ b/src/python/arcor2_runtime/tests/test_action.py @@ -2,6 +2,7 @@ import io import threading import time +from queue import Empty, Queue import pytest @@ -9,12 +10,37 @@ from arcor2.data.events import ActionStateAfter, ActionStateBefore, PackageState from arcor2.exceptions import Arcor2Exception from arcor2.object_types.abstract import Generic -from arcor2_runtime import action as glob_action from arcor2_runtime.action import ACTION_NAME_ID_MAPPING_ATTR, AP_ID_ATTR, patch_object_actions -def test_patch_object_actions(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +@pytest.fixture +def mock_stdin(monkeypatch): + sio = io.StringIO() + sio.fileno = lambda: 0 # type: ignore # fake whatever fileno + monkeypatch.setattr("sys.stdin", sio) + return sio + + +@pytest.fixture +def setup_action(): + cmd_q: Queue[str] = Queue() + + def read_stdin(timeout: float = 0.0) -> str | None: + try: + return cmd_q.get(timeout=timeout) + except Empty: + return None + + from arcor2_runtime import action + + action = importlib.reload(action) + action.read_stdin = read_stdin + + return action, cmd_q + + +def test_patch_object_actions(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class MyObject(Generic): def action(self, pose: Pose, *, an: None | str = None) -> None: @@ -22,11 +48,6 @@ def action(self, pose: Pose, *, an: None | str = None) -> None: action.__action__ = ActionMetadata() # type: ignore - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" pose = Pose(Position(0, 0, 0), Orientation(1, 0, 0, 0)) setattr(pose.position, AP_ID_ATTR, "pose") # set pose id (simulate pose declaration in scene json) @@ -70,8 +91,8 @@ def action(self, pose: Pose, *, an: None | str = None) -> None: assert action.g.ea[thread_id] is None -def test_patch_object_actions_without_mapping(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_patch_object_actions_without_mapping(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class MyObject(Generic): def action(self, pose: Pose, *, an: None | str = None) -> None: @@ -79,11 +100,6 @@ def action(self, pose: Pose, *, an: None | str = None) -> None: action.__action__ = ActionMetadata() # type: ignore - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" pose = Pose(Position(0, 0, 0), Orientation(1, 0, 0, 0)) setattr(pose.position, AP_ID_ATTR, "pose") # set pose id (simulate pose declaration in scene json) @@ -118,8 +134,8 @@ def action(self, pose: Pose, *, an: None | str = None) -> None: assert out_after2 == out_after -def test_composite_action(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_composite_action(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class MyObject(Generic): def inner_inner_action(self, *, an: None | str = None) -> None: @@ -149,11 +165,6 @@ def action_with_inner_name_spec(self, *, an: None | str = None) -> None: action_wo_flag.__action__ = ActionMetadata() # type: ignore action_with_inner_name_spec.__action__ = ActionMetadata() # type: ignore - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = MyObject(obj_id, "") @@ -205,8 +216,8 @@ def action_with_inner_name_spec(self, *, an: None | str = None) -> None: assert action.g.ea[thread_id] is None -def test_unknown_parameter_type(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_unknown_parameter_type(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class UnknownParameterType: pass @@ -217,11 +228,6 @@ def action(self, param: UnknownParameterType, *, an: None | str = None) -> None: action.__action__ = ActionMetadata() # type: ignore - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = MyObject(obj_id, "") @@ -251,8 +257,8 @@ def action(self, param: UnknownParameterType, *, an: None | str = None) -> None: assert action.g.ea[thread_id] is None -def test_parallel_actions(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_parallel_actions(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class ParallelActionsObject(Generic): def action1(self, *, an: None | str = None) -> None: @@ -264,11 +270,6 @@ def action2(self, *, an: None | str = None) -> None: action1.__action__ = ActionMetadata() # type: ignore action2.__action__ = ActionMetadata() # type: ignore - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = ParallelActionsObject(obj_id, "") @@ -293,8 +294,8 @@ def thread1(): assert None in action.g.ea -def test_breakpoints_pose(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_breakpoints_pose(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class ActionObject(Generic): def action1(self, pose: Pose, *, an: None | str = None) -> None: @@ -306,11 +307,6 @@ def action1(self, pose: Pose, *, an: None | str = None) -> None: pose1 = Pose() setattr(pose1.position, AP_ID_ATTR, ap1_id) # simulates what patch_aps does - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = ActionObject(obj_id, "") patch_object_actions(ActionObject) @@ -348,8 +344,8 @@ def thread1(): assert ps_evt.data.state == PackageState.Data.StateEnum.PAUSED -def test_breakpoints_joints(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_breakpoints_joints(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class ActionObject(Generic): def action1(self, joints: ProjectRobotJoints, *, an: None | str = None) -> None: @@ -361,11 +357,6 @@ def action1(self, joints: ProjectRobotJoints, *, an: None | str = None) -> None: joints = ProjectRobotJoints("test_name", "test_robot", []) setattr(joints, AP_ID_ATTR, ap1_id) # simulates what patch_aps does - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = ActionObject(obj_id, "") patch_object_actions(ActionObject) @@ -403,8 +394,8 @@ def thread1(): assert ps_evt.data.state == PackageState.Data.StateEnum.PAUSED -def test_breakpoints_pose_parallel(monkeypatch, capsys) -> None: - action = importlib.reload(glob_action) # to get local "copy" of the module +def test_breakpoints_pose_parallel(mock_stdin, capsys, setup_action) -> None: + action, _ = setup_action class ActionObject(Generic): def action1(self, pose: Pose, *, an: None | str = None) -> None: @@ -424,11 +415,6 @@ def action2(self, pose: Pose, *, an: None | str = None) -> None: pose2 = Pose() setattr(pose2.position, AP_ID_ATTR, ap2_id) # simulates what patch_aps does - # @action tries to read from stdin - sio = io.StringIO() - sio.fileno = lambda: 0 # type: ignore # fake whatever fileno - monkeypatch.setattr("sys.stdin", sio) - obj_id = "123" my_obj = ActionObject(obj_id, "") patch_object_actions(ActionObject) @@ -476,3 +462,76 @@ def thread2(): assert not th1.is_alive() assert not th2.is_alive() + + +def test_pause_resume_callbacks(mock_stdin, capsys, setup_action) -> None: + # TODO use the same approach in the rest of the tests + action, cmd_q = setup_action + + class ActionObject(Generic): + action_to_be_called_in_cb_called = 0 + + def action1(self, *, an: None | str = None) -> None: + pass + + def action_to_be_called_in_cb(self, *, an: None | str = None) -> None: + self.action_to_be_called_in_cb_called += 1 + + action1.__action__ = ActionMetadata() # type: ignore + action_to_be_called_in_cb.__action__ = ActionMetadata() # type: ignore + + obj_id = "123" + my_obj = ActionObject(obj_id, "") + patch_object_actions(ActionObject) + + pause_callback_called = threading.Event() + resume_callback_called = threading.Event() + + def pause_callback() -> None: + assert not pause_callback_called.is_set() + assert not resume_callback_called.is_set() + pause_callback_called.set() + my_obj.action_to_be_called_in_cb() + + def resume_callback() -> None: + assert pause_callback_called.is_set() + assert not resume_callback_called.is_set() + resume_callback_called.set() + my_obj.action_to_be_called_in_cb() + + def thread1(): + my_obj.action1() + + action.g.pause_callback = pause_callback + action.g.resume_callback = resume_callback + + # the actual test... + cmd_q.put("p") + + th1 = threading.Thread(target=thread1, daemon=True) + th1.start() + + assert action.g.pause.wait(0.2) + + cmd_q.put("r") + + th1.join(0.2) + assert not th1.is_alive() + assert action._cmd_thread.is_alive() + + assert pause_callback_called.is_set() + assert resume_callback_called.is_set() + assert my_obj.action_to_be_called_in_cb_called == 2 + + output, _ = capsys.readouterr() + arr = output.strip().split("\n") + assert len(arr) == 3 + + ps_evt = PackageState.from_json(arr[0]) + assert ps_evt.data.state == PackageState.Data.StateEnum.PAUSED + + before_evt = ActionStateBefore.from_json(arr[1]) + assert before_evt.data.thread_id is not None + + ps_evt = PackageState.from_json(arr[2]) + assert ps_evt.data.state == PackageState.Data.StateEnum.RUNNING