diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..d8aab5829c 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -82,6 +82,19 @@ class Executor virtual void spin() = 0; + /// Version of spin() that takes an exception handler to be called when a callback throws. + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * \param[in] exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) = 0; + /// Add a callback group to an executor. /** * An executor can have zero or more callback groups which provide work during `spin` functions. @@ -462,7 +475,7 @@ class Executor /// Find the next available executable and do the work associated with it. /** - * \param[in] any_exec Union structure that can hold any executable type (timer, subscription, + * \param[in] any_exec Structure that can hold any executable type (timer, subscription, * service, client). * \throws std::runtime_error if there is an issue triggering the guard condition */ @@ -470,6 +483,19 @@ class Executor void execute_any_executable(AnyExecutable & any_exec); + /// Find the next available executable and do the work associated with it. + /** + * \param[in] any_exec Structure that can hold any executable type (timer, subscription, + * service, client). + * \param[in] exception_handler will be called for every exception in the processing threads + * \throws std::runtime_error if there is an issue triggering the guard condition + */ + RCLCPP_PUBLIC + void + execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler); + /// Run subscription executable. /** * Do necessary setup and tear-down as well as executing the subscription. diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..01b4bd11e2 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -69,6 +69,18 @@ class MultiThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * \param[in] exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + RCLCPP_PUBLIC size_t get_number_of_threads(); @@ -76,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor protected: RCLCPP_PUBLIC void - run(size_t this_thread_number); + run( + size_t this_thread_number, + const std::function & exception_handler); private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..986ce16a54 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -65,6 +65,18 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + /// Single-threaded implementation of spin. + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * \param[in] exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) }; diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 6f22909caf..ebed7797d2 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -68,6 +68,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * \param[in] exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) override; + /// Static executor implementation of spin some /** * This non-blocking function will execute entities that @@ -125,6 +135,11 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin_once_impl(std::chrono::nanoseconds timeout) override; + void + spin_once_impl( + std::chrono::nanoseconds timeout, + const std::function & exception_handler); + std::optional> collect_and_wait(std::chrono::nanoseconds timeout); diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index dd5b1ebe63..d86724e557 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * \param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + /// Events executor implementation of spin some /** * This non-blocking function will execute the timers and events diff --git a/rclcpp/include/rclcpp/experimental/timers_manager.hpp b/rclcpp/include/rclcpp/experimental/timers_manager.hpp index af3337bfd6..23580f505b 100644 --- a/rclcpp/include/rclcpp/experimental/timers_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/timers_manager.hpp @@ -125,9 +125,14 @@ class TimersManager /** * @brief Starts a thread that takes care of executing the timers stored in this object. * Function will throw an error if the timers thread was already running. + * + * @param exception_handler if valid, the execution of the timer will be done in a try catch block, + * and any occurring exception will be passed to the given handler */ RCLCPP_PUBLIC - void start(); + void start( + const std::function & exception_handler = std::function()); /** * @brief Stops the timers thread. @@ -511,6 +516,11 @@ class TimersManager */ void run_timers(); + /** + * @brief calls run_timers with a try catch block. + */ + void run_timers(const std::function & exception_handler); + /** * @brief Get the amount of time before the next timer triggers. * This function is not thread safe, acquire a mutex before calling it. @@ -528,7 +538,7 @@ class TimersManager * while keeping the heap correctly sorted. * This function is not thread safe, acquire the timers_mutex_ before calling it. */ - void execute_ready_timers_unsafe(); + void execute_ready_timers_unsafe(std::function exception_handler); // Callback to be called when timer is ready std::functioncan_be_taken_from().store(true); } +template +void +execute_guarded( + const Function & function, + const std::function & exception_handler) +{ + try { + function(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning: " << e.what()); + + exception_handler(e); + } +} + +void +Executor::execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler) +{ + if (!spinning.load()) { + return; + } + + assert( + (void("cannot execute an AnyExecutable without a valid callback group"), + any_exec.callback_group)); + + if (any_exec.timer) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.timer->get_timer_handle().get())); + execute_guarded([&any_exec]() { + execute_timer(any_exec.timer, any_exec.data); + }, exception_handler); + } + if (any_exec.subscription) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.subscription->get_subscription_handle().get())); + execute_guarded( + [&any_exec]() { + execute_subscription(any_exec.subscription); + }, exception_handler); + } + if (any_exec.service) { + execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler); + } + if (any_exec.client) { + execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler); + } + if (any_exec.waitable) { + execute_guarded([&any_exec]() { + const std::shared_ptr & const_data = any_exec.data; + any_exec.waitable->execute(const_data); + }, exception_handler); + } + // Reset the callback_group, regardless of type + any_exec.callback_group->can_be_taken_from().store(true); +} + template static void diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 2d5c76e817..ad6b5cc541 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -51,6 +51,12 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() +{ + spin([](const std::exception & e) {throw e;}); +} + +void +MultiThreadedExecutor::spin(const std::function & exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -61,12 +67,12 @@ MultiThreadedExecutor::spin() { std::lock_guard wait_lock{wait_mutex_}; for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler); threads.emplace_back(func); } } - run(thread_id); + run(thread_id, exception_handler); for (auto & thread : threads) { thread.join(); } @@ -79,7 +85,9 @@ MultiThreadedExecutor::get_number_of_threads() } void -MultiThreadedExecutor::run(size_t this_thread_number) +MultiThreadedExecutor::run( + size_t this_thread_number, + const std::function & exception_handler) { (void)this_thread_number; while (rclcpp::ok(this->context_) && spinning.load()) { @@ -97,7 +105,7 @@ MultiThreadedExecutor::run(size_t this_thread_number) std::this_thread::yield(); } - execute_any_executable(any_exec); + execute_any_executable(any_exec, exception_handler); if (any_exec.callback_group && any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 975733b497..e627b81a0d 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -26,6 +26,13 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() +{ + spin([](const std::exception & e) {throw e;}); +} + +void +SingleThreadedExecutor::spin( + const std::function & exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -39,7 +46,7 @@ SingleThreadedExecutor::spin() while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_executable; if (get_next_executable(any_executable)) { - execute_any_executable(any_executable); + execute_any_executable(any_executable, exception_handler); } } } diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index d517ccafd0..160f1dd895 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -29,6 +29,14 @@ StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} void StaticSingleThreadedExecutor::spin() +{ + spin([](const std::exception & e) {throw e;}); +} + + +void +StaticSingleThreadedExecutor::spin( + const std::function & exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -39,7 +47,7 @@ StaticSingleThreadedExecutor::spin() // except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor // behavior. while (rclcpp::ok(this->context_) && spinning.load()) { - this->spin_once_impl(std::chrono::nanoseconds(-1)); + this->spin_once_impl(std::chrono::nanoseconds(-1), exception_handler); } } @@ -97,12 +105,32 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati void StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) +{ + spin_once_impl(timeout, std::function()); +} + +void +StaticSingleThreadedExecutor::spin_once_impl( + std::chrono::nanoseconds timeout, + const std::function & exception_handler) { if (rclcpp::ok(context_) && spinning.load()) { std::lock_guard guard(mutex_); auto wait_result = this->collect_and_wait(timeout); if (wait_result.has_value()) { - this->execute_ready_executables(current_collection_, wait_result.value(), true); + if(exception_handler) { + try { + this->execute_ready_executables(current_collection_, wait_result.value(), true); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } else { + this->execute_ready_executables(current_collection_, wait_result.value(), true); + } } } } diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ce6a103ab2..9f91ca0827 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -126,6 +126,35 @@ EventsExecutor::spin() } } +void +EventsExecutor::spin(const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + timers_manager_->start(exception_handler); + RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); ); + + while (rclcpp::ok(context_) && spinning.load()) { + // Wait until we get an event + ExecutorEvent event; + bool has_event = events_queue_->dequeue(event); + if (has_event) { + try { + this->execute_event(event); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } + } +} + void EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) { diff --git a/rclcpp/src/rclcpp/experimental/timers_manager.cpp b/rclcpp/src/rclcpp/experimental/timers_manager.cpp index 2caa0a6b15..4c62f7464e 100644 --- a/rclcpp/src/rclcpp/experimental/timers_manager.cpp +++ b/rclcpp/src/rclcpp/experimental/timers_manager.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rcpputils/scope_exit.hpp" @@ -71,14 +72,17 @@ void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) } } -void TimersManager::start() +void TimersManager::start(const std::function & exception_handler) { // Make sure that the thread is not already running if (running_.exchange(true)) { throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); } - timers_thread_ = std::thread(&TimersManager::run_timers, this); + timers_thread_ = std::thread( + [this, exception_handler]() { + run_timers(exception_handler); + }); } void TimersManager::stop() @@ -203,7 +207,8 @@ std::optional TimersManager::get_head_timeout_unsafe() return head_timer->time_until_trigger(); } -void TimersManager::execute_ready_timers_unsafe() +void TimersManager::execute_ready_timers_unsafe( + std::function exception_handler) { // We start by locking the timers TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); @@ -226,7 +231,19 @@ void TimersManager::execute_ready_timers_unsafe() if (on_ready_callback_) { on_ready_callback_(head_timer.get(), data); } else { - head_timer->execute_callback(data); + if (exception_handler) { + try { + head_timer->execute_callback(data); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } else { + head_timer->execute_callback(data); + } } } else { // someone canceled the timer between is_ready and call @@ -245,7 +262,8 @@ void TimersManager::execute_ready_timers_unsafe() weak_timers_heap_.store(locked_heap); } -void TimersManager::run_timers() +void TimersManager::run_timers( + const std::function & exception_handler) { // Make sure the running flag is set to false when we exit from this function // to allow restarting the timers thread. @@ -289,7 +307,7 @@ void TimersManager::run_timers() timers_updated_ = false; // Execute timers - this->execute_ready_timers_unsafe(); + this->execute_ready_timers_unsafe(exception_handler); } } diff --git a/rclcpp/test/benchmark/benchmark_parameter_client.cpp b/rclcpp/test/benchmark/benchmark_parameter_client.cpp index 0d9a6fd9c9..b59c029dd0 100644 --- a/rclcpp/test/benchmark/benchmark_parameter_client.cpp +++ b/rclcpp/test/benchmark/benchmark_parameter_client.cpp @@ -46,7 +46,10 @@ class RemoteNodeTest : public benchmark::Fixture remote_node_name, rclcpp::NodeOptions().context(remote_context)); remote_executor->add_node(remote_node); - remote_thread = std::thread(&rclcpp::executors::SingleThreadedExecutor::spin, remote_executor); + remote_thread = + std::thread( + static_cast(&rclcpp:: + executors::SingleThreadedExecutor::spin), remote_executor); } void TearDown(benchmark::State &) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1a538eaa30..cf654575e7 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -133,6 +133,52 @@ TYPED_TEST(TestExecutors, emptyExecutor) spinner.join(); } +TYPED_TEST(TestExecutors, catch_exception) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::shared_ptr node = + std::make_shared("test_executor_catch_exception"); + + const std::string test_reason = "test exception"; + + std::atomic_bool timer_executed_after_exception = false; + + size_t cnt = 0; + + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor, &cnt]() + { + if (cnt == 0) { + cnt++; + throw std::runtime_error(test_reason); + } + + timer_executed_after_exception = true; + + executor.cancel(); + }); + + std::atomic_bool caught_exception = false; + + executor.add_node(node); + executor.spin( + [&caught_exception, &test_reason](const std::exception & e) + { + const std::runtime_error * runtime_error = dynamic_cast(&e); + ASSERT_NE(runtime_error, nullptr); + + ASSERT_EQ(runtime_error->what(), test_reason); + + caught_exception = true; + } + ); + + ASSERT_TRUE(caught_exception); + ASSERT_TRUE(timer_executed_after_exception); +} + + // Check executor throws properly if the same node is added a second time TYPED_TEST(TestExecutors, addNodeTwoExecutors) { diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp index 49391cd838..3d9a8bf32b 100644 --- a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -42,6 +42,8 @@ class CustomExecutor : public rclcpp::Executor void spin() override {} + void spin(const std::function &) override {} + void collect() { this->collect_entities(); diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index 668ab96797..4302f51107 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -42,6 +42,10 @@ class DummyExecutor : public rclcpp::Executor { } + void spin(const std::function &) override + { + } + void spin_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100)); diff --git a/rclcpp_lifecycle/test/benchmark/benchmark_lifecycle_client.cpp b/rclcpp_lifecycle/test/benchmark/benchmark_lifecycle_client.cpp index 59ed24489e..d6e672e6f6 100644 --- a/rclcpp_lifecycle/test/benchmark/benchmark_lifecycle_client.cpp +++ b/rclcpp_lifecycle/test/benchmark/benchmark_lifecycle_client.cpp @@ -203,7 +203,7 @@ class BenchmarkLifecycleClient : public performance_test_fixture::PerformanceTes executor = std::make_shared(); executor->add_node(lifecycle_node->get_node_base_interface()); executor->add_node(lifecycle_client->get_node_base_interface()); - spinner_ = std::thread(&rclcpp::executors::SingleThreadedExecutor::spin, executor); + spinner_ = std::thread([this]() {executor->spin();}); performance_test_fixture::PerformanceTest::SetUp(state); }