diff --git a/requirements.txt b/requirements.txt index ba876360f..92147cb58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,9 @@ psutil==5.9.7 lxml==4.9.1 typing_extensions==4.9.0 owlready2>=0.45 -catkin_pkg \ No newline at end of file +catkin_pkg +Pillow~=10.1.0 +pynput~=1.7.7 +playsound~=1.3.0 +pydub~=0.25.1 +gTTS~=2.5.3 diff --git a/src/pycram/datastructures/enums.py b/src/pycram/datastructures/enums.py index 301263473..89e625249 100644 --- a/src/pycram/datastructures/enums.py +++ b/src/pycram/datastructures/enums.py @@ -2,6 +2,11 @@ from enum import Enum, auto +class ExecutionType(Enum): + """Enum for Execution Process Module types.""" + REAL = auto() + SIMULATED = auto() + SEMI_REAL = auto() class Arms(Enum): """Enum for Arms.""" @@ -120,3 +125,36 @@ class GripperType(Enum): CUSTOM = auto() +class PerceptionTechniques(Enum): + """ + Enum for techniques for perception tasks. + """ + ALL = auto() + HUMAN = auto() + TYPES = auto() + + +class ImageEnum(Enum): + """ + Enum for image switch view on hsrb display. + """ + HI = 0 + TALK = 1 + DISH = 2 + DONE = 3 + DROP = 4 + HANDOVER = 5 + ORDER = 6 + PICKING = 7 + PLACING = 8 + REPEAT = 9 + SEARCH = 10 + WAVING = 11 + FOLLOWING = 12 + DRIVINGBACK = 13 + PUSHBUTTONS = 14 + FOLLOWSTOP = 15 + JREPEAT = 16 + SOFA = 17 + INSPECT = 18 + CHAIR = 37 diff --git a/src/pycram/designators/motion_designator.py b/src/pycram/designators/motion_designator.py index b008c8561..5e1174edd 100644 --- a/src/pycram/designators/motion_designator.py +++ b/src/pycram/designators/motion_designator.py @@ -12,13 +12,14 @@ MoveGripperMotion as ORMMoveGripperMotion, DetectingMotion as ORMDetectingMotion, OpeningMotion as ORMOpeningMotion, ClosingMotion as ORMClosingMotion, Motion as ORMMotionDesignator) -from ..datastructures.enums import ObjectType, Arms, GripperState +from ..datastructures.enums import ObjectType, Arms, GripperState, ExecutionType from typing_extensions import Dict, Optional, get_type_hints from ..datastructures.pose import Pose from ..tasktree import with_tree from ..designator import BaseMotion + @dataclass class MoveMotion(BaseMotion): """ @@ -160,9 +161,9 @@ def perform(self): if not world_object: raise PerceptionObjectNotFound( f"Could not find an object with the type {self.object_type} in the FOV of the robot") - if ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return RealObject.Object(world_object.name, world_object.obj_type, - world_object, world_object.get_pose()) + world_object, world_object.get_pose()) return ObjectDesignatorDescription.Object(world_object.name, world_object.obj_type, world_object) @@ -313,3 +314,26 @@ def insert(self, session: Session, *args, **kwargs) -> ORMClosingMotion: session.add(motion) return motion + + +@dataclass +class TalkingMotion(BaseMotion): + """ + Talking Motion, lets the robot say a sentence. + """ + + cmd: str + """ + Talking Motion, let the robot say a sentence. + """ + + @with_tree + def perform(self): + pm_manager = ProcessModuleManager.get_manager() + return pm_manager.talk().execute(self) + + def to_sql(self) -> ORMMotionDesignator: + pass + + def insert(self, session: Session, *args, **kwargs) -> ORMMotionDesignator: + pass diff --git a/src/pycram/external_interfaces/giskard.py b/src/pycram/external_interfaces/giskard.py index 925584fa5..958a32751 100644 --- a/src/pycram/external_interfaces/giskard.py +++ b/src/pycram/external_interfaces/giskard.py @@ -22,7 +22,7 @@ try: from giskardpy.python_interface.old_python_interface import OldGiskardWrapper as GiskardWrapper from giskard_msgs.msg import WorldBody, MoveResult, CollisionEntry - from giskard_msgs.srv import UpdateWorldRequest, UpdateWorld, UpdateWorldResponse, RegisterGroupResponse + #from giskard_msgs.srv import UpdateWorldRequest, UpdateWorld, UpdateWorldResponse, RegisterGroupResponse except ModuleNotFoundError as e: rospy.logwarn("Failed to import Giskard messages, the real robot will not be available") diff --git a/src/pycram/external_interfaces/tmc.py b/src/pycram/external_interfaces/tmc.py new file mode 100644 index 000000000..038dad2bf --- /dev/null +++ b/src/pycram/external_interfaces/tmc.py @@ -0,0 +1,60 @@ +import rospy +from typing_extensions import Optional + +from ..datastructures.enums import GripperState +from ..designators.motion_designator import MoveGripperMotion, TalkingMotion + +is_init = False + + +def init_tmc_interface(): + global is_init + if is_init: + return + try: + from tmc_control_msgs.msg import GripperApplyEffortActionGoal + from tmc_msgs.msg import Voice + is_init = True + rospy.loginfo("Successfully initialized tmc interface") + except ModuleNotFoundError as e: + rospy.logwarn(f"Could not import TMC messages, tmc interface could not be initialized") + + +def tmc_gripper_control(designator: MoveGripperMotion, topic_name: Optional[str] = '/hsrb/gripper_controller/grasp/goal'): + """ + Publishes a message to the gripper controller to open or close the gripper for the HSR. + + :param designator: The designator containing the motion to be executed + :param topic_name: The topic name to publish the message to + """ + if (designator.motion == GripperState.OPEN): + pub_gripper = rospy.Publisher(topic_name, GripperApplyEffortActionGoal, + queue_size=10) + rate = rospy.Rate(10) + msg = GripperApplyEffortActionGoal() + msg.goal.effort = 0.8 + pub_gripper.publish(msg) + + elif (designator.motion == GripperState.CLOSE): + pub_gripper = rospy.Publisher(topic_name, GripperApplyEffortActionGoal, + queue_size=10) + rate = rospy.Rate(10) + msg = GripperApplyEffortActionGoal() + msg.goal.effort = -0.8 + pub_gripper.publish(msg) + + +def tmc_talk(designator: TalkingMotion, topic_name: Optional[str] = '/talk_request'): + """ + Publishes a sentence to the talk_request topic of the HSRB robot + + :param designator: The designator containing the sentence to be spoken + :param topic_name: The topic name to publish the sentence to + """ + pub = rospy.Publisher(topic_name, Voice, queue_size=10) + texttospeech = Voice() + # language 1 = english (0 = japanese) + texttospeech.language = 1 + texttospeech.sentence = designator.cmd + + pub.publish(texttospeech) diff --git a/src/pycram/process_module.py b/src/pycram/process_module.py index 50ce0ee71..a1ee38f19 100644 --- a/src/pycram/process_module.py +++ b/src/pycram/process_module.py @@ -16,6 +16,7 @@ from .language import Language from .robot_description import RobotDescription from typing_extensions import TYPE_CHECKING +from .datastructures.enums import ExecutionType if TYPE_CHECKING: from .designators.motion_designator import BaseMotion @@ -26,7 +27,7 @@ class ProcessModule: Implementation of process modules. Process modules are the part that communicate with the outer world to execute designators. """ - execution_delay = True + execution_delay = False """ Adds a delay of 0.5 seconds after executing a process module, to make the execution in simulation more realistic """ @@ -89,7 +90,7 @@ def __enter__(self): sets it to 'real' """ self.pre = ProcessModuleManager.execution_type - ProcessModuleManager.execution_type = "real" + ProcessModuleManager.execution_type = ExecutionType.REAL self.pre_delay = ProcessModule.execution_delay ProcessModule.execution_delay = False @@ -127,7 +128,7 @@ def __enter__(self): sets it to 'simulated' """ self.pre = ProcessModuleManager.execution_type - ProcessModuleManager.execution_type = "simulated" + ProcessModuleManager.execution_type = ExecutionType.SIMULATED def __exit__(self, _type, value, traceback): """ @@ -140,6 +141,41 @@ def __call__(self): return self +class SemiRealRobot: + """ + Management class for executing designators on the semi-real robot. This is intended to be used in a with environment. + When importing this class an instance is imported instead. + + Example: + + .. code-block:: python + + with semi_real_robot: + some designators + """ + + def __init__(self): + self.pre: str = "" + + def __enter__(self): + """ + Entering function for 'with' scope, saves the previously set :py:attr:`~ProcessModuleManager.execution_type` and + sets it to 'semi_real' + """ + self.pre = ProcessModuleManager.execution_type + ProcessModuleManager.execution_type = ExecutionType.SEMI_REAL + + def __exit__(self, type, value, traceback): + """ + Exit method for the 'with' scope, sets the :py:attr:`~ProcessModuleManager.execution_type` to the previously + used one. + """ + ProcessModuleManager.execution_type = self.pre + + def __call__(self): + return self + + def with_real_robot(func: Callable) -> Callable: """ Decorator to execute designators in the decorated class on the real robot. @@ -158,7 +194,7 @@ def plan(): def wrapper(*args, **kwargs): pre = ProcessModuleManager.execution_type - ProcessModuleManager.execution_type = "real" + ProcessModuleManager.execution_type = ExecutionType.REAL ret = func(*args, **kwargs) ProcessModuleManager.execution_type = pre return ret @@ -184,7 +220,7 @@ def plan(): def wrapper(*args, **kwargs): pre = ProcessModuleManager.execution_type - ProcessModuleManager.execution_type = "simulated" + ProcessModuleManager.execution_type = ExecutionType.Simulated ret = func(*args, **kwargs) ProcessModuleManager.execution_type = pre return ret @@ -195,6 +231,7 @@ def wrapper(*args, **kwargs): # These are imported, so they don't have to be initialized when executing with simulated_robot = SimulatedRobot() real_robot = RealRobot() +semi_real_robot = SemiRealRobot() class ProcessModuleManager(ABC): diff --git a/src/pycram/process_modules/boxy_process_modules.py b/src/pycram/process_modules/boxy_process_modules.py index 5dc25d8f9..b8abbb0b2 100644 --- a/src/pycram/process_modules/boxy_process_modules.py +++ b/src/pycram/process_modules/boxy_process_modules.py @@ -200,29 +200,29 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyNavigation(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveArmJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyWorldStateDetecting(self._world_state_detecting_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return BoxyMoveGripper(self._move_gripper_lock) diff --git a/src/pycram/process_modules/default_process_modules.py b/src/pycram/process_modules/default_process_modules.py index 7c9a76dd5..40a3dbdc7 100644 --- a/src/pycram/process_modules/default_process_modules.py +++ b/src/pycram/process_modules/default_process_modules.py @@ -196,41 +196,41 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultNavigation(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveArmJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultWorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveJoints(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultMoveGripper(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultOpen(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DefaultClose(self._close_lock) diff --git a/src/pycram/process_modules/donbot_process_modules.py b/src/pycram/process_modules/donbot_process_modules.py index e9ff45edc..e3fcec033 100644 --- a/src/pycram/process_modules/donbot_process_modules.py +++ b/src/pycram/process_modules/donbot_process_modules.py @@ -149,33 +149,33 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotNavigation(self._navigate_lock) def place(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotPlace(self._place_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveHead(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveTCP(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveJoints(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotWorldStateDetecting(self._world_state_detecting_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return DonbotMoveGripper(self._move_gripper_lock) diff --git a/src/pycram/process_modules/hsrb_process_modules.py b/src/pycram/process_modules/hsrb_process_modules.py index 5b66d6ec2..bda99aef8 100644 --- a/src/pycram/process_modules/hsrb_process_modules.py +++ b/src/pycram/process_modules/hsrb_process_modules.py @@ -1,45 +1,21 @@ import numpy as np import rospy from threading import Lock -from typing import Any +from typing_extensions import Any -from ..datastructures.enums import JointType +from ..datastructures.enums import ExecutionType +from ..external_interfaces.tmc import tmc_gripper_control, tmc_talk from ..robot_description import RobotDescription from ..process_module import ProcessModule -from ..datastructures.pose import Point -from ..utils import _apply_ik -from ..external_interfaces.ik import request_ik -from .. import world_reasoning as btr from ..local_transformer import LocalTransformer from ..designators.motion_designator import * from ..external_interfaces import giskard -from ..world_concepts.world_object import Object from ..datastructures.world import World +from pydub import AudioSegment +from pydub.playback import play +from gtts import gTTS - -def calculate_and_apply_ik(robot, gripper: str, target_position: Point, max_iterations: Optional[int] = None): - """ - Calculates the inverse kinematics for the given target pose and applies it to the robot. - """ - target_position_l = [target_position.x, target_position.y, target_position.z] - # TODO: Check if this is correct (getting the arm and using its joints), previously joints was not provided. - arm = "right" if gripper == RobotDescription.current_robot_description.kinematic_chains["right"].get_tool_frame() else "left" - inv = request_ik(Pose(target_position_l, [0, 0, 0, 1]), - robot, RobotDescription.current_robot_description.kinematic_chains[arm].joints, gripper) - _apply_ik(robot, inv) - - -def _park_arms(arm): - """ - Defines the joint poses for the parking positions of the arms of HSRB and applies them to the - in the World defined robot. - :return: None - """ - - robot = World.robot - if arm == "left": - for joint, pose in RobotDescription.current_robot_description.get_static_joint_chain("left", "park").items(): - robot.set_joint_position(joint, pose) +import io class HSRBNavigation(ProcessModule): @@ -52,176 +28,14 @@ def _execute(self, desig: MoveMotion): robot.set_pose(desig.target) -class HSRBMoveHead(ProcessModule): - """ - This process module moves the head to look at a specific point in the world coordinate frame. - This point can either be a position or an object. - """ - - def _execute(self, desig: LookingMotion): - target = desig.target - robot = World.robot - - local_transformer = LocalTransformer() - pose_in_pan = local_transformer.transform_pose(target, robot.get_link_tf_frame("head_pan_link")) - pose_in_tilt = local_transformer.transform_pose(target, robot.get_link_tf_frame("head_tilt_link")) - - new_pan = np.arctan2(pose_in_pan.position.y, pose_in_pan.position.x) - new_tilt = np.arctan2(pose_in_tilt.position.z, pose_in_tilt.position.x ** 2 + pose_in_tilt.position.y ** 2) * -1 - - current_pan = robot.get_joint_position("head_pan_joint") - current_tilt = robot.get_joint_position("head_tilt_joint") - - robot.set_joint_position("head_pan_joint", new_pan + current_pan) - robot.set_joint_position("head_tilt_joint", new_tilt + current_tilt) - - -class HSRBMoveGripper(ProcessModule): - """ - This process module controls the gripper of the robot. They can either be opened or closed. - Furthermore, it can only moved one gripper at a time. - """ - - def _execute(self, desig: MoveGripperMotion): - robot = World.robot - gripper = desig.gripper - motion = desig.motion - for joint, state in RobotDescription.current_robot_description.get_arm_chain(gripper).get_static_gripper_state(motion).items(): - robot.set_joint_position(joint, state) - - class HSRBDetecting(ProcessModule): """ This process module tries to detect an object with the given type. To be detected the object has to be in the field of view of the robot. """ - - def _execute(self, desig: DetectingMotion): - rospy.loginfo("Detecting technique: {}".format(desig.technique)) - robot = World.robot - object_type = desig.object_type - # Should be "wide_stereo_optical_frame" - cam_frame_name = RobotDescription.current_robot_description.get_camera_frame() - # should be [0, 0, 1] - front_facing_axis = RobotDescription.current_robot_description.get_default_camera().front_facing_axis - # if desig.technique == 'all': - # rospy.loginfo("Fake detecting all generic objects") - # objects = BulletWorld.current_bullet_world.get_all_objets_not_robot() - # elif desig.technique == 'human': - # rospy.loginfo("Fake detecting human -> spawn 0,0,0") - # human = [] - # human.append(Object("human", ObjectType.HUMAN, "human_male.stl", pose=Pose([0, 0, 0]))) - # object_dict = {} - # - # # Iterate over the list of objects and store each one in the dictionary - # for i, obj in enumerate(human): - # object_dict[obj.name] = obj - # return object_dict - # - # else: - # rospy.loginfo("Fake -> Detecting specific object type") - objects = World.current_world.get_object_by_type(object_type) - - object_dict = {} - - perceived_objects = [] - for obj in objects: - if btr.visible(obj, robot.get_link_pose(cam_frame_name), front_facing_axis): - return obj - - -class HSRBMoveTCP(ProcessModule): - """ - This process moves the tool center point of either the right or the left arm. - """ - - def _execute(self, desig: MoveTCPMotion): - target = desig.target - robot = World.robot - - _move_arm_tcp(target, robot, desig.arm) - - -class HSRBMoveArmJoints(ProcessModule): - """ - This process modules moves the joints of either the right or the left arm. The joint states can be given as - list that should be applied or a pre-defined position can be used, such as "parking" - """ - - def _execute(self, desig: MoveArmJointsMotion): - - robot = World.robot - if desig.right_arm_poses: - robot.set_joint_positions(desig.right_arm_poses) - if desig.left_arm_poses: - robot.set_joint_positions(desig.left_arm_poses) - - -class HSRBMoveJoints(ProcessModule): - """ - Process Module for generic joint movements, is not confined to the arms but can move any joint of the robot - """ - - def _execute(self, desig: MoveJointsMotion): - robot = World.robot - robot.set_joint_positions(dict(zip(desig.names, desig.positions))) - - -class HSRBWorldStateDetecting(ProcessModule): - """ - This process module detectes an object even if it is not in the field of view of the robot. - """ - - def _execute(self, desig: WorldStateDetectingMotion): - obj_type = desig.object_type - return list(filter(lambda obj: obj.obj_type == obj_type, World.current_world.objects))[0] - - -class HSRBOpen(ProcessModule): - """ - Low-level implementation of opening a container in the simulation. Assumes the handle is already grasped. - """ - - def _execute(self, desig: OpeningMotion): - part_of_object = desig.object_part.world_object - - container_joint = part_of_object.find_joint_above_link(desig.object_part.name, JointType.PRISMATIC) - - goal_pose = btr.link_pose_for_joint_config(part_of_object, { - container_joint: part_of_object.get_joint_limits(container_joint)[1] - 0.05}, desig.object_part.name) - - _move_arm_tcp(goal_pose, World.robot, desig.arm) - - desig.object_part.world_object.set_joint_position(container_joint, - part_of_object.get_joint_limits(container_joint)[1]) - - -class HSRBClose(ProcessModule): - """ - Low-level implementation that lets the robot close a grasped container, in simulation - """ - - def _execute(self, desig: ClosingMotion): - part_of_object = desig.object_part.world_object - - container_joint = part_of_object.find_joint_above_link(desig.object_part.name, JointType.PRISMATIC) - - goal_pose = btr.link_pose_for_joint_config(part_of_object, { - container_joint: part_of_object.get_joint_limits(container_joint)[0]}, desig.object_part.name) - - _move_arm_tcp(goal_pose, World.robot, desig.arm) - - desig.object_part.world_object.set_joint_position(container_joint, - part_of_object.get_joint_limits(container_joint)[0]) - - -def _move_arm_tcp(target: Pose, robot: Object, arm: Arms) -> None: - gripper = RobotDescription.current_robot_description.get_arm_chain(arm).get_tool_frame() - - joints = RobotDescription.current_robot_description.get_arm_chain(arm).joints - - inv = request_ik(target, robot, joints, gripper) - _apply_ik(robot, inv) + # pass + def _execute(self, desig: DetectingMotion) -> Any: + pass ########################################################### @@ -237,45 +51,18 @@ class HSRBNavigationReal(ProcessModule): def _execute(self, designator: MoveMotion) -> Any: rospy.logdebug(f"Sending goal to giskard to Move the robot") # giskard.achieve_cartesian_goal(designator.target, robot_description.base_link, "map") - queryPoseNav(designator.target) - - -class HSRBNavigationSemiReal(ProcessModule): - """ - Process module for the real HSRB that sends a cartesian goal to giskard to move the robot base - """ - - def _execute(self, designator: MoveMotion) -> Any: - rospy.logdebug(f"Sending goal to giskard to Move the robot") - giskard.achieve_cartesian_goal(designator.target, RobotDescription.current_robot_description.base_link, "map") + # todome fix this # queryPoseNav(designator.target) class HSRBMoveHeadReal(ProcessModule): """ - Process module for the real robot to move that such that it looks at the given position. Uses the same calculation - as the simulated one + Process module for the real HSRB that sends a pose goal to giskard to move the robot head """ def _execute(self, desig: LookingMotion): target = desig.target - robot = World.robot - - local_transformer = LocalTransformer() - pose_in_pan = local_transformer.transform_pose(target, robot.get_link_tf_frame("head_pan_link")) - pose_in_tilt = local_transformer.transform_pose(target, robot.get_link_tf_frame("head_tilt_link")) - - new_pan = np.arctan2(pose_in_pan.position.y, pose_in_pan.position.x) - new_tilt = np.arctan2(pose_in_tilt.position.z, pose_in_tilt.position.x + pose_in_tilt.position.y) - - current_pan = robot.get_joint_position("head_pan_joint") - current_tilt = robot.get_joint_position("head_tilt_joint") - - giskard.avoid_all_collisions() - giskard.achieve_joint_goal( - {"head_pan_joint": new_pan + current_pan, "head_tilt_joint": new_tilt + current_tilt}) - giskard.achieve_joint_goal( - {"head_pan_joint": new_pan + current_pan, "head_tilt_joint": new_tilt + current_tilt}) + giskard.move_head_to_pose(target) class HSRBDetectingReal(ProcessModule): @@ -285,98 +72,12 @@ class HSRBDetectingReal(ProcessModule): """ def _execute(self, desig: DetectingMotion) -> Any: - # todo at the moment perception ignores searching for a specific object type so we do as well on real - if desig.technique == 'human' and (desig.state == "start" or desig.state == None): - human_pose = queryHuman() - pose = Pose.from_pose_stamped(human_pose) - pose.position.z = 0 - human = [] - human.append(Object("human", ObjectType.HUMAN, "human_male.stl", pose=pose)) - object_dict = {} - - # Iterate over the list of objects and store each one in the dictionary - for i, obj in enumerate(human): - object_dict[obj.name] = obj - return object_dict - - return human_pose - elif desig.technique == 'human' and desig.state == "stop": - stop_queryHuman() - return "stopped" - - query_result = queryEmpty(ObjectDesignatorDescription(types=[desig.object_type])) - perceived_objects = [] - for i in range(0, len(query_result.res)): - # this has to be pose from pose stamped since we spawn the object with given header - obj_pose = Pose.from_pose_stamped(query_result.res[i].pose[0]) - # obj_pose.orientation = [0, 0, 0, 1] - # obj_pose_tmp = query_result.res[i].pose[0] - obj_type = query_result.res[i].type - obj_size = query_result.res[i].shape_size - obj_color = query_result.res[i].color[0] - color_switch = { - "red": [1, 0, 0, 1], - "green": [0, 1, 0, 1], - "blue": [0, 0, 1, 1], - "black": [0, 0, 0, 1], - "white": [1, 1, 1, 1], - # add more colors if needed - } - color = color_switch.get(obj_color) - if color is None: - color = [0, 0, 0, 1] - - # atm this is the string size that describes the object but it is not the shape size thats why string - def extract_xyz_values(input_string): - # Split the input string by commas and colon to separate key-value pairs - # key_value_pairs = input_string.split(', ') - - # Initialize variables to store the X, Y, and Z values - x_value = None - y_value = None - z_value = None - - for key in input_string: - x_value = key.dimensions.x - y_value = key.dimensions.y - z_value = key.dimensions.z - - # - # # Iterate through the key-value pairs to extract the values - # for pair in key_value_pairs: - # key, value = pair.split(': ') - # if key == 'x': - # x_value = float(value) - # elif key == 'y': - # y_value = float(value) - # elif key == 'z': - # z_value = float(value) - - return x_value, y_value, z_value - - x, y, z = extract_xyz_values(obj_size) - size = (x, z / 2, y) - size_box = (x / 2, z / 2, y / 2) - hard_size = (0.02, 0.02, 0.03) - id = World.current_world.add_rigid_box(obj_pose, hard_size, color) - box_object = Object(obj_type + "_" + str(rospy.get_time()), obj_type, pose=obj_pose, color=color, id=id, - customGeom={"size": [hard_size[0], hard_size[1], hard_size[2]]}) - box_object.set_pose(obj_pose) - box_desig = ObjectDesignatorDescription.Object(box_object.name, box_object.type, box_object) - - perceived_objects.append(box_desig) - - object_dict = {} - - # Iterate over the list of objects and store each one in the dictionary - for i, obj in enumerate(perceived_objects): - object_dict[obj.name] = obj - return object_dict + pass class HSRBMoveTCPReal(ProcessModule): """ - Moves the tool center point of the real HSRB while avoiding all collisions + Moves the tool center point of the real HSRB while avoiding all collisions via giskard """ def _execute(self, designator: MoveTCPMotion) -> Any: @@ -385,13 +86,13 @@ def _execute(self, designator: MoveTCPMotion) -> Any: giskard.avoid_all_collisions() if designator.allow_gripper_collision: giskard.allow_gripper_collision(designator.arm) - giskard.achieve_cartesian_goal(pose_in_map, RobotDescription.current_robot_description.get_arm_chain(designator.arm).get_tool_frame(), - "map") + giskard.achieve_cartesian_goal(pose_in_map, RobotDescription.current_robot_description.get_arm_chain( + designator.arm).get_tool_frame(), "map") class HSRBMoveArmJointsReal(ProcessModule): """ - Moves the arm joints of the real HSRB to the given configuration while avoiding all collisions + Moves the arm joints of the real HSRB to the given configuration while avoiding all collisions via giskard """ def _execute(self, designator: MoveArmJointsMotion) -> Any: @@ -419,67 +120,84 @@ class HSRBMoveGripperReal(ProcessModule): """ def _execute(self, designator: MoveGripperMotion) -> Any: - if (designator.motion == "open"): - pub_gripper = rospy.Publisher('/hsrb/gripper_controller/grasp/goal', GripperApplyEffortActionGoal, - queue_size=10) - rate = rospy.Rate(10) - rospy.sleep(2) - msg = GripperApplyEffortActionGoal() # sprechen joint gripper_controll_manager an, indem wir goal publishen type den giskard fürs greifen erwartet - msg.goal.effort = 0.8 - pub_gripper.publish(msg) - - elif (designator.motion == "close"): - pub_gripper = rospy.Publisher('/hsrb/gripper_controller/grasp/goal', GripperApplyEffortActionGoal, - queue_size=10) - rate = rospy.Rate(10) - rospy.sleep(2) - msg = GripperApplyEffortActionGoal() - msg.goal.effort = -0.8 - pub_gripper.publish(msg) - - # if designator.allow_gripper_collision: - # giskard.allow_gripper_collision("left") - # giskard.achieve_gripper_motion_goal(designator.motion) + tmc_gripper_control(designator) class HSRBOpenReal(ProcessModule): """ - Tries to open an already grasped container + This process Modules tries to open an already grasped container via giskard """ def _execute(self, designator: OpeningMotion) -> Any: - giskard.achieve_open_container_goal(RobotDescription.current_robot_description.get_arm_chain(designator.arm).get_tool_frame(), - designator.object_part.name) + giskard.achieve_open_container_goal( + RobotDescription.current_robot_description.get_arm_chain(designator.arm).get_tool_frame(), + designator.object_part.name) class HSRBCloseReal(ProcessModule): """ - Tries to close an already grasped container + This process module executes close a an already grasped container via giskard """ def _execute(self, designator: ClosingMotion) -> Any: - giskard.achieve_close_container_goal(RobotDescription.current_robot_description.get_arm_chain(designator.arm).get_tool_frame(), - designator.object_part.name) - - -# class HSRBTalkReal(ProcessModule): -# """ -# Tries to close an already grasped container -# """ -# -# def _execute(self, designator: TalkingMotion.Motion) -> Any: -# pub = rospy.Publisher('/talk_request', Voice, queue_size=10) -# -# # fill message of type Voice with required data: -# texttospeech = Voice() -# # language 1 = english (0 = japanese) -# texttospeech.language = 1 -# texttospeech.sentence = designator.cmd -# -# rospy.sleep(1) -# pub.publish(texttospeech) + giskard.achieve_close_container_goal( + RobotDescription.current_robot_description.get_arm_chain(designator.arm).get_tool_frame(), + designator.object_part.name) + + +class HSRBTalkReal(ProcessModule): + """ + Let the robot speak over tmc interface. + """ + + def _execute(self, designator: TalkingMotion) -> Any: + tmc_talk(designator) +########################################################### +########## Process Modules for the Semi Real HSRB ############### +########################################################### +class HSRBNavigationSemiReal(ProcessModule): + """ + Process module for the real HSRB that sends a cartesian goal to giskard to move the robot base + """ + + def _execute(self, designator: MoveMotion) -> Any: + rospy.logdebug(f"Sending goal to giskard to Move the robot") + giskard.teleport_robot(designator.target) + + +class HSRBTalkSemiReal(ProcessModule): + """ + Low Level implementation to let the robot talk using gTTS and pydub. + """ + + def _execute(self, designator: TalkingMotion) -> Any: + """ + Convert text to speech using gTTS, modify the pitch and play it without saving to disk. + """ + sentence = designator.cmd + # Create a gTTS object + tts = gTTS(text=sentence, lang='en', slow=False) + + # Save the speech to an in-memory file + mp3_fp = io.BytesIO() + tts.write_to_fp(mp3_fp) + mp3_fp.seek(0) + + # Load the audio into pydub from the in-memory file + audio = AudioSegment.from_file(mp3_fp, format="mp3") + + # Speed up the audio slightly + faster_audio = audio.speedup(playback_speed=1.2) + + # Play the modified audio + play(faster_audio) + + +########################################################### +########## HSRB MANAGER ############### +########################################################### class HSRBManager(ProcessModuleManager): def __init__(self): @@ -499,85 +217,65 @@ def __init__(self): self._talk_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return HSRBNavigation(self._navigate_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBNavigationReal(self._navigate_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBNavigationSemiReal(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBMoveHead(self._looking_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBMoveHeadReal(self._looking_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBMoveHeadReal(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return HSRBDetecting(self._detecting_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBDetectingReal(self._detecting_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBDetecting(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBMoveTCP(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBMoveTCPReal(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBMoveTCPReal(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBMoveArmJoints(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBMoveArmJointsReal(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBMoveArmJointsReal(self._move_arm_joints_lock) - def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real": - return HSRBWorldStateDetecting(self._world_state_detecting_lock) - elif ProcessModuleManager.execution_type == "semi_real": - return HSRBWorldStateDetecting(self._world_state_detecting_lock) - def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBMoveJoints(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBMoveJointsReal(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBMoveJointsReal(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBMoveGripper(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBMoveGripperReal(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBMoveGripperReal(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBOpen(self._open_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBOpenReal(self._open_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBOpenReal(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": - return HSRBClose(self._close_lock) - elif ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.REAL: return HSRBCloseReal(self._close_lock) - elif ProcessModuleManager.execution_type == "semi_real": + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: return HSRBCloseReal(self._close_lock) - # def talk(self): - # if ProcessModuleManager.execution_type == "real": - # return HSRBTalkReal(self._talk_lock) - # elif ProcessModuleManager.execution_type == "semi_real": - # return HSRBTalkReal(self._talk_lock) + def talk(self): + if ProcessModuleManager.execution_type == ExecutionType.REAL: + return HSRBTalkReal(self._talk_lock) + elif ProcessModuleManager.execution_type == ExecutionType.SEMI_REAL: + return HSRBTalkSemiReal(self._talk_lock) diff --git a/src/pycram/process_modules/pr2_process_modules.py b/src/pycram/process_modules/pr2_process_modules.py index 0ac65ba8d..261094ccc 100644 --- a/src/pycram/process_modules/pr2_process_modules.py +++ b/src/pycram/process_modules/pr2_process_modules.py @@ -19,7 +19,7 @@ from ..datastructures.world import World from ..world_concepts.world_object import Object from ..datastructures.pose import Pose -from ..datastructures.enums import JointType, ObjectType, Arms +from ..datastructures.enums import JointType, ObjectType, Arms, ExecutionType from ..external_interfaces import giskard from ..external_interfaces.robokudo import query @@ -375,59 +375,60 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Navigation(self._navigate_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2NavigationReal(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveHead(self._looking_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveHeadReal(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Detecting(self._detecting_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2DetectingReal(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveTCP(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveTCPReal(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveArmJoints(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveArmJointsReal(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real": + if (ProcessModuleManager.execution_type == ExecutionType.SIMULATED or + ProcessModuleManager.execution_type == ExecutionType.REAL): return Pr2WorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return PR2MoveJoints(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveJointsReal(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2MoveGripper(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2MoveGripperReal(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Open(self._open_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2OpenReal(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return Pr2Close(self._close_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return Pr2CloseReal(self._close_lock) diff --git a/src/pycram/process_modules/stretch_process_modules.py b/src/pycram/process_modules/stretch_process_modules.py index 77b33596a..3dd46d33b 100644 --- a/src/pycram/process_modules/stretch_process_modules.py +++ b/src/pycram/process_modules/stretch_process_modules.py @@ -295,59 +295,59 @@ def __init__(self): self._close_lock = Lock() def navigate(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchNavigate(self._navigate_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchNavigationReal(self._navigate_lock) def looking(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveHead(self._looking_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveHeadReal(self._looking_lock) def detecting(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchDetecting(self._detecting_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchDetectingReal(self._detecting_lock) def move_tcp(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveTCP(self._move_tcp_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveTCPReal(self._move_tcp_lock) def move_arm_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveArmJoints(self._move_arm_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveArmJointsReal(self._move_arm_joints_lock) def world_state_detecting(self): - if ProcessModuleManager.execution_type == "simulated" or ProcessModuleManager.execution_type == "real": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED or ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchWorldStateDetecting(self._world_state_detecting_lock) def move_joints(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveJoints(self._move_joints_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveJointsReal(self._move_joints_lock) def move_gripper(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchMoveGripper(self._move_gripper_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchMoveGripperReal(self._move_gripper_lock) def open(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchOpen(self._open_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchOpenReal(self._open_lock) def close(self): - if ProcessModuleManager.execution_type == "simulated": + if ProcessModuleManager.execution_type == ExecutionType.SIMULATED: return StretchClose(self._close_lock) - elif ProcessModuleManager.execution_type == "real": + elif ProcessModuleManager.execution_type == ExecutionType.REAL: return StretchCloseReal(self._close_lock) diff --git a/src/pycram/ros/tf_broadcaster.py b/src/pycram/ros/tf_broadcaster.py index 9f4661a43..8cf46578b 100644 --- a/src/pycram/ros/tf_broadcaster.py +++ b/src/pycram/ros/tf_broadcaster.py @@ -12,7 +12,7 @@ class TFBroadcaster: """ Broadcaster that publishes TF frames for every object in the World. """ - def __init__(self, projection_namespace="simulated", odom_frame="odom", interval=0.1): + def __init__(self, projection_namespace= ExecutionType.SIMULATED, odom_frame="odom", interval=0.1): """ The broadcaster prefixes all published TF messages with a projection namespace to distinguish between the TF frames from the simulation and the one from the real robot.