Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QoS Profile override to CLI #347

Merged
merged 15 commits into from
Apr 7, 2020
25 changes: 25 additions & 0 deletions docs/qos_profile_overrides.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Overriding topic QoS profiles

## Recording

When starting a recording workflow, you can pass a yaml file that contains QoS profile settings for a specific topic.
The yaml file should have following structure:

```yaml
<topic1_name>:
<setting_1>: value
<time_setting>:
sec: 0
nsec: 0
...
<topic2_name>:
<setting_2>: value
...
```

See [`qos_profile.yaml`](../ros2bag/test/resources/qos_profile.yaml) for a complete example.

## Resources

* [About QoS settings](https://index.ros.org/doc/ros2/Concepts/About-Quality-of-Service-Settings/)
* [ROS2 QoS policies design doc](https://design.ros2.org/articles/qos.html)
2 changes: 2 additions & 0 deletions ros2bag/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved

<export>
<build_type>ament_python</build_type>
Expand Down
58 changes: 54 additions & 4 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@

import datetime
import os
from typing import Dict
from typing import Optional

from ros2bag.verb import VerbExtension

from ros2cli.node import NODE_NAME_PREFIX
from rclpy.duration import Duration
from rclpy.qos import InvalidQoSProfileException
from rclpy.qos import QoSProfile
import yaml


class RecordVerb(VerbExtension):
Expand Down Expand Up @@ -71,7 +76,13 @@ def add_arguments(self, parser, cli_name): # noqa: D102
)
parser.add_argument(
'--include-hidden-topics', action='store_true',
help='record also hidden topics.')
help='record also hidden topics.'
)
parser.add_argument(
'--qos-profile-overrides', type=str, default='',
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
help='Path to a yaml file defining overrides of the QoS profile for specific topics.'
'See docs/qos_profiles_overrides.md for more info.'
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
)
self._subparser = parser

def create_bag_directory(self, uri):
Expand All @@ -80,6 +91,34 @@ def create_bag_directory(self, uri):
except OSError:
return "[ERROR] [ros2bag]: Could not create bag folder '{}'.".format(uri)

def _dict_to_duration(self, time_dict: Optional[Dict[str, int]]) -> Duration:
if time_dict:
return Duration(seconds=time_dict.get('sec', 0), nanoseconds=time_dict.get('nsec', 0))
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
else:
return Duration()

def validate_qos_profile_overrides(self, qos_profile_path: str) -> Dict[str, Dict]:
"""Validate the QoS profile yaml file path and its structure."""
if not os.path.isfile(qos_profile_path):
raise ValueError('[ERROR] [ros2bag]: {} does not exist.'.format(qos_profile_path))
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
with open(qos_profile_path, 'r') as file:
qos_profile_dict = yaml.safe_load(file)
topic_names = list(qos_profile_dict)
for name in topic_names:
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
try:
profile = qos_profile_dict.get(name)
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
# Convert dict to Duration. Required for construction
profile['deadline'] = self._dict_to_duration(profile.get('deadline'))
profile['lifespan'] = self._dict_to_duration(profile.get('lifespan'))
profile['liveliness_lease_duration'] = self._dict_to_duration(
profile.get('liveliness_lease_duration'))
QoSProfile(**qos_profile_dict.get(name))
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
except InvalidQoSProfileException as e:
raise ValueError(
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
'[ERROR] [ros2bag]: Invalid QoS profile configuration for topic {}.\n{}'
.format(name, str(e)))
return qos_profile_dict

def main(self, *, args): # noqa: D102
if args.all and args.topics:
return 'Invalid choice: Can not specify topics and -a at the same time.'
Expand All @@ -93,6 +132,15 @@ def main(self, *, args): # noqa: D102
return 'Invalid choice: Cannot specify compression format without a compression mode.'
args.compression_mode = args.compression_mode.upper()

qos_profile_overrides_path = args.qos_profile_overrides
qos_profile_overrides = {} # Specify a valid default
if qos_profile_overrides_path:
try:
qos_profile_overrides = self.validate_qos_profile_overrides(
qos_profile_overrides_path)
except ValueError as e:
return str(e)

self.create_bag_directory(uri)

