From 9131012c649ddb7f6d8bfabbfbc08cf9f1bf98c6 Mon Sep 17 00:00:00 2001 From: Masaaki Hijikata Date: Thu, 31 Oct 2024 16:58:29 +0900 Subject: [PATCH] [add] add velocity and effort test --- test/test_topic_based_robot.py | 20 +++- test/topic_based_robot.py | 182 +++++++++++++++++++++++++++++---- 2 files changed, 180 insertions(+), 22 deletions(-) diff --git a/test/test_topic_based_robot.py b/test/test_topic_based_robot.py index 7f2b600..e5e7339 100755 --- a/test/test_topic_based_robot.py +++ b/test/test_topic_based_robot.py @@ -35,16 +35,28 @@ rclpy.init() -robot = TopicBasedRobot(["joint_1", "joint_2", "joint_3"]) +robot = TopicBasedRobot(["joint_1", "joint_2", "joint_3"], ["left_wheel_joint", "right_wheel_joint"], ["rrr_base_joint"]) # By default the joint_states should have the values from initial_value from rrr.urdf.xacro -current_joint_state = robot.get_current_joint_state() +current_joint_state = robot.get_current_position_joint_state() urdf_initial_values = [0.2, 0.3, 0.1] assert ( current_joint_state == urdf_initial_values ), f"{current_joint_state=} != {urdf_initial_values=}" -# Test setting the robot joint states +# Test setting the robot joint position states joint_state = [0.1, 0.2, 0.3] robot.set_joint_positions(joint_state) -current_joint_state = robot.get_current_joint_state() +current_joint_state = robot.get_current_position_joint_state() +assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}" + +# Test setting the robot joint velocity states +joint_state = [0.4, 0.5] +robot.set_joint_velocities(joint_state) +current_joint_state = robot.get_current_velocity_joint_state() +assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}" + +# Test setting the robot joint effort states +joint_state = [0.6] +robot.set_joint_efforts(joint_state) +current_joint_state = robot.get_current_effort_joint_state() assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}" diff --git a/test/topic_based_robot.py b/test/topic_based_robot.py index fc33a83..cefa66b 100644 --- a/test/topic_based_robot.py +++ b/test/topic_based_robot.py @@ -42,9 +42,11 @@ class TopicBasedRobot(Node): """Topic Based Robot to test topic_based_ros2_control interface""" - def __init__(self, joint_names: list[str]) -> None: + def __init__(self, position_joint_names: list[str], velocity_joint_names = [], effort_joint_names = []) -> None: super().__init__("topic_based_robot") - self.joint_names = joint_names.copy() + self.position_joint_names = position_joint_names.copy() + self.velocity_joint_names = velocity_joint_names.copy() + self.effort_joint_names = effort_joint_names.copy() self.last_joint_command = [] self.current_joint_state = [] self.callback_group = ReentrantCallbackGroup() @@ -72,46 +74,154 @@ def __init__(self, joint_names: list[str]) -> None: callback_group=self.callback_group, ) - def filter_joint_state_msg(self, msg: JointState): + def filter_joint_position_state_msg(self, msg: JointState): joint_states = [] - for joint_name in self.joint_names: + self.get_logger().warning( + f"Caught msg name list: '{msg.name}'", + ) + for joint_name in self.position_joint_names: try: index = msg.name.index(joint_name) except ValueError: - msg = f"Joint name '{joint_name}' not in input keys {msg.name}" - raise ValueError(msg) from None + continue + # + # Commented out because the joints that the message outputs were sometimes missing + # + #msg = f"Joint name '{joint_name}' not in input keys {msg.name}" + #raise ValueError(msg) from None joint_states.append(msg.position[index]) return joint_states + def filter_joint_velocity_state_msg(self, msg: JointState): + joint_states = [] + self.get_logger().warning( + f"Caught msg name list: '{msg.name}'", + ) + for joint_name in self.velocity_joint_names: + try: + index = msg.name.index(joint_name) + except ValueError: + continue + # + # Commented out because the joints that the message outputs were sometimes missing + # + #msg = f"Joint name '{joint_name}' not in input keys {msg.name}" + #raise ValueError(msg) from None + joint_states.append(msg.velocity[index]) + return joint_states + + def filter_joint_effort_state_msg(self, msg: JointState): + joint_states = [] + self.get_logger().warning( + f"Caught msg name list: '{msg.name}'", + ) + for joint_name in self.effort_joint_names: + try: + index = msg.name.index(joint_name) + except ValueError: + continue + # + # Commented out because the joints that the message outputs were sometimes missing + # + #msg = f"Joint name '{joint_name}' not in input keys {msg.name}" + #raise ValueError(msg) from None + joint_states.append(msg.effort[index]) + return joint_states + def command_callback(self, msg: JointState): - self.last_joint_command = self.filter_joint_state_msg(msg) + # Output by position, velocity, and force, so the number of joint names of each type is matched + if self.position_joint_names[0] in msg.name: + assert len(self.position_joint_names) == len(msg.name), f"{self.position_joint_names=} != {msg.name=}" + self.last_position_joint_command = self.filter_joint_position_state_msg(msg) + + if not len(self.velocity_joint_names) == 0: + if self.velocity_joint_names[0] in msg.name: + assert len(self.velocity_joint_names) == len(msg.name), f"{self.velocity_joint_names=} != {msg.name=}" + self.last_velocity_joint_command = self.filter_joint_velocity_state_msg(msg) + + if not len(self.effort_joint_names) == 0: + if self.effort_joint_names[0] in msg.name: + assert len(self.effort_joint_names) == len(msg.name), f"{self.effort_joint_names=} != {msg.name=}" + self.last_effort_joint_command = self.filter_joint_effort_state_msg(msg) def joint_states_callback(self, msg: JointState): - self.current_joint_state = self.filter_joint_state_msg(msg) + self.current_position_joint_state = self.filter_joint_position_state_msg(msg) + if not len(self.velocity_joint_names) == 0: + self.current_velocity_joint_state = self.filter_joint_velocity_state_msg(msg) + if not len(self.effort_joint_names) == 0: + self.current_effort_joint_state = self.filter_joint_effort_state_msg(msg) - def get_current_joint_command(self) -> OrderedDict[str, float]: + def get_current_position_joint_command(self) -> OrderedDict[str, float]: """Get the last joint command sent to the robot.""" - self.last_joint_command = [] - while len(self.last_joint_command) == 0: + self.last_position_joint_command = [] + while len(self.last_position_joint_command) == 0: self.get_logger().warning( f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...", throttle_duration_sec=2.0, skip_first=True, ) rclpy.spin_once(self, timeout_sec=0.0) - return self.last_joint_command + return self.last_position_joint_command - def get_current_joint_state(self) -> OrderedDict[str, float]: + def get_current_velocity_joint_command(self) -> OrderedDict[str, float]: + """Get the last joint command sent to the robot.""" + self.last_velocity_joint_command = [] + while len(self.last_velocity_joint_command) == 0: + self.get_logger().warning( + f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...", + throttle_duration_sec=2.0, + skip_first=True, + ) + rclpy.spin_once(self, timeout_sec=0.0) + return self.last_velocity_joint_command + + def get_current_effort_joint_command(self) -> OrderedDict[str, float]: + """Get the last joint command sent to the robot.""" + self.last_effort_joint_command = [] + while len(self.last_effort_joint_command) == 0: + self.get_logger().warning( + f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...", + throttle_duration_sec=2.0, + skip_first=True, + ) + rclpy.spin_once(self, timeout_sec=0.0) + return self.last_effort_joint_command + + def get_current_position_joint_state(self) -> OrderedDict[str, float]: """Get the current joint state reported by ros2_control on joint_states topic.""" - self.current_joint_state = [] - while len(self.current_joint_state) == 0: + self.current_position_joint_state = [] + while len(self.current_position_joint_state) == 0: self.get_logger().warning( f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...", throttle_duration_sec=2.0, skip_first=True, ) rclpy.spin_once(self, timeout_sec=0.0) - return self.current_joint_state + return self.current_position_joint_state + + def get_current_velocity_joint_state(self) -> OrderedDict[str, float]: + """Get the current joint state reported by ros2_control on joint_states topic.""" + self.current_velocity_joint_state = [] + while len(self.current_velocity_joint_state) == 0: + self.get_logger().warning( + f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...", + throttle_duration_sec=2.0, + skip_first=True, + ) + rclpy.spin_once(self, timeout_sec=0.0) + return self.current_velocity_joint_state + + def get_current_effort_joint_state(self) -> OrderedDict[str, float]: + """Get the current joint state reported by ros2_control on joint_states topic.""" + self.current_effort_joint_state = [] + while len(self.current_effort_joint_state) == 0: + self.get_logger().warning( + f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...", + throttle_duration_sec=2.0, + skip_first=True, + ) + rclpy.spin_once(self, timeout_sec=0.0) + return self.current_effort_joint_state def set_joint_positions( self, @@ -120,13 +230,49 @@ def set_joint_positions( """Set the joint positions of the robot.""" self.actual_joint_state_publisher.publish( JointState( - name=list(self.joint_names), + name=list(self.position_joint_names), position=joint_positions, ), ) while not np.allclose( - self.get_current_joint_state(), + self.get_current_position_joint_state(), joint_positions, atol=1e-3, ): rclpy.spin_once(self, timeout_sec=0.0) + + def set_joint_velocities( + self, + joint_velocities: list[float] | np.ndarray, + ) -> None: + """Set the joint velocities of the robot.""" + self.actual_joint_state_publisher.publish( + JointState( + name=list(self.velocity_joint_names), + velocity=joint_velocities, + ), + ) + while not np.allclose( + self.get_current_velocity_joint_state(), + joint_velocities, + atol=1e-3, + ): + rclpy.spin_once(self, timeout_sec=0.0) + + def set_joint_efforts( + self, + joint_efforts: list[float] | np.ndarray, + ) -> None: + """Set the joint efforts of the robot.""" + self.actual_joint_state_publisher.publish( + JointState( + name=list(self.effort_joint_names), + effort=joint_efforts, + ), + ) + while not np.allclose( + self.get_current_effort_joint_state(), + joint_efforts, + atol=1e-3, + ): + rclpy.spin_once(self, timeout_sec=0.0)