-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing joints moving to default position when their target is not set #773
base: noetic-devel
Are you sure you want to change the base?
Changes from all commits
69aced1
5cc2342
80d1c0a
8fd40b3
dea4b58
9675e55
4dc9121
1fcf0ce
2d6c9b2
d2b9708
0e50fa5
a83b770
97ed92c
b353441
0af6674
9a02700
6254394
480fc1e
f298774
b17bb1a
c0e9d31
fc05a18
ee270c0
2303648
90dc1aa
99e816e
df2c826
f09ce7b
d65af25
f8d1de5
f23062c
d83c462
6a09a29
0952bd7
5fb64ed
22177ad
12e6217
543eb52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# Copyright 2015 Shadow Robot Company Ltd. | ||
# Copyright 2015-2022 Shadow Robot Company Ltd. | ||
# | ||
# This program is free software: you can redistribute it and/or modify it | ||
# under the terms of the GNU General Public License as published by the Free | ||
|
@@ -15,27 +15,32 @@ | |
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
from __future__ import absolute_import | ||
from __future__ import division | ||
import threading | ||
import re | ||
|
||
import rospy | ||
from actionlib import SimpleActionClient | ||
from control_msgs.msg import FollowJointTrajectoryAction, \ | ||
FollowJointTrajectoryGoal | ||
FollowJointTrajectoryGoal, JointControllerState | ||
from moveit_commander import MoveGroupCommander, RobotCommander, \ | ||
PlanningSceneInterface | ||
from moveit_msgs.msg import RobotTrajectory, PositionIKRequest | ||
from sensor_msgs.msg import JointState | ||
from control_msgs.msg import JointTrajectoryControllerState | ||
import geometry_msgs.msg | ||
from sr_robot_msgs.srv import RobotTeachMode, RobotTeachModeRequest, \ | ||
RobotTeachModeResponse | ||
|
||
from xml.dom import minidom | ||
|
||
from moveit_msgs.srv import GetPositionIK | ||
from moveit_msgs.srv import ListRobotStatesInWarehouse as ListStates | ||
from moveit_msgs.srv import GetRobotStateFromWarehouse as GetState | ||
from moveit_msgs.msg import OrientationConstraint, Constraints | ||
from moveit_msgs.msg import OrientationConstraint, Constraints, RobotState | ||
|
||
from trajectory_msgs.msg import JointTrajectoryPoint, JointTrajectory | ||
from math import radians | ||
from math import radians, pi | ||
|
||
from moveit_msgs.srv import GetPositionFK | ||
from std_msgs.msg import Header | ||
|
@@ -95,22 +100,76 @@ def __init__(self, name): | |
|
||
self._controllers = {} | ||
|
||
self._underactuated_joint_finder = re.compile('[r,l]h_[F,M,R,L]FJ[1,2]') | ||
|
||
rospy.wait_for_service('compute_ik') | ||
self._compute_ik = rospy.ServiceProxy('compute_ik', GetPositionIK) | ||
self._forward_k = rospy.ServiceProxy('compute_fk', GetPositionFK) | ||
|
||
controller_list_param = rospy.get_param("/move_group/controller_list") | ||
self._allowed_start_tolerance = rospy.get_param("/move_group/trajectory_execution/allowed_start_tolerance", 0.1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This module is common for arm and hand (it's the parent of HandCommander and ArmCommander). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In there I am only reading that parameter. When launching |
||
robot_description = rospy.get_param("/robot_description") | ||
self._joint_limits = {} | ||
self._initialize_joint_limits(robot_description) | ||
|
||
# create dictionary with name of controllers and corresponding joints | ||
self._controllers = {item["name"]: item["joints"] for item in controller_list_param} | ||
|
||
self._set_up_action_client(self._controllers) | ||
|
||
self._set_points_lock = threading.Lock() | ||
self._are_set_points_ready = False | ||
self._set_points_cv = threading.Condition() | ||
self._set_points = {} | ||
|
||
self._set_up_set_points_subscribers(self._controllers) | ||
|
||
self.tf_buffer = tf2_ros.Buffer() | ||
self.listener = tf2_ros.TransformListener(self.tf_buffer) | ||
|
||
threading.Thread(None, rospy.spin) | ||
|
||
self._wait_for_set_points() | ||
|
||
def _initialize_joint_limits(self, robot_description): | ||
""" | ||
It reads a robot descritpion and updates the joint limits dictionary of the class | ||
@param robot_description - Robot description from which the joint limits are going to be read | ||
""" | ||
robot_dom = minidom.parseString(robot_description) | ||
robot = robot_dom.getElementsByTagName('robot')[0] | ||
|
||
for child in robot.childNodes: | ||
if child.nodeType is child.TEXT_NODE: | ||
continue | ||
|
||
if child.localName == 'joint': | ||
joint_type = child.getAttribute('type') | ||
if joint_type in ['fixed', 'floating', 'planar']: | ||
continue | ||
name = child.getAttribute('name') | ||
|
||
if joint_type == 'continuous': | ||
minval = -pi | ||
maxval = pi | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the joint_type passed in this else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, thanks can you then just move the name = child.getAttribute('name') before the first 'if' to slightly increase readability? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the attribute name seems to be particular for nodes of type joint |
||
try: | ||
limit = child.getElementsByTagName('limit')[0] | ||
minval = float(limit.getAttribute('lower')) | ||
maxval = float(limit.getAttribute('upper')) | ||
except Exception as exception: | ||
rospy.logwarn(f"{exception}: {name} is not fixed, nor continuous, \ | ||
but limits are not specified!") | ||
continue | ||
self._joint_limits.update({name: (minval, maxval)}) | ||
|
||
def get_joint_limits(self): | ||
""" | ||
@return - dictionary mapping joint names with a tuple containing the lower limit | ||
jeferal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
and the upper limit of a joint | ||
""" | ||
return self._joint_limits | ||
|
||
def _is_trajectory_valid(self, trajectory, required_keys): | ||
if type(trajectory) != list: | ||
rospy.logerr("Trajectory is not a list of waypoints") | ||
|
@@ -259,9 +318,20 @@ def move_to_joint_value_target(self, joint_states, wait=True, | |
if angle_degrees: | ||
joint_states_cpy.update((joint, radians(i)) | ||
for joint, i in joint_states_cpy.items()) | ||
self._move_group_commander.set_start_state_to_current_state() | ||
# Create the set points dictionary and robot state | ||
set_points, robot_state_set_points = self.get_current_set_points() | ||
# In case that the set points are far from the state, use the state of the joints | ||
set_points, robot_state_set_points = self.fix_set_points_if_far_from_state(set_points, robot_state_set_points) | ||
# Setting the start state of the plan | ||
self._move_group_commander.set_start_state(robot_state_set_points) | ||
# Bound the set points to make sure they lay within the joint limits | ||
set_points = self._bound_state(set_points) | ||
# Set the target of the plan | ||
self._move_group_commander.set_joint_value_target(set_points) | ||
self._move_group_commander.set_joint_value_target(joint_states_cpy) | ||
beatrizleon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._move_group_commander.go(wait=wait) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like go function uses the current state internally, I tried to find the line of code that does that, but I could not. By performing tests, I could not reproduce the problem on planning when an object is grasped and that makes me think that is using the current state instead of the state set when using .set_start_satate |
||
|
||
# Create and execute the plan | ||
self._move_group_commander.execute(self._move_group_commander.plan()[CONST_TUPLE_TRAJECTORY_INDEX], wait=wait) | ||
|
||
def set_start_state_to_current_state(self): | ||
return self._move_group_commander.set_start_state_to_current_state() | ||
|
@@ -277,15 +347,26 @@ def plan_to_joint_value_target(self, joint_states, angle_degrees=False, custom_s | |
@return - motion plan (RobotTrajectory msg) that contains the trajectory to the set goal state. | ||
""" | ||
joint_states_cpy = copy.deepcopy(joint_states) | ||
# Create the set points dictionary and robot state | ||
set_points, robot_state_set_points = self.get_current_set_points() | ||
# In case that the set points are far from the state, use the state of the joints | ||
set_points, robot_state_set_points = self.fix_set_points_if_far_from_state(set_points, robot_state_set_points) | ||
|
||
if angle_degrees: | ||
joint_states_cpy.update((joint, radians(i)) | ||
for joint, i in joint_states_cpy.items()) | ||
# Set the first point of the plan | ||
if custom_start_state is None: | ||
self._move_group_commander.set_start_state_to_current_state() | ||
self._move_group_commander.set_start_state(robot_state_set_points) | ||
else: | ||
self._move_group_commander.set_start_state(custom_start_state) | ||
|
||
# Make sure that the set points lay within the joint limits | ||
set_points_bounded = self._bound_state(set_points) | ||
# Set the target of the plan | ||
self._move_group_commander.set_joint_value_target(set_points_bounded) | ||
self._move_group_commander.set_joint_value_target(joint_states_cpy) | ||
# Create the plan | ||
self.__plan = self._move_group_commander.plan()[CONST_TUPLE_TRAJECTORY_INDEX] | ||
return self.__plan | ||
|
||
|
@@ -464,6 +545,131 @@ def get_current_state_bounded(self): | |
def get_robot_state_bounded(self): | ||
return self._move_group_commander._g.get_current_state_bounded() | ||
|
||
@staticmethod | ||
def _bound_joint(joint_value, joint_limits): | ||
""" | ||
Forces a joint value to be within the given joint limits | ||
@joint_value - Value of the joint to be bounded | ||
@joint_limits - Tuple with the lower limit and upper limit of the joint | ||
jeferal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@return - Joint value within the given joints | ||
""" | ||
if joint_value < joint_limits[0]: | ||
joint_value = joint_limits[0] | ||
elif joint_value > joint_limits[1]: | ||
joint_value = joint_limits[1] | ||
return joint_value | ||
|
||
def _bound_state(self, joint_states): | ||
""" | ||
Bounds the joint states within the limits of the joints | ||
@param joint_states - It can be of type dict or RobotState. This param should contain the joints to be bounded | ||
within their limits | ||
@return - The joint states updated with the joint poisitions bounded | ||
""" | ||
if isinstance(joint_states, dict): | ||
for joint in joint_states: | ||
joint_states[joint] = self._bound_joint(joint_states[joint], | ||
self._joint_limits[joint]) | ||
elif isinstance(joint_states, RobotState): | ||
for i in range(len(joint_states.joint_state.name)): | ||
joint_states.joint_state.position[i] = \ | ||
self._bound_joint(joint_states.joint_state.position[i], | ||
self._joint_limits[joint_states.joint_state.name[i]]) | ||
|
||
return joint_states | ||
|
||
def _is_joint_underactuated(self, joint_name): | ||
""" | ||
@param joint_name - Name of the joint to check if it is underactuated or not, example format rh_FFJ1 | ||
@return - boolean indicating if the joint is underactuated or not | ||
""" | ||
return bool(self._underactuated_joint_finder.findall(joint_name)) | ||
|
||
def get_current_set_points(self): | ||
""" | ||
Reads from the set points | ||
@return - Tuple that contains a dictionary mapping joint names with their respective set point, and | ||
a RobotState object which contains the same set points. | ||
""" | ||
raw_set_points = {} | ||
with self._set_points_lock: | ||
raw_set_points = copy.deepcopy(self._set_points) | ||
|
||
current_state = self.get_current_state() | ||
set_points = {} | ||
joint_names = list(raw_set_points.keys()) | ||
|
||
# Store the fingers ids from which the underactuated joints have been processed | ||
underactuation_fingers_processed = [] | ||
|
||
for joint_name in joint_names: | ||
if self._is_joint_underactuated(joint_name) and joint_name[3:5] not in underactuation_fingers_processed: | ||
# Underactuated joint, split the set point of j0 given the state of j1 and j2 | ||
joint_0_name = f"{joint_name[:-2]}J0" | ||
joint_1_name = f"{joint_name[:-2]}J1" | ||
joint_2_name = f"{joint_name[:-2]}J2" | ||
|
||
state_j1 = current_state[joint_1_name] | ||
state_j2 = current_state[joint_2_name] | ||
set_point_j0 = raw_set_points[joint_0_name] | ||
|
||
if (state_j1 + state_j2) == 0: | ||
# Avoid division by 0, use directly the set point of J1 and J2 | ||
# from the trajectory controller | ||
set_point_j1 = raw_set_points[joint_1_name] | ||
set_point_j2 = raw_set_points[joint_2_name] | ||
else: | ||
# Split j0 given state of j1 and j2 | ||
set_point_j1 = state_j1 * set_point_j0 / (state_j1 + state_j2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure that this is necessary. That is the actual one that get set when one uses trajectory messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should proably rethink if we need to listen to the J0 position controllers at all. |
||
set_point_j2 = state_j2 * set_point_j0 / (state_j1 + state_j2) | ||
|
||
set_points.update({joint_1_name: set_point_j1}) | ||
set_points.update({joint_2_name: set_point_j2}) | ||
|
||
# Avoid executing again this | ||
underactuation_fingers_processed.append(joint_name[3:5]) | ||
else: | ||
if "J0" not in joint_name: | ||
# Avoind adding set points of J0 to the output | ||
set_points.update({joint_name: raw_set_points[joint_name]}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this method only needs to do this bit? |
||
|
||
# Create the RobotState since some moveit functions require this object instead of | ||
# a dictionary. The JointState is filled first. | ||
joint_state = JointState() | ||
joint_state.header.stamp = rospy.Time.now() | ||
for joint_name in set_points: | ||
joint_state.name.append(joint_name) | ||
joint_state.position.append(set_points[joint_name]) | ||
|
||
move_group_robot_state = RobotState() | ||
move_group_robot_state.joint_state = joint_state | ||
|
||
return set_points, move_group_robot_state | ||
|
||
def fix_set_points_if_far_from_state(self, set_points, robot_state): | ||
""" | ||
Updates the set points with the current state of the robot if the set point | ||
is far from the state given the allowed_start_tolerance. This can happen when the joint is not able | ||
to achieve a certain set point of the controller such as when grasping an object. | ||
@param set_points - Dictionary that maps the joint names with their set points. | ||
@param robot_state - RobotState object that contains the joint names and their set points | ||
as the position of the joints. | ||
@return - Tuple that contains the updated set_points and updated robot_state. | ||
""" | ||
current_state = self.get_current_state() | ||
|
||
# Update set points | ||
for joint_name in list(set_points.keys()): | ||
if abs(set_points[joint_name] - current_state[joint_name]) > self._allowed_start_tolerance: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want the user to be able to specify the |
||
set_points[joint_name] = current_state[joint_name] | ||
|
||
# Update robot state | ||
for index, joint_name in enumerate(robot_state.joint_state.name): | ||
if abs(robot_state.joint_state.position[index] - current_state[joint_name]) > self._allowed_start_tolerance: | ||
robot_state.joint_state.position[index] = current_state[joint_name] | ||
|
||
return set_points, robot_state | ||
|
||
def move_to_named_target(self, name, wait=True): | ||
""" | ||
Set target of the robot's links and moves to it | ||
|
@@ -770,6 +976,36 @@ def _joint_states_callback(self, joint_state): | |
self._joints_effort = {n: v for n, v in | ||
zip(joint_state.name, joint_state.effort)} | ||
Comment on lines
976
to
977
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. side note -> this can be just self._joints_effort = {zip(joint_state.name, joint_state.effort)} |
||
|
||
def _set_up_set_points_subscribers(self, controllers_list): | ||
""" | ||
Sets up the required subscribers to read from the set points of each given controller | ||
@param controller_list - Dictionary mapping a trajectory controller with the list of the joints it has | ||
""" | ||
# Get joint names of the group | ||
joint_names_group = self._move_group_commander.get_active_joints() | ||
trajectory_controllers_state_subscribed = [] | ||
j0_position_controllers_state_subscribed = [] | ||
|
||
# Subscribe to all the trajectory controllers that contain joints of this group | ||
for controller_name in controllers_list.keys(): | ||
for joint_name in joint_names_group: | ||
if joint_name in controllers_list[controller_name]: | ||
topic_name = f"/{controller_name}/state" | ||
if topic_name not in trajectory_controllers_state_subscribed: | ||
rospy.Subscriber(topic_name, JointTrajectoryControllerState, self._set_point_cb, queue_size=1) | ||
trajectory_controllers_state_subscribed.append(topic_name) | ||
|
||
# Subscribe to the j0 position controllers of the joints contained in this group | ||
for joint_name in joint_names_group: | ||
if self._is_joint_underactuated(joint_name): | ||
topic_name = f"/sh_{joint_name.lower()[0:5]}j0_position_controller/state" | ||
if topic_name not in j0_position_controllers_state_subscribed: | ||
rospy.Subscriber(topic_name, | ||
JointControllerState, | ||
self._set_point_j0_cb, f"{joint_name[0:5]}J0", | ||
queue_size=1) | ||
j0_position_controllers_state_subscribed.append(topic_name) | ||
|
||
def _set_up_action_client(self, controller_list): | ||
""" | ||
Sets up an action client to communicate with the trajectory controller. | ||
|
@@ -852,6 +1088,46 @@ def _call_action(self, goals): | |
self._clients[client].send_goal( | ||
goals[client], lambda terminal_state, result: self._action_done_cb(client, terminal_state, result)) | ||
|
||
def _set_point_cb(self, msg): | ||
""" | ||
Updates the dictionary mapping joint names with their desired position in the trajectory controllers | ||
@param msg - ROS message of type JointTrajectoryControllerState | ||
""" | ||
joint_names_group = self._move_group_commander.get_active_joints() | ||
with self._set_points_lock: | ||
for index, joint in enumerate(msg.joint_names): | ||
if joint not in joint_names_group: | ||
continue | ||
self._set_points.update({joint: msg.desired.positions[index]}) | ||
if not self._are_set_points_ready: | ||
with self._set_points_cv: | ||
for joint_name in joint_names_group: | ||
if self._is_joint_underactuated(joint_name) and f"{joint_name[0:5]}J0" not in joint_names_group: | ||
joint_names_group.append(f"{joint_name[0:5]}J0") | ||
for joint_name in joint_names_group: | ||
if joint_name not in self._set_points.keys(): | ||
break | ||
else: | ||
self._are_set_points_ready = True | ||
self._set_points_cv.notifyAll() | ||
|
||
def _set_point_j0_cb(self, msg, joint_name): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned before, not really sure if we need to know the J0 goals at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we took the decision of listening to J0's set points because the J1 and J2 set points because of the underactuation might be far from the real state and also outside of the joint limits. For instance, when the set point of FFJ2 is 90deg and the set point of FFJ1 is ~60deg I find this: |
||
""" | ||
Updates the dictionary mapping joint names with their desired position in the trajectory controllers | ||
This callback is associated to the position controller of j0 | ||
@param msg - ROS message of type JointControllerState | ||
""" | ||
with self._set_points_lock: | ||
self._set_points.update({joint_name: msg.set_point}) | ||
|
||
def _wait_for_set_points(self): | ||
""" | ||
Waits until the set points variable has been updated with all joints | ||
""" | ||
if not self._are_set_points_ready: | ||
with self._set_points_cv: | ||
self._set_points_cv.wait() | ||
|
||
def run_joint_trajectory_unsafe(self, joint_trajectory, wait=True): | ||
""" | ||
Moves robot through all joint states with specified timeouts. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not wrong, this repo still has the master branch of the linter , so if I remove this line, the tests might fail