Skip to content

Commit

Permalink
Support multiple QoS Profiles
Browse files Browse the repository at this point in the history
Signed-off-by: Anas Abou Allaban <[email protected]>
  • Loading branch information
piraka9011 committed Apr 1, 2020
1 parent 0950c53 commit 7f01769
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 57 deletions.
42 changes: 31 additions & 11 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import os

from ros2bag.verb import VerbExtension

from ros2cli.node import NODE_NAME_PREFIX
import yaml


class RecordVerb(VerbExtension):
Expand Down Expand Up @@ -68,7 +68,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'--include-hidden-topics', action='store_true',
help='record also hidden topics.')
parser.add_argument(
'--qos-profile', type=str, default='',
'--qos-profiles', type=str, default='',
help='Path to a yaml file with a QoS policy to override the default profile.'
)
self._subparser = parser
Expand All @@ -79,6 +79,27 @@ def create_bag_directory(self, uri):
except OSError:
return "[ERROR] [ros2bag]: Could not create bag folder '{}'.".format(uri)

def validate_qos_profiles(self, qos_profile_path):
"""Validate the QoS profile yaml file path and its structure."""
if os.path.isfile(qos_profile_path):
with open(qos_profile_path, 'r') as file:
# Load as a dict first to verify contents
qos_profile = yaml.safe_load(file)
if not qos_profile.get('fallback'):
raise ValueError(
"[ERROR] [ros2bag]: QoS override must include a 'fallback' profile")
topic_names = list(qos_profile)
for name in topic_names:
if type(qos_profile.get(name)) != dict:
raise ValueError(
'[ERROR] [ros2bag]: QoS profile configuration for topic {} is invalid.'
.format(name))
# Read file as a string
file.seek(0)
return file.read()
else:
raise ValueError('[ERROR] [ros2bag]: {} does not exist.'.format(qos_profile_path))

def main(self, *, args): # noqa: D102
if args.all and args.topics:
return 'Invalid choice: Can not specify topics and -a at the same time.'
Expand All @@ -92,13 +113,12 @@ def main(self, *, args): # noqa: D102
return 'Invalid choice: Cannot specify compression format without a compression mode.'
args.compression_mode = args.compression_mode.upper()

qos_profile = args.qos_profile
if qos_profile:
if os.path.isfile(qos_profile):
with open(qos_profile, 'r') as file:
qos_profile = file.read()
else:
return "[ERROR] [ros2bag]: {} does not exist.".format(args.qos_profile)
qos_profiles = args.qos_profiles
if qos_profiles:
try:
qos_profiles = self.validate_qos_profiles(qos_profiles)
except ValueError as e:
return str(e)

self.create_bag_directory(uri)

Expand All @@ -122,7 +142,7 @@ def main(self, *, args): # noqa: D102
polling_interval=args.polling_interval,
max_bagfile_size=args.max_bag_size,
include_hidden_topics=args.include_hidden_topics,
qos_profile=qos_profile)
qos_profiles=qos_profiles)
elif args.topics and len(args.topics) > 0:
# NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups
# combined with constrained environments (as imposed by colcon test)
Expand All @@ -143,7 +163,7 @@ def main(self, *, args): # noqa: D102
max_bagfile_size=args.max_bag_size,
topics=args.topics,
include_hidden_topics=args.include_hidden_topics,
qos_profile=qos_profile)
qos_profiles=qos_profiles)
else:
self._subparser.print_help()

Expand Down
33 changes: 18 additions & 15 deletions rosbag2_tests/resources/qos_test/qos_profile.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
history: 3
depth: 0
reliability: 1
durability: 2
deadline:
sec: 2147483647
nsec: 4294967295
lifespan:
sec: 2147483647
nsec: 4294967295
liveliness: 2147483647
liveliness_lease_duration:
sec: 2147483647
nsec: 4294967295
avoid_ros_namespace_conventions: false
fallback:
history: 0
test_topic:
history: 3
depth: 0
reliability: 1
durability: 2
deadline:
sec: 2147483647
nsec: 4294967295
lifespan:
sec: 2147483647
nsec: 4294967295
liveliness: 2147483647
liveliness_lease_duration:
sec: 2147483647
nsec: 4294967295
avoid_ros_namespace_conventions: false
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct RecordOptions
std::string node_prefix = "";
std::string compression_mode = "";
std::string compression_format = "";
std::string qos_profile = "";
std::string qos_profiles = "";
bool include_hidden_topics = false;
};

Expand Down
48 changes: 24 additions & 24 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer, std::shared_ptr<

