Skip to content

Commit

Permalink
[add] add velocity and effort test
Browse files Browse the repository at this point in the history
  • Loading branch information
hijimasa committed Oct 31, 2024
1 parent c7f9431 commit 9131012
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 22 deletions.
20 changes: 16 additions & 4 deletions test/test_topic_based_robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,28 @@

rclpy.init()

robot = TopicBasedRobot(["joint_1", "joint_2", "joint_3"])
robot = TopicBasedRobot(["joint_1", "joint_2", "joint_3"], ["left_wheel_joint", "right_wheel_joint"], ["rrr_base_joint"])
# By default the joint_states should have the values from initial_value from rrr.urdf.xacro
current_joint_state = robot.get_current_joint_state()
current_joint_state = robot.get_current_position_joint_state()
urdf_initial_values = [0.2, 0.3, 0.1]
assert (
current_joint_state == urdf_initial_values
), f"{current_joint_state=} != {urdf_initial_values=}"

# Test setting the robot joint states
# Test setting the robot joint position states
joint_state = [0.1, 0.2, 0.3]
robot.set_joint_positions(joint_state)
current_joint_state = robot.get_current_joint_state()
current_joint_state = robot.get_current_position_joint_state()
assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}"

# Test setting the robot joint velocity states
joint_state = [0.4, 0.5]
robot.set_joint_velocities(joint_state)
current_joint_state = robot.get_current_velocity_joint_state()
assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}"

# Test setting the robot joint effort states
joint_state = [0.6]
robot.set_joint_efforts(joint_state)
current_joint_state = robot.get_current_effort_joint_state()
assert current_joint_state == joint_state, f"{current_joint_state=} != {joint_state=}"
182 changes: 164 additions & 18 deletions test/topic_based_robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
class TopicBasedRobot(Node):
"""Topic Based Robot to test topic_based_ros2_control interface"""

def __init__(self, joint_names: list[str]) -> None:
def __init__(self, position_joint_names: list[str], velocity_joint_names = [], effort_joint_names = []) -> None:
super().__init__("topic_based_robot")
self.joint_names = joint_names.copy()
self.position_joint_names = position_joint_names.copy()
self.velocity_joint_names = velocity_joint_names.copy()
self.effort_joint_names = effort_joint_names.copy()
self.last_joint_command = []
self.current_joint_state = []
self.callback_group = ReentrantCallbackGroup()
Expand Down Expand Up @@ -72,46 +74,154 @@ def __init__(self, joint_names: list[str]) -> None:
callback_group=self.callback_group,
)

def filter_joint_state_msg(self, msg: JointState):
def filter_joint_position_state_msg(self, msg: JointState):
joint_states = []
for joint_name in self.joint_names:
self.get_logger().warning(
f"Caught msg name list: '{msg.name}'",
)
for joint_name in self.position_joint_names:
try:
index = msg.name.index(joint_name)
except ValueError:
msg = f"Joint name '{joint_name}' not in input keys {msg.name}"
raise ValueError(msg) from None
continue
#
# Commented out because the joints that the message outputs were sometimes missing
#
#msg = f"Joint name '{joint_name}' not in input keys {msg.name}"
#raise ValueError(msg) from None
joint_states.append(msg.position[index])
return joint_states

def filter_joint_velocity_state_msg(self, msg: JointState):
joint_states = []
self.get_logger().warning(
f"Caught msg name list: '{msg.name}'",
)
for joint_name in self.velocity_joint_names:
try:
index = msg.name.index(joint_name)
except ValueError:
continue
#
# Commented out because the joints that the message outputs were sometimes missing
#
#msg = f"Joint name '{joint_name}' not in input keys {msg.name}"
#raise ValueError(msg) from None
joint_states.append(msg.velocity[index])
return joint_states

def filter_joint_effort_state_msg(self, msg: JointState):
joint_states = []
self.get_logger().warning(
f"Caught msg name list: '{msg.name}'",
)
for joint_name in self.effort_joint_names:
try:
index = msg.name.index(joint_name)
except ValueError:
continue
#
# Commented out because the joints that the message outputs were sometimes missing
#
#msg = f"Joint name '{joint_name}' not in input keys {msg.name}"
#raise ValueError(msg) from None
joint_states.append(msg.effort[index])
return joint_states

def command_callback(self, msg: JointState):
self.last_joint_command = self.filter_joint_state_msg(msg)
# Output by position, velocity, and force, so the number of joint names of each type is matched
if self.position_joint_names[0] in msg.name:
assert len(self.position_joint_names) == len(msg.name), f"{self.position_joint_names=} != {msg.name=}"
self.last_position_joint_command = self.filter_joint_position_state_msg(msg)

