Skip to content

Commit

Permalink
Add test package for single and multi thread
Browse files Browse the repository at this point in the history
Signed-off-by: ical-Jeon <[email protected]>
  • Loading branch information
icsl-Jeon committed May 20, 2023
1 parent 355adf0 commit 961bd37
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 0 deletions.
34 changes: 34 additions & 0 deletions rclcpp/executors/executor_comparison/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
cmake_minimum_required(VERSION 3.5)
project(examples_rclcpp_executor_comparison)

find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_BUILD_TYPE Debug)

add_executable(publisher_node publisher_node.cc)
ament_target_dependencies(publisher_node rclcpp std_msgs)
install(TARGETS publisher_node DESTINATION lib/simple_thread_tester)

add_executable(single_thread_subscriber_node single_thread_subscriber_node.cc)
ament_target_dependencies(single_thread_subscriber_node rclcpp std_msgs)
install(TARGETS single_thread_subscriber_node
DESTINATION lib/simple_thread_tester)

add_executable(multi_thread_reentrant_subscriber_node
multi_thread_reentrant_subscriber_node.cc)
ament_target_dependencies(multi_thread_reentrant_subscriber_node rclcpp
std_msgs)
install(TARGETS multi_thread_reentrant_subscriber_node
DESTINATION lib/simple_thread_tester)

add_executable(multi_thread_mutually_exclusive_subscriber_node
multi_thread_mutually_exclusive_subscriber_node.cc)
ament_target_dependencies(multi_thread_mutually_exclusive_subscriber_node
rclcpp std_msgs)
install(TARGETS multi_thread_mutually_exclusive_subscriber_node
DESTINATION lib/simple_thread_tester)

ament_package()
31 changes: 31 additions & 0 deletions rclcpp/executors/executor_comparison/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Simple ros2 package to understand threads

