Skip to content

Commit

Permalink
fix(ExecutorNotifyWaitable): Keep strong references to used guard con…
Browse files Browse the repository at this point in the history
…ditions

A waitable should make sure, that all entities it adds to the waitset
are alive during the wait. The ExecutorNotifyWaitable did not do this,
leading to crashes, if a callback group was dropped while waiting.
This commit changes this behavior, by holding shared pointers to the
used guard condition.
Also while at it, fixed a possible race, were a trigger could get lost.
Optimized the is_ready call by using the indices returned by rcl.

Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jan 17, 2025
1 parent 7e29c68 commit 46bb085
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 54 deletions.
18 changes: 15 additions & 3 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <set>
#include <vector>

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -41,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
* of this waitable has signaled the wait_set.
*/
RCLCPP_PUBLIC
explicit ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback = {});
explicit ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback = {}, const rclcpp::Context::SharedPtr & context =
rclcpp::contexts::get_global_default_context());

// Destructor
RCLCPP_PUBLIC
Expand Down Expand Up @@ -158,8 +161,17 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
std::function<void(size_t)> on_ready_callback_;

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
std::set<rclcpp::GuardCondition::SharedPtr> notify_guard_conditions_;

/// The indixes were our guard conditions were stored in the
/// rcl waitset
std::vector<size_t> idxs_of_added_guard_condition_;

/// set to true, if we got a pending trigger
bool needs_processing = false;

/// A guard condition needed to generate wakeups
rclcpp::GuardCondition::SharedPtr guard_condition_;
};

} // namespace executors
Expand Down
19 changes: 1 addition & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->entities_need_rebuild_.store(true);
})),
}, options.context)),
entities_need_rebuild_(true),
collector_(notify_waitable_),
wait_set_({}, {}, {}, {}, {}, {}, options.context),
Expand Down Expand Up @@ -732,33 +732,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Clear any previous wait result
this->wait_result_.reset();

// we need to make sure that callback groups don't get out of scope
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
// we explicitly hold them here as a bugfix
std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

{
std::lock_guard<std::mutex> guard(mutex_);

if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
cbgs.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
cbgs.push_back(std::move(shr_ptr));
}
}
}

this->wait_result_.emplace(wait_set_.wait(timeout));

// drop references to the callback groups, before trying to execute anything
cbgs.clear();

if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
Expand Down
100 changes: 67 additions & 33 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ namespace rclcpp
namespace executors
{

ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
: execute_callback_(on_execute_callback)
ExecutorNotifyWaitable::ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback,
const rclcpp::Context::SharedPtr & context)
: execute_callback_(on_execute_callback),
guard_condition_(std::make_shared<rclcpp::GuardCondition>(context))
{
}

Expand All @@ -30,6 +33,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other)
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}

ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
Expand All @@ -38,6 +44,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}
return *this;
}
Expand All @@ -47,21 +56,42 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

idxs_of_added_guard_condition_.clear();
idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());

if(needs_processing) {
rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition();
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);

// we want to directly wake up any way, not need to add the other guard conditions
guard_condition_->trigger();

return;
}

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}

for (const auto & guard_condition : this->notify_guard_conditions_) {
rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);
}
}

Expand All @@ -71,27 +101,33 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

bool any_ready = false;
for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
const auto * rcl_guard_condition = wait_set.guard_conditions[ii];
for (size_t rcl_index : idxs_of_added_guard_condition_) {
if(rcl_index >= wait_set.size_of_guard_conditions) {
throw std::runtime_error(
"ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
}

const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index];

if (nullptr == rcl_guard_condition) {
continue;
}
for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
any_ready = true;
break;
}
}

any_ready = true;
needs_processing = true;
break;
}

return any_ready;
}

void
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
{
std::lock_guard<std::mutex> lock(execute_mutex_);

needs_processing = false;

this->execute_callback_();
}

Expand Down Expand Up @@ -122,11 +158,7 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> c
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = gc_callback;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
for (const auto & gc : notify_guard_conditions_) {
gc->set_on_trigger_callback(on_ready_callback_);
}
}
Expand All @@ -138,11 +170,7 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = nullptr;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
for (const auto & gc : notify_guard_conditions_) {
gc->set_on_trigger_callback(nullptr);
}
}
Expand All @@ -159,9 +187,9 @@ void
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
notify_guard_conditions_.insert(weak_guard_condition);
const auto & guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) {
notify_guard_conditions_.insert(guard_condition);
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(on_ready_callback_);
}
Expand All @@ -172,11 +200,17 @@ void
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
notify_guard_conditions_.erase(weak_guard_condition);
auto guard_condition = weak_guard_condition.lock();
const auto & guard_condition = weak_guard_condition.lock();
if (!guard_condition) {
// If the lock did not work, the guard condition can't be
// saved in the sets, therefore we don't need to remove it
return;
}
auto it = notify_guard_conditions_.find(guard_condition);
if (it != notify_guard_conditions_.end()) {
notify_guard_conditions_.erase(it);
// If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
if (guard_condition && on_ready_callback_) {
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(nullptr);
}
}
Expand Down

0 comments on commit 46bb085

Please sign in to comment.