void Recorder::record(const RecordOptions & record_options)
{
qos_profile_overrides_ = YAML::Load(record_options.qos_profiles);
if (record_options.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}
serialization_format_ = record_options.rmw_serialization_format;
ROSBAG2_TRANSPORT_LOG_INFO("Listening for topics...");
subscribe_topics(
get_requested_or_available_topics(record_options.topics, record_options.include_hidden_topics),
record_options.qos_profile);
get_requested_or_available_topics(record_options.topics, record_options.include_hidden_topics));

std::future<void> discovery_future;
if (!record_options.is_discovery_disabled) {
Expand Down Expand Up @@ -122,6 +122,9 @@ Recorder::get_missing_topics(const std::unordered_map<std::string, std::string>

namespace
{
// YAML key for global QoS profile fallback
constexpr const char kFallbackKey[] = "fallback";

std::string serialized_offered_qos_profiles_for_topic(
std::shared_ptr<rosbag2_transport::Rosbag2Node> node,
const std::string & topic_name)
Expand All @@ -136,29 +139,16 @@ std::string serialized_offered_qos_profiles_for_topic(
} // unnamed namespace

void Recorder::subscribe_topics(
const std::unordered_map<std::string, std::string> & topics_and_types,
std::string offered_qos_profile)
const std::unordered_map<std::string, std::string> & topics_and_types)
{
if (offered_qos_profile.empty()) {
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic(
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(node_, topic_with_type.first)
});
}
} else {
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic(
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
offered_qos_profile
});
}
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic(
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(node_, topic_with_type.first)
});
}
}

Expand All @@ -169,6 +159,16 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
// that callback called before we reached out the line: writer_->create_topic(topic)
writer_->create_topic(topic);
auto subscription = create_subscription(topic.name, topic.type);
// Get QoS profile based on the following order:
// Topic Override -> Fallback -> Topic Default
Rosbag2QoS subscription_qos;
if (qos_profile_overrides_[topic.name]) {
subscription_qos = qos_profile_overrides_[topic.name].as<rosbag2_transport::Rosbag2QoS>();
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("override topic " << topic.name << " qos " << subscription_qos)
}
if (qos_profile_overrides_[kFallbackKey]) {
subscription_qos = qos_profile_overrides_[std::string(kFallbackKey)].as<rosbag2_transport::Rosbag2QoS>();
}

if (subscription) {
subscribed_topics_.insert(topic.name);
Expand Down
17 changes: 15 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@
#include <utility>
#include <vector>

#ifdef _WIN32
// This is necessary because of a bug in yaml-cpp's cmake
#define YAML_CPP_DLL
// This is necessary because yaml-cpp does not always use dllimport/dllexport consistently
# pragma warning(push)
# pragma warning(disable:4251)
# pragma warning(disable:4275)
#endif
#include "yaml-cpp/yaml.h"
#ifdef _WIN32
# pragma warning(pop)
#endif

#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_storage/topic_metadata.hpp"
Expand Down Expand Up @@ -62,8 +75,7 @@ class Recorder
get_missing_topics(const std::unordered_map<std::string, std::string> & all_topics);

void subscribe_topics(
const std::unordered_map<std::string, std::string> & topics_and_types,
std::string offered_qos_profile = "");
const std::unordered_map<std::string, std::string> & topics_and_types);

void subscribe_topic(const rosbag2_storage::TopicMetadata & topic);

Expand All @@ -77,6 +89,7 @@ class Recorder
std::vector<std::shared_ptr<GenericSubscription>> subscriptions_;
std::unordered_set<std::string> subscribed_topics_;
std::string serialization_format_;
YAML::Node qos_profile_overrides_;
};

} // namespace rosbag2_transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
"max_bagfile_size",
"topics",
"include_hidden_topics",
"qos_profile",
"qos_profiles",
nullptr};

char * uri = nullptr;
Expand All @@ -60,7 +60,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
char * node_prefix = nullptr;
char * compression_mode = nullptr;
char * compression_format = nullptr;
char * qos_profile = nullptr;
char * qos_profiles = nullptr;
bool all = false;
bool no_discovery = false;
uint64_t polling_interval_ms = 100;
Expand All @@ -82,7 +82,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
&max_bagfile_size,
&topics,
&include_hidden_topics,
&qos_profile))
&qos_profiles))
{
return nullptr;
}
Expand All @@ -97,7 +97,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
record_options.compression_mode = std::string(compression_mode);
record_options.compression_format = compression_format;
record_options.include_hidden_topics = include_hidden_topics;
record_options.qos_profile = qos_profile;
record_options.qos_profiles = qos_profiles;

rosbag2_compression::CompressionOptions compression_options{
record_options.compression_format,
Expand Down

0 comments on commit 7f01769

Please sign in to comment.