if args.all:
Expand All @@ -115,7 +163,8 @@ def main(self, *, args): # noqa: D102
polling_interval=args.polling_interval,
max_bagfile_size=args.max_bag_size,
max_cache_size=args.max_cache_size,
include_hidden_topics=args.include_hidden_topics)
include_hidden_topics=args.include_hidden_topics,
qos_profile_overrides=qos_profile_overrides)
elif args.topics and len(args.topics) > 0:
# NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups
# combined with constrained environments (as imposed by colcon test)
Expand All @@ -136,7 +185,8 @@ def main(self, *, args): # noqa: D102
max_bagfile_size=args.max_bag_size,
max_cache_size=args.max_cache_size,
topics=args.topics,
include_hidden_topics=args.include_hidden_topics)
include_hidden_topics=args.include_hidden_topics,
qos_profile_overrides=qos_profile_overrides)
else:
self._subparser.print_help()

Expand Down
5 changes: 5 additions & 0 deletions ros2bag/test/resources/incomplete_qos_profile.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/test_topic:
history: 3
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
depth: 0
reliability: 1
durability: 2
16 changes: 16 additions & 0 deletions ros2bag/test/resources/qos_profile.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/test_topic:
history: 3
depth: 0
reliability: 1
durability: 2
deadline:
sec: 2147483647
nsec: 4294967295
lifespan:
sec: 2147483647
nsec: 4294967295
liveliness: 0
liveliness_lease_duration:
sec: 2147483647
nsec: 4294967295
avoid_ros_namespace_conventions: false
91 changes: 91 additions & 0 deletions ros2bag/test/test_qos_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
from pathlib import Path
import tempfile

import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess

import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools

import pytest


PROFILE_PATH = Path(__file__).parent / 'resources'
TEST_NODE = 'cli_qos_profile_test_node'
TEST_NAMESPACE = 'cli_qos_profile'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
def generate_test_description():
return LaunchDescription([launch_testing.actions.ReadyToTest()])


class TestROS2PkgCLI(unittest.TestCase):

@classmethod
def setUpClass(cls, launch_service, proc_info, proc_output):
@contextlib.contextmanager
def launch_bag_command(self, arguments, **kwargs):
pkg_command_action = ExecuteProcess(
cmd=['ros2', 'bag', *arguments],
additional_env={'PYTHONUNBUFFERED': '1'},
name='ros2bag-cli',
output='screen',
**kwargs
)
with launch_testing.tools.launch_process(
launch_service, pkg_command_action, proc_info, proc_output
) as pkg_command:
yield pkg_command
cls.launch_bag_command = launch_bag_command

def test_qos_simple(self):
profile_path = PROFILE_PATH / 'qos_profile.yaml'
with tempfile.TemporaryDirectory() as tmpdirname:
output_path = Path(tmpdirname) / 'ros2bag_test_basic'
arguments = ['record', '-a', '--qos-profile-overrides', profile_path.as_posix(),
'--output', output_path.as_posix()]
with self.launch_bag_command(arguments=arguments) as bag_command:
assert bag_command.wait_for_shutdown(timeout=5), \
print('ros2bag CLI failed on shutdown.')
output = bag_command.output.splitlines()
error_string = '[ERROR] [ros2bag]:'
for line in output:
assert error_string not in line, \
print('ros2bag CLI failed: {}'.format(line))

def test_incomplete_qos_profile(self):
profile_path = PROFILE_PATH / 'incomplete_qos_profile.yaml'
with tempfile.TemporaryDirectory() as tmpdirname:
output_path = Path(tmpdirname) / 'ros2bag_test_incomplete'
arguments = ['record', '-a', '--qos-profile-overrides', profile_path.as_posix(),
'--output', output_path.as_posix()]
with self.launch_bag_command(arguments=arguments) as bag_command:
assert bag_command.wait_for_shutdown(timeout=5), \
print('ros2bag CLI failed on shutdown.')
output = bag_command.output.splitlines()
error_string = '[ERROR] [ros2bag]:'
for line in output:
assert error_string not in line, \
print('ros2bag CLI failed: {}'.format(line))
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
#include <chrono>
#include <memory>
#include <string>
#include <vector>
#include <unordered_map>
#include <utility>

#include "rclcpp/qos.hpp"

