Skip to content

Commit

Permalink
Merge commit '6a48919727226ebad853373e8a1eec0ee22c3be4'
Browse files Browse the repository at this point in the history
Conflicts:
	rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
	rmw_zenoh_cpp/src/rmw_init.cpp
	rmw_zenoh_cpp/src/rmw_zenoh.cpp
  • Loading branch information
yellowhatter committed Sep 24, 2024
2 parents 2f8b788 + 6a48919 commit 7861708
Show file tree
Hide file tree
Showing 12 changed files with 1,225 additions and 1,302 deletions.
29 changes: 23 additions & 6 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cstdlib>
#include <cstring>
#include <string>
#include <utility>

#include "rmw/types.h"

Expand All @@ -27,6 +28,9 @@
namespace rmw_zenoh_cpp
{

attachement_context_t::attachement_context_t(std::unique_ptr<attachement_data_t> && _data)
: data(std::move(_data)) {}

bool create_attachment_iter(z_owned_bytes_t * kv_pair, void * context)
{
attachement_context_t * ctx = reinterpret_cast<attachement_context_t *>(context);
Expand All @@ -52,19 +56,31 @@ bool create_attachment_iter(z_owned_bytes_t * kv_pair, void * context)
return true;
}

attachement_data_t::attachement_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE);
}

z_result_t attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment)
{
attachement_context_t context = attachement_context_t(this);
attachement_context_t context =
attachement_context_t(std::make_unique<attachement_data_t>(*this));
return z_bytes_from_iter(
attachment, create_attachment_iter,
reinterpret_cast<void *>(&context));
}


bool get_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & key, z_owned_bytes_t * val)
{
if (z_bytes_is_empty(attachment)) {
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return false;
}

Expand All @@ -89,6 +105,8 @@ bool get_attachment(

if (found) {
break;
} else {
z_drop(z_move(*val));
}
}

Expand All @@ -107,7 +125,7 @@ bool get_gid_from_attachment(
const z_loaned_bytes_t * const attachment,
uint8_t gid[RMW_GID_STORAGE_SIZE])
{
if (z_bytes_is_empty(attachment)) {
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return false;
}

Expand Down Expand Up @@ -135,7 +153,7 @@ int64_t get_int64_from_attachment(
const std::string & name)
{
// A valid request must have had an attachment
if (z_bytes_is_empty(attachment)) {
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return -1;
}

Expand All @@ -148,7 +166,7 @@ int64_t get_int64_from_attachment(
}

int64_t num;
if (z_bytes_deserialize_into_int64(z_loan(val), &num)) {
if (z_bytes_deserialize_into_int64(z_loan(val), &num) != Z_OK) {
return -1;
}

Expand All @@ -161,5 +179,4 @@ int64_t get_int64_from_attachment(

return num;
}

} // namespace rmw_zenoh_cpp
21 changes: 7 additions & 14 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <zenoh.h>

#include <string>
#include <memory>

#include "rmw/types.h"

Expand All @@ -30,40 +31,32 @@ class attachement_data_t final
int64_t sequence_number;
int64_t source_timestamp;
uint8_t source_gid[RMW_GID_STORAGE_SIZE];
attachement_data_t(
explicit attachement_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE);
}
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);
z_result_t serialize_to_zbytes(z_owned_bytes_t *);
};

class attachement_context_t final
{
public:
const attachement_data_t * data;
std::unique_ptr<attachement_data_t> data;
int length = 3;
int idx = 0;

attachement_context_t(const attachement_data_t * _data)
: data(_data) {}
attachement_context_t(std::unique_ptr<attachement_data_t> && _data);
};

bool get_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & key, z_owned_bytes_t * val);

bool get_gid_from_attachment(
const z_loaned_bytes_t * const attachment,
uint8_t gid[RMW_GID_STORAGE_SIZE]);
const z_loaned_bytes_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

int64_t get_int64_from_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & name);
const z_loaned_bytes_t * const attachment, const std::string & name);
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
39 changes: 28 additions & 11 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ void GraphCache::parse_put(
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) {
sub_it->second(entity->zid());
}
}
}
Expand Down Expand Up @@ -595,8 +595,10 @@ void GraphCache::parse_del(
return entity->zid() == node_it.second->zid_ && entity->nid() == node_it.second->nid_;
});
if (node_it == range.second) {
// Node does not exist.
RMW_ZENOH_LOG_WARN_NAMED(
// Node does not exist or its liveliness token has been unregistered before one of its
// pubs/subs/service liveliness token. This could happen since Zenoh doesn't guarantee
// any order for unregistration events if the remote Node closed abruptly or was disconnected.
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove unknown node /%s from the graph. Ignoring...",
entity->node_name().c_str()
Expand All @@ -606,16 +608,17 @@ void GraphCache::parse_del(

if (entity->type() == EntityType::Node) {
// Node
// The liveliness tokens to remove pub/subs should be received before the one to remove a node
// given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in
// the node below, we should update the count in graph_topics_.
// In case the remote Node closed abruptly or was disconnected, Zenoh could deliver the
// liveliness tokens unregistration events in any order.
// If the event for Node unregistration comes before the unregistration of its
// pubs/subs/services, we should update the count in graph_topics_ and graph_services_.
const GraphNodePtr graph_node = node_it->second;
if (!graph_node->pubs_.empty() ||
!graph_node->subs_.empty() ||
!graph_node->clients_.empty() ||
!graph_node->services_.empty())
{
RMW_ZENOH_LOG_WARN_NAMED(
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove node /%s from the graph before all pub/subs/"
"clients/services for this node have been removed. Removing all entities first...",
Expand Down Expand Up @@ -1331,15 +1334,29 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
querying_subs_cbs_[keyexpr] = std::move(
std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data)
{
auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_data);
}

} // namespace rmw_zenoh_cpp
13 changes: 11 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@

namespace rmw_zenoh_cpp
{
// Forward declare to prevent circular dependency.
// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of
// rmw_data_types.hpp.
class rmw_subscription_data_t;

///=============================================================================
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
Expand Down Expand Up @@ -184,9 +189,12 @@ class GraphCache final
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -286,7 +294,8 @@ class GraphCache final
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
std::unordered_map<std::string, std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "liveliness_utils.hpp"

#include <zenoh.h>

#include <functional>
Expand Down
Loading

0 comments on commit 7861708

Please sign in to comment.