Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement latched topics on TCP #2

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nimbro_topic_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ find_package(catkin REQUIRED COMPONENTS
topic_tools
rostest
message_generation
std_msgs
std_srvs
)

add_message_files(FILES
Expand Down
3 changes: 3 additions & 0 deletions nimbro_topic_transport/doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ parameter is `name`.
- `resend`: If the sender does not get a message 1.0/`rate` after the last one,
it will re-send the last received one. (UDP only)
- `compress`: If true, compress the data on the wire with bz2.
- `latch` (bool): This topic should be latched. If you want it to work correctly
(even when the receiver is started later than the sender), you have to launch the
helper nodes (look in the example launch files).
11 changes: 11 additions & 0 deletions nimbro_topic_transport/launch/tcp_receiver.launch
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
<launch>
<arg name="support_latching" default="0" />
<arg name="latch_helper_server_port" default="17010" />
<arg name="latch_helper_server_hostname" default="localhost" />
<arg name="latch_helper_service_server_name" default="tcp_sender" />

<node name="tcp_receiver" pkg="nimbro_topic_transport" type="tcp_receiver" output="screen">
<param name="port" value="17001" />

Expand All @@ -8,4 +13,10 @@
<remap from="/my_first_topic" to="/recv/my_first_topic" />
<remap from="/my_second_topic" to="/recv/my_second_topic" />
</node>

<include file="$(find nimbro_topic_transport)/launch/tcp_receiver_latch_helper.launch" if="$(arg support_latching)">
<arg name="server_port" value="$(arg latch_helper_server_port)" />
<arg name="server_hostname" value="$(arg latch_helper_server_hostname)" />
<arg name="service_server_name" value="$(arg latch_helper_service_server_name)" />
</include>
</launch>
21 changes: 21 additions & 0 deletions nimbro_topic_transport/launch/tcp_receiver_latch_helper.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<launch>
<arg name="service_server_name" />
<arg name="server_hostname" />
<arg name="server_port" />

<node pkg="nimbro_topic_transport" type="tcp_receiver_latch_helper.py" name="$(anon tcp_receiver_latch_helper)">
<param name="service_server_name" value="$(arg service_server_name)" />
</node>

<node pkg="nimbro_service_transport" type="service_client" name="$(anon tcp_receiver_latch_helper_service_client)">
<param name="server" value="$(arg server_hostname)" />
<param name="port" value="$(arg server_port)" />

<rosparam subst_value="true">
services:
- name: "$(arg service_server_name)/publish_latched_topics"
type: "std_srvs/Empty"
</rosparam>

</node>
</launch>
6 changes: 6 additions & 0 deletions nimbro_topic_transport/launch/tcp_sender.launch
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<launch>
<arg name="target" default="localhost" />
<arg name="support_latching" default="0" />
<arg name="latch_helper_server_port" default="17010" />

<!-- The UDP sender node -->
<node name="tcp_sender" pkg="nimbro_topic_transport" type="tcp_sender" output="screen">
Expand All @@ -11,4 +13,8 @@
<!-- Load the list of topics from a YAML file -->
<rosparam command="load" file="$(find nimbro_topic_transport)/launch/topics.yaml" />
</node>

<include file="$(find nimbro_topic_transport)/launch/tcp_sender_latch_helper.launch" if="$(arg support_latching)">
<arg name="server_port" value="$(arg latch_helper_server_port)" />
</include>
</launch>
7 changes: 7 additions & 0 deletions nimbro_topic_transport/launch/tcp_sender_latch_helper.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<launch>
<arg name="server_port" />

<node name="$(anon tcp_sender_latch_helper_service_server)" pkg="nimbro_service_transport" type="service_server">
<param name="port" value="$(arg server_port)" />
</node>
</launch>
1 change: 1 addition & 0 deletions nimbro_topic_transport/launch/topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ topics:
compress: true # enable bz2 compression
rate: 1.0 # rate limit at 1Hz
- name: "/my_second_topic"
latch: True
4 changes: 4 additions & 0 deletions nimbro_topic_transport/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

<buildtool_depend>catkin</buildtool_depend>
<depend>roscpp</depend>
<depend>rospy</depend>
<depend>topic_tools</depend>

<build_depend>message_generation</build_depend>

<depend>std_msgs</depend>
<depend>std_srvs</depend>

<!-- This is not a hard dependency, the package compiles without plot_msgs. -->
<depend>plot_msgs</depend>

Expand Down
3 changes: 2 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace nimbro_topic_transport

enum TCPFlag
{
TCP_FLAG_COMPRESSED = (1 << 0)
TCP_FLAG_COMPRESSED = (1 << 0),
TCP_FLAG_LATCHED = (1 << 1)
};

struct TCPHeader
Expand Down
11 changes: 11 additions & 0 deletions nimbro_topic_transport/src/tcp/tcp_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "tcp_packet.h"
#include "../topic_info.h"
#include <topic_tools/shape_shifter.h>
#include <std_msgs/String.h>

#include <bzlib.h>

Expand Down Expand Up @@ -106,6 +107,8 @@ TCPReceiver::TCPReceiver()
);