#include "rosbag2_compression/compression_options.hpp"
#include "rosbag2_compression/sequential_compression_reader.hpp"
Expand All @@ -32,6 +35,55 @@
#include "rosbag2_transport/storage_options.hpp"
#include "rmw/rmw.h"

namespace
{
/// Convert a Python3 unicode string to a native C++ std::string
std::string python_object_to_string(PyObject * object)
{
PyObject * python_string;
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
if (PyUnicode_Check(object)) {
python_string = PyUnicode_AsUTF8String(object);
} else {
throw std::runtime_error("Unable to decode Python string to std::string.");
}
return std::string(PyBytes_AsString(python_string));
}

/// Convert a Python dictionary to rmw_time_t
rmw_time_t python_dict_to_rmw_time(PyObject * object)
{
rmw_time_t new_time{};
new_time.sec = PyLong_AsSize_t(PyDict_GetItemString(object, "sec"));
new_time.nsec = PyLong_AsSize_t(PyDict_GetItemString(object, "nsec"));
return new_time;
}

/// Map a Python dictionary object to a QoS profile
rclcpp::QoS python_dict_to_qos(PyObject * object)
{
rclcpp::QoS profile{10};
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
profile.history(
static_cast<rmw_qos_history_policy_t>(
PyLong_AsSize_t(PyDict_GetItemString(object, "history"))));
profile.reliability(
static_cast<rmw_qos_reliability_policy_t>(
PyLong_AsSize_t(PyDict_GetItemString(object, "reliability"))));
profile.durability(
static_cast<rmw_qos_durability_policy_t>(
PyLong_AsSize_t(PyDict_GetItemString(object, "durability"))));
profile.deadline(python_dict_to_rmw_time(PyDict_GetItemString(object, "deadline")));
profile.lifespan(python_dict_to_rmw_time(PyDict_GetItemString(object, "lifespan")));
profile.liveliness(
static_cast<rmw_qos_liveliness_policy_t>(
PyLong_AsSize_t(PyDict_GetItemString(object, "liveliness"))));
profile.liveliness_lease_duration(
python_dict_to_rmw_time(PyDict_GetItemString(object, "liveliness_lease_duration")));
profile.avoid_ros_namespace_conventions(
PyObject_IsTrue(PyDict_GetItemString(object, "avoid_ros_namespace_conventions")));
return profile;
}
} // namespace

static PyObject *
rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * kwargs)
{
Expand All @@ -52,6 +104,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
"max_cache_size",
"topics",
"include_hidden_topics",
"qos_profile_overrides",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alphasort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alphasorting these argument will introduce irrelevant changes that I'd prefer not to have as a part of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sgtm. Please link the PR back here once it's done.

nullptr};

char * uri = nullptr;
Expand All @@ -60,6 +113,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
char * node_prefix = nullptr;
char * compression_mode = nullptr;
char * compression_format = nullptr;
PyObject * qos_profile_overrides = nullptr;
bool all = false;
bool no_discovery = false;
uint64_t polling_interval_ms = 100;
Expand All @@ -69,7 +123,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
bool include_hidden_topics = false;
if (
!PyArg_ParseTupleAndKeywords(
args, kwargs, "ssssss|bbKKKOb", const_cast<char **>(kwlist),
args, kwargs, "ssssss|bbKKKObO", const_cast<char **>(kwlist),
&uri,
&storage_id,
&serilization_format,
Expand All @@ -82,7 +136,8 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
&max_bagfile_size,
&max_cache_size,
&topics,
&include_hidden_topics
&include_hidden_topics,
&qos_profile_overrides
))
{
return nullptr;
Expand All @@ -105,6 +160,17 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
rosbag2_compression::compression_mode_from_string(record_options.compression_mode)
};

if (qos_profile_overrides) {
PyObject * key, * value;
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
Py_ssize_t pos = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_overrides{};
while (PyDict_Next(qos_profile_overrides, &pos, &key, &value)) {
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
auto topic_name = python_object_to_string(key);
auto profile = python_dict_to_qos(value);
topic_qos_overrides.insert(std::pair<std::string, rclcpp::QoS>(topic_name, profile));
}
}

if (topics) {
PyObject * topic_iterator = PyObject_GetIter(topics);
if (topic_iterator != nullptr) {
Expand Down