From 46bb08580a3f11fb590fff62c2ebeb48175df3c7 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 18 Dec 2024 17:43:15 +0100 Subject: [PATCH] fix(ExecutorNotifyWaitable): Keep strong references to used guard conditions A waitable should make sure, that all entities it adds to the waitset are alive during the wait. The ExecutorNotifyWaitable did not do this, leading to crashes, if a callback group was dropped while waiting. This commit changes this behavior, by holding shared pointers to the used guard condition. Also while at it, fixed a possible race, were a trigger could get lost. Optimized the is_ready call by using the indices returned by rcl. Signed-off-by: Janosch Machowinski --- .../executors/executor_notify_waitable.hpp | 18 +++- rclcpp/src/rclcpp/executor.cpp | 19 +--- .../executors/executor_notify_waitable.cpp | 100 ++++++++++++------ 3 files changed, 83 insertions(+), 54 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp index 7ae1c3b875..57f366f649 100644 --- a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rclcpp/guard_condition.hpp" #include "rclcpp/waitable.hpp" @@ -41,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable * of this waitable has signaled the wait_set. */ RCLCPP_PUBLIC - explicit ExecutorNotifyWaitable(std::function on_execute_callback = {}); + explicit ExecutorNotifyWaitable( + std::function on_execute_callback = {}, const rclcpp::Context::SharedPtr & context = + rclcpp::contexts::get_global_default_context()); // Destructor RCLCPP_PUBLIC @@ -158,8 +161,17 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable std::function on_ready_callback_; /// The collection of guard conditions to be waited on. - std::set> notify_guard_conditions_; + std::set notify_guard_conditions_; + + /// The indixes were our guard conditions were stored in the + /// rcl waitset + std::vector idxs_of_added_guard_condition_; + + /// set to true, if we got a pending trigger + bool needs_processing = false; + + /// A guard condition needed to generate wakeups + rclcpp::GuardCondition::SharedPtr guard_condition_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 160709fb98..260b921044 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) notify_waitable_(std::make_shared( [this]() { this->entities_need_rebuild_.store(true); - })), + }, options.context)), entities_need_rebuild_(true), collector_(notify_waitable_), wait_set_({}, {}, {}, {}, {}, {}, options.context), @@ -732,33 +732,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // Clear any previous wait result this->wait_result_.reset(); - // we need to make sure that callback groups don't get out of scope - // during the wait. As in jazzy, they are not covered by the DynamicStorage, - // we explicitly hold them here as a bugfix - std::vector cbgs; - { std::lock_guard guard(mutex_); if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } - - auto callback_groups = this->collector_.get_all_callback_groups(); - cbgs.resize(callback_groups.size()); - for(const auto & w_ptr : callback_groups) { - auto shr_ptr = w_ptr.lock(); - if(shr_ptr) { - cbgs.push_back(std::move(shr_ptr)); - } - } } this->wait_result_.emplace(wait_set_.wait(timeout)); - // drop references to the callback groups, before trying to execute anything - cbgs.clear(); - if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { RCUTILS_LOG_WARN_NAMED( "rclcpp", diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 2e62f9dd1a..9a8475112c 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -20,8 +20,11 @@ namespace rclcpp namespace executors { -ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function on_execute_callback) -: execute_callback_(on_execute_callback) +ExecutorNotifyWaitable::ExecutorNotifyWaitable( + std::function on_execute_callback, + const rclcpp::Context::SharedPtr & context) +: execute_callback_(on_execute_callback), + guard_condition_(std::make_shared(context)) { } @@ -30,6 +33,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other) std::lock_guard lock(other.guard_condition_mutex_); this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; + this->guard_condition_ = other.guard_condition_; + this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_; + this->needs_processing = other.needs_processing; } ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other) @@ -38,6 +44,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl std::lock_guard lock(other.guard_condition_mutex_); this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; + this->guard_condition_ = other.guard_condition_; + this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_; + this->needs_processing = other.needs_processing; } return *this; } @@ -47,21 +56,42 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set) { std::lock_guard lock(guard_condition_mutex_); + idxs_of_added_guard_condition_.clear(); + idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size()); + + if(needs_processing) { + rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition(); + size_t rcl_index; + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + + idxs_of_added_guard_condition_.push_back(rcl_index); + + // we want to directly wake up any way, not need to add the other guard conditions + guard_condition_->trigger(); + + return; + } + // Note: no guard conditions need to be re-triggered, since the guard // conditions in this class are not tracking a stateful condition, but instead // only serve to interrupt the wait set when new information is available to // consider. - for (auto weak_guard_condition : this->notify_guard_conditions_) { - auto guard_condition = weak_guard_condition.lock(); - if (!guard_condition) {continue;} - + for (const auto & guard_condition : this->notify_guard_conditions_) { rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition(); - rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL); + size_t rcl_index; + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( ret, "failed to add guard condition to wait set"); } + + idxs_of_added_guard_condition_.push_back(rcl_index); } } @@ -71,20 +101,23 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set) std::lock_guard lock(guard_condition_mutex_); bool any_ready = false; - for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) { - const auto * rcl_guard_condition = wait_set.guard_conditions[ii]; + for (size_t rcl_index : idxs_of_added_guard_condition_) { + if(rcl_index >= wait_set.size_of_guard_conditions) { + throw std::runtime_error( + "ExecutorNotifyWaitable::is_ready: Internal error, got index out of range"); + } + + const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index]; if (nullptr == rcl_guard_condition) { continue; } - for (const auto & weak_guard_condition : this->notify_guard_conditions_) { - auto guard_condition = weak_guard_condition.lock(); - if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { - any_ready = true; - break; - } - } + + any_ready = true; + needs_processing = true; + break; } + return any_ready; } @@ -92,6 +125,9 @@ void ExecutorNotifyWaitable::execute(const std::shared_ptr & /*data*/) { std::lock_guard lock(execute_mutex_); + + needs_processing = false; + this->execute_callback_(); } @@ -122,11 +158,7 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function c std::lock_guard lock(guard_condition_mutex_); on_ready_callback_ = gc_callback; - for (auto weak_gc : notify_guard_conditions_) { - auto gc = weak_gc.lock(); - if (!gc) { - continue; - } + for (const auto & gc : notify_guard_conditions_) { gc->set_on_trigger_callback(on_ready_callback_); } } @@ -138,11 +170,7 @@ ExecutorNotifyWaitable::clear_on_ready_callback() std::lock_guard lock(guard_condition_mutex_); on_ready_callback_ = nullptr; - for (auto weak_gc : notify_guard_conditions_) { - auto gc = weak_gc.lock(); - if (!gc) { - continue; - } + for (const auto & gc : notify_guard_conditions_) { gc->set_on_trigger_callback(nullptr); } } @@ -159,9 +187,9 @@ void ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition) { std::lock_guard lock(guard_condition_mutex_); - auto guard_condition = weak_guard_condition.lock(); - if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) { - notify_guard_conditions_.insert(weak_guard_condition); + const auto & guard_condition = weak_guard_condition.lock(); + if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) { + notify_guard_conditions_.insert(guard_condition); if (on_ready_callback_) { guard_condition->set_on_trigger_callback(on_ready_callback_); } @@ -172,11 +200,17 @@ void ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition) { std::lock_guard lock(guard_condition_mutex_); - if (notify_guard_conditions_.count(weak_guard_condition) != 0) { - notify_guard_conditions_.erase(weak_guard_condition); - auto guard_condition = weak_guard_condition.lock(); + const auto & guard_condition = weak_guard_condition.lock(); + if (!guard_condition) { + // If the lock did not work, the guard condition can't be + // saved in the sets, therefore we don't need to remove it + return; + } + auto it = notify_guard_conditions_.find(guard_condition); + if (it != notify_guard_conditions_.end()) { + notify_guard_conditions_.erase(it); // If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset - if (guard_condition && on_ready_callback_) { + if (on_ready_callback_) { guard_condition->set_on_trigger_callback(nullptr); } }