Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

call rcl content filter fallback #996

Open
wants to merge 10 commits into
base: rolling
Choose a base branch
from
3 changes: 3 additions & 0 deletions rcl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ project(rcl)

find_package(ament_cmake_ros REQUIRED)

find_package(rcl_content_filter_fallback REQUIRED)
find_package(rcl_interfaces REQUIRED)
find_package(rcl_logging_interface REQUIRED)
find_package(rcl_yaml_param_parser REQUIRED)
Expand Down Expand Up @@ -76,6 +77,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC
"$<INSTALL_INTERFACE:include/${PROJECT_NAME}>")
# specific order: dependents before dependencies
ament_target_dependencies(${PROJECT_NAME}
"rcl_content_filter_fallback"
"rcl_interfaces"
"rcl_logging_interface"
"rcl_yaml_param_parser"
Expand Down Expand Up @@ -117,6 +119,7 @@ ament_export_targets(${PROJECT_NAME})

# specific order: dependents before dependencies
ament_export_dependencies(ament_cmake)
ament_export_dependencies(rcl_content_filter_fallback)
ament_export_dependencies(rcl_interfaces)
ament_export_dependencies(rcl_logging_interface)
ament_export_dependencies(rcl_yaml_param_parser)
Expand Down
1 change: 1 addition & 0 deletions rcl/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<build_export_depend>rmw</build_export_depend>

<depend>rcl_content_filter_fallback</depend>
<depend>rcl_interfaces</depend>
<depend>rcl_logging_interface</depend>
<depend>rcl_logging_spdlog</depend> <!-- the default logging impl -->
Expand Down
134 changes: 129 additions & 5 deletions rcl/src/rcl/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ extern "C"

#include <stdio.h>

#include "rcl_content_filter_fallback/rcl_content_filter_fallback.h"

#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
Expand All @@ -42,6 +44,54 @@ rcl_get_zero_initialized_subscription()
return null_subscription;
}

static
bool
rcl_subscription_rcl_content_filter_fallback_set(
const rcl_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
if (!subscription->impl->rcl_content_filter_fallback) {
subscription->impl->rcl_content_filter_fallback =
rcl_content_filter_fallback_create(subscription->impl->type_support);
if (!subscription->impl->rcl_content_filter_fallback) {
RCL_SET_ERROR_MSG("Failed to create rcl content filter fallback");
return false;
}
RCUTILS_LOG_DEBUG_NAMED(
ROS_PACKAGE_NAME, "rcl content filter fallback is created for topic '%s'",
rcl_subscription_get_topic_name(subscription));
}

if (!rcl_content_filter_fallback_set(
subscription->impl->rcl_content_filter_fallback,
options))
{
RCL_SET_ERROR_MSG("Failed to set options for rcl content filter fallback");
return false;
}

return true;
}

static
bool
rcl_subscription_rcl_content_filter_fallback_is_relevant(
const rcl_subscription_t * subscription,
void * data,
bool serialized)
{
if (subscription->impl->rcl_content_filter_fallback &&
rcl_content_filter_fallback_is_enabled(subscription->impl->rcl_content_filter_fallback))
{
return rcl_content_filter_fallback_evaluate(
subscription->impl->rcl_content_filter_fallback,
data,
serialized);
}

return true;
}

rcl_ret_t
rcl_subscription_init(
rcl_subscription_t * subscription,
Expand Down Expand Up @@ -120,6 +170,23 @@ rcl_subscription_init(
options->qos.avoid_ros_namespace_conventions;
// options
subscription->impl->options = *options;
subscription->impl->type_support = type_support;

if (options->rmw_subscription_options.content_filter_options) {
// Content filter topic not supported (or not enabled as some failed cases) on rmw
// implementation.
// TODO(iuhilnehc-ynos): enable rcl content filter fallback with an environment variable
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on DDS.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on DDS.
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on RMW implementation.

if (!subscription->impl->rmw_handle->is_cft_enabled) {
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
if (!rcl_subscription_rcl_content_filter_fallback_set(
subscription,
options->rmw_subscription_options.content_filter_options))
{
goto fail;
}
}
}

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
ret = RCL_RET_OK;
TRACEPOINT(
Expand Down Expand Up @@ -147,6 +214,10 @@ rcl_subscription_init(
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
}

if (subscription->impl->rcl_content_filter_fallback) {
rcl_content_filter_fallback_destroy(subscription->impl->rcl_content_filter_fallback);
}

allocator->deallocate(subscription->impl, allocator->state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -190,6 +261,10 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node)
result = RCL_RET_ERROR;
}

if (subscription->impl->rcl_content_filter_fallback) {
rcl_content_filter_fallback_destroy(subscription->impl->rcl_content_filter_fallback);
}

allocator.deallocate(subscription->impl, allocator.state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -445,7 +520,9 @@ rcl_subscription_is_cft_enabled(const rcl_subscription_t * subscription)
if (!rcl_subscription_is_valid(subscription)) {
return false;
}
return subscription->impl->rmw_handle->is_cft_enabled;
return subscription->impl->rmw_handle->is_cft_enabled ||
(subscription->impl->rcl_content_filter_fallback &&
rcl_content_filter_fallback_is_enabled(subscription->impl->rcl_content_filter_fallback));
}

rcl_ret_t
Expand All @@ -467,8 +544,13 @@ rcl_subscription_set_content_filter(
&options->rmw_subscription_content_filter_options);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
rcl_reset_error();
if (!rcl_subscription_rcl_content_filter_fallback_set(
subscription,
&options->rmw_subscription_content_filter_options))
{
return RMW_RET_ERROR;
}
}

// copy options into subscription_options
Expand Down Expand Up @@ -502,8 +584,20 @@ rcl_subscription_get_content_filter(
subscription->impl->rmw_handle,
allocator,
&options->rmw_subscription_content_filter_options);

return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
// If options can be get from rmw implementation, it's unnecessary to get them from rcl
// content filter fallback.
if (rmw_ret != RMW_RET_OK) {
rcl_reset_error();
if (!rcl_content_filter_fallback_get(
subscription->impl->rcl_content_filter_fallback,
allocator,
&options->rmw_subscription_content_filter_options))
{
RCL_SET_ERROR_MSG("Failed to get options from rcl content filter fallback");
return RMW_RET_ERROR;
}
}
return RMW_RET_OK;
}

rcl_ret_t
Expand Down Expand Up @@ -538,6 +632,16 @@ rcl_take(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with rcl content filter fallback
if (!rcl_subscription_rcl_content_filter_fallback_is_relevant(
subscription,
ros_message,
false))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down Expand Up @@ -617,6 +721,16 @@ rcl_take_serialized_message(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with rcl content filter fallback
if (!rcl_subscription_rcl_content_filter_fallback_is_relevant(
subscription,
serialized_message,
true))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down Expand Up @@ -653,6 +767,16 @@ rcl_take_loaned_message(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with rcl content filter fallback
if (!rcl_subscription_rcl_content_filter_fallback_is_relevant(
subscription,
*loaned_message,
false))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down
3 changes: 3 additions & 0 deletions rcl/src/rcl/subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
#include "rmw/rmw.h"

#include "rcl/subscription.h"
#include "rosidl_runtime_c/message_type_support_struct.h"

struct rcl_subscription_impl_s
{
rcl_subscription_options_t options;
rmw_qos_profile_t actual_qos;
rmw_subscription_t * rmw_handle;
void * rcl_content_filter_fallback;
const rosidl_message_type_support_t * type_support;
};

#endif // RCL__SUBSCRIPTION_IMPL_H_
Loading