Visit [here](https://motion-boseong.vercel.app/threading-ros2) for explanations.

## Running node

### 1. Single-thread test
In terminal A (also for the two belows)

```
ros2 run simple_thread_tester publisher_node
```

In terminal B

```
ros2 run simple_thread_tester single_thread_subscriber_node
```
### 2. Multi-thread reentrant callback group test

```
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
```

### 3. Multi-thread mutually exclusive callback group test

```
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
# If simulate mutex
ros2 param set /subscriber_node use_mutex true
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#include <chrono>
#include <random>
#include <thread>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {

public:
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
: Node("subscriber_node",
node_options.allow_undeclared_parameters(true)) {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
this, std::placeholders::_1),
options);

options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
this, std::placeholders::_1),
options);

timer_ = create_wall_timer(
1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
this));

auto param_change_callback =
[this](std::vector<rclcpp::Parameter> parameters) {
for (auto parameter : parameters) {
if (parameter.get_name() == "use_mutex") {
use_mutex = parameter.as_bool();
if (use_mutex)
RCLCPP_INFO(get_logger(), "will use mutex");
else
RCLCPP_INFO(get_logger(), "will not mutex");
}
}

auto result = rcl_interfaces::msg::SetParametersResult();
result.successful = true;
return result;
};

callback_handler =
this->add_on_set_parameters_callback(param_change_callback);
}

private:
bool use_mutex{false};
struct {
std::mutex short_topic_mutex;
std::mutex long_topic_mutex;
} mutex_list;
std::string processed_short_string_;
std::string processed_long_string_;
void ProcessString(const std::string &raw_string, std::string &output);

rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
rclcpp::TimerBase::SharedPtr timer_;
OnSetParametersCallbackHandle::SharedPtr callback_handler;

void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void TimerCallback();
};

void MultiThreadMutuallyExclusiveSubscriber::ProcessString(
const std::string &raw_string, std::string &output) {
output = "** ";

for (char c : raw_string) {
output.push_back(c);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
output += " **";
};

void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
std::shared_ptr<std::unique_lock<std::mutex>> lock;
if (use_mutex)
mutex_list.short_topic_mutex.lock();
ProcessString(msg->data, processed_short_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_short_string_.c_str());
if (use_mutex)
mutex_list.short_topic_mutex.unlock();
}

void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {

if (use_mutex)
mutex_list.long_topic_mutex.lock();
ProcessString(msg->data, processed_long_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_long_string_.c_str());
if (use_mutex)
mutex_list.long_topic_mutex.unlock();
}

void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() {
if (use_mutex) {
mutex_list.short_topic_mutex.lock();
mutex_list.long_topic_mutex.lock();
}
RCLCPP_INFO(get_logger(),
"Getting processed strings: \n [long] %s \n [short] %s",
processed_long_string_.c_str(), processed_short_string_.c_str());

if (use_mutex) {
mutex_list.short_topic_mutex.unlock();
mutex_list.long_topic_mutex.unlock();
}
}

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::NodeOptions option;
auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <chrono>
#include <random>
#include <thread>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class MultiThreadReentrantSubscriber : public rclcpp::Node {

public:
MultiThreadReentrantSubscriber() : Node("subscriber_node") {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::Reentrant);

short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this,
std::placeholders::_1),
options);

long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this,
std::placeholders::_1),
options);
}

private:
std::string processed_short_string_;
std::string processed_long_string_;
std::string ProcessString(const std::string &raw_string);

rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
};

std::string
MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) {
std::thread::id this_id = std::this_thread::get_id();
std::ostringstream oss;
oss << std::this_thread::get_id();

std::this_thread::sleep_for(std::chrono::seconds(2));
return "** " + raw_string + " **" + " ( by " + oss.str() + ") ";
};

void MultiThreadReentrantSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
auto processed_string = ProcessString(msg->data);

RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
processed_short_string_ = processed_string;
}

void MultiThreadReentrantSubscriber::LongTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
auto processed_string = ProcessString(msg->data);

RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
processed_long_string_ = processed_string;
}

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<MultiThreadReentrantSubscriber>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
19 changes: 19 additions & 0 deletions rclcpp/executors/executor_comparison/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>examples_rclcpp_executor_comparison</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">jbs</maintainer>
<license>TODO: License declaration</license>

<buildtool_depend>ament_cmake</buildtool_depend>


<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

<export>
<build_type>ament_cmake</build_type>
</export>
</package>
54 changes: 54 additions & 0 deletions rclcpp/executors/executor_comparison/publisher_node.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <random>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class PublisherNode : public rclcpp::Node {
public:
PublisherNode() : Node("publisher_node") {
short_publisher_ = create_publisher<std_msgs::msg::String>(
"/short_topic", rclcpp::SystemDefaultsQoS());
long_publisher_ = create_publisher<std_msgs::msg::String>(
"/long_topic", rclcpp::SystemDefaultsQoS());

short_timer_ =
create_wall_timer(100ms, [this]() { PublishShortMessage(); });

long_timer_ = create_wall_timer(1s, [this]() { PublishLongMessage(); });
}

private:
size_t short_seq_{0};
size_t long_seq_{0};

rclcpp::Publisher<std_msgs::msg::String>::SharedPtr short_publisher_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr long_publisher_;

rclcpp::TimerBase::SharedPtr short_timer_;
rclcpp::TimerBase::SharedPtr long_timer_;

void PublishShortMessage() {
std_msgs::msg::String message;
message.data = "Short message (seq=" + std::to_string(short_seq_++) + ")";
RCLCPP_INFO(get_logger(), message.data, " published.");
short_publisher_->publish(message);
}

void PublishLongMessage() {
std_msgs::msg::String message;
message.data = "Long message with more characters (seq=" +
std::to_string(long_seq_++) + ")";
RCLCPP_INFO(get_logger(), message.data, " published.");
long_publisher_->publish(message);
}
};

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<PublisherNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
Loading

0 comments on commit 961bd37

Please sign in to comment.