if not len(self.velocity_joint_names) == 0:
if self.velocity_joint_names[0] in msg.name:
assert len(self.velocity_joint_names) == len(msg.name), f"{self.velocity_joint_names=} != {msg.name=}"
self.last_velocity_joint_command = self.filter_joint_velocity_state_msg(msg)

if not len(self.effort_joint_names) == 0:
if self.effort_joint_names[0] in msg.name:
assert len(self.effort_joint_names) == len(msg.name), f"{self.effort_joint_names=} != {msg.name=}"
self.last_effort_joint_command = self.filter_joint_effort_state_msg(msg)

def joint_states_callback(self, msg: JointState):
self.current_joint_state = self.filter_joint_state_msg(msg)
self.current_position_joint_state = self.filter_joint_position_state_msg(msg)
if not len(self.velocity_joint_names) == 0:
self.current_velocity_joint_state = self.filter_joint_velocity_state_msg(msg)
if not len(self.effort_joint_names) == 0:
self.current_effort_joint_state = self.filter_joint_effort_state_msg(msg)

def get_current_joint_command(self) -> OrderedDict[str, float]:
def get_current_position_joint_command(self) -> OrderedDict[str, float]:
"""Get the last joint command sent to the robot."""
self.last_joint_command = []
while len(self.last_joint_command) == 0:
self.last_position_joint_command = []
while len(self.last_position_joint_command) == 0:
self.get_logger().warning(
f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.last_joint_command
return self.last_position_joint_command

def get_current_joint_state(self) -> OrderedDict[str, float]:
def get_current_velocity_joint_command(self) -> OrderedDict[str, float]:
"""Get the last joint command sent to the robot."""
self.last_velocity_joint_command = []
while len(self.last_velocity_joint_command) == 0:
self.get_logger().warning(
f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.last_velocity_joint_command

def get_current_effort_joint_command(self) -> OrderedDict[str, float]:
"""Get the last joint command sent to the robot."""
self.last_effort_joint_command = []
while len(self.last_effort_joint_command) == 0:
self.get_logger().warning(
f"Waiting for joint command '{self.desired_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.last_effort_joint_command

def get_current_position_joint_state(self) -> OrderedDict[str, float]:
"""Get the current joint state reported by ros2_control on joint_states topic."""
self.current_joint_state = []
while len(self.current_joint_state) == 0:
self.current_position_joint_state = []
while len(self.current_position_joint_state) == 0:
self.get_logger().warning(
f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.current_joint_state
return self.current_position_joint_state

def get_current_velocity_joint_state(self) -> OrderedDict[str, float]:
"""Get the current joint state reported by ros2_control on joint_states topic."""
self.current_velocity_joint_state = []
while len(self.current_velocity_joint_state) == 0:
self.get_logger().warning(
f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.current_velocity_joint_state

def get_current_effort_joint_state(self) -> OrderedDict[str, float]:
"""Get the current joint state reported by ros2_control on joint_states topic."""
self.current_effort_joint_state = []
while len(self.current_effort_joint_state) == 0:
self.get_logger().warning(
f"Waiting for current joint states from topic '{self.current_joint_state_subscriber.topic_name}'...",
throttle_duration_sec=2.0,
skip_first=True,
)
rclpy.spin_once(self, timeout_sec=0.0)
return self.current_effort_joint_state

def set_joint_positions(
self,
Expand All @@ -120,13 +230,49 @@ def set_joint_positions(
"""Set the joint positions of the robot."""
self.actual_joint_state_publisher.publish(
JointState(
name=list(self.joint_names),
name=list(self.position_joint_names),
position=joint_positions,
),
)
while not np.allclose(
self.get_current_joint_state(),
self.get_current_position_joint_state(),
joint_positions,
atol=1e-3,
):
rclpy.spin_once(self, timeout_sec=0.0)

def set_joint_velocities(
self,
joint_velocities: list[float] | np.ndarray,
) -> None:
"""Set the joint velocities of the robot."""
self.actual_joint_state_publisher.publish(
JointState(
name=list(self.velocity_joint_names),
velocity=joint_velocities,
),
)
while not np.allclose(
self.get_current_velocity_joint_state(),
joint_velocities,
atol=1e-3,
):
rclpy.spin_once(self, timeout_sec=0.0)

def set_joint_efforts(
self,
joint_efforts: list[float] | np.ndarray,
) -> None:
"""Set the joint efforts of the robot."""
self.actual_joint_state_publisher.publish(
JointState(
name=list(self.effort_joint_names),
effort=joint_efforts,
),
)
while not np.allclose(
self.get_current_effort_joint_state(),
joint_efforts,
atol=1e-3,
):
rclpy.spin_once(self, timeout_sec=0.0)

0 comments on commit 9131012

Please sign in to comment.