Skip to content

Commit

Permalink
[execution enums] changed the modules to use execution enums instead …
Browse files Browse the repository at this point in the history
…of string
  • Loading branch information
sunava committed Aug 19, 2024
1 parent 3dc1f8e commit 43ccd9c
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 77 deletions.
10 changes: 6 additions & 4 deletions src/pycram/designators/motion_designator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
MoveGripperMotion as ORMMoveGripperMotion, DetectingMotion as ORMDetectingMotion,
OpeningMotion as ORMOpeningMotion, ClosingMotion as ORMClosingMotion,
Motion as ORMMotionDesignator)
from ..datastructures.enums import ObjectType, Arms, GripperState
from ..datastructures.enums import ObjectType, Arms, GripperState, ExecutionType

from typing_extensions import Dict, Optional, get_type_hints
from ..datastructures.pose import Pose
from ..tasktree import with_tree
from ..designator import BaseMotion


@dataclass
class MoveMotion(BaseMotion):
"""
Expand Down Expand Up @@ -160,9 +161,9 @@ def perform(self):
if not world_object:
raise PerceptionObjectNotFound(
f"Could not find an object with the type {self.object_type} in the FOV of the robot")
if ProcessModuleManager.execution_type == "real":
if ProcessModuleManager.execution_type == ExecutionType.REAL:
return RealObject.Object(world_object.name, world_object.obj_type,
world_object, world_object.get_pose())
world_object, world_object.get_pose())

return ObjectDesignatorDescription.Object(world_object.name, world_object.obj_type,
world_object)
Expand Down Expand Up @@ -314,6 +315,7 @@ def insert(self, session: Session, *args, **kwargs) -> ORMClosingMotion:

return motion


@dataclass
class TalkingMotion(BaseMotion):
"""
Expand All @@ -334,4 +336,4 @@ def to_sql(self) -> ORMMotionDesignator:
pass

def insert(self, session: Session, *args, **kwargs) -> ORMMotionDesignator:
pass
pass
7 changes: 6 additions & 1 deletion src/pycram/external_interfaces/tmc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import rospy
from .designators.motion_designator import MoveGripperMotion, TalkingMotion
from typing_extensions import Optional

from ..datastructures.enums import GripperState
from ..designators.motion_designator import MoveGripperMotion, TalkingMotion

is_init = False

