Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Sep 27, 2024
2 parents 5be2532 + 8e883bc commit 344b797
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 34 deletions.
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct rmw_zenoh_event_status_t
size_t total_count;
size_t total_count_change;
size_t current_count;
size_t current_count_change;
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;

Expand Down Expand Up @@ -97,7 +97,7 @@ class DataCallbackManager
size_t unread_count_ {0};
};

/// Base class to be inherited by entities that support events.
/// A class to manage QoS related events.
class EventsManager
{
public:
Expand Down
79 changes: 53 additions & 26 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,20 @@ void GraphCache::handle_matched_events_for_put(
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
if (is_pub) {
// Count the number of matching subs for each set of qos settings.
if (!topic_data_ptr->subs_.empty()) {
match_count_for_entity += topic_data_ptr->subs_.size();
}
match_count_for_entity += topic_data_ptr->subs_.size();
// Also iterate through the subs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand All @@ -238,17 +241,20 @@ void GraphCache::handle_matched_events_for_put(
} else {
// Entity is a sub.
// Count the number of matching pubs for each set of qos settings.
if (!topic_data_ptr->pubs_.empty()) {
match_count_for_entity += topic_data_ptr->pubs_.size();
}
match_count_for_entity += topic_data_ptr->pubs_.size();
// Also iterate through the pubs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand Down Expand Up @@ -308,7 +314,7 @@ void GraphCache::handle_matched_events_for_del(
}

///=============================================================================
void GraphCache::take_entities_with_events(EntityEventMap & entities_with_events)
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
Expand Down Expand Up @@ -412,8 +418,8 @@ void GraphCache::parse_put(
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) {
sub_it->second(entity->zid());
}
}
}
Expand Down Expand Up @@ -1260,6 +1266,13 @@ void GraphCache::set_qos_event_callback(
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
}

///=============================================================================
bool GraphCache::is_entity_local(const liveliness::Entity & entity) const
{
Expand Down Expand Up @@ -1300,8 +1313,8 @@ void GraphCache::update_event_counters(
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::abs(change);
status_to_update.total_count_change += std::abs(change);
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}
Expand Down Expand Up @@ -1332,15 +1345,29 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
querying_subs_cbs_[keyexpr] = std::move(
std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data)
{
auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_data);
}

} // namespace rmw_zenoh_cpp
18 changes: 15 additions & 3 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@

namespace rmw_zenoh_cpp
{
// Forward declare to prevent circular dependency.
// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of
// rmw_data_types.hpp.
class rmw_subscription_data_t;

///=============================================================================
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
Expand Down Expand Up @@ -182,13 +187,19 @@ class GraphCache final
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -240,7 +251,7 @@ class GraphCache final

using EntityEventMap =
std::unordered_map<liveliness::ConstEntityPtr, std::unordered_set<rmw_zenoh_event_type_t>>;
void take_entities_with_events(EntityEventMap & entities_with_events);
void take_entities_with_events(const EntityEventMap & entities_with_events);

std::string zid_str_;
/*
Expand Down Expand Up @@ -288,7 +299,8 @@ class GraphCache final
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
std::unordered_map<std::string, std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ rmw_context_impl_s::rmw_context_impl_s(
if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) {
++connection_attempts;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (ret != RMW_RET_OK) {
throw std::runtime_error(
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ rmw_event_set_callback(
return RMW_RET_ERROR;
}

// Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase.
// Both rmw_subscription_data_t and rmw_publisher_data_t store an EventsManager object.
rmw_zenoh_cpp::EventsManager * event_data =
static_cast<rmw_zenoh_cpp::EventsManager *>(rmw_event->data);
RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT);
Expand Down
23 changes: 22 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,10 @@ rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
Expand Down Expand Up @@ -724,6 +728,10 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
RMW_SET_ERROR_MSG("failed to undeclare pub");
ret = RMW_RET_ERROR;
}

// Remove any event callbacks registered to this publisher.
context_impl->graph_cache()->remove_qos_event_callbacks(publisher_data->entity);

RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, );
allocator->deallocate(publisher_data, allocator->state);
}
Expand Down Expand Up @@ -1486,7 +1494,7 @@ rmw_create_subscription(
// Register the querying subscriber with the graph cache to get latest
// messages from publishers that were discovered after their first publication.
context_impl->graph_cache()->set_querying_subscriber_callback(
sub_data->entity->topic_info()->topic_keyexpr_,
sub_data,
[sub_data](const std::string & queryable_prefix) -> void
{
if (sub_data == nullptr) {
Expand Down Expand Up @@ -1582,6 +1590,10 @@ rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
Expand Down Expand Up @@ -1619,8 +1631,13 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
RMW_SET_ERROR_MSG("failed to undeclare sub");
ret = RMW_RET_ERROR;
}
// Also remove the registered callback from the GraphCache.
context_impl->graph_cache()->remove_querying_subscriber_callback(sub_data);
}

// Remove any event callbacks registered to this subscription.
context_impl->graph_cache()->remove_qos_event_callbacks(sub_data->entity);

RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
allocator->deallocate(sub_data, allocator->state);
}
Expand Down Expand Up @@ -1686,6 +1703,8 @@ rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
// Re-enable test in rclcpp when this feature is implemented
// https://github.com/ros2/rclcpp/pull/2627
static_cast<void>(subscription);
static_cast<void>(options);
return RMW_RET_UNSUPPORTED;
Expand All @@ -1699,6 +1718,8 @@ rmw_subscription_get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
// Re-enable test in rclcpp when this feature is implemented
// https://github.com/ros2/rclcpp/pull/2627
static_cast<void>(subscription);
static_cast<void>(allocator);
static_cast<void>(options);
Expand Down

0 comments on commit 344b797

Please sign in to comment.