diff --git a/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp b/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp index 3ca2ab6da..8e141ffde 100644 --- a/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp +++ b/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp @@ -159,6 +159,13 @@ init_context_impl( common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid( eprosima_fastrtps_identifier, participant_info->participant_->guid()); common_context->pub = publisher.get(); + common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) { + return rmw_fastrtps_shared_cpp::__rmw_publish( + eprosima_fastrtps_identifier, + pub, + msg, + nullptr); + }; common_context->sub = subscription.get(); common_context->graph_guard_condition = graph_guard_condition.get(); diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 0262554eb..d94ae4859 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -442,43 +442,16 @@ rmw_create_client( } memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + // Update graph + rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->request_writer_->guid()); + rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->response_reader_->guid()); + if (RMW_RET_OK != common_context->add_client_graph( + request_publisher_gid, response_subscriber_gid, + node->name, node->namespace_)) { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->request_writer_->guid()); - common_context->graph_cache.associate_writer( - request_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - - rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->response_reader_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_reader( - response_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != ret) { - common_context->graph_cache.dissociate_reader( - response_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - common_context->graph_cache.dissociate_writer( - request_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - return nullptr; - } + return nullptr; } cleanup_rmw_client.cancel(); diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 3d1cfe8f5..6bf4bd231 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -20,6 +20,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/rmw.h" +#include "rcpputils/scope_exit.hpp" + #include "rmw/impl/cpp/macros.hpp" #include "rmw_dds_common/qos.hpp" @@ -99,37 +101,32 @@ rmw_create_publisher( if (!publisher) { return nullptr; } - - auto common_context = static_cast(node->context->impl->common); - - auto info = static_cast(publisher->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_writer( - info->publisher_gid, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { + auto cleanup_publisher = rcpputils::make_scope_exit( + [participant_info, publisher]() { rmw_error_state_t error_state = *rmw_get_error_state(); rmw_reset_error(); - static_cast(common_context->graph_cache.dissociate_writer( - info->publisher_gid, common_context->gid, node->name, node->namespace_)); - rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher( - eprosima_fastrtps_identifier, participant_info, publisher); - if (RMW_RET_OK != rmw_ret) { + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher( + eprosima_fastrtps_identifier, participant_info, publisher)) + { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n"); rmw_reset_error(); } rmw_set_error_state(error_state.message, error_state.file, error_state.line_number); - return nullptr; - } + }); + + auto common_context = static_cast(node->context->impl->common); + auto info = static_cast(publisher->data); + + // Update graph + if (RMW_RET_OK != common_context->add_publisher_graph( + info->publisher_gid, + node->name, node->namespace_)) + { + return nullptr; } + + cleanup_publisher.cancel(); return publisher; } diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index eed5b939f..118b4e2f9 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -442,42 +442,16 @@ rmw_create_service( } memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + // Update graph + rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->request_reader_->guid()); + rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->response_writer_->guid()); + if (RMW_RET_OK != common_context->add_service_graph( + request_subscriber_gid, response_publisher_gid, + node->name, node->namespace_)) { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->request_reader_->guid()); - common_context->graph_cache.associate_reader( - request_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->response_writer_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_writer( - response_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != ret) { - common_context->graph_cache.dissociate_writer( - response_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - common_context->graph_cache.dissociate_reader( - request_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - return nullptr; - } + return nullptr; } cleanup_rmw_service.cancel(); diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 1110beb1b..f0e0396cd 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -21,6 +21,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/rmw.h" +#include "rcpputils/scope_exit.hpp" + #include "rmw_dds_common/qos.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" @@ -93,40 +95,35 @@ rmw_create_subscription( if (!subscription) { return nullptr; } - - auto common_context = static_cast(node->context->impl->common); - auto info = static_cast(subscription->data); - - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { + auto cleanup_subscription = rcpputils::make_scope_exit( + [participant_info, subscription]() { rmw_error_state_t error_state = *rmw_get_error_state(); rmw_reset_error(); - static_cast(common_context->graph_cache.dissociate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_)); - rmw_ret = rmw_fastrtps_shared_cpp::destroy_subscription( - eprosima_fastrtps_identifier, participant_info, subscription); - if (RMW_RET_OK != rmw_ret) { + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription( + eprosima_fastrtps_identifier, participant_info, subscription)) + { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n"); rmw_reset_error(); } rmw_set_error_state(error_state.message, error_state.file, error_state.line_number); - return nullptr; - } + }); + + auto common_context = static_cast(node->context->impl->common); + auto info = static_cast(subscription->data); + + // Update graph + if (RMW_RET_OK != common_context->add_subscriber_graph( + info->subscription_gid_, + node->name, node->namespace_)) + { + return nullptr; } + info->node_ = node; info->common_context_ = common_context; + cleanup_subscription.cancel(); return subscription; } diff --git a/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp b/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp index 4bfa597c9..d8e12d127 100644 --- a/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp @@ -159,6 +159,13 @@ init_context_impl( common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid( eprosima_fastrtps_identifier, participant_info->participant_->guid()); common_context->pub = publisher.get(); + common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) { + return rmw_fastrtps_shared_cpp::__rmw_publish( + eprosima_fastrtps_identifier, + pub, + msg, + nullptr); + }; common_context->sub = subscription.get(); common_context->graph_guard_condition = graph_guard_condition.get(); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 77fae42a3..d8626b54c 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -471,40 +471,16 @@ rmw_create_client( } memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + // Update graph + rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->request_writer_->guid()); + rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->response_reader_->guid()); + if (RMW_RET_OK != common_context->add_client_graph( + request_publisher_gid, response_subscriber_gid, + node->name, node->namespace_)) { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->request_writer_->guid()); - common_context->graph_cache.associate_writer( - request_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - - rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->response_reader_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_reader( - response_subscriber_gid, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { - common_context->graph_cache.dissociate_reader( - response_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - common_context->graph_cache.dissociate_writer( - request_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - return nullptr; - } + return nullptr; } cleanup_rmw_client.cancel(); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index c5b1ca459..c1d5dd38d 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -19,6 +19,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/rmw.h" +#include "rcpputils/scope_exit.hpp" + #include "rmw/impl/cpp/macros.hpp" #include "rmw_dds_common/qos.hpp" @@ -99,37 +101,32 @@ rmw_create_publisher( if (!publisher) { return nullptr; } - - auto common_context = static_cast(node->context->impl->common); - - auto info = static_cast(publisher->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_writer( - info->publisher_gid, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { + auto cleanup_publisher = rcpputils::make_scope_exit( + [participant_info, publisher]() { rmw_error_state_t error_state = *rmw_get_error_state(); rmw_reset_error(); - static_cast(common_context->graph_cache.dissociate_writer( - info->publisher_gid, common_context->gid, node->name, node->namespace_)); - rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher( - eprosima_fastrtps_identifier, participant_info, publisher); - if (RMW_RET_OK != rmw_ret) { + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher( + eprosima_fastrtps_identifier, participant_info, publisher)) + { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n"); rmw_reset_error(); } rmw_set_error_state(error_state.message, error_state.file, error_state.line_number); - return nullptr; - } + }); + + auto common_context = static_cast(node->context->impl->common); + auto info = static_cast(publisher->data); + + // Update graph + if (RMW_RET_OK != common_context->add_publisher_graph( + info->publisher_gid, + node->name, node->namespace_)) + { + return nullptr; } + + cleanup_publisher.cancel(); return publisher; } diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index dacdf01f2..4e1fe8341 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -471,40 +471,16 @@ rmw_create_service( } memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + // Update graph + rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->request_reader_->guid()); + rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + eprosima_fastrtps_identifier, info->response_writer_->guid()); + if (RMW_RET_OK != common_context->add_service_graph( + request_subscriber_gid, response_publisher_gid, + node->name, node->namespace_)) { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->request_reader_->guid()); - common_context->graph_cache.associate_reader( - request_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - - rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->response_writer_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_writer( - response_publisher_gid, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { - common_context->graph_cache.dissociate_writer( - response_publisher_gid, - common_context->gid, - node->name, - node->namespace_); - common_context->graph_cache.dissociate_reader( - request_subscriber_gid, - common_context->gid, - node->name, - node->namespace_); - return nullptr; - } + return nullptr; } cleanup_rmw_service.cancel(); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp index 235a77bd4..e2885ffe6 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp @@ -20,6 +20,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/rmw.h" +#include "rcpputils/scope_exit.hpp" + #include "rmw_dds_common/qos.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" @@ -96,39 +98,35 @@ rmw_create_subscription( if (!subscription) { return nullptr; } - - auto common_context = static_cast(node->context->impl->common); - auto info = static_cast(subscription->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { + auto cleanup_subscription = rcpputils::make_scope_exit( + [participant_info, subscription]() { rmw_error_state_t error_state = *rmw_get_error_state(); rmw_reset_error(); - static_cast(common_context->graph_cache.dissociate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_)); - rmw_ret = rmw_fastrtps_shared_cpp::destroy_subscription( - eprosima_fastrtps_identifier, participant_info, subscription); - if (RMW_RET_OK != rmw_ret) { + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription( + eprosima_fastrtps_identifier, participant_info, subscription)) + { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n"); rmw_reset_error(); } rmw_set_error_state(error_state.message, error_state.file, error_state.line_number); - return nullptr; - } + }); + + auto common_context = static_cast(node->context->impl->common); + auto info = static_cast(subscription->data); + + // Update graph + if (RMW_RET_OK != common_context->add_subscriber_graph( + info->subscription_gid_, + node->name, node->namespace_)) + { + return nullptr; } + info->node_ = node; info->common_context_ = common_context; + cleanup_subscription.cancel(); return subscription; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index 77dbeccba..1e583abcd 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -44,27 +44,15 @@ __rmw_destroy_client( static_cast(node->context->impl->participant_info); auto info = static_cast(client->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - identifier, info->request_writer_->guid()); - common_context->graph_cache.dissociate_writer( - gid, - common_context->gid, - node->name, - node->namespace_); - gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - identifier, info->response_reader_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.dissociate_reader( - gid, common_context->gid, node->name, node->namespace_); - final_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - identifier, - common_context->pub, - static_cast(&msg), - nullptr); - } + // Update graph + rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + identifier, info->request_writer_->guid()); + rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + identifier, info->response_reader_->guid()); + final_ret = common_context->remove_client_graph( + request_publisher_gid, response_subscriber_gid, + node->name, node->namespace_ + ); auto show_previous_error = [&final_ret]() diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp index e6d56efa3..07579dc87 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp @@ -70,7 +70,6 @@ __rmw_create_node( } auto common_context = static_cast(context->impl->common); - rmw_dds_common::GraphCache & graph_cache = common_context->graph_cache; rmw_node_t * node_handle = rmw_node_allocate(); if (nullptr == node_handle) { RMW_SET_ERROR_MSG("failed to allocate node"); @@ -103,24 +102,13 @@ __rmw_create_node( node_handle->context = context; - { - // Though graph_cache methods are thread safe, both cache update and publishing have to also - // be atomic. - // If not, the following race condition is possible: - // node1-update-get-message / node2-update-get-message / node2-publish / node1-publish - // In that case, the last message published is not accurate. - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = - graph_cache.add_node(common_context->gid, name, namespace_); - if (RMW_RET_OK != __rmw_publish( - node_handle->implementation_identifier, - common_context->pub, - static_cast(&participant_msg), - nullptr)) - { - return nullptr; - } + rmw_ret_t rmw_ret = common_context->add_node_graph( + name, namespace_ + ); + if (RMW_RET_OK != rmw_ret) { + return nullptr; } + cleanup_node.cancel(); return node_handle; } @@ -133,17 +121,9 @@ __rmw_destroy_node( assert(node->implementation_identifier == identifier); rmw_ret_t ret = RMW_RET_OK; auto common_context = static_cast(node->context->impl->common); - rmw_dds_common::GraphCache & graph_cache = common_context->graph_cache; - { - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = - graph_cache.remove_node(common_context->gid, node->name, node->namespace_); - ret = __rmw_publish( - identifier, - common_context->pub, - static_cast(&participant_msg), - nullptr); - } + ret = common_context->remove_node_graph( + node->name, node->namespace_ + ); rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); rmw_node_free(node); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8ad697aba..f499fcd0d 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -49,22 +49,16 @@ __rmw_destroy_publisher( rmw_error_state_t error_state; auto common_context = static_cast(node->context->impl->common); auto info = static_cast(publisher->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.dissociate_writer( - info->publisher_gid, common_context->gid, node->name, node->namespace_); - rmw_ret_t publish_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - identifier, - common_context->pub, - &msg, - nullptr); - if (RMW_RET_OK != publish_ret) { - error_state = *rmw_get_error_state(); - ret = publish_ret; - rmw_reset_error(); - } + + // Update graph + rmw_ret_t rmw_ret = common_context->remove_publisher_graph( + info->publisher_gid, + node->name, node->namespace_ + ); + if (RMW_RET_OK != rmw_ret) { + error_state = *rmw_get_error_state(); + ret = rmw_ret; + rmw_reset_error(); } auto participant_info = diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index fda506467..d071b5220 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -56,27 +56,16 @@ __rmw_destroy_service( auto participant_info = static_cast(node->context->impl->participant_info); auto info = static_cast(service->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - identifier, info->request_reader_->guid()); - common_context->graph_cache.dissociate_reader( - gid, - common_context->gid, - node->name, - node->namespace_); - gid = rmw_fastrtps_shared_cpp::create_rmw_gid( - identifier, info->response_writer_->guid()); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.dissociate_writer( - gid, common_context->gid, node->name, node->namespace_); - final_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - identifier, - common_context->pub, - static_cast(&msg), - nullptr); - } + + // Update graph + rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + identifier, info->request_reader_->guid()); + rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid( + identifier, info->response_writer_->guid()); + final_ret = common_context->remove_service_graph( + request_subscriber_gid, response_publisher_gid, + node->name, node->namespace_ + ); auto show_previous_error = [&final_ret]() diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 63328f073..218b0ff38 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -52,22 +52,15 @@ __rmw_destroy_subscription( rmw_error_string_t error_string; auto common_context = static_cast(node->context->impl->common); auto info = static_cast(subscription->data); - { - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.dissociate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_); - ret = rmw_fastrtps_shared_cpp::__rmw_publish( - identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != ret) { - error_state = *rmw_get_error_state(); - error_string = rmw_get_error_string(); - rmw_reset_error(); - } + // Update graph + ret = common_context->remove_subscriber_graph( + info->subscription_gid_, + node->name, node->namespace_ + ); + if (RMW_RET_OK != ret) { + error_state = *rmw_get_error_state(); + error_string = rmw_get_error_string(); + rmw_reset_error(); } auto participant_info = @@ -145,10 +138,10 @@ __rmw_subscription_set_content_filter( eprosima::fastdds::dds::DomainParticipant * dds_participant = info->dds_participant_; eprosima::fastdds::dds::TopicDescription * des_topic = nullptr; - const char * eprosima_fastrtps_identifier = subscription->implementation_identifier; + const char * identifier = subscription->implementation_identifier; rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_subscription( - eprosima_fastrtps_identifier, + identifier, info->node_, subscription, true /* reset_cft */); @@ -204,28 +197,20 @@ __rmw_subscription_set_content_filter( ///// // Update RMW GID info->subscription_gid_ = rmw_fastrtps_shared_cpp::create_rmw_gid( - eprosima_fastrtps_identifier, info->data_reader_->guid()); + identifier, info->data_reader_->guid()); - { - rmw_dds_common::Context * common_context = info->common_context_; - const rmw_node_t * node = info->node_; - - // Update graph - std::lock_guard guard(common_context->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common_context->graph_cache.associate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_); - rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish( - eprosima_fastrtps_identifier, - common_context->pub, - static_cast(&msg), - nullptr); - if (RMW_RET_OK != rmw_ret) { - static_cast(common_context->graph_cache.dissociate_reader( - info->subscription_gid_, common_context->gid, node->name, node->namespace_)); - return RMW_RET_ERROR; - } + rmw_dds_common::Context * common_context = info->common_context_; + const rmw_node_t * node = info->node_; + + // Update graph + ret = common_context->add_subscriber_graph( + info->subscription_gid_, + node->name, node->namespace_ + ); + if (RMW_RET_OK != ret) { + return RMW_RET_ERROR; } + cleanup_datareader.cancel(); return RMW_RET_OK; }