diff --git a/sr_robot_commander/src/sr_robot_commander/sr_robot_commander.py b/sr_robot_commander/src/sr_robot_commander/sr_robot_commander.py index 124623211..531088304 100644 --- a/sr_robot_commander/src/sr_robot_commander/sr_robot_commander.py +++ b/sr_robot_commander/src/sr_robot_commander/sr_robot_commander.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2015 Shadow Robot Company Ltd. +# Copyright 2015-2022 Shadow Robot Company Ltd. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free @@ -15,27 +15,32 @@ # with this program. If not, see . from __future__ import absolute_import +from __future__ import division import threading +import re import rospy from actionlib import SimpleActionClient from control_msgs.msg import FollowJointTrajectoryAction, \ - FollowJointTrajectoryGoal + FollowJointTrajectoryGoal, JointControllerState from moveit_commander import MoveGroupCommander, RobotCommander, \ PlanningSceneInterface from moveit_msgs.msg import RobotTrajectory, PositionIKRequest from sensor_msgs.msg import JointState +from control_msgs.msg import JointTrajectoryControllerState import geometry_msgs.msg from sr_robot_msgs.srv import RobotTeachMode, RobotTeachModeRequest, \ RobotTeachModeResponse +from xml.dom import minidom + from moveit_msgs.srv import GetPositionIK from moveit_msgs.srv import ListRobotStatesInWarehouse as ListStates from moveit_msgs.srv import GetRobotStateFromWarehouse as GetState -from moveit_msgs.msg import OrientationConstraint, Constraints +from moveit_msgs.msg import OrientationConstraint, Constraints, RobotState from trajectory_msgs.msg import JointTrajectoryPoint, JointTrajectory -from math import radians +from math import radians, pi from moveit_msgs.srv import GetPositionFK from std_msgs.msg import Header @@ -95,22 +100,76 @@ def __init__(self, name): self._controllers = {} + self._underactuated_joint_finder = re.compile('[r,l]h_[F,M,R,L]FJ[1,2]') + rospy.wait_for_service('compute_ik') self._compute_ik = rospy.ServiceProxy('compute_ik', GetPositionIK) self._forward_k = rospy.ServiceProxy('compute_fk', GetPositionFK) controller_list_param = rospy.get_param("/move_group/controller_list") + self._allowed_start_tolerance = rospy.get_param("/move_group/trajectory_execution/allowed_start_tolerance", 0.1) + robot_description = rospy.get_param("/robot_description") + self._joint_limits = {} + self._initialize_joint_limits(robot_description) # create dictionary with name of controllers and corresponding joints self._controllers = {item["name"]: item["joints"] for item in controller_list_param} self._set_up_action_client(self._controllers) + self._set_points_lock = threading.Lock() + self._are_set_points_ready = False + self._set_points_cv = threading.Condition() + self._set_points = {} + + self._set_up_set_points_subscribers(self._controllers) + self.tf_buffer = tf2_ros.Buffer() self.listener = tf2_ros.TransformListener(self.tf_buffer) threading.Thread(None, rospy.spin) + self._wait_for_set_points() + + def _initialize_joint_limits(self, robot_description): + """ + It reads a robot descritpion and updates the joint limits dictionary of the class + @param robot_description - Robot description from which the joint limits are going to be read + """ + robot_dom = minidom.parseString(robot_description) + robot = robot_dom.getElementsByTagName('robot')[0] + + for child in robot.childNodes: + if child.nodeType is child.TEXT_NODE: + continue + + if child.localName == 'joint': + joint_type = child.getAttribute('type') + if joint_type in ['fixed', 'floating', 'planar']: + continue + name = child.getAttribute('name') + + if joint_type == 'continuous': + minval = -pi + maxval = pi + else: + try: + limit = child.getElementsByTagName('limit')[0] + minval = float(limit.getAttribute('lower')) + maxval = float(limit.getAttribute('upper')) + except Exception as exception: + rospy.logwarn(f"{exception}: {name} is not fixed, nor continuous, \ + but limits are not specified!") + continue + self._joint_limits.update({name: (minval, maxval)}) + + def get_joint_limits(self): + """ + @return - dictionary mapping joint names with a tuple containing the lower limit + and the upper limit of a joint + """ + return self._joint_limits + def _is_trajectory_valid(self, trajectory, required_keys): if type(trajectory) != list: rospy.logerr("Trajectory is not a list of waypoints") @@ -259,9 +318,20 @@ def move_to_joint_value_target(self, joint_states, wait=True, if angle_degrees: joint_states_cpy.update((joint, radians(i)) for joint, i in joint_states_cpy.items()) - self._move_group_commander.set_start_state_to_current_state() + # Create the set points dictionary and robot state + set_points, robot_state_set_points = self.get_current_set_points() + # In case that the set points are far from the state, use the state of the joints + set_points, robot_state_set_points = self.fix_set_points_if_far_from_state(set_points, robot_state_set_points) + # Setting the start state of the plan + self._move_group_commander.set_start_state(robot_state_set_points) + # Bound the set points to make sure they lay within the joint limits + set_points = self._bound_state(set_points) + # Set the target of the plan + self._move_group_commander.set_joint_value_target(set_points) self._move_group_commander.set_joint_value_target(joint_states_cpy) - self._move_group_commander.go(wait=wait) + + # Create and execute the plan + self._move_group_commander.execute(self._move_group_commander.plan()[CONST_TUPLE_TRAJECTORY_INDEX], wait=wait) def set_start_state_to_current_state(self): return self._move_group_commander.set_start_state_to_current_state() @@ -277,15 +347,26 @@ def plan_to_joint_value_target(self, joint_states, angle_degrees=False, custom_s @return - motion plan (RobotTrajectory msg) that contains the trajectory to the set goal state. """ joint_states_cpy = copy.deepcopy(joint_states) + # Create the set points dictionary and robot state + set_points, robot_state_set_points = self.get_current_set_points() + # In case that the set points are far from the state, use the state of the joints + set_points, robot_state_set_points = self.fix_set_points_if_far_from_state(set_points, robot_state_set_points) if angle_degrees: joint_states_cpy.update((joint, radians(i)) for joint, i in joint_states_cpy.items()) + # Set the first point of the plan if custom_start_state is None: - self._move_group_commander.set_start_state_to_current_state() + self._move_group_commander.set_start_state(robot_state_set_points) else: self._move_group_commander.set_start_state(custom_start_state) + + # Make sure that the set points lay within the joint limits + set_points_bounded = self._bound_state(set_points) + # Set the target of the plan + self._move_group_commander.set_joint_value_target(set_points_bounded) self._move_group_commander.set_joint_value_target(joint_states_cpy) + # Create the plan self.__plan = self._move_group_commander.plan()[CONST_TUPLE_TRAJECTORY_INDEX] return self.__plan @@ -464,6 +545,131 @@ def get_current_state_bounded(self): def get_robot_state_bounded(self): return self._move_group_commander._g.get_current_state_bounded() + @staticmethod + def _bound_joint(joint_value, joint_limits): + """ + Forces a joint value to be within the given joint limits + @joint_value - Value of the joint to be bounded + @joint_limits - Tuple with the lower limit and upper limit of the joint + @return - Joint value within the given joints + """ + if joint_value < joint_limits[0]: + joint_value = joint_limits[0] + elif joint_value > joint_limits[1]: + joint_value = joint_limits[1] + return joint_value + + def _bound_state(self, joint_states): + """ + Bounds the joint states within the limits of the joints + @param joint_states - It can be of type dict or RobotState. This param should contain the joints to be bounded + within their limits + @return - The joint states updated with the joint poisitions bounded + """ + if isinstance(joint_states, dict): + for joint in joint_states: + joint_states[joint] = self._bound_joint(joint_states[joint], + self._joint_limits[joint]) + elif isinstance(joint_states, RobotState): + for i in range(len(joint_states.joint_state.name)): + joint_states.joint_state.position[i] = \ + self._bound_joint(joint_states.joint_state.position[i], + self._joint_limits[joint_states.joint_state.name[i]]) + + return joint_states + + def _is_joint_underactuated(self, joint_name): + """ + @param joint_name - Name of the joint to check if it is underactuated or not, example format rh_FFJ1 + @return - boolean indicating if the joint is underactuated or not + """ + return bool(self._underactuated_joint_finder.findall(joint_name)) + + def get_current_set_points(self): + """ + Reads from the set points + @return - Tuple that contains a dictionary mapping joint names with their respective set point, and + a RobotState object which contains the same set points. + """ + raw_set_points = {} + with self._set_points_lock: + raw_set_points = copy.deepcopy(self._set_points) + + current_state = self.get_current_state() + set_points = {} + joint_names = list(raw_set_points.keys()) + + # Store the fingers ids from which the underactuated joints have been processed + underactuation_fingers_processed = [] + + for joint_name in joint_names: + if self._is_joint_underactuated(joint_name) and joint_name[3:5] not in underactuation_fingers_processed: + # Underactuated joint, split the set point of j0 given the state of j1 and j2 + joint_0_name = f"{joint_name[:-2]}J0" + joint_1_name = f"{joint_name[:-2]}J1" + joint_2_name = f"{joint_name[:-2]}J2" + + state_j1 = current_state[joint_1_name] + state_j2 = current_state[joint_2_name] + set_point_j0 = raw_set_points[joint_0_name] + + if (state_j1 + state_j2) == 0: + # Avoid division by 0, use directly the set point of J1 and J2 + # from the trajectory controller + set_point_j1 = raw_set_points[joint_1_name] + set_point_j2 = raw_set_points[joint_2_name] + else: + # Split j0 given state of j1 and j2 + set_point_j1 = state_j1 * set_point_j0 / (state_j1 + state_j2) + set_point_j2 = state_j2 * set_point_j0 / (state_j1 + state_j2) + + set_points.update({joint_1_name: set_point_j1}) + set_points.update({joint_2_name: set_point_j2}) + + # Avoid executing again this + underactuation_fingers_processed.append(joint_name[3:5]) + else: + if "J0" not in joint_name: + # Avoind adding set points of J0 to the output + set_points.update({joint_name: raw_set_points[joint_name]}) + + # Create the RobotState since some moveit functions require this object instead of + # a dictionary. The JointState is filled first. + joint_state = JointState() + joint_state.header.stamp = rospy.Time.now() + for joint_name in set_points: + joint_state.name.append(joint_name) + joint_state.position.append(set_points[joint_name]) + + move_group_robot_state = RobotState() + move_group_robot_state.joint_state = joint_state + + return set_points, move_group_robot_state + + def fix_set_points_if_far_from_state(self, set_points, robot_state): + """ + Updates the set points with the current state of the robot if the set point + is far from the state given the allowed_start_tolerance. This can happen when the joint is not able + to achieve a certain set point of the controller such as when grasping an object. + @param set_points - Dictionary that maps the joint names with their set points. + @param robot_state - RobotState object that contains the joint names and their set points + as the position of the joints. + @return - Tuple that contains the updated set_points and updated robot_state. + """ + current_state = self.get_current_state() + + # Update set points + for joint_name in list(set_points.keys()): + if abs(set_points[joint_name] - current_state[joint_name]) > self._allowed_start_tolerance: + set_points[joint_name] = current_state[joint_name] + + # Update robot state + for index, joint_name in enumerate(robot_state.joint_state.name): + if abs(robot_state.joint_state.position[index] - current_state[joint_name]) > self._allowed_start_tolerance: + robot_state.joint_state.position[index] = current_state[joint_name] + + return set_points, robot_state + def move_to_named_target(self, name, wait=True): """ Set target of the robot's links and moves to it @@ -770,6 +976,36 @@ def _joint_states_callback(self, joint_state): self._joints_effort = {n: v for n, v in zip(joint_state.name, joint_state.effort)} + def _set_up_set_points_subscribers(self, controllers_list): + """ + Sets up the required subscribers to read from the set points of each given controller + @param controller_list - Dictionary mapping a trajectory controller with the list of the joints it has + """ + # Get joint names of the group + joint_names_group = self._move_group_commander.get_active_joints() + trajectory_controllers_state_subscribed = [] + j0_position_controllers_state_subscribed = [] + + # Subscribe to all the trajectory controllers that contain joints of this group + for controller_name in controllers_list.keys(): + for joint_name in joint_names_group: + if joint_name in controllers_list[controller_name]: + topic_name = f"/{controller_name}/state" + if topic_name not in trajectory_controllers_state_subscribed: + rospy.Subscriber(topic_name, JointTrajectoryControllerState, self._set_point_cb, queue_size=1) + trajectory_controllers_state_subscribed.append(topic_name) + + # Subscribe to the j0 position controllers of the joints contained in this group + for joint_name in joint_names_group: + if self._is_joint_underactuated(joint_name): + topic_name = f"/sh_{joint_name.lower()[0:5]}j0_position_controller/state" + if topic_name not in j0_position_controllers_state_subscribed: + rospy.Subscriber(topic_name, + JointControllerState, + self._set_point_j0_cb, f"{joint_name[0:5]}J0", + queue_size=1) + j0_position_controllers_state_subscribed.append(topic_name) + def _set_up_action_client(self, controller_list): """ Sets up an action client to communicate with the trajectory controller. @@ -852,6 +1088,46 @@ def _call_action(self, goals): self._clients[client].send_goal( goals[client], lambda terminal_state, result: self._action_done_cb(client, terminal_state, result)) + def _set_point_cb(self, msg): + """ + Updates the dictionary mapping joint names with their desired position in the trajectory controllers + @param msg - ROS message of type JointTrajectoryControllerState + """ + joint_names_group = self._move_group_commander.get_active_joints() + with self._set_points_lock: + for index, joint in enumerate(msg.joint_names): + if joint not in joint_names_group: + continue + self._set_points.update({joint: msg.desired.positions[index]}) + if not self._are_set_points_ready: + with self._set_points_cv: + for joint_name in joint_names_group: + if self._is_joint_underactuated(joint_name) and f"{joint_name[0:5]}J0" not in joint_names_group: + joint_names_group.append(f"{joint_name[0:5]}J0") + for joint_name in joint_names_group: + if joint_name not in self._set_points.keys(): + break + else: + self._are_set_points_ready = True + self._set_points_cv.notifyAll() + + def _set_point_j0_cb(self, msg, joint_name): + """ + Updates the dictionary mapping joint names with their desired position in the trajectory controllers + This callback is associated to the position controller of j0 + @param msg - ROS message of type JointControllerState + """ + with self._set_points_lock: + self._set_points.update({joint_name: msg.set_point}) + + def _wait_for_set_points(self): + """ + Waits until the set points variable has been updated with all joints + """ + if not self._are_set_points_ready: + with self._set_points_cv: + self._set_points_cv.wait() + def run_joint_trajectory_unsafe(self, joint_trajectory, wait=True): """ Moves robot through all joint states with specified timeouts. diff --git a/sr_robot_commander/test/test_robot_commander.py b/sr_robot_commander/test/test_robot_commander.py index ac287c6a8..a079fd578 100755 --- a/sr_robot_commander/test/test_robot_commander.py +++ b/sr_robot_commander/test/test_robot_commander.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2021 Shadow Robot Company Ltd. +# Copyright 2021, 2022 Shadow Robot Company Ltd. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free @@ -53,7 +53,15 @@ 'ra_shoulder_lift_joint': -1.37, 'ra_wrist_1_joint': -0.52, 'ra_wrist_2_joint': 1.57, 'ra_wrist_3_joint': 0.00} +CONST_PARTIAL_TARGET = {'ra_shoulder_pan_joint': 0.2, 'ra_wrist_1_joint': -0.52, + 'ra_wrist_2_joint': 1.57} + +CONST_FULL_TARGET = {'ra_shoulder_pan_joint': 0.2, 'ra_elbow_joint': 2.00, + 'ra_shoulder_lift_joint': -1.25, 'ra_wrist_1_joint': -0.52, + 'ra_wrist_2_joint': 1.57, 'ra_wrist_3_joint': 3.14} + TOLERANCE_UNSAFE = 0.04 +TOLERANCE_SAFE = 0.01 PLANNING_ATTEMPTS = 5 @@ -146,6 +154,12 @@ def compare_poses(self, pose1, pose2, position_threshold=0.02, orientation_thres return True return False + def compare_joint_positions(self, joint_configuration_1, joint_configuration_2, tolerance=0.01): + for key in joint_configuration_1.keys(): + if abs(joint_configuration_1[key] - joint_configuration_2[key]) > tolerance: + return False + return True + def test_get_and_set_planner_id(self): prev_planner_id = self.robot_commander._move_group_commander.get_planner_id() test_planner_id = "RRTstarkConfigDefault" @@ -210,6 +224,24 @@ def test_plan_to_joint_value_target(self): custom_start_state=None) self.assertIsInstance(plan, RobotTrajectory) + def test_plan_to_joint_value_target_and_execute_partial(self): + self.reset_to_home() + self.robot_commander._move_group_commander.set_joint_value_target([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + plan = self.robot_commander.plan_to_joint_value_target(CONST_PARTIAL_TARGET, angle_degrees=False, + custom_start_state=None) + self.robot_commander.execute_plan(plan) + self.assertTrue(self.compare_joint_positions(self.robot_commander.get_current_state(), + CONST_FULL_TARGET, + tolerance=TOLERANCE_SAFE)) + + def test_move_to_joint_value_target_partial(self): + self.reset_to_home() + self.robot_commander._move_group_commander.set_joint_value_target([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + self.robot_commander.move_to_joint_value_target(CONST_PARTIAL_TARGET, wait=True) + self.assertTrue(self.compare_joint_positions(self.robot_commander.get_current_state(), + CONST_FULL_TARGET, + tolerance=TOLERANCE_SAFE)) + def test_execute(self): self.reset_to_home() self.robot_commander.plan_to_joint_value_target(CONST_EXAMPLE_TARGET, angle_degrees=False,