Expand All @@ -22,6 +25,7 @@ def tmc_gripper_control(designator: MoveGripperMotion, topic_name: Optional[str]
Publishes a message to the gripper controller to open or close the gripper for the HSR.
:param designator: The designator containing the motion to be executed
:param topic_name: The topic name to publish the message to
"""
if (designator.motion == GripperState.OPEN):
pub_gripper = rospy.Publisher(topic_name, GripperApplyEffortActionGoal,
Expand All @@ -45,6 +49,7 @@ def tmc_talk(designator: TalkingMotion, topic_name: Optional[str] = '/talk_reque
Publishes a sentence to the talk_request topic of the HSRB robot
:param designator: The designator containing the sentence to be spoken
:param topic_name: The topic name to publish the sentence to
"""
pub = rospy.Publisher(topic_name, Voice, queue_size=10)
texttospeech = Voice()
Expand Down
14 changes: 7 additions & 7 deletions src/pycram/process_modules/boxy_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,29 +200,29 @@ def __init__(self):
self._close_lock = Lock()

def navigate(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyNavigation(self._navigate_lock)

def looking(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyMoveHead(self._looking_lock)

def detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyDetecting(self._detecting_lock)

def move_tcp(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyMoveTCP(self._move_tcp_lock)

def move_arm_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyMoveArmJoints(self._move_arm_joints_lock)

def world_state_detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyWorldStateDetecting(self._world_state_detecting_lock)

def move_gripper(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return BoxyMoveGripper(self._move_gripper_lock)
20 changes: 10 additions & 10 deletions src/pycram/process_modules/default_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,41 +196,41 @@ def __init__(self):
self._close_lock = Lock()

def navigate(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultNavigation(self._navigate_lock)

def looking(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultMoveHead(self._looking_lock)

def detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultDetecting(self._detecting_lock)

def move_tcp(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultMoveTCP(self._move_tcp_lock)

def move_arm_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultMoveArmJoints(self._move_arm_joints_lock)

def world_state_detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultWorldStateDetecting(self._world_state_detecting_lock)

def move_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultMoveJoints(self._move_joints_lock)

def move_gripper(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultMoveGripper(self._move_gripper_lock)

def open(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultOpen(self._open_lock)

def close(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DefaultClose(self._close_lock)
16 changes: 8 additions & 8 deletions src/pycram/process_modules/donbot_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,33 @@ def __init__(self):
self._close_lock = Lock()

def navigate(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotNavigation(self._navigate_lock)

def place(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotPlace(self._place_lock)

def looking(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotMoveHead(self._looking_lock)

def detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotDetecting(self._detecting_lock)

def move_tcp(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotMoveTCP(self._move_tcp_lock)

def move_arm_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotMoveJoints(self._move_arm_joints_lock)

def world_state_detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotWorldStateDetecting(self._world_state_detecting_lock)

def move_gripper(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return DonbotMoveGripper(self._move_gripper_lock)
8 changes: 1 addition & 7 deletions src/pycram/process_modules/hsrb_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@
from threading import Lock
from typing_extensions import Any

from ..datastructures.enums import JointType, PerceptionTechniques, ExecutionType
from ..datastructures.enums import ExecutionType
from ..external_interfaces.tmc import tmc_gripper_control, tmc_talk
from ..robot_description import RobotDescription
from ..process_module import ProcessModule
from ..datastructures.pose import Point
from ..robot_descriptions import robot_description
from ..utils import _apply_ik
from ..external_interfaces.ik import request_ik
from .. import world_reasoning as btr
from ..local_transformer import LocalTransformer
from ..designators.motion_designator import *
from ..external_interfaces import giskard
from ..world_concepts.world_object import Object
from ..datastructures.world import World
from pydub import AudioSegment
from pydub.playback import play
Expand Down
41 changes: 21 additions & 20 deletions src/pycram/process_modules/pr2_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ..datastructures.world import World
from ..world_concepts.world_object import Object
from ..datastructures.pose import Pose
from ..datastructures.enums import JointType, ObjectType, Arms
from ..datastructures.enums import JointType, ObjectType, Arms, ExecutionType
from ..external_interfaces import giskard
from ..external_interfaces.robokudo import query

Expand Down Expand Up @@ -375,59 +375,60 @@ def __init__(self):
self._close_lock = Lock()

def navigate(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2Navigation(self._navigate_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2NavigationReal(self._navigate_lock)

def looking(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2MoveHead(self._looking_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2MoveHeadReal(self._looking_lock)

def detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2Detecting(self._detecting_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2DetectingReal(self._detecting_lock)

def move_tcp(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2MoveTCP(self._move_tcp_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2MoveTCPReal(self._move_tcp_lock)

def move_arm_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2MoveArmJoints(self._move_arm_joints_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2MoveArmJointsReal(self._move_arm_joints_lock)

def world_state_detecting(self):
if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real":
if (ProcessModuleManager.execution_type == ExecutionType.SIMULATED or
ProcessModuleManager.execution_type == ExecutionType.REAL):
return Pr2WorldStateDetecting(self._world_state_detecting_lock)

def move_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return PR2MoveJoints(self._move_joints_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2MoveJointsReal(self._move_joints_lock)

def move_gripper(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2MoveGripper(self._move_gripper_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2MoveGripperReal(self._move_gripper_lock)

def open(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2Open(self._open_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2OpenReal(self._open_lock)

def close(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return Pr2Close(self._close_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return Pr2CloseReal(self._close_lock)
38 changes: 19 additions & 19 deletions src/pycram/process_modules/stretch_process_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,59 +295,59 @@ def __init__(self):
self._close_lock = Lock()

def navigate(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchNavigate(self._navigate_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchNavigationReal(self._navigate_lock)

def looking(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchMoveHead(self._looking_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchMoveHeadReal(self._looking_lock)

def detecting(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchDetecting(self._detecting_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchDetectingReal(self._detecting_lock)

def move_tcp(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchMoveTCP(self._move_tcp_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchMoveTCPReal(self._move_tcp_lock)

def move_arm_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchMoveArmJoints(self._move_arm_joints_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchMoveArmJointsReal(self._move_arm_joints_lock)

def world_state_detecting(self):
if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED or ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchWorldStateDetecting(self._world_state_detecting_lock)

def move_joints(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchMoveJoints(self._move_joints_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchMoveJointsReal(self._move_joints_lock)

def move_gripper(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchMoveGripper(self._move_gripper_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchMoveGripperReal(self._move_gripper_lock)

def open(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchOpen(self._open_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchOpenReal(self._open_lock)

def close(self):
if ProcessModuleManager.execution_type == "simulated":
if ProcessModuleManager.execution_type == ExecutionType.SIMULATED:
return StretchClose(self._close_lock)
elif ProcessModuleManager.execution_type == "real":
elif ProcessModuleManager.execution_type == ExecutionType.REAL:
return StretchCloseReal(self._close_lock)
2 changes: 1 addition & 1 deletion src/pycram/ros/tf_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TFBroadcaster:
"""
Broadcaster that publishes TF frames for every object in the World.
"""
def __init__(self, projection_namespace="simulated", odom_frame="odom", interval=0.1):
def __init__(self, projection_namespace= ExecutionType.SIMULATED, odom_frame="odom", interval=0.1):
"""
The broadcaster prefixes all published TF messages with a projection namespace to distinguish between the TF
frames from the simulation and the one from the real robot.
Expand Down

0 comments on commit 43ccd9c

Please sign in to comment.