Skip to content

Commit

Permalink
Demo to show the working of the incompatible_qos event callbacks. (#416)
Browse files Browse the repository at this point in the history
Signed-off-by: Jaison Titus <[email protected]>
Signed-off-by: Miaofei <[email protected]>
Co-authored-by: Miaofei <[email protected]>
  • Loading branch information
jaisontj and mm318 authored Apr 1, 2020
1 parent 7b736b0 commit df9a85e
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 16 deletions.
46 changes: 30 additions & 16 deletions quality_of_service_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ No Durability QoS demo app exists yet in this package.
The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in positive integer milliseconds) between messages published on the topic.
A `deadline_duration` of `0` means that there is no deadline, so the application will act as a standard Talker/Listener pair.

The Publisher in this demo will pause publishing periodically, which will print deadline expirations from the Subscriber.
The publisher in this demo will pause publishing periodically, which will print deadline expirations from the subscriber.

Run `quality_of_service_demo/deadline -h` for more usage information.

Expand All @@ -36,23 +36,23 @@ Examples:

## Lifespan

`quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires.
`quality_of_service_demo/lifespan` demonstrates messages being deleted from a publisher's outgoing queue once their configured lifespan expires.

The application requires an argument `lifespan_duration` - this is the maximum time (in positive integer milliseconds) that a message will sit in the Publisher's outgoing queue.
The application requires an argument `lifespan_duration` - this is the maximum time (in positive integer milliseconds) that a message will sit in the publisher's outgoing queue.

The Publisher in this demo will publish a preset number of messages, then stop publishing.
A Subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed.
The publisher in this demo will publish a preset number of messages, then stop publishing.
A subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed.

Run `lifespan -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo_cpp lifespan 1000 --publish-count 10 --subscribe-after 3000` _or_
* `ros2 run quality_of_service_demo_py lifespan 1000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber.
* The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds.
* After a few seconds, you should see (approximately) messages 4-9 printed from the subscriber.
* The first few messages, with 1 second lifespan, were gone by the time the subscriber joined after 3 seconds.
* `ros2 run quality_of_service_demo_cpp lifespan 4000 --publish-count 10 --subscribe-after 3000` _or_
* `ros2 run quality_of_service_demo_py lifespan 4000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see all of the messages (0-9) printed from the Subscriber.
* After a few seconds, you should see all of the messages (0-9) printed from the subscriber.
* All messages, with their 4 second lifespan, survived until the subscriber joined.

## Liveliness
Expand All @@ -61,11 +61,11 @@ Examples:

The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive.

The Publisher in this demo will assert its liveliness based on passed in options, and be stopped after some amount of time.
When using `AUTOMATIC` liveliness policy, the Publisher is deleted at this time, in order to stop all automatic liveliness assertions in the rmw implementation - therefore only the Subscription will receive a Liveliness event for `AUTOMATIC`.
For `MANUAL` liveliness policies, the Publisher's assertions are stopped, as well as its message publishing.
The publisher in this demo will assert its liveliness based on passed in options, and be stopped after some amount of time.
When using `AUTOMATIC` liveliness policy, the publisher is deleted at this time, in order to stop all automatic liveliness assertions in the rmw implementation - therefore only the Subscription will receive a Liveliness event for `AUTOMATIC`.
For `MANUAL` liveliness policies, the publisher's assertions are stopped, as well as its message publishing.
Publishing a message implicitly counts as asserting liveliness, so publishing is stopped in order to allow the Liveliness lease to lapse.
Therefore in the `MANUAL` policy types, you will see Liveliness events from both Publisher and Subscription (in rmw implementations that implement this feature).
Therefore in the `MANUAL` policy types, you will see Liveliness events from both publisher and Subscription (in rmw implementations that implement this feature).

Run `quality_of_service_demo/liveliness -h` for more usage information.

Expand All @@ -77,6 +77,21 @@ Examples:
* `ros2 run quality_of_service_demo_py liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE`
* With this configuration, the node never explicitly asserts its liveliness, but it publishes messages (implicitly asserting liveliness) less often (500ms) than the liveliness lease, so you will see alternating alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes, until the talker is stopped.

## Incompatible QoS Offered/Requested

`quality_of_service_demo/incompatible_qos` demonstrates notification from a publisher and a subscriber upon discovering each other that they are not compatible.

The application requires an argument `incompatible_qos_policy_name` that specifies which QoS policy should be made incompatible between the publisher and subscriber as an example.

The publisher in this demo will publish 5 messages, but also output a notification stating that its offered QoS is incompatible with the subscriber that is also created by this demo.
The subscriber in this demo will similarly output a notification stating that its requested QoS is incompatible with the QoS offered by the publisher.

Run `quality_of_service_demo/incompatible_qos -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo_cpp incompatible_qos durability` _or_
* `ros2 run quality_of_service_demo_py incompatible_qos durability`

## Interactive Quality of Service Demos

These demos allow the user to interactively generate events and see the publisher's and subscriber's responses to these user events.
Expand Down Expand Up @@ -112,11 +127,10 @@ While the demo is running, the following commands may be issued in the publisher

### Example Demo Run

Let's start the publisher sending a message every half second, and the publisher and subscriber with both offered and requested Deadline periods and Liveliness lease durations at 1 second
(it is recommended to run these demos using the RTI Connext RMW implementation):
Let's start the publisher sending a message every half second, and the publisher and subscriber with both offered and requested Deadline periods and Liveliness lease durations at 1 second:
```
RMW_IMPLEMENTATION=rmw_connext_cpp ros2 run quality_of_service_demo_cpp interactive_publisher --delay 0.5 --deadline 1 --liveliness MANUAL_BY_TOPIC --lease 1
RMW_IMPLEMENTATION=rmw_connext_cpp ros2 run quality_of_service_demo_cpp interactive_subscriber --deadline 1 --liveliness MANUAL_BY_TOPIC --lease 1
ros2 run quality_of_service_demo_cpp interactive_publisher --delay 0.5 --deadline 1 --liveliness MANUAL_BY_TOPIC --lease 1
ros2 run quality_of_service_demo_cpp interactive_subscriber --deadline 1 --liveliness MANUAL_BY_TOPIC --lease 1
```

You will see the publisher publishing messages and the subscriber receiving them.
Expand Down
1 change: 1 addition & 0 deletions quality_of_service_demo/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ qos_demo_executable(lifespan)
qos_demo_executable(deadline)
qos_demo_executable(interactive_publisher)
qos_demo_executable(interactive_subscriber)
qos_demo_executable(incompatible_qos)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class Talker : public rclcpp::Node
*/
void publish();

/// Get the number of messages that have been published by this publisher.
size_t get_published_count() const;

/// Assert the liveliness of the node.
bool assert_node_liveliness() const;

Expand Down
6 changes: 6 additions & 0 deletions quality_of_service_demo/rclcpp/src/common_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ Talker::publish()
}
}

size_t
Talker::get_published_count() const
{
return publish_count_;
}

bool
Talker::assert_node_liveliness() const
{
Expand Down
150 changes: 150 additions & 0 deletions quality_of_service_demo/rclcpp/src/incompatible_qos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <string>

#include "rcutils/cmdline_parser.h"

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"

#include "std_msgs/msg/string.hpp"
#include "std_msgs/msg/bool.hpp"

#include "quality_of_service_demo/common_nodes.hpp"

void print_usage()
{
std::cout << "Usage:\n";
std::cout << "incompatible_qos [-h] <incompatible_qos_policy_name>\n\n";
std::cout << "required arguments:\n";
std::cout << "incompatible_qos_policy_name: The QoS Policy that should be incompatible between\n"
" the publisher and subscription (durability, deadline,\n"
" liveliness_policy, liveliness_lease_duration,\n"
" or reliability).\n\n";
std::cout << "optional arguments:\n";
std::cout << "-h: Print this help message.\n";
}

int main(int argc, char * argv[])
{
// Force flush of the stdout buffer.
setvbuf(stdout, NULL, _IONBF, BUFSIZ);

// Argument count and usage
if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) {
print_usage();
return 0;
}

// Configuration variables
std::string qos_policy_name(argv[1]);
rclcpp::QoS qos_profile_publisher(10);
rclcpp::QoS qos_profile_subscription(10);

if (qos_policy_name == "durability") {
std::cout << "Durability incompatibility selected.\n"
"Incompatibility condition: publisher durability kind < "
"subscripition durability kind.\n"
"Setting publisher durability to: VOLATILE\n"
"Setting subscription durability to: TRANSIENT_LOCAL\n";
qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE);
qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
} else if (qos_policy_name == "deadline") {
std::cout << "Deadline incompatibility selected.\n"
"Incompatibility condition: publisher deadline > subscription deadline\n"
"Setting publisher deadline to: 2 seconds\n"
"Setting subscription deadline to: 1 second\n";
qos_profile_publisher.deadline(std::chrono::milliseconds(2000));
qos_profile_subscription.deadline(std::chrono::milliseconds(1000));
} else if (qos_policy_name == "liveliness_policy") {
std::cout << "Liveliness Policy incompatibility selected.\n"
"Incompatibility condition: publisher liveliness policy < "
"subscripition liveliness policy.\n"
"Setting publisher liveliness policy to: MANUAL_BY_NODE\n"
"Setting subscription liveliness policy to: MANUAL_BY_TOPIC\n";
qos_profile_publisher.liveliness(RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE);
qos_profile_subscription.liveliness(RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC);
} else if (qos_policy_name == "liveliness_lease_duration") {
std::cout << "Liveliness lease duration incompatibility selected.\n"
"Incompatibility condition: publisher liveliness lease duration > "
"subscription liveliness lease duration\n"
"Setting publisher liveliness lease duration to: 2 seconds\n"
"Setting subscription liveliness lease duration to: 1 seconds\n";
qos_profile_publisher.liveliness_lease_duration(std::chrono::milliseconds(2000));
qos_profile_subscription.liveliness_lease_duration(std::chrono::milliseconds(1000));
} else if (qos_policy_name == "reliability") {
std::cout << "Reliability incompatibility selected.\n"
"Incompatibility condition: publisher reliability < subscripition reliability.\n"
"Setting publisher reliability to: BEST_EFFORT\n"
"Setting subscription reliability to: RELIABLE\n";
qos_profile_publisher.reliability(RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT);
qos_profile_subscription.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
} else {
std::cout << qos_policy_name << " is not a valid qos policy name\n";
print_usage();
return 0;
}
std::cout << "\n";

// Initialization and configuration
rclcpp::init(argc, argv);
const std::string topic("qos_incompatible_qos_chatter");
constexpr size_t num_msgs = 5;

auto talker = std::make_shared<Talker>(qos_profile_publisher, topic, num_msgs);
talker->get_options().event_callbacks.incompatible_qos_callback =
[node = talker.get()](rclcpp::QOSOfferedIncompatibleQoSInfo & event) -> void
{
RCLCPP_INFO(
node->get_logger(),
"Offered incompatible qos - total %d delta %d last_policy_kind: %d",
event.total_count, event.total_count_change, event.last_policy_kind);
};

auto listener = std::make_shared<Listener>(qos_profile_subscription, topic);
listener->get_options().event_callbacks.incompatible_qos_callback =
[node = listener.get()](rclcpp::QOSRequestedIncompatibleQoSInfo & event) -> void
{
RCLCPP_INFO(
node->get_logger(),
"Requested incompatible qos - total %d delta %d last_policy_kind: %d",
event.total_count, event.total_count_change, event.last_policy_kind);
};

try {
talker->initialize();
listener->initialize();
} catch (const rclcpp::UnsupportedEventTypeException & exc) {
std::cout << '\n' << exc.what() << "\n\n"
"Please try this demo using a different RMW implementation\n";
return -1;
}

// Execution
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(talker);
executor.add_node(listener);

while (talker->get_published_count() < num_msgs) {
executor.spin_once();
}

// Cleanup
rclcpp::shutdown();

return 0;
}
Loading

0 comments on commit df9a85e

Please sign in to comment.