Skip to content

Commit

Permalink
liveliness_declare_subscriber
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 19, 2024
1 parent 0635799 commit dc2476e
Showing 1 changed file with 44 additions and 65 deletions.
109 changes: 44 additions & 65 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ static std::mutex data_to_data_shared_ptr_map_mutex;
static std::unordered_map<rmw_context_impl_s::Data *,
std::shared_ptr<rmw_context_impl_s::Data>> data_to_data_shared_ptr_map;

static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data);

// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
// "this" ptr by reference.
Expand Down Expand Up @@ -213,26 +211,35 @@ class rmw_context_impl_s::Data final

// Setup the liveliness subscriber to receives updates from the ROS graph
// and update the graph cache.
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, this);
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
auto undeclare_z_sub = rcpputils::make_scope_exit(
[this]() {
z_undeclare_subscriber(z_move(this->graph_subscriber_));
});
if (zc_liveliness_declare_subscriber(
z_loan(session_->_0),
&graph_subscriber_, z_loan(liveliness_ke),
z_move(callback), &sub_options) != Z_OK)
{
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str());
zenoh::Session::LivelinessSubscriberOptions sub_options =
zenoh::Session::LivelinessSubscriberOptions::create_default();
sub_options.history = true;
graph_subscriber_cpp_ = session_->liveliness_declare_subscriber(
keyexpr_cpp,
[&](const zenoh::Sample& sample) {

// // Look up the data shared_ptr in the global map. If it is in there, use it.
// If not, it is being shutdown so we can just ignore this update.
std::shared_ptr<rmw_context_impl_s::Data> data_shared_ptr{nullptr};
{
std::lock_guard<std::mutex> lk(data_to_data_shared_ptr_map_mutex);
if (data_to_data_shared_ptr_map.count(this) == 0) {
return;
}
data_shared_ptr = data_to_data_shared_ptr_map[this];
}

// Update the graph cache.
data_shared_ptr->update_graph_cache(sample, std::string(sample.get_keyexpr().as_string_view()));
},
zenoh::closures::none,
std::move(sub_options),
&err);

undeclare_z_sub.cancel();
if (err != Z_OK) {
throw std::runtime_error("unable to create zenoh subscription. ");
}
}

// Shutdown the Zenoh session.
Expand All @@ -258,7 +265,16 @@ class rmw_context_impl_s::Data final
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
zenoh::ZResult err;
std::move(graph_subscriber_cpp_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}

if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
Expand All @@ -267,12 +283,6 @@ class rmw_context_impl_s::Data final
// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}

// // Close the zenoh session
// if (z_close(z_loan_mut(session_), NULL) != Z_OK) {
// RMW_SET_ERROR_MSG("Error while closing zenoh session");
// return RMW_RET_ERROR;
// }
session_.reset();
return RMW_RET_OK;
}
Expand Down Expand Up @@ -387,17 +397,17 @@ class rmw_context_impl_s::Data final
nodes_.erase(node);
}

void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr)
void update_graph_cache(const zenoh::Sample& sample_kind, const std::string & keystr)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return;
}
switch (sample_kind) {
case z_sample_kind_t::Z_SAMPLE_KIND_PUT:
switch (sample_kind.get_kind()) {
case zenoh::SampleKind::Z_SAMPLE_KIND_PUT:
graph_cache_->parse_put(keystr);
break;
case z_sample_kind_t::Z_SAMPLE_KIND_DELETE:
case zenoh::SampleKind::Z_SAMPLE_KIND_DELETE:
graph_cache_->parse_del(keystr);
break;
default:
Expand All @@ -409,7 +419,7 @@ class rmw_context_impl_s::Data final
if (RMW_RET_OK != rmw_ret) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"[graph_sub_data_handler] Unable to trigger graph guard condition."
"[update_graph_cache] Unable to trigger graph guard condition."
);
}
}
Expand Down Expand Up @@ -437,7 +447,7 @@ class rmw_context_impl_s::Data final
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
z_owned_subscriber_t graph_subscriber_;
std::optional<zenoh::Subscriber<void>> graph_subscriber_cpp_;
// Equivalent to rmw_dds_common::Context's guard condition.
// Guard condition that should be triggered when the graph changes.
std::unique_ptr<rmw_guard_condition_t> graph_guard_condition_;
Expand All @@ -451,37 +461,6 @@ class rmw_context_impl_s::Data final
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
};

///=============================================================================
static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data)
{
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);

auto data_ptr = static_cast<rmw_context_impl_s::Data *>(data);
if (data_ptr == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"[graph_sub_data_handler] Invalid data_ptr."
);
return;
}

// Look up the data shared_ptr in the global map. If it is in there, use it.
// If not, it is being shutdown so we can just ignore this update.
std::shared_ptr<rmw_context_impl_s::Data> data_shared_ptr{nullptr};
{
std::lock_guard<std::mutex> lk(data_to_data_shared_ptr_map_mutex);
if (data_to_data_shared_ptr_map.count(data_ptr) == 0) {
return;
}
data_shared_ptr = data_to_data_shared_ptr_map[data_ptr];
}

// Update the graph cache.
std::string str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
data_shared_ptr->update_graph_cache(z_sample_kind(sample), str);
}

///=============================================================================
rmw_context_impl_s::rmw_context_impl_s(
const std::size_t domain_id,
Expand Down

0 comments on commit dc2476e

Please sign in to comment.