diff --git a/rcl/CMakeLists.txt b/rcl/CMakeLists.txt index c5092071e..962706b77 100644 --- a/rcl/CMakeLists.txt +++ b/rcl/CMakeLists.txt @@ -5,6 +5,7 @@ project(rcl) find_package(ament_cmake_ros REQUIRED) find_package(libyaml_vendor REQUIRED) +find_package(rcl_content_filter_fallback REQUIRED) find_package(rcl_interfaces REQUIRED) find_package(rcl_logging_interface REQUIRED) find_package(rcl_yaml_param_parser REQUIRED) @@ -85,6 +86,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} PUBLIC # TODO(clalancette): rcl_interfaces should be PRIVATE, but downstream depends on it for now ${rcl_interfaces_TARGETS} + rcl_content_filter_fallback::rcl_content_filter_fallback # TODO(clalancette): rcl_logging_interface should be PRIVATE, but downstream depends on it for now rcl_logging_interface::rcl_logging_interface rcl_yaml_param_parser::rcl_yaml_param_parser @@ -136,6 +138,7 @@ ament_export_libraries(${PROJECT_NAME}) ament_export_targets(${PROJECT_NAME}) ament_export_dependencies(ament_cmake) +ament_export_dependencies(rcl_content_filter_fallback) # TODO(clalancette): rcl_interfaces shouldn't be exported, but downstream depends on it for now ament_export_dependencies(rcl_interfaces) # TODO(clalancette): rcl_logging_interface shouldn't be exported, but downstream depends on it for now diff --git a/rcl/package.xml b/rcl/package.xml index 52ad9bd83..d3b715051 100644 --- a/rcl/package.xml +++ b/rcl/package.xml @@ -20,6 +20,7 @@ rmw libyaml_vendor + rcl_content_filter_fallback rcl_interfaces rcl_logging_interface rcl_logging_spdlog diff --git a/rcl/src/rcl/subscription.c b/rcl/src/rcl/subscription.c index 1ca6edd8c..d1fa20e6c 100644 --- a/rcl/src/rcl/subscription.c +++ b/rcl/src/rcl/subscription.c @@ -21,6 +21,8 @@ extern "C" #include +#include "rcl_content_filter_fallback/rcl_content_filter_fallback.h" + #include "rcl/error_handling.h" #include "rcl/node.h" #include "rcl/node_type_cache.h" @@ -45,6 +47,54 @@ rcl_get_zero_initialized_subscription() return null_subscription; } +static +bool +rcl_subscription_rcl_content_filter_fallback_set( + const rcl_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options) +{ + if (!subscription->impl->rcl_content_filter_fallback) { + subscription->impl->rcl_content_filter_fallback = + rcl_content_filter_fallback_create(subscription->impl->type_support); + if (!subscription->impl->rcl_content_filter_fallback) { + RCL_SET_ERROR_MSG("Failed to create rcl content filter fallback"); + return false; + } + RCUTILS_LOG_DEBUG_NAMED( + ROS_PACKAGE_NAME, "rcl content filter fallback is created for topic '%s'", + rcl_subscription_get_topic_name(subscription)); + } + + if (!rcl_content_filter_fallback_set( + subscription->impl->rcl_content_filter_fallback, + options)) + { + RCL_SET_ERROR_MSG("Failed to set options for rcl content filter fallback"); + return false; + } + + return true; +} + +static +bool +rcl_subscription_rcl_content_filter_fallback_is_relevant( + const rcl_subscription_t * subscription, + void * data, + bool serialized) +{ + if (subscription->impl->rcl_content_filter_fallback && + rcl_content_filter_fallback_is_enabled(subscription->impl->rcl_content_filter_fallback)) + { + return rcl_content_filter_fallback_evaluate( + subscription->impl->rcl_content_filter_fallback, + data, + serialized); + } + + return true; +} + rcl_ret_t rcl_subscription_init( rcl_subscription_t * subscription, @@ -124,6 +174,23 @@ rcl_subscription_init( // options subscription->impl->options = *options; + subscription->impl->type_support = type_support; + + if (options->rmw_subscription_options.content_filter_options) { + // Content filter topic not supported (or not enabled as some failed cases) on rmw + // implementation. + // TODO(iuhilnehc-ynos): enable rcl content filter fallback with an environment variable + // (e.g. FORCE_RCL_CONTENT_FILTER) regardless of whether cft is enabled on RMW implementation. + if (!subscription->impl->rmw_handle->is_cft_enabled) { + if (!rcl_subscription_rcl_content_filter_fallback_set( + subscription, + options->rmw_subscription_options.content_filter_options)) + { + goto fail; + } + } + } + if (RCL_RET_OK != rcl_node_type_cache_register_type( node, type_support->get_type_hash_func(type_support), type_support->get_type_description_func(type_support), @@ -163,6 +230,10 @@ rcl_subscription_init( RCUTILS_SAFE_FWRITE_TO_STDERR("\n"); } + if (subscription->impl->rcl_content_filter_fallback) { + rcl_content_filter_fallback_destroy(subscription->impl->rcl_content_filter_fallback); + } + allocator->deallocate(subscription->impl, allocator->state); subscription->impl = NULL; } @@ -206,6 +277,10 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node) result = RCL_RET_ERROR; } + if (subscription->impl->rcl_content_filter_fallback) { + rcl_content_filter_fallback_destroy(subscription->impl->rcl_content_filter_fallback); + } + if ( ROSIDL_TYPE_HASH_VERSION_UNSET != subscription->impl->type_hash.version && RCL_RET_OK != rcl_node_type_cache_unregister_type(node, &subscription->impl->type_hash)) @@ -470,7 +545,9 @@ rcl_subscription_is_cft_enabled(const rcl_subscription_t * subscription) if (!rcl_subscription_is_valid(subscription)) { return false; } - return subscription->impl->rmw_handle->is_cft_enabled; + return subscription->impl->rmw_handle->is_cft_enabled || + (subscription->impl->rcl_content_filter_fallback && + rcl_content_filter_fallback_is_enabled(subscription->impl->rcl_content_filter_fallback)); } rcl_ret_t @@ -492,8 +569,13 @@ rcl_subscription_set_content_filter( &options->rmw_subscription_content_filter_options); if (ret != RMW_RET_OK) { - RCL_SET_ERROR_MSG(rmw_get_error_string().str); - return rcl_convert_rmw_ret_to_rcl_ret(ret); + rcl_reset_error(); + if (!rcl_subscription_rcl_content_filter_fallback_set( + subscription, + &options->rmw_subscription_content_filter_options)) + { + return RMW_RET_ERROR; + } } // copy options into subscription_options @@ -527,8 +609,20 @@ rcl_subscription_get_content_filter( subscription->impl->rmw_handle, allocator, &options->rmw_subscription_content_filter_options); - - return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret); + // If options can be get from rmw implementation, it's unnecessary to get them from rcl + // content filter fallback. + if (rmw_ret != RMW_RET_OK) { + rcl_reset_error(); + if (!rcl_content_filter_fallback_get( + subscription->impl->rcl_content_filter_fallback, + allocator, + &options->rmw_subscription_content_filter_options)) + { + RCL_SET_ERROR_MSG("Failed to get options from rcl content filter fallback"); + return RMW_RET_ERROR; + } + } + return RMW_RET_OK; } rcl_ret_t @@ -563,6 +657,16 @@ rcl_take( if (!taken) { return RCL_RET_SUBSCRIPTION_TAKE_FAILED; } + + // filter ros message with rcl content filter fallback + if (!rcl_subscription_rcl_content_filter_fallback_is_relevant( + subscription, + ros_message, + false)) + { + return RCL_RET_SUBSCRIPTION_TAKE_FAILED; + } + return RCL_RET_OK; } @@ -642,6 +746,16 @@ rcl_take_serialized_message( if (!taken) { return RCL_RET_SUBSCRIPTION_TAKE_FAILED; } + + // filter ros message with rcl content filter fallback + if (!rcl_subscription_rcl_content_filter_fallback_is_relevant( + subscription, + serialized_message, + true)) + { + return RCL_RET_SUBSCRIPTION_TAKE_FAILED; + } + return RCL_RET_OK; } @@ -710,6 +824,16 @@ rcl_take_loaned_message( if (!taken) { return RCL_RET_SUBSCRIPTION_TAKE_FAILED; } + + // filter ros message with rcl content filter fallback + if (!rcl_subscription_rcl_content_filter_fallback_is_relevant( + subscription, + *loaned_message, + false)) + { + return RCL_RET_SUBSCRIPTION_TAKE_FAILED; + } + return RCL_RET_OK; } diff --git a/rcl/src/rcl/subscription_impl.h b/rcl/src/rcl/subscription_impl.h index 8193421cf..8c9032df7 100644 --- a/rcl/src/rcl/subscription_impl.h +++ b/rcl/src/rcl/subscription_impl.h @@ -18,12 +18,15 @@ #include "rmw/rmw.h" #include "rcl/subscription.h" +#include "rosidl_runtime_c/message_type_support_struct.h" struct rcl_subscription_impl_s { rcl_subscription_options_t options; rmw_qos_profile_t actual_qos; rmw_subscription_t * rmw_handle; + void * rcl_content_filter_fallback; + const rosidl_message_type_support_t * type_support; rosidl_type_hash_t type_hash; }; diff --git a/rcl/test/rcl/test_subscription.cpp b/rcl/test/rcl/test_subscription.cpp index 2cf0ddbc9..b9509dcca 100644 --- a/rcl/test/rcl/test_subscription.cpp +++ b/rcl/test/rcl/test_subscription.cpp @@ -18,6 +18,8 @@ #include #include +#include "rcl_content_filter_fallback/rcl_content_filter_fallback.h" + #include "rcl/subscription.h" #include "rcl/rcl.h" #include "rmw/rmw.h" @@ -969,7 +971,9 @@ TEST_F( rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; }); - bool is_cft_support = rcl_subscription_is_cft_enabled(&subscription); + + // CFT must be enabled because rcl CFT will be effective even if rmw CFT is not supported. + ASSERT_TRUE(rcl_subscription_is_cft_enabled(&subscription)); ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 1000)); // publish with a non-filtered data @@ -983,22 +987,20 @@ TEST_F( ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } - if (is_cft_support) { - ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); - } else { - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 100, 100)); - - test_msgs__msg__Strings msg; - test_msgs__msg__Strings__init(&msg); - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - test_msgs__msg__Strings__fini(&msg); - }); - ret = rcl_take(&subscription, &msg, nullptr, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - ASSERT_EQ( - std::string(test_string), - std::string(msg.string_value.data, msg.string_value.size)); + { + // this event can be triggered if using the rcl content filter fallback + bool ready = wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100); + if (ready) { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + // data filtered inside rcl_take, expect the result with RCL_RET_SUBSCRIPTION_TAKE_FAILED + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_SUBSCRIPTION_TAKE_FAILED, ret) << rcl_get_error_string().str; + } } constexpr char test_filtered_string[] = "FilteredData"; @@ -1045,14 +1047,9 @@ TEST_F( ret = rcl_subscription_set_content_filter( &subscription, &options); - if (is_cft_support) { - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - // waiting to allow for filter propagation - std::this_thread::sleep_for(std::chrono::seconds(10)); - } else { - ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); - rcl_reset_error(); - } + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); EXPECT_EQ( RCL_RET_OK, @@ -1071,22 +1068,18 @@ TEST_F( ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } - if (is_cft_support) { - ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); - } else { - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 100, 100)); - - test_msgs__msg__Strings msg; - test_msgs__msg__Strings__init(&msg); - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - test_msgs__msg__Strings__fini(&msg); - }); - ret = rcl_take(&subscription, &msg, nullptr, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - ASSERT_EQ( - std::string(test_filtered_string), - std::string(msg.string_value.data, msg.string_value.size)); + { + bool ready = wait_for_subscription_to_be_ready(&subscription, context_ptr, 100, 100); + if (ready) { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_SUBSCRIPTION_TAKE_FAILED, ret) << rcl_get_error_string().str; + } } constexpr char test_filtered_other_string[] = "FilteredOtherData"; @@ -1122,7 +1115,7 @@ TEST_F( ret = rcl_subscription_get_content_filter( &subscription, &content_filter_options); - if (is_cft_support) { + { ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; rmw_subscription_content_filter_options_t * options = @@ -1140,9 +1133,6 @@ TEST_F( &subscription, &content_filter_options) ); - } else { - ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); - rcl_reset_error(); } } @@ -1161,15 +1151,12 @@ TEST_F( ret = rcl_subscription_set_content_filter( &subscription, &options); - if (is_cft_support) { + { ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; // waiting to allow for filter propagation std::this_thread::sleep_for(std::chrono::seconds(10)); ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 1000)); ASSERT_FALSE(rcl_subscription_is_cft_enabled(&subscription)); - } else { - ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); - rcl_reset_error(); } EXPECT_EQ( @@ -1279,9 +1266,7 @@ TEST_F( const char * filter_expression2 = "int32_value = %0"; const char * expression_parameters2[] = {"4"}; size_t expression_parameters2_count = sizeof(expression_parameters2) / sizeof(char *); - bool is_cft_support = - (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0 || - std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps_cpp") == 0); + // rcl CFT will be the fallback if rmw CFT is unsupported on implementation { rcl_subscription_content_filter_options_t options = rcl_get_zero_initialized_subscription_content_filter_options(); @@ -1296,10 +1281,7 @@ TEST_F( ret = rcl_subscription_set_content_filter( &subscription, &options); - if (!is_cft_support) { - ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); - rcl_reset_error(); - } else { + { ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; // waiting to allow for filter propagation std::this_thread::sleep_for(std::chrono::seconds(10)); @@ -1322,20 +1304,19 @@ TEST_F( ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } - if (is_cft_support) { - ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); - } else { - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 100, 100)); - - test_msgs__msg__BasicTypes msg; - test_msgs__msg__BasicTypes__init(&msg); - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - test_msgs__msg__BasicTypes__fini(&msg); - }); - ret = rcl_take(&subscription, &msg, nullptr, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - ASSERT_TRUE(test_value == msg.int32_value); + { + // It will be triggered if using the rcl CFT. + bool ready = wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 1000); + if (ready) { + test_msgs__msg__BasicTypes msg; + test_msgs__msg__BasicTypes__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_SUBSCRIPTION_TAKE_FAILED, ret) << rcl_get_error_string().str; + } } // publish filtered data @@ -1650,7 +1631,7 @@ TEST_F( RCL_RET_OK, rcl_subscription_content_filter_options_init( &subscription, - "data = '0'", + "int32_value = 0", 0, nullptr, &options @@ -1668,7 +1649,7 @@ TEST_F( auto mock = mocking_utils::patch_and_return( "lib:rcl", rmw_subscription_set_content_filter, RMW_RET_UNSUPPORTED); EXPECT_EQ( - RMW_RET_UNSUPPORTED, + RMW_RET_OK, rcl_subscription_set_content_filter( &subscription, &options)); rcl_reset_error(); @@ -1677,6 +1658,8 @@ TEST_F( { auto mock = mocking_utils::patch_and_return( "lib:rcl", rmw_subscription_set_content_filter, RMW_RET_ERROR); + auto mock2 = mocking_utils::patch_and_return( + "lib:rcl", rcl_content_filter_fallback_set, false); EXPECT_EQ( RMW_RET_ERROR, rcl_subscription_set_content_filter( @@ -1708,20 +1691,67 @@ TEST_F( rcl_subscription_content_filter_options_t options = rcl_get_zero_initialized_subscription_content_filter_options(); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RCL_RET_OK, + rcl_subscription_content_filter_options_fini(&subscription, &options) + ); + }); { auto mock = mocking_utils::patch_and_return( "lib:rcl", rmw_subscription_get_content_filter, RMW_RET_UNSUPPORTED); EXPECT_EQ( - RMW_RET_UNSUPPORTED, + RMW_RET_ERROR, rcl_subscription_get_content_filter( &subscription, &options)); rcl_reset_error(); } + { + // set content filter options + rcl_subscription_content_filter_options_t options2 = + rcl_get_zero_initialized_subscription_content_filter_options(); + EXPECT_EQ( + RCL_RET_OK, + rcl_subscription_content_filter_options_init( + &subscription, + "int32_value = 0", + 0, + nullptr, + &options2 + ) + ); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RCL_RET_OK, + rcl_subscription_content_filter_options_fini(&subscription, &options2) + ); + }); + + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_set_content_filter, RMW_RET_UNSUPPORTED); + EXPECT_EQ( + RMW_RET_OK, + rcl_subscription_set_content_filter( + &subscription, &options2)); + + auto mock2 = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_get_content_filter, RMW_RET_UNSUPPORTED); + + EXPECT_EQ( + RMW_RET_OK, + rcl_subscription_get_content_filter( + &subscription, &options)); + } + { auto mock = mocking_utils::patch_and_return( "lib:rcl", rmw_subscription_get_content_filter, RMW_RET_ERROR); + auto mock2 = mocking_utils::patch_and_return( + "lib:rcl", rcl_content_filter_fallback_get, false); EXPECT_EQ( RMW_RET_ERROR, rcl_subscription_get_content_filter(