Skip to content

Commit

Permalink
feat(arcor_runtime): add callbacks for pause and resume events
Browse files Browse the repository at this point in the history
  • Loading branch information
ZdenekM authored and ZdenekM committed Aug 14, 2024
1 parent ff788f6 commit 8b3b5b1
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 68 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o

[README](src/python/arcor2_execution/README.md) | [CHANGELOG](src/python/arcor2_execution/CHANGELOG.md)

- 2024-06-26: [1.4.2](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.4.2) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.4.2), [pypi](https://pypi.org/project/arcor2-execution/1.4.2/)).
- 2024-08-14: [1.5.0](https://github.com/robofit/arcor2/releases/tag/arcor2_execution%2F1.5.0) ([docker](https://hub.docker.com/r/arcor2/arcor2_execution/tags?page=1&ordering=last_updated&name=1.5.0), [pypi](https://pypi.org/project/arcor2-execution/1.5.0/)).

### arcor2_execution_data

Expand Down Expand Up @@ -126,7 +126,7 @@ The following video by [Kinali](https://www.kinali.cz/en/) shows the use case (o

[README](src/python/arcor2_runtime/README.md) | [CHANGELOG](src/python/arcor2_runtime/CHANGELOG.md)

- 2024-06-14: [1.3.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.3.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.3.0/)).
- 2024-08-14: [1.4.0](https://github.com/robofit/arcor2/releases/tag/arcor2_runtime%2F1.4.0) ([pypi](https://pypi.org/project/arcor2-runtime/1.4.0/)).

### arcor2_scene

Expand Down
2 changes: 1 addition & 1 deletion compose-files/fit-demo/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
- fit-demo-project-network

fit-demo-execution:
image: arcor2/arcor2_execution:1.4.2
image: arcor2/arcor2_execution:1.5.0
container_name: fit-demo-execution
networks:
- fit-demo-execution-network
Expand Down
2 changes: 1 addition & 1 deletion src/docker/arcor2_execution/BUILD
Original file line number Diff line number Diff line change
@@ -1 +1 @@
docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.4.2"])
docker_image(name="arcor2_execution", repository="arcor2/arcor2_execution", image_tags=["1.5.0"])
6 changes: 6 additions & 0 deletions src/python/arcor2_execution/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [1.5.0] - 2024-08-14

### Changed

- Dependency on `arcor2_runtime~=1.4.0`.

## [1.4.2] - 2024-06-26

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/python/arcor2_execution/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.2
1.5.0
6 changes: 6 additions & 0 deletions src/python/arcor2_runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [1.4.0] - 2024-08-14

### Added

- There are optional callbacks for pause and resume events in the `action.Globals`.

## [1.3.0] - 2024-06-14

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/python/arcor2_runtime/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.0
1.4.0
38 changes: 33 additions & 5 deletions src/python/arcor2_runtime/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from arcor2.object_types.abstract import Generic
from arcor2.object_types.utils import iterate_over_actions
from arcor2.parameter_plugins.utils import plugin_from_instance
from arcor2_runtime.exceptions import print_exception


class Commands(StrEnum):
Expand All @@ -23,6 +24,8 @@ class Commands(StrEnum):
ACTION_NAME_ID_MAPPING_ATTR = "_action_name_id_mapping"
AP_ID_ATTR = "_ap_id"

CB_TYPE = Callable[[], None] | None


@dataclass
class Globals:
Expand All @@ -41,6 +44,13 @@ class Globals:

lock: threading.Lock = field(default_factory=threading.Lock)

# can be used to react on pause/resume events
# callbacks are called before the actual event happens
pause_callback: CB_TYPE = None
resume_callback: CB_TYPE = None
# id of the thread within the one of the callbacks (only one can be called at the time)
callback_thread_id: int = 0


g = Globals()

Expand Down Expand Up @@ -114,7 +124,7 @@ def print_event(event: Event) -> None:
sys.stdout.flush()


def _get_commands():
def _get_stdin_commands():
"""Reads stdin and checks for commands from parent script (e.g. Execution
unit). Prints events to stdout. State is signalled using events.
Expand All @@ -124,6 +134,16 @@ def _get_commands():
:return:
"""

def callback(cb: CB_TYPE) -> None:
if cb:
g.callback_thread_id = threading.get_ident()
try:
cb()
except Exception as e: # otherwise, the thread will stop silently
print_exception(e)
sys.exit(1)
g.callback_thread_id = 0

while True:
raw_cmd = read_stdin(0.1)

Expand All @@ -135,6 +155,7 @@ def _get_commands():
with g.lock:
if g.pause.is_set():
if cmd in (Commands.STEP, Commands.RESUME):
callback(g.resume_callback)
g.pause.clear()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING)))

Expand All @@ -144,26 +165,29 @@ def _get_commands():
g.resume.set()
else:
if cmd == Commands.PAUSE:
callback(g.pause_callback)
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))


_cmd_thread = threading.Thread(target=_get_commands)
_cmd_thread = threading.Thread(target=_get_stdin_commands)
_cmd_thread.daemon = True
_cmd_thread.start()


def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None:
def handle_commands(*, before: bool, breakpoint: bool = False) -> None:
"""Actual handling of commands in (potentially parallel) actions."""

with g.lock:
# this is to handle pause caused by breakpoint
if (breakpoint or (before and g.pause_on_next_action.is_set())) and not g.pause.is_set():
g.pause_on_next_action.clear()
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

# waiting - the same for both "external" and "internal" pause (command from the Execution service, or breakpoint)
if g.pause.is_set():
g.resume.wait()

Expand Down Expand Up @@ -215,6 +239,10 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An
if thread_id not in g.depth:
g.depth[thread_id] = 0

# this allows calling actions in pause/resume callbacks by treating them as ordinary methods (no events, etc.)
if thread_id == g.callback_thread_id:
return f(obj, *action_args, an=an, **kwargs)

try:
action_id: None | str = None
action_mapping_provided = hasattr(obj, ACTION_NAME_ID_MAPPING_ATTR)
Expand Down Expand Up @@ -273,7 +301,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An

print_event(state_before)

handle_stdin_commands(before=True, breakpoint=make_a_break)
handle_commands(before=True, breakpoint=make_a_break)

else: # handle outermost actions
# if not set (e.g. when writing actions manually), do not attempt to get action IDs from names
Expand Down Expand Up @@ -311,7 +339,7 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An

g.ea[thread_id] = None

handle_stdin_commands(before=False)
handle_commands(before=False)

return res

Expand Down
Loading

0 comments on commit 8b3b5b1

Please sign in to comment.