diff --git a/diagnostic_aggregator/CMakeLists.txt b/diagnostic_aggregator/CMakeLists.txt index f0559ea2b..fd6c2a070 100644 --- a/diagnostic_aggregator/CMakeLists.txt +++ b/diagnostic_aggregator/CMakeLists.txt @@ -123,6 +123,11 @@ if(BUILD_TESTING) ENV ) endforeach() + + add_launch_test( + test/test_critical_pub.py + TIMEOUT 30 + ) endif() install( diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp index 24b898bc6..b901acdc0 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp @@ -152,6 +152,16 @@ class Aggregator std::string base_path_; /**< \brief Prepended to all status names of aggregator. */ + /*! + *\brief If true, aggregator will publish an error immediately after receiving. + */ + bool critical_; + + /*! + *\brief Store the last top level value to publish the critical error only once. + */ + std::uint8_t last_top_level_state_; + /// Records all ROS warnings. No warnings are repeated. std::set ros_warnings_; diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index 78d7aaadb..d9576c737 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -64,7 +64,9 @@ Aggregator::Aggregator() pub_rate_(1.0), history_depth_(1000), clock_(n_->get_clock()), - base_path_("") + base_path_(""), + critical_(false), + last_top_level_state_(DiagnosticStatus::STALE) { RCLCPP_DEBUG(logger_, "constructor"); bool other_as_errors = false; @@ -88,12 +90,16 @@ Aggregator::Aggregator() other_as_errors = param.second.as_bool(); } else if (param.first.compare("history_depth") == 0) { history_depth_ = param.second.as_int(); + } else if (param.first.compare("critical") == 0) { + critical_ = param.second.as_bool(); } } RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_); RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str()); RCLCPP_DEBUG( logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false")); + RCLCPP_DEBUG( + logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false")); analyzer_group_ = std::make_unique(); if (!analyzer_group_->init(base_path_, "", n_)) { @@ -149,6 +155,24 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg) std::lock_guard lock(mutex_); for (auto j = 0u; j < diag_msg->status.size(); ++j) { analyzed = false; + + const bool top_level_state_transition_to_error = + (last_top_level_state_ != DiagnosticStatus::ERROR) && + (diag_msg->status[j].level == DiagnosticStatus::ERROR); + + if (critical_ && top_level_state_transition_to_error) { + RCLCPP_DEBUG( + logger_, "Received error message: %s, publishing error immediately", + diag_msg->status[j].name.c_str()); + DiagnosticStatus diag_toplevel_state; + diag_toplevel_state.name = "toplevel_state_critical"; + diag_toplevel_state.level = diag_msg->status[j].level; + toplevel_state_pub_->publish(diag_toplevel_state); + + // store the last published state + last_top_level_state_ = diag_toplevel_state.level; + } + auto item = std::make_shared(&diag_msg->status[j]); if (analyzer_group_->match(item->getName())) { @@ -217,6 +241,8 @@ void Aggregator::publishData() // have stale items but not all are stale diag_toplevel_state.level = DiagnosticStatus::ERROR; } + last_top_level_state_ = diag_toplevel_state.level; + toplevel_state_pub_->publish(diag_toplevel_state); } diff --git a/diagnostic_aggregator/test/test_critical_pub.py b/diagnostic_aggregator/test/test_critical_pub.py new file mode 100644 index 000000000..824a06e3d --- /dev/null +++ b/diagnostic_aggregator/test/test_critical_pub.py @@ -0,0 +1,103 @@ +import unittest + +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus +from launch import LaunchDescription +from launch_ros.actions import Node +from launch_testing.actions import ReadyToTest +import pytest +import rclpy + + +@pytest.mark.launch_test +def generate_test_description(): + # Launch the aggregator + parameters = [{'analyzers.test.type': 'diagnostic_aggregator/GenericAnalyzer'}, + {'analyzers.test.path': 'Test'}, + {'analyzers.test.contains': ['test']}, + {'critical': True}] + + aggregator_cmd = Node( + package='diagnostic_aggregator', + executable='aggregator_node', + name='diagnostic_aggregator', + parameters=parameters) + + ld = LaunchDescription() + + # Launch the node + ld.add_action(aggregator_cmd) + ld.add_action(ReadyToTest()) + return ld + + +class TestProcessOutput(unittest.TestCase): + + @ classmethod + def setUpClass(cls): + # Initialize the ROS context for the test node + rclpy.init() + + @ classmethod + def tearDownClass(cls): + # Shutdown the ROS context + rclpy.shutdown() + + def setUp(self): + # Create a ROS node for tests + self.node = rclpy.create_node('test_node') + self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1) + self.subscriber = self.node.create_subscription( + DiagnosticStatus, + '/diagnostics_toplevel_state', + lambda msg: self.received_state.append(msg.level), + 1) + self.received_state = [] + + def tearDown(self): + self.node.destroy_node() + + def publish_message(self, level): + msg = DiagnosticArray() + msg.status.append(DiagnosticStatus()) + msg.status[0].level = level + msg.status[0].name = 'test status' + while len(self.received_state) == 0: + self.publisher.publish(msg) + rclpy.spin_once(self.node) + + return self.node.get_clock().now() + + def test_critical_publisher(self): + # Publish the ok message and wait till the toplevel state is received + state = DiagnosticStatus.OK + time_0 = self.publish_message(state) + + assert (self.received_state[0] == state), \ + 'Received state is not the same as the sent state' + self.received_state.clear() + + # Publish the ok message and expect the toplevel state after 1 second period + time_1 = self.publish_message(state) + assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \ + 'OK message received too early' + assert (self.received_state[0] == state), \ + 'Received state is not the same as the sent state' + self.received_state.clear() + + # Publish the message and expect the critical error message immediately + state = DiagnosticStatus.ERROR + time_2 = self.publish_message(state) + + assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \ + 'Critical error message not received within 0.1 second' + assert (self.received_state[0] == state), \ + 'Received state is not the same as the sent state' + self.received_state.clear() + + # Next error message should be sent at standard 1 second rate + time_3 = self.publish_message(state) + + assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \ + 'Periodic error message received too early' + assert (self.received_state[0] == state), \ + 'Received state is not the same as the sent state'