Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Loaned messages memory handling #2624

Draft
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
/// This function is currently not implemented.
RCLCPP_PUBLIC
void handle_loaned_message(
void * loaned_message, const rclcpp::MessageInfo & message_info) override;
std::shared_ptr<void> loaned_message, const rclcpp::MessageInfo & message_info) override;

// Same as return_serialized_message() as the subscription is to serialized_messages only
RCLCPP_PUBLIC
Expand Down
7 changes: 2 additions & 5 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class Subscription : public SubscriptionBase

void
handle_loaned_message(
void * loaned_message,
std::shared_ptr<void> loaned_message,
const rclcpp::MessageInfo & message_info) override
{
if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
Expand All @@ -339,10 +339,7 @@ class Subscription : public SubscriptionBase
return;
}

auto typed_message = static_cast<ROSMessageType *>(loaned_message);
// message is loaned, so we have to make sure that the deleter does not deallocate the message
auto sptr = std::shared_ptr<ROSMessageType>(
typed_message, [](ROSMessageType * msg) {(void) msg;});
auto sptr = std::static_pointer_cast<ROSMessageType>(loaned_message);

std::chrono::time_point<std::chrono::system_clock> now;
if (subscription_topic_statistics_) {
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_PUBLIC
virtual
void
handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0;
handle_loaned_message(
std::shared_ptr<void> loaned_message,
const rclcpp::MessageInfo & message_info) = 0;

/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
Expand Down
36 changes: 23 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,19 +558,29 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}
return true;
},
[&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
if (nullptr != loaned_msg) {
rcl_ret_t ret = rcl_return_loaned_message_from_subscription(
subscription->get_subscription_handle().get(), loaned_msg);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"rcl_return_loaned_message_from_subscription() failed for subscription on topic "
"'%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
}
loaned_msg = nullptr;
}
[&]()
{
// Create a shared_ptr with a custom deleter to return the loaned
// to the RMW when last reference goes out of scope. This allows the user
// to store copies of the message outside the subscription callback.
auto loaned_message_ptr = std::shared_ptr<void>(
loaned_msg,
[subscription](void* loaned_msg_ptr)
{
rcl_ret_t ret = rcl_return_loaned_message_from_subscription(
subscription->get_subscription_handle().get(),
loaned_msg_ptr);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
}
});

// Pass the shared_ptr to the user's callback
subscription->handle_loaned_message(loaned_message_ptr, message_info);
});
} else {
// This case is taking a copy of the message data from the middleware via
// inter-process communication.
Expand Down
5 changes: 2 additions & 3 deletions rclcpp/src/rclcpp/generic_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ GenericSubscription::handle_serialized_message(
any_callback_.dispatch(message, message_info);
}

void
GenericSubscription::handle_loaned_message(
void * message, const rclcpp::MessageInfo & message_info)
void GenericSubscription::handle_loaned_message(
std::shared_ptr<void> message, const rclcpp::MessageInfo & message_info)
{
(void) message;
(void) message_info;
Expand Down
4 changes: 4 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ ament_add_gtest(test_loaned_message test_loaned_message.cpp)
ament_add_test_label(test_loaned_message mimick)
target_link_libraries(test_loaned_message ${PROJECT_NAME} mimick ${test_msgs_TARGETS})

ament_add_gtest(test_shared_memory test_shared_memory.cpp)
ament_add_test_label(test_shared_memory mimick)
target_link_libraries(test_shared_memory ${PROJECT_NAME} mimick ${test_msgs_TARGETS})

ament_add_gtest(test_memory_strategy test_memory_strategy.cpp)
target_link_libraries(test_memory_strategy ${PROJECT_NAME} ${test_msgs_TARGETS})

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TestSubscription : public rclcpp::SubscriptionBase
create_serialized_message() override {return nullptr;}

void handle_message(std::shared_ptr<void> &, const rclcpp::MessageInfo &) override {}
void handle_loaned_message(void *, const rclcpp::MessageInfo &) override {}
void handle_loaned_message(std::shared_ptr<void>, const rclcpp::MessageInfo &) override {}
void handle_serialized_message(
const std::shared_ptr<rclcpp::SerializedMessage> &, const rclcpp::MessageInfo &) override {}
void return_message(std::shared_ptr<void> &) override {}
Expand Down
Loading