Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligently subscribe to topics according to their QoS profiles #355

Merged
merged 18 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 86 additions & 40 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,6 @@
# pragma warning(pop)
#endif

namespace
{
bool all_qos_same(const std::vector<rclcpp::TopicEndpointInfo> & values)
{
auto adjacent_different_elements_it = std::adjacent_find(
values.begin(),
values.end(),
[](const rclcpp::TopicEndpointInfo & left, const rclcpp::TopicEndpointInfo & right) -> bool {
return left.qos_profile() != right.qos_profile();
}
);
// No adjacent elements were different, so all are the same.
return adjacent_different_elements_it == values.end();
}
} // unnamed namespace

namespace rosbag2_transport
{
Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer, std::shared_ptr<Rosbag2Node> node)
Expand Down Expand Up @@ -102,11 +86,9 @@ void Recorder::topics_discovery(
while (rclcpp::ok()) {
auto topics_to_subscribe =
get_requested_or_available_topics(requested_topics, include_hidden_topics);
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

Expand Down Expand Up @@ -162,12 +144,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
// that callback called before we reached out the line: writer_->create_topic(topic)
writer_->create_topic(topic);

// TODO(emersonknapp) re-enable common_qos_or_fallback once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{common_qos_or_fallback(topic.name)};
#else
Rosbag2QoS subscription_qos{};
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{qos_for_topic(topic.name)};
auto subscription = create_subscription(topic.name, topic.type, subscription_qos);
if (subscription) {
subscriptions_.insert({topic.name, subscription});
Expand Down Expand Up @@ -218,18 +195,75 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin
return YAML::Dump(offered_qos_profiles);
}

rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name)
rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const
{
auto endpoints = node_->get_publishers_info_by_topic(topic_name);
if (!endpoints.empty() && all_qos_same(endpoints)) {
return Rosbag2QoS(endpoints[0].qos_profile()).default_history();
size_t num_endpoints = endpoints.size();
size_t reliability_reliable_endpoints_count = 0;
size_t durability_transient_local_endpoints_count = 0;
for (const auto & endpoint : endpoints) {
const auto & profile = endpoint.qos_profile().get_rmw_qos_profile();
if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
reliability_reliable_endpoints_count++;
}
if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
durability_transient_local_endpoints_count++;
}
}

// We set policies in order as defined in rmw_qos_profile_t
Rosbag2QoS request_qos;
// Policy: history, depth
// History does not affect compatibility
request_qos.default_history();

// Policy: reliability
if (reliability_reliable_endpoints_count == num_endpoints) {
request_qos.reliable();
} else {
if (reliability_reliable_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering Reliable reliability. "
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
"Falling back to Best Effort as it will connect to all publishers. "
"Some messages from Reliable publishers could be dropped.");
}
request_qos.best_effort();
}

// Policy: durability
// If all publishers offer transient_local, we can request it and receive latched messages
if (durability_transient_local_endpoints_count == num_endpoints) {
request_qos.transient_local();
} else {
if (durability_transient_local_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering Transient Local durability. "
"Falling back to Volatile as it will connect to all publishers. "
"Previously-published latched messages will not be retrieved.");
}
request_qos.durability_volatile();
}
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Topic " << topic_name << " has publishers offering different QoS settings. "
"Cannot determine what QoS to request, falling back to default QoS profile."
);
topics_warned_about_incompatibility_.insert(topic_name);
return Rosbag2QoS{};

// Policy: deadline
// Deadline does not affect delivery of messages,
// and we do not record Deadline"Missed events.
// We can always use unspecified deadline, which will be compatible with all publishers.

// Policy: lifespan
// Lifespan does not affect compatibiliy

// Policy: liveliness, liveliness_lease_duration
// Liveliness does not affect delivery of messages,
// and we do not record LivelinessChanged events.
// We can always use unspecified liveliness, which will be compatible with all publishers.
return request_qos;
}

rclcpp::QoS Recorder::qos_for_topic(const std::string & topic_name) const
{
return adapt_qos_to_publishers(topic_name);
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand All @@ -243,17 +277,29 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na
// Already warned about this topic
return;
}
const auto & used_qos = existing_subscription->second->qos_profile();
const auto & used_profile = existing_subscription->second->qos_profile().get_rmw_qos_profile();
auto publishers_info = node_->get_publishers_info_by_topic(topic_name);
for (const auto & info : publishers_info) {
if (info.qos_profile() != used_qos) {
auto new_profile = info.qos_profile().get_rmw_qos_profile();
bool incompatible_reliability =
new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT &&
used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
bool incompatible_durability =
new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE &&
used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE;

if (incompatible_reliability) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"A new publisher for subscribed topic " << topic_name << " was found that is offering "
"a (possibly) incompatible QoS profile. Not changing subscription QoS. "
"Messages from this publisher may not be recorded."
);
"A new publisher for subscribed topic " << topic_name << " was found offering "
"Best Effort reliability, but rosbag already subscribed requesting Reliable. "
"Messages will not be recorded from this new publisher.");
topics_warned_about_incompatibility_.insert(topic_name);
} else if (incompatible_durability) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"A new publisher for susbcribed topic " << topic_name << " was found offering "
"Volatile durability, but rosbag2 already subscribed requesting Transient Local. "
"Messages from this new publisher will not be recorded.");
topics_warned_about_incompatibility_.insert(topic_name);
return;
}
}
}
Expand Down
7 changes: 2 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,8 @@ class Recorder

void record_messages() const;

/** Find the QoS profile that should be used for subscribing.
* If all currently offered QoS Profiles for a topic are the same, return that profile.
* Otherwise, print a warning and return a fallback value.
*/
rclcpp::QoS common_qos_or_fallback(const std::string & topic_name);
rclcpp::QoS qos_for_topic(const std::string & topic_name) const;
rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const;

// Serialize all currently offered QoS profiles for a topic into a YAML list.
std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name);
Expand Down
15 changes: 7 additions & 8 deletions rosbag2_transport/test/rosbag2_transport/test_qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ TEST(TestQoS, supports_version_4)
YAML::Node loaded_node = YAML::Load(serialized_profiles);
auto deserialized_profiles = loaded_node.as<std::vector<rosbag2_transport::Rosbag2QoS>>();
ASSERT_EQ(deserialized_profiles.size(), 1u);
rosbag2_transport::Rosbag2QoS actual_qos = deserialized_profiles[0];
auto actual_qos = deserialized_profiles[0].get_rmw_qos_profile();

rmw_time_t zerotime{0, 0};
// Explicitly set up the same QoS profile in case defaults change
rclcpp::QoS expected_qos(10);
expected_qos
.keep_last(10)
auto expected_qos = rosbag2_transport::Rosbag2QoS{}
.default_history()
.reliable()
.durability_volatile()
.deadline(zerotime)
.lifespan(zerotime)
.liveliness(RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT)
.liveliness_lease_duration(zerotime)
.avoid_ros_namespace_conventions(false);
// Any values not present in the YAML should take the default value in both profiles
EXPECT_EQ(actual_qos, expected_qos);
.liveliness_lease_duration(zerotime).get_rmw_qos_profile();

EXPECT_EQ(actual_qos.reliability, expected_qos.reliability);
EXPECT_EQ(actual_qos.durability, expected_qos.durability);
}

TEST(TestQoS, detect_new_qos_fields)
Expand Down
Loading