-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fts real robot #230
base: dev
Are you sure you want to change the base?
Fts real robot #230
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,4 +23,5 @@ deprecated | |
probabilistic_model>=6.0.2 | ||
random_events>=3.1.2 | ||
sympy | ||
pint>=0.21.1 | ||
pint>=0.21.1 | ||
scipy~=1.10.1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from abc import abstractmethod | ||
|
||
from scipy.signal import butter, lfilter | ||
|
||
|
||
class CustomFilter: | ||
""" | ||
Abstract class to ensure that every supported filter needs to implement the filter method | ||
""" | ||
|
||
@abstractmethod | ||
def filter(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docstring |
||
pass | ||
|
||
|
||
class Butterworth(CustomFilter): | ||
""" | ||
Implementation for a Butterworth filter. | ||
""" | ||
|
||
def __init__(self, order=4, cutoff=10, fs=60): | ||
self.order = order | ||
self.cutoff = cutoff | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. documentation of params |
||
self.fs = fs | ||
|
||
self.b, self.a = butter(self.order, cutoff / (0.5 * fs), btype='low') | ||
|
||
def filter(self, data: list): | ||
return lfilter(self.b, self.a, data) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,19 +2,24 @@ | |
import time | ||
import threading | ||
|
||
import rospy | ||
from geometry_msgs.msg import WrenchStamped | ||
from std_msgs.msg import Header | ||
|
||
from ..datastructures.enums import FilterConfig | ||
from ..datastructures.world import World | ||
from ..ros.custom_filter import Butterworth | ||
from ..ros.data_types import Time | ||
from ..ros.publisher import create_publisher | ||
|
||
|
||
class ForceTorqueSensor: | ||
class ForceTorqueSensorSimulated: | ||
""" | ||
Simulated force-torque sensor for a joint with a given name. | ||
Reads simulated forces and torques at that joint from world and publishes geometry_msgs/Wrench messages | ||
to the given topic. | ||
""" | ||
|
||
def __init__(self, joint_name, fts_topic="/pycram/fts", interval=0.1): | ||
""" | ||
The given joint_name has to be part of :py:attr:`~pycram.world.World.robot` otherwise a | ||
|
@@ -77,3 +82,152 @@ def _stop_publishing(self) -> None: | |
""" | ||
self.kill_event.set() | ||
self.thread.join() | ||
|
||
|
||
class ForceTorqueSensor: | ||
""" | ||
Monitor a force-torque sensor of a supported robot and save relevant data. | ||
|
||
Apply a specified filter and save this data as well. | ||
Default filter is the low pass filter 'Butterworth' | ||
|
||
Can also calculate the derivative of (un-)filtered data | ||
""" | ||
filtered = 'filtered' | ||
unfiltered = 'unfiltered' | ||
|
||
def __init__(self, robot_name, filter_config=FilterConfig.butterworth, filter_order=4, custom_topic=None, | ||
debug=False): | ||
|
||
""" | ||
Create a subscriber for the force-torque-sensor topic of a specified robot. | ||
|
||
:param robot_name: Name of the robot | ||
:param filter_config: Desired filter (default: Butterworth) | ||
:param filter_order: Order of the filter. Declares the number of elements that delay the sampling | ||
:param custom_topic: Declare a custom topic if the default topics do not fit | ||
""" | ||
|
||
self.robot_name = robot_name | ||
self.filter_config = filter_config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. declare the variables as class variables and document them there |
||
self.filter = self.__get_filter(order=filter_order) | ||
self.debug = debug | ||
|
||
self.wrench_topic_name = custom_topic | ||
self.force_torque_subscriber = None | ||
self.init_data = True | ||
|
||
self.whole_data = None | ||
self.prev_values = None | ||
|
||
self.order = filter_order | ||
|
||
self.__setup() | ||
|
||
def __setup(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. double underscore methods usually have a meaning of "super private please do never access this method from any other file". Is this intended? |
||
self.__get_robot_parameters() | ||
self.subscribe() | ||
|
||
def __get_robot_parameters(self): | ||
if self.wrench_topic_name is not None: | ||
return | ||
|
||
if self.robot_name == 'hsrb': | ||
self.wrench_topic_name = '/hsrb/wrist_wrench/compensated' | ||
|
||
elif self.robot_name == 'iai_donbot': | ||
self.wrench_topic_name = '/kms40_driver/wrench' | ||
else: | ||
rospy.logerr(f'{self.robot_name} is not supported') | ||
|
||
def __get_rospy_data(self, | ||
data_compensated: WrenchStamped): | ||
if self.init_data: | ||
self.init_data = False | ||
self.prev_values = [data_compensated] * (self.order + 1) | ||
self.whole_data = {self.unfiltered: [data_compensated], | ||
self.filtered: [data_compensated]} | ||
|
||
filtered_data = self.__filter_data(data_compensated) | ||
|
||
self.whole_data[self.unfiltered].append(data_compensated) | ||
self.whole_data[self.filtered].append(filtered_data) | ||
|
||
self.prev_values.append(data_compensated) | ||
self.prev_values.pop(0) | ||
|
||
if self.debug: | ||
print( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use the debug logging here instead of printing |
||
f'x: {data_compensated.wrench.force.x}, ' | ||
f'y: {data_compensated.wrench.force.y}, ' | ||
f'z: {data_compensated.wrench.force.z}') | ||
|
||
def __get_filter(self, order=4, cutoff=10, fs=60): | ||
if self.filter_config == FilterConfig.butterworth: | ||
return Butterworth(order=order, cutoff=cutoff, fs=fs) | ||
|
||
def __filter_data(self, current_wrench_data: WrenchStamped) -> WrenchStamped: | ||
filtered_data = WrenchStamped() | ||
for attr in ['x', 'y', 'z']: | ||
force_values = [getattr(val.wrench.force, attr) for val in self.prev_values] + [ | ||
getattr(current_wrench_data.wrench.force, attr)] | ||
torque_values = [getattr(val.wrench.torque, attr) for val in self.prev_values] + [ | ||
getattr(current_wrench_data.wrench.torque, attr)] | ||
|
||
filtered_force = self.filter.filter(force_values)[-1] | ||
filtered_torque = self.filter.filter(torque_values)[-1] | ||
|
||
setattr(filtered_data.wrench.force, attr, filtered_force) | ||
setattr(filtered_data.wrench.torque, attr, filtered_torque) | ||
|
||
return filtered_data | ||
|
||
def subscribe(self): | ||
""" | ||
Subscribe to the specified wrench topic. | ||
|
||
This will automatically be called on setup. | ||
Only use this if you already unsubscribed before. | ||
""" | ||
|
||
self.force_torque_subscriber = rospy.Subscriber(name=self.wrench_topic_name, | ||
data_class=WrenchStamped, | ||
callback=self.__get_rospy_data) | ||
|
||
def unsubscribe(self): | ||
""" | ||
Unsubscribe from the specified topic | ||
""" | ||
self.force_torque_subscriber.unregister() | ||
|
||
def get_last_value(self, is_filtered=True) -> WrenchStamped: | ||
""" | ||
Get the most current data values. | ||
|
||
:param is_filtered: Decides about using filtered or raw data | ||
|
||
:return: A list containing the most current values (newest are first) | ||
""" | ||
status = self.filtered if is_filtered else self.unfiltered | ||
return self.whole_data[status][-1] | ||
|
||
def get_derivative(self, is_filtered=True) -> WrenchStamped: | ||
""" | ||
Calculate the derivative of current data. | ||
|
||
:param is_filtered: Decides about using filtered or raw data | ||
""" | ||
status = self.filtered if is_filtered else self.unfiltered | ||
|
||
before: WrenchStamped = self.whole_data[status][-2] | ||
after: WrenchStamped = self.whole_data[status][-1] | ||
derivative = WrenchStamped() | ||
|
||
derivative.wrench.force.x = before.wrench.force.x - after.wrench.force.x | ||
derivative.wrench.force.y = before.wrench.force.y - after.wrench.force.y | ||
derivative.wrench.force.z = before.wrench.force.z - after.wrench.force.z | ||
derivative.wrench.torque.x = before.wrench.torque.x - after.wrench.torque.x | ||
derivative.wrench.torque.y = before.wrench.torque.y - after.wrench.torque.y | ||
derivative.wrench.torque.z = before.wrench.torque.z - after.wrench.torque.z | ||
|
||
return derivative |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the class name also be filter?