diff --git a/rcl/test/rcl/test_subscription.cpp b/rcl/test/rcl/test_subscription.cpp index ba74e73fe..ea098a15e 100644 --- a/rcl/test/rcl/test_subscription.cpp +++ b/rcl/test/rcl/test_subscription.cpp @@ -23,6 +23,7 @@ #include "rmw/rmw.h" #include "rmw/validate_full_topic_name.h" +#include "rcutils/strdup.h" #include "rcutils/testing/fault_injection.h" #include "test_msgs/msg/basic_types.h" #include "test_msgs/msg/strings.h" @@ -847,6 +848,388 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_bad_return_l rcl_subscription_fini(&subscription, this->node_ptr)) << rcl_get_error_string().str; } + +/* A subscription with a content filtered topic setting. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixture, + RMW_IMPLEMENTATION), test_subscription_content_filtered) { + bool is_vendor_support_cft + = (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings); + constexpr char topic[] = "rcl_test_subscription_content_filtered_chatter"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + rcl_subscription_t subscription = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + subscription_options.rmw_subscription_options.filter_expression + = rcutils_strdup("string_value MATCH 'FilteredData'", subscription_options.allocator); + ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + if (is_vendor_support_cft) { + ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + constexpr char test_string[] = "NotFilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + constexpr char test_filtered_string[] = "FilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // set filter + { + rcutils_string_array_t expression_parameters; + expression_parameters = rcutils_get_zero_initialized_string_array(); + rcl_allocator_t allocator = rcl_get_default_allocator(); + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &expression_parameters, 1, &allocator); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str; + expression_parameters.data[0] = rcutils_strdup("'FilteredOtherData'", allocator); + + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "string_value MATCH %0", &expression_parameters); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + + rcutils_ret = rcutils_string_array_fini(&expression_parameters); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str; + } + + // publish FilteredData again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + constexpr char test_filtered_other_string[] = "FilteredOtherData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_other_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_other_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // get filter + char * filter_expression = nullptr; + rcutils_string_array_t expression_parameters; + expression_parameters = rcutils_get_zero_initialized_string_array(); + ret = rcl_subscription_get_cft_expression_parameters( + &subscription, &filter_expression, &expression_parameters); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string("string_value MATCH %0"), + std::string(filter_expression)); + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + allocator.deallocate(filter_expression, allocator.state); + ASSERT_EQ(static_cast(1), expression_parameters.size); + ASSERT_EQ( + std::string("'FilteredOtherData'"), + std::string(expression_parameters.data[0])); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_string_array_fini(&expression_parameters)); + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + + // reset filter + { + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "", NULL); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(wait_for_established_subscription(&publisher, 50, 100)); + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + } + + // publish no filtered data again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + // it might take 'FilteredData' again, if so, contine to take data + std::string data = std::string(msg.string_value.data, msg.string_value.size); + if (is_vendor_support_cft) { + const int try_total_num = 3; + int i = 0; + while (data != test_string && i++ < try_total_num) { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + data = std::string(msg.string_value.data, msg.string_value.size); + } + } + ASSERT_EQ(std::string(test_string), data); + } +} + +/* A subscription without a content filtered topic setting at beginning. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription_not_begin_content_filtered) { + bool is_vendor_support_cft + = (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings); + constexpr char topic[] = "rcl_test_subscription_not_begin_content_filtered_chatter"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + rcl_subscription_t subscription = rcl_get_zero_initialized_subscription(); + // not to set filter expression + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + + // publish no filtered data + constexpr char test_string[] = "NotFilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // set filter + { + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "string_value MATCH 'FilteredData'", NULL); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + } + + if (is_vendor_support_cft) { + ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } + // publish no filtered data again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // publish filtered data + constexpr char test_filtered_string[] = "FilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } +} + TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_get_options) { rcl_ret_t ret; const rosidl_message_type_support_t * ts =