Skip to content

Commit

Permalink
WIP: QoS Profile Overrides - Player
Browse files Browse the repository at this point in the history
Signed-off-by: Anas Abou Allaban <[email protected]>
  • Loading branch information
piraka9011 committed Apr 6, 2020
1 parent 17b3d8e commit a2888c3
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include <cstddef>
#include <string>
#include <unordered_map>

#include "rclcpp/qos.hpp"

namespace rosbag2_transport
{
Expand All @@ -27,6 +30,7 @@ struct PlayOptions
size_t read_ahead_queue_size;
std::string node_prefix = "";
float rate = 1.0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
};

} // namespace rosbag2_transport
Expand Down
7 changes: 6 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bool Player::is_storage_completely_loaded() const

void Player::play(const PlayOptions & options)
{
topic_qos_profile_overrides_ = options.topic_qos_profile_overrides;
prepare_publishers();

storage_loading_future_ = std::async(
Expand Down Expand Up @@ -157,10 +158,14 @@ void Player::prepare_publishers()
{
auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
rclcpp::QoS topic_qos{10};
if (topic_qos_profile_overrides_.count(topic.name)) {
topic_qos = topic_qos_profile_overrides_.at(topic.name);
}
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, Rosbag2QoS{})));
topic.name, topic.type, topic_qos)));
}
}

Expand Down
3 changes: 3 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include "moodycamel/readerwriterqueue.h"

#include "rclcpp/qos.hpp"

#include "rosbag2_transport/play_options.hpp"

#include "replayable_message.hpp"
Expand Down Expand Up @@ -68,6 +70,7 @@ class Player
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> rosbag2_transport_;
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
};

} // namespace rosbag2_transport
Expand Down
56 changes: 56 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <vector>
#include <unordered_map>
#include <utility>

#include "rclcpp/rclcpp.hpp"
Expand Down Expand Up @@ -119,3 +120,58 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics)
&test_msgs::msg::Arrays::float32_values,
ElementsAre(40.0f, 2.0f, 0.0f)))));
}

TEST_F(RosBag2PlayTestFixture, topic_qos_profiles_overriden)
{
const auto topic1 = "topic1";
const auto topic2 = "topic2";
const auto msg_type = "test_msgs/BasicTypes";
auto basic_msg = get_messages_basic_types()[0];
basic_msg->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{topic1, msg_type, "", ""},
{topic2, msg_type, "", ""},
};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{serialize_test_message(topic1, 500, basic_msg),
serialize_test_message(topic1, 700, basic_msg),
serialize_test_message(topic1, 900, basic_msg),
serialize_test_message(topic2, 550, basic_msg),
serialize_test_message(topic2, 750, basic_msg),
serialize_test_message(topic2, 950, basic_msg)};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic2", 2);

auto await_received_messages = sub_->spin_subscriptions();

const auto profile1 = rclcpp::QoS{0}.keep_all().durability_volatile();
const auto profile2 = rclcpp::QoS{0}.keep_last(10).reliable();
const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{"/topic1", profile1},
std::pair<std::string, rclcpp::QoS>{"/topic2", profile2}
};
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport(reader_, writer_, info_);
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

auto received_messages_1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");

auto received_messages_2 = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic2");

EXPECT_GT(received_messages_1.size(), 0u);
EXPECT_GT(received_messages_2.size(), 0u);
}

0 comments on commit a2888c3

Please sign in to comment.