Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arcor_runtime): add callbacks for pause and resume events #864

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o

[README](src/python/arcor2_execution/README.md) | [CHANGELOG](src/python/arcor2_execution/CHANGELOG.md)

- 2024-06-26: [1.4.2](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.4.2) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.4.2), [pypi](https://pypi.org/project/arcor2-execution/1.4.2/)).
- 2024-08-14: [1.5.0](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.5.0) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.5.0), [pypi](https://pypi.org/project/arcor2-execution/1.5.0/)).

### arcor2_execution_data

Expand Down Expand Up @@ -126,7 +126,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o

[README](src/python/arcor2_runtime/README.md) | [CHANGELOG](src/python/arcor2_runtime/CHANGELOG.md)

- 2024-06-14: [1.3.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.3.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.3.0/)).
- 2024-08-14: [1.4.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.4.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.4.0/)).

### arcor2_scene

Expand Down
2 changes: 1 addition & 1 deletion compose-files/fit-demo/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
- fit-demo-project-network

fit-demo-execution:
image: arcor2/arcor2_execution:1.4.2
image: arcor2/arcor2_execution:1.5.0
container_name: fit-demo-execution
networks:
- fit-demo-execution-network
Expand Down
2 changes: 1 addition & 1 deletion src/docker/arcor2_execution/BUILD
Original file line number Diff line number Diff line change
@@ -1 +1 @@
docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.4.2"])
docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.5.0"])
6 changes: 6 additions & 0 deletions src/python/arcor2_execution/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [1.5.0] - 2024-08-14

### Changed

- Dependency on `arcor2_runtime~=1.4.0`.

## [1.4.2] - 2024-06-26

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/python/arcor2_execution/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.2
1.5.0
6 changes: 6 additions & 0 deletions src/python/arcor2_runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [1.4.0] - 2024-08-14

### Added

- There are optional callbacks for pause and resume events in the `action.Globals`.

## [1.3.0] - 2024-06-14

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/python/arcor2_runtime/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.0
1.4.0
38 changes: 33 additions & 5 deletions src/python/arcor2_runtime/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from arcor2.object_types.abstract import Generic
from arcor2.object_types.utils import iterate_over_actions
from arcor2.parameter_plugins.utils import plugin_from_instance
from arcor2_runtime.exceptions import print_exception


class Commands(StrEnum):
Expand All @@ -23,6 +24,8 @@ class Commands(StrEnum):
ACTION_NAME_ID_MAPPING_ATTR = "_action_name_id_mapping"
AP_ID_ATTR = "_ap_id"

CB_TYPE = Callable[[], None] | None


@dataclass
class Globals:
Expand All @@ -41,6 +44,13 @@ class Globals:

lock: threading.Lock = field(default_factory=threading.Lock)

# can be used to react on pause/resume events
# callbacks are called before the actual event happens
pause_callback: CB_TYPE = None
resume_callback: CB_TYPE = None
# id of the thread within the one of the callbacks (only one can be called at the time)
callback_thread_id: int = 0


g = Globals()

Expand Down Expand Up @@ -114,7 +124,7 @@ def print_event(event: Event) -> None:
sys.stdout.flush()


def _get_commands():
def _get_stdin_commands():
"""Reads stdin and checks for commands from parent script (e.g. Execution
unit). Prints events to stdout. State is signalled using events.

Expand All @@ -124,6 +134,16 @@ def _get_commands():
:return:
"""

def callback(cb: CB_TYPE) -> None:
if cb:
g.callback_thread_id = threading.get_ident()
try:
cb()
except Exception as e: # otherwise, the thread will stop silently
print_exception(e)
sys.exit(1)
g.callback_thread_id = 0

while True:
raw_cmd = read_stdin(0.1)

Expand All @@ -135,6 +155,7 @@ def _get_commands():
with g.lock:
if g.pause.is_set():
if cmd in (Commands.STEP, Commands.RESUME):
callback(g.resume_callback)
g.pause.clear()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING)))

Expand All @@ -144,26 +165,29 @@ def _get_commands():
g.resume.set()
else:
if cmd == Commands.PAUSE:
callback(g.pause_callback)
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))


_cmd_thread = threading.Thread(target=_get_commands)
_cmd_thread = threading.Thread(target=_get_stdin_commands)
_cmd_thread.daemon = True
_cmd_thread.start()


def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None:
def handle_commands(*, before: bool, breakpoint: bool = False) -> None:
"""Actual handling of commands in (potentially parallel) actions."""

with g.lock:
# this is to handle pause caused by breakpoint
if (breakpoint or (before and g.pause_on_next_action.is_set())) and not g.pause.is_set():
g.pause_on_next_action.clear()
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

# waiting - the same for both "external" and "internal" pause (command from the Execution service, or breakpoint)
if g.pause.is_set():
g.resume.wait()

Expand Down Expand Up @@ -215,6 +239,10 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An
if thread_id not in g.depth:
g.depth[thread_id] = 0

# this allows calling actions in pause/resume callbacks by treating them as ordinary methods (no events, etc.)
if thread_id == g.callback_thread_id:
return f(obj, *action_args, an=an, **kwargs)

try:
action_id: None | str = None
action_mapping_provided = hasattr(obj, ACTION_NAME_ID_MAPPING_ATTR)
Expand Down Expand Up @@ -273,7 +301,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An

print_event(state_before)

handle_stdin_commands(before=True, breakpoint=make_a_break)
handle_commands(before=True, breakpoint=make_a_break)

else: # handle outermost actions
# if not set (e.g. when writing actions manually), do not attempt to get action IDs from names
Expand Down Expand Up @@ -311,7 +339,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An

g.ea[thread_id] = None

handle_stdin_commands(before=False)
handle_commands(before=False)

return res

Expand Down
Loading
Loading