m_nh.param("topic_prefix", m_topicPrefix, std::string());

m_pub_ready = m_nh.advertise<std_msgs::String>("/network/receiver_ready", 1, true);
}

TCPReceiver::~TCPReceiver()
Expand All @@ -116,6 +119,10 @@ void TCPReceiver::run()
{
fd_set fds;

std_msgs::String readyMessage;
readyMessage.data = "";
m_pub_ready.publish(readyMessage);

while(ros::ok())
{
ros::spinOnce();
Expand Down Expand Up @@ -345,6 +352,10 @@ void TCPReceiver::ClientHandler::run()
topic_info::getMsgDef(type)
);

if (header.flags() & TCP_FLAG_LATCHED) {
options.latch = true;
}

// It will take subscribers some time to connect to our publisher.
// Therefore, latch messages so they will not be lost.
// No, this is often unexpected. Instead, wait before publishing.
Expand Down
1 change: 1 addition & 0 deletions nimbro_topic_transport/src/tcp/tcp_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TCPReceiver
ros::WallTimer m_statsTimer;

std::string m_topicPrefix;
ros::Publisher m_pub_ready;
};

}
Expand Down
35 changes: 35 additions & 0 deletions nimbro_topic_transport/src/tcp/tcp_receiver_latch_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python

__author__ = "Martin Pecka <[email protected]>"

import rospy
from std_msgs.msg import String
from std_srvs.srv import Empty, EmptyRequest


class LatchHelper(object):
"""Helper class that calls the publish_latched_messages service of a remote sender whenever a receiver gets ready."""

def __init__(self):

# name of the remote service to be called
self._service_server_name = rospy.get_param("~service_server_name")

self._ready_subscriber = rospy.Subscriber("/network/receiver_ready",
String, self.receiver_ready_cb, queue_size=1)

self._service_server = rospy.ServiceProxy(self._service_server_name + "/publish_latched_messages", Empty)

def receiver_ready_cb(self, receiver_name_msg):
try:
self._service_server.wait_for_service(timeout=0.1)
self._service_server(EmptyRequest())
except rospy.ROSException: # It means the waiting timed out
# So we have nobody to contact and we can omit calling the service
pass


if __name__ == '__main__':
rospy.init_node("tcp_receiver_latch_helper")
helper = LatchHelper()
rospy.spin()
25 changes: 24 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ TCPSender::TCPSender()
if(entry.hasMember("compress") && ((bool)entry["compress"]) == true)
flags |= TCP_FLAG_COMPRESSED;

if(entry.hasMember("latch") && ((bool)entry["latch"]) == true)
flags |= TCP_FLAG_LATCHED;

boost::function<void(const topic_tools::ShapeShifter::ConstPtr&)> func;
func = boost::bind(&TCPSender::send, this, topic, flags, _1);

Expand Down Expand Up @@ -138,7 +141,10 @@ TCPSender::TCPSender()
boost::bind(&TCPSender::updateStats, this)
);
m_statsTimer.start();
}

m_latchedMessageRequestServer = m_nh.advertiseService(
"publish_latched_messages", &TCPSender::sendLatched, this);
}

TCPSender::~TCPSender()
{
Expand Down Expand Up @@ -250,6 +256,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha
if(flags & TCP_FLAG_COMPRESSED)
maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs

if (flags & TCP_FLAG_LATCHED)
this->m_latchedMessages[topic] = std::make_pair(shifter, flags);

m_packet.resize(
sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize
);
Expand Down Expand Up @@ -363,6 +372,20 @@ void TCPSender::updateStats()
m_sentBytesInStatsInterval = 0;
}

bool TCPSender::sendLatched(std_srvs::Empty::Request& request, std_srvs::Empty::Response& response) {

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int>>::iterator it =
this->m_latchedMessages.begin();

// send all latched messages
while (it != this->m_latchedMessages.end()) {
this->send(it->first, it->second.second, it->second.first);
it++;
}

return true;
}

}


Expand Down
8 changes: 8 additions & 0 deletions nimbro_topic_transport/src/tcp/tcp_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <config_server/parameter.h>
#endif

#include <map>

#include <std_srvs/Empty.h>

#include <nimbro_topic_transport/SenderStats.h>

namespace nimbro_topic_transport
Expand All @@ -28,6 +32,7 @@ class TCPSender
bool connect();

void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter);
bool sendLatched(std_srvs::Empty::Request& request, std_srvs::Empty::Response& response);
private:
void updateStats();

Expand All @@ -43,6 +48,8 @@ class TCPSender
std::vector<uint8_t> m_packet;
std::vector<uint8_t> m_compressionBuf;

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int> > m_latchedMessages;

#if WITH_CONFIG_SERVER
std::map<std::string, boost::shared_ptr<config_server::Parameter<bool>>> m_enableTopic;
#endif
Expand All @@ -53,6 +60,7 @@ class TCPSender
ros::WallTimer m_statsTimer;
uint64_t m_sentBytesInStatsInterval;
std::map<std::string, uint64_t> m_topicSendBytesInStatsInteral;
ros::ServiceServer m_latchedMessageRequestServer;
};

}
Expand Down