From 43ccd9c4ccdb9aeaff6afb3cbeace682e4a86b78 Mon Sep 17 00:00:00 2001 From: Vanessa Hassouna Date: Mon, 19 Aug 2024 11:59:05 +0200 Subject: [PATCH] [execution enums] changed the modules to use execution enums instead of string --- src/pycram/designators/motion_designator.py | 10 +++-- src/pycram/external_interfaces/tmc.py | 7 +++- .../process_modules/boxy_process_modules.py | 14 +++---- .../default_process_modules.py | 20 ++++----- .../process_modules/donbot_process_modules.py | 16 ++++---- .../process_modules/hsrb_process_modules.py | 8 +--- .../process_modules/pr2_process_modules.py | 41 ++++++++++--------- .../stretch_process_modules.py | 38 ++++++++--------- src/pycram/ros/tf_broadcaster.py | 2 +- 9 files changed, 79 insertions(+), 77 deletions(-) diff --git a/src/pycram/designators/motion_designator.py b/src/pycram/designators/motion_designator.py index 574a57c16..5e1174edd 100644 --- a/src/pycram/designators/motion_designator.py +++ b/src/pycram/designators/motion_designator.py @@ -12,13 +12,14 @@ MoveGripperMotion as ORMMoveGripperMotion, DetectingMotion as ORMDetectingMotion, OpeningMotion as ORMOpeningMotion, ClosingMotion as ORMClosingMotion, Motion as ORMMotionDesignator) -from ..datastructures.enums import ObjectType, Arms, GripperState +from ..datastructures.enums import ObjectType, Arms, GripperState, ExecutionType from typing_extensions import Dict, Optional, get_type_hints from ..datastructures.pose import Pose from ..tasktree import with_tree from ..designator import BaseMotion + @dataclass class MoveMotion(BaseMotion): """ @@ -160,9 +161,9 @@ def perform(self): if not world_object: raise PerceptionObjectNotFound( f"Could not find an object with the type {self.object_type} in the FOV of the robot") - if ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return RealObject.Object(world_object.name, world_object.obj_type, - world_object, world_object.get_pose()) + world_object, world_object.get_pose()) return ObjectDesignatorDescription.Object(world_object.name, world_object.obj_type, world_object) @@ -314,6 +315,7 @@ def insert(self, session: Session, *args, **kwargs) -> ORMClosingMotion: return motion + @dataclass class TalkingMotion(BaseMotion): """ @@ -334,4 +336,4 @@ def to_sql(self) -> ORMMotionDesignator: pass def insert(self, session: Session, *args, **kwargs) -> ORMMotionDesignator: - pass \ No newline at end of file + pass diff --git a/src/pycram/external_interfaces/tmc.py b/src/pycram/external_interfaces/tmc.py index e0fa584e6..038dad2bf 100644 --- a/src/pycram/external_interfaces/tmc.py +++ b/src/pycram/external_interfaces/tmc.py @@ -1,5 +1,8 @@ import rospy -from .designators.motion_designator import MoveGripperMotion, TalkingMotion +from typing_extensions import Optional + +from ..datastructures.enums import GripperState +from ..designators.motion_designator import MoveGripperMotion, TalkingMotion is_init = False @@ -22,6 +25,7 @@ def tmc_gripper_control(designator: MoveGripperMotion, topic_name: Optional[str] Publishes a message to the gripper controller to open or close the gripper for the HSR. :param designator: The designator containing the motion to be executed + :param topic_name: The topic name to publish the message to """ if (designator.motion == GripperState.OPEN): pub_gripper = rospy.Publisher(topic_name, GripperApplyEffortActionGoal, @@ -45,6 +49,7 @@ def tmc_talk(designator: TalkingMotion, topic_name: Optional[str] = '/talk_reque Publishes a sentence to the talk_request topic of the HSRB robot :param designator: The designator containing the sentence to be spoken + :param topic_name: The topic name to publish the sentence to """ pub = rospy.Publisher(topic_name, Voice, queue_size=10) texttospeech = Voice() diff --git a/src/pycram/process_modules/boxy_process_modules.py b/src/pycram/process_modules/boxy_process_modules.py index 5dc25d8f9..b8abbb0b2 100644 --- a/src/pycram/process_modules/boxy_process_modules.py +++ b/src/pycram/process_modules/boxy_process_modules.py @@ -200,29 +200,29 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyNavigation(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveArmJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyWorldStateDetecting(self._world_state_detecting_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveGripper(self._move_gripper_lock) diff --git a/src/pycram/process_modules/default_process_modules.py b/src/pycram/process_modules/default_process_modules.py index 7c9a76dd5..40a3dbdc7 100644 --- a/src/pycram/process_modules/default_process_modules.py +++ b/src/pycram/process_modules/default_process_modules.py @@ -196,41 +196,41 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultNavigation(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveArmJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultWorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveJoints(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveGripper(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultOpen(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultClose(self._close_lock) diff --git a/src/pycram/process_modules/donbot_process_modules.py b/src/pycram/process_modules/donbot_process_modules.py index e9ff45edc..e3fcec033 100644 --- a/src/pycram/process_modules/donbot_process_modules.py +++ b/src/pycram/process_modules/donbot_process_modules.py @@ -149,33 +149,33 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotNavigation(self._navigate_lock) def place(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotPlace(self._place_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotWorldStateDetecting(self._world_state_detecting_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveGripper(self._move_gripper_lock) diff --git a/src/pycram/process_modules/hsrb_process_modules.py b/src/pycram/process_modules/hsrb_process_modules.py index e6abc7992..bda99aef8 100644 --- a/src/pycram/process_modules/hsrb_process_modules.py +++ b/src/pycram/process_modules/hsrb_process_modules.py @@ -3,19 +3,13 @@ from threading import Lock from typing_extensions import Any -from ..datastructures.enums import JointType, PerceptionTechniques, ExecutionType +from ..datastructures.enums import ExecutionType from ..external_interfaces.tmc import tmc_gripper_control, tmc_talk from ..robot_description import RobotDescription from ..process_module import ProcessModule -from ..datastructures.pose import Point -from ..robot_descriptions import robot_description -from ..utils import _apply_ik -from ..external_interfaces.ik import request_ik -from .. import world_reasoning as btr from ..local_transformer import LocalTransformer from ..designators.motion_designator import * from ..external_interfaces import giskard -from ..world_concepts.world_object import Object from ..datastructures.world import World from pydub import AudioSegment from pydub.playback import play diff --git a/src/pycram/process_modules/pr2_process_modules.py b/src/pycram/process_modules/pr2_process_modules.py index 0ac65ba8d..261094ccc 100644 --- a/src/pycram/process_modules/pr2_process_modules.py +++ b/src/pycram/process_modules/pr2_process_modules.py @@ -19,7 +19,7 @@ from ..datastructures.world import World from ..world_concepts.world_object import Object from ..datastructures.pose import Pose -from ..datastructures.enums import JointType, ObjectType, Arms +from ..datastructures.enums import JointType, ObjectType, Arms, ExecutionType from ..external_interfaces import giskard from ..external_interfaces.robokudo import query @@ -375,59 +375,60 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Navigation(self._navigate_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2NavigationReal(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveHead(self._looking_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveHeadReal(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Detecting(self._detecting_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2DetectingReal(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveTCP(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveTCPReal(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveArmJoints(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveArmJointsReal(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real": + if (ProcessModuleManager.execution_type == ExecutionType.SIMULATED or + ProcessModuleManager.execution_type == ExecutionType.REAL): return Pr2WorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return PR2MoveJoints(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveJointsReal(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveGripper(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveGripperReal(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Open(self._open_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2OpenReal(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Close(self._close_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2CloseReal(self._close_lock) diff --git a/src/pycram/process_modules/stretch_process_modules.py b/src/pycram/process_modules/stretch_process_modules.py index 77b33596a..3dd46d33b 100644 --- a/src/pycram/process_modules/stretch_process_modules.py +++ b/src/pycram/process_modules/stretch_process_modules.py @@ -295,59 +295,59 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchNavigate(self._navigate_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchNavigationReal(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveHead(self._looking_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveHeadReal(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchDetecting(self._detecting_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchDetectingReal(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveTCP(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveTCPReal(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveArmJoints(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveArmJointsReal(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED or ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchWorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveJoints(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveJointsReal(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveGripper(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveGripperReal(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchOpen(self._open_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchOpenReal(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchClose(self._close_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchCloseReal(self._close_lock) diff --git a/src/pycram/ros/tf_broadcaster.py b/src/pycram/ros/tf_broadcaster.py index 9f4661a43..8cf46578b 100644 --- a/src/pycram/ros/tf_broadcaster.py +++ b/src/pycram/ros/tf_broadcaster.py @@ -12,7 +12,7 @@ class TFBroadcaster: """ Broadcaster that publishes TF frames for every object in the World. """ - def __init__(self, projection_namespace="simulated", odom_frame="odom", interval=0.1): + def __init__(self, projection_namespace= ExecutionType.SIMULATED, odom_frame="odom", interval=0.1): """ The broadcaster prefixes all published TF messages with a projection namespace to distinguish between the TF frames from the simulation and the one from the real robot.