Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fts real robot #230

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ deprecated
probabilistic_model>=6.0.2
random_events>=3.1.2
sympy
pint>=0.21.1
pint>=0.21.1
scipy~=1.10.1
8 changes: 8 additions & 0 deletions src/pycram/datastructures/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,11 @@ def from_pycram_joint_type(cls, joint_type: JointType) -> 'MultiverseJointCMD':
return MultiverseJointCMD.PRISMATIC_JOINT_CMD
else:
raise UnsupportedJointType(joint_type)


class FilterConfig(Enum):
"""
Declare existing filter methods.
Currently supported: Butterworth
"""
butterworth = 1
29 changes: 29 additions & 0 deletions src/pycram/ros/custom_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import abstractmethod

from scipy.signal import butter, lfilter


class CustomFilter:
"""
Abstract class to ensure that every supported filter needs to implement the filter method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the class name also be filter?

"""

@abstractmethod
def filter(self, data):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

pass


class Butterworth(CustomFilter):
"""
Implementation for a Butterworth filter.
"""

def __init__(self, order=4, cutoff=10, fs=60):
self.order = order
self.cutoff = cutoff
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation of params

self.fs = fs

self.b, self.a = butter(self.order, cutoff / (0.5 * fs), btype='low')

def filter(self, data: list):
return lfilter(self.b, self.a, data)
156 changes: 155 additions & 1 deletion src/pycram/ros_utils/force_torque_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@
import time
import threading

import rospy
from geometry_msgs.msg import WrenchStamped
from std_msgs.msg import Header

from ..datastructures.enums import FilterConfig
from ..datastructures.world import World
from ..ros.custom_filter import Butterworth
from ..ros.data_types import Time
from ..ros.publisher import create_publisher


class ForceTorqueSensor:
class ForceTorqueSensorSimulated:
"""
Simulated force-torque sensor for a joint with a given name.
Reads simulated forces and torques at that joint from world and publishes geometry_msgs/Wrench messages
to the given topic.
"""

def __init__(self, joint_name, fts_topic="/pycram/fts", interval=0.1):
"""
The given joint_name has to be part of :py:attr:`~pycram.world.World.robot` otherwise a
Expand Down Expand Up @@ -77,3 +82,152 @@ def _stop_publishing(self) -> None:
"""
self.kill_event.set()
self.thread.join()


class ForceTorqueSensor:
"""
Monitor a force-torque sensor of a supported robot and save relevant data.

Apply a specified filter and save this data as well.
Default filter is the low pass filter 'Butterworth'

Can also calculate the derivative of (un-)filtered data
"""
filtered = 'filtered'
unfiltered = 'unfiltered'

def __init__(self, robot_name, filter_config=FilterConfig.butterworth, filter_order=4, custom_topic=None,
debug=False):

"""
Create a subscriber for the force-torque-sensor topic of a specified robot.

:param robot_name: Name of the robot
:param filter_config: Desired filter (default: Butterworth)
:param filter_order: Order of the filter. Declares the number of elements that delay the sampling
:param custom_topic: Declare a custom topic if the default topics do not fit
"""

self.robot_name = robot_name
self.filter_config = filter_config
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declare the variables as class variables and document them there

self.filter = self.__get_filter(order=filter_order)
self.debug = debug

self.wrench_topic_name = custom_topic
self.force_torque_subscriber = None
self.init_data = True

self.whole_data = None
self.prev_values = None

self.order = filter_order

self.__setup()

def __setup(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double underscore methods usually have a meaning of "super private please do never access this method from any other file". Is this intended?

self.__get_robot_parameters()
self.subscribe()

def __get_robot_parameters(self):
if self.wrench_topic_name is not None:
return

if self.robot_name == 'hsrb':
self.wrench_topic_name = '/hsrb/wrist_wrench/compensated'

elif self.robot_name == 'iai_donbot':
self.wrench_topic_name = '/kms40_driver/wrench'
else:
rospy.logerr(f'{self.robot_name} is not supported')

def __get_rospy_data(self,
data_compensated: WrenchStamped):
if self.init_data:
self.init_data = False
self.prev_values = [data_compensated] * (self.order + 1)
self.whole_data = {self.unfiltered: [data_compensated],
self.filtered: [data_compensated]}

filtered_data = self.__filter_data(data_compensated)

self.whole_data[self.unfiltered].append(data_compensated)
self.whole_data[self.filtered].append(filtered_data)

self.prev_values.append(data_compensated)
self.prev_values.pop(0)

if self.debug:
print(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the debug logging here instead of printing

f'x: {data_compensated.wrench.force.x}, '
f'y: {data_compensated.wrench.force.y}, '
f'z: {data_compensated.wrench.force.z}')

def __get_filter(self, order=4, cutoff=10, fs=60):
if self.filter_config == FilterConfig.butterworth:
return Butterworth(order=order, cutoff=cutoff, fs=fs)

def __filter_data(self, current_wrench_data: WrenchStamped) -> WrenchStamped:
filtered_data = WrenchStamped()
for attr in ['x', 'y', 'z']:
force_values = [getattr(val.wrench.force, attr) for val in self.prev_values] + [
getattr(current_wrench_data.wrench.force, attr)]
torque_values = [getattr(val.wrench.torque, attr) for val in self.prev_values] + [
getattr(current_wrench_data.wrench.torque, attr)]

filtered_force = self.filter.filter(force_values)[-1]
filtered_torque = self.filter.filter(torque_values)[-1]

setattr(filtered_data.wrench.force, attr, filtered_force)
setattr(filtered_data.wrench.torque, attr, filtered_torque)

return filtered_data

def subscribe(self):
"""
Subscribe to the specified wrench topic.

This will automatically be called on setup.
Only use this if you already unsubscribed before.
"""

self.force_torque_subscriber = rospy.Subscriber(name=self.wrench_topic_name,
data_class=WrenchStamped,
callback=self.__get_rospy_data)

def unsubscribe(self):
"""
Unsubscribe from the specified topic
"""
self.force_torque_subscriber.unregister()

def get_last_value(self, is_filtered=True) -> WrenchStamped:
"""
Get the most current data values.

:param is_filtered: Decides about using filtered or raw data

:return: A list containing the most current values (newest are first)
"""
status = self.filtered if is_filtered else self.unfiltered
return self.whole_data[status][-1]

def get_derivative(self, is_filtered=True) -> WrenchStamped:
"""
Calculate the derivative of current data.

:param is_filtered: Decides about using filtered or raw data
"""
status = self.filtered if is_filtered else self.unfiltered

before: WrenchStamped = self.whole_data[status][-2]
after: WrenchStamped = self.whole_data[status][-1]
derivative = WrenchStamped()

derivative.wrench.force.x = before.wrench.force.x - after.wrench.force.x
derivative.wrench.force.y = before.wrench.force.y - after.wrench.force.y
derivative.wrench.force.z = before.wrench.force.z - after.wrench.force.z
derivative.wrench.torque.x = before.wrench.torque.x - after.wrench.torque.x
derivative.wrench.torque.y = before.wrench.torque.y - after.wrench.torque.y
derivative.wrench.torque.z = before.wrench.torque.z - after.wrench.torque.z

return derivative