Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override Subscriber QoS - Record #346

Merged
merged 13 commits into from
Apr 7, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ class PublisherManager

template<class T>
void add_publisher(
const std::string & topic_name, std::shared_ptr<T> message, size_t expected_messages = 0)
const std::string & topic_name,
std::shared_ptr<T> message,
size_t expected_messages = 0,
const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()})
{
auto node_name = std::string("publisher") + std::to_string(counter_++);
auto publisher_node = std::make_shared<rclcpp::Node>(
node_name,
rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false));
auto publisher = publisher_node->create_publisher<T>(topic_name, 10);
auto publisher = publisher_node->create_publisher<T>(topic_name, qos);

publisher_nodes_.push_back(publisher_node);
publishers_.push_back(
Expand Down
5 changes: 3 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ function(create_tests_for_rmw_implementation)

rosbag2_transport_add_gmock(test_record
test/rosbag2_transport/test_record.cpp
${SKIP_TEST}
AMENT_DEPS test_msgs rosbag2_test_common
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
INCLUDE_DIRS src
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play
test/rosbag2_transport/test_play.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

#include <chrono>
#include <string>
#include <unordered_map>
#include <vector>

#include "rclcpp/rclcpp.hpp"

namespace rosbag2_transport
{
struct RecordOptions
Expand All @@ -32,6 +35,7 @@ struct RecordOptions
std::string node_prefix = "";
std::string compression_mode = "";
std::string compression_format = "";
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
};

Expand Down
23 changes: 15 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

namespace
{
// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
bool all_qos_same(const std::vector<rclcpp::TopicEndpointInfo> & values)
{
auto adjacent_different_elements_it = std::adjacent_find(
Expand All @@ -58,6 +60,7 @@ bool all_qos_same(const std::vector<rclcpp::TopicEndpointInfo> & values)
// No adjacent elements were different, so all are the same.
return adjacent_different_elements_it == values.end();
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
} // unnamed namespace

namespace rosbag2_transport
Expand All @@ -67,6 +70,7 @@ Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer, std::shared_ptr<

void Recorder::record(const RecordOptions & record_options)
{
topic_qos_profile_overrides_ = record_options.topic_qos_profile_overrides;
if (record_options.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}
Expand Down Expand Up @@ -162,12 +166,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
// that callback called before we reached out the line: writer_->create_topic(topic)
writer_->create_topic(topic);

// TODO(emersonknapp) re-enable common_qos_or_fallback once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{common_qos_or_fallback(topic.name)};
#else
Rosbag2QoS subscription_qos{};
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)};
auto subscription = create_subscription(topic.name, topic.type, subscription_qos);
if (subscription) {
subscriptions_.insert({topic.name, subscription});
Expand Down Expand Up @@ -218,8 +217,15 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin
return YAML::Dump(offered_qos_profiles);
}

rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name)
rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name)
{
rclcpp::QoS subscription_qos{rclcpp::KeepAll()};
if (topic_qos_profile_overrides_.count(topic_name)) {
subscription_qos = topic_qos_profile_overrides_.at(topic_name);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name);
}
// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
auto endpoints = node_->get_publishers_info_by_topic(topic_name);
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
if (!endpoints.empty() && all_qos_same(endpoints)) {
return Rosbag2QoS(endpoints[0].qos_profile()).default_history();
Expand All @@ -229,7 +235,8 @@ rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name)
"Cannot determine what QoS to request, falling back to default QoS profile."
);
topics_warned_about_incompatibility_.insert(topic_name);
return Rosbag2QoS{};
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
return subscription_qos;
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand Down
19 changes: 14 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,19 @@ class Recorder

void record_messages() const;

/** Find the QoS profile that should be used for subscribing.
* If all currently offered QoS Profiles for a topic are the same, return that profile.
* Otherwise, print a warning and return a fallback value.
*/
rclcpp::QoS common_qos_or_fallback(const std::string & topic_name);
/**
* Find the QoS profile that should be used for subscribing.
*
* Profiles are prioritized by:
* 1. The override specified in the record_options, if one exists for the topic.
* 2. The publisher's offered QoS profile if all current publishers are offering the exact same
* compatibility profile.
* 3. The default Rosbag2QoS profile, if the above conditions are not met.
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \return The QoS profile to be used for subscribing.
*/
rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name);

// Serialize all currently offered QoS profiles for a topic into a YAML list.
std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name);
Expand All @@ -101,6 +109,7 @@ class Recorder
std::unordered_map<std::string, std::shared_ptr<GenericSubscription>> subscriptions_;
std::unordered_set<std::string> topics_warned_about_incompatibility_;
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
};

} // namespace rosbag2_transport
Expand Down
42 changes: 39 additions & 3 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_transport/qos.hpp"
#include "rosbag2_transport/rosbag2_transport.hpp"

#include "test_msgs/msg/arrays.hpp"
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "record_integration_fixture.hpp"
Expand Down Expand Up @@ -100,8 +101,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
));
}

TEST_F(RecordIntegrationTestFixture, records_sensor_data)
{
TEST_F(RecordIntegrationTestFixture, records_sensor_data) {
using clock = std::chrono::system_clock;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -141,3 +141,39 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data)
EXPECT_FALSE(recorded_messages.empty());
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION

TEST_F(RecordIntegrationTestFixture, topic_qos_overrides)
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
{
const auto num_msgs = 3;
auto strict_msg = std::make_shared<test_msgs::msg::Strings>();
strict_msg->string_value = "strict";
const auto strict_topic = "/strict_topic";

rosbag2_transport::RecordOptions record_options =
{false, false, {strict_topic}, "rmw_format", 100ms};
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
const auto profile_override = rclcpp::QoS{rclcpp::KeepAll()}
.best_effort().durability_volatile().avoid_ros_namespace_conventions(false);
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {
{strict_topic, profile_override}
};
record_options.topic_qos_profile_overrides = topic_qos_profile_overrides;

// Create two BEST_EFFORT publishers on the same topic with different Durability policies.
// If no override is specified, then the recorder cannot see any published messages.
auto profile1 = rosbag2_transport::Rosbag2QoS{}.best_effort().durability_volatile();
auto profile2 = rosbag2_transport::Rosbag2QoS{}.best_effort().transient_local();
pub_man_.add_publisher<test_msgs::msg::Strings>(
strict_topic, strict_msg, num_msgs, profile1);
pub_man_.add_publisher<test_msgs::msg::Strings>(
strict_topic, strict_msg, num_msgs, profile2);

start_recording(record_options);
run_publishers();
stop_recording();

auto & writer =
static_cast<MockSequentialWriter &>(writer_->get_implementation_handle());
auto recorded_messages = writer.get_messages();

ASSERT_GE(recorded_messages.size(), 0u);
}