Skip to content

Commit

Permalink
to support a feature of content filtered topic
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored and Chen Lihui committed Dec 2, 2020
1 parent f5e35bd commit 166e1fb
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 1 deletion.
35 changes: 35 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,41 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
bool
exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);

/// Check if subscription instance can support content filter topic feature.
/**
* Depending on the middleware and the message type, this will return true if the middleware
* can support content filter topic feature.
*
* \return boolean flag indicating if middleware can support content filter topic feature.
*/
RCLCPP_PUBLIC
bool
is_cft_supported() const;

/// Set the filter expression and expression parameters for the subscription.
/**
* \param[in] filter_expression An filter expression to set.
* \param[in] expression_parameters Array of expression parameters to set.
*/
RCLCPP_PUBLIC
virtual
void
set_cft_expression_parameters(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters);

/// Get the filter expression and expression parameters for the subscription.
/**
* \param[out] filter_expression An filter expression to get.
* \param[out] expression_parameters Array of expression parameters to get.
*/
RCLCPP_PUBLIC
virtual
void
get_cft_expression_parameters(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const;

protected:
template<typename EventCallbackT>
void
Expand Down
69 changes: 68 additions & 1 deletion rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"

#include "rclcpp/callback_group.hpp"
#include "rclcpp/detail/rmw_implementation_specific_subscription_payload.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"
Expand Down Expand Up @@ -75,6 +78,17 @@ struct SubscriptionOptionsBase
TopicStatisticsOptions topic_stats_options;

QosOverridingOptions qos_overriding_options;

// Options to configure content filtered topic in the subscription.
struct ContentFilterOptions
{
// Filter expression like SQL statement
std::string filter_expression;
// Expression parameters for filter expression
std::vector<std::string> expression_parameters;
};

ContentFilterOptions content_filter_options;
};

/// Structure containing optional configuration for Subscriptions.
Expand Down Expand Up @@ -105,7 +119,8 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
using AllocatorTraits = std::allocator_traits<Allocator>;
using MessageAllocatorT = typename AllocatorTraits::template rebind_alloc<MessageT>;
auto message_alloc = std::make_shared<MessageAllocatorT>(*allocator.get());
result.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
rcl_allocator_t allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.allocator = allocator;
result.qos = qos.get_rmw_qos_profile();
result.rmw_subscription_options.ignore_local_publications = this->ignore_local_publications;

Expand All @@ -114,6 +129,58 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options);
}

auto fail_clean = [&result]()
{
rcl_ret_t ret = rcl_subscription_options_fini(&result);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
};

// Copy content_filter_options into rcl_subscription_options if necessary.
if (!content_filter_options.filter_expression.empty()) {
char * expression =
rcutils_strdup(content_filter_options.filter_expression.c_str(), allocator);
if (!expression) {
throw std::runtime_error("failed to allocate memory for filter expression");
}
result.rmw_subscription_options.filter_expression = expression;
}

if (!content_filter_options.expression_parameters.empty()) {
rcutils_string_array_t * parameters =
static_cast<rcutils_string_array_t *>(allocator.allocate(
sizeof(rcutils_string_array_t),
allocator.state));
if (!parameters) {
fail_clean();
throw std::runtime_error("failed to allocate memory for expression parameters");
}
rcutils_ret_t ret = rcutils_string_array_init(
parameters, content_filter_options.expression_parameters.size(), &allocator);
if (RCUTILS_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret,
"failed to initialize string array for expression parameters");
}

for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) {
char * parameter =
rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator);
if (!parameter) {
fail_clean();
throw std::runtime_error("failed to allocate memory for expression parameters");
}
parameters->data[i] = parameter;
}

result.rmw_subscription_options.expression_parameters = parameters;
}

return result;
}

Expand Down
89 changes: 89 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"

#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
Expand All @@ -44,6 +48,20 @@ SubscriptionBase::SubscriptionBase(
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
// finalize subscription_options
RCLCPP_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_options_fini(
const_cast<rcl_subscription_options_t *>(&subscription_options));
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"),
"Failed to fini subscription option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
{
if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) {
Expand Down Expand Up @@ -288,3 +306,74 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
}
throw std::runtime_error("given pointer_to_subscription_part does not match any part");
}

bool
SubscriptionBase::is_cft_supported() const
{
return rcl_subscription_is_cft_supported(subscription_handle_.get());
}

void
SubscriptionBase::set_cft_expression_parameters(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters)
{
rcl_allocator_t allocator = rcl_get_default_allocator();
rcutils_string_array_t parameters;
parameters = rcutils_get_zero_initialized_string_array();
RCLCPP_SCOPE_EXIT(
{
rcutils_ret_t rcutils_ret = rcutils_string_array_fini(&parameters);
if (RCUTILS_RET_OK != rcutils_ret) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"),
"failed to finalize parameters: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

for (size_t i = 0; i < expression_parameters.size(); ++i) {
parameters.data[i] = rcutils_strdup(expression_parameters[i].c_str(), allocator);
}
rcl_ret_t ret = rcl_subscription_set_cft_expression_parameters(
subscription_handle_.get(),
filter_expression.c_str(),
&parameters);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters");
}
}

void
SubscriptionBase::get_cft_expression_parameters(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const
{
char * expression = NULL;
rcutils_string_array_t parameters;
parameters = rcutils_get_zero_initialized_string_array();
rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters(
subscription_handle_.get(),
&expression,
&parameters);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters");
}

filter_expression = expression;
for (size_t i = 0; i < parameters.size; ++i) {
expression_parameters.push_back(parameters.data[i]);
}

rcutils_ret_t rcutils_ret = rcutils_string_array_fini(&parameters);
if (RCUTILS_RET_OK != rcutils_ret) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"),
"failed to finalize parameters: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
}

0 comments on commit 166e1fb

Please sign in to comment.