Skip to content

Commit

Permalink
feat: publish top level msg when error is received
Browse files Browse the repository at this point in the history
  • Loading branch information
outrider-jhulas committed Nov 29, 2023
1 parent 24fb79a commit f7638ae
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
5 changes: 5 additions & 0 deletions diagnostic_aggregator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ if(BUILD_TESTING)
ENV
)
endforeach()

add_launch_test(
test/test_critical_pub.py
TIMEOUT 30
)
endif()

install(
Expand Down
10 changes: 10 additions & 0 deletions diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ class Aggregator

std::string base_path_; /**< \brief Prepended to all status names of aggregator. */

/*!
*\brief If true, aggregator will publish an error immediately after receiving.
*/
bool critical_;

/*!
*\brief Store the last top level value to publish the critical error only once.
*/
std::uint8_t last_top_level_state_;

/// Records all ROS warnings. No warnings are repeated.
std::set<std::string> ros_warnings_;

Expand Down
28 changes: 27 additions & 1 deletion diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ Aggregator::Aggregator()
pub_rate_(1.0),
history_depth_(1000),
clock_(n_->get_clock()),
base_path_("")
base_path_(""),
critical_(false),
last_top_level_state_(DiagnosticStatus::STALE)
{
RCLCPP_DEBUG(logger_, "constructor");
bool other_as_errors = false;
Expand All @@ -88,12 +90,16 @@ Aggregator::Aggregator()
other_as_errors = param.second.as_bool();
} else if (param.first.compare("history_depth") == 0) {
history_depth_ = param.second.as_int();
} else if (param.first.compare("critical") == 0) {
critical_ = param.second.as_bool();
}
}
RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_);
RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str());
RCLCPP_DEBUG(
logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false"));
RCLCPP_DEBUG(
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));

analyzer_group_ = std::make_unique<AnalyzerGroup>();
if (!analyzer_group_->init(base_path_, "", n_)) {
Expand Down Expand Up @@ -149,6 +155,24 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
std::lock_guard<std::mutex> lock(mutex_);
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
analyzed = false;

const bool top_level_state_transition_to_error =
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
(diag_msg->status[j].level == DiagnosticStatus::ERROR);

if (critical_ && top_level_state_transition_to_error) {
RCLCPP_DEBUG(
logger_, "Received error message: %s, publishing error immediately",
diag_msg->status[j].name.c_str());
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state_critical";
diag_toplevel_state.level = diag_msg->status[j].level;
toplevel_state_pub_->publish(diag_toplevel_state);

// store the last published state
last_top_level_state_ = diag_toplevel_state.level;
}

auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);

if (analyzer_group_->match(item->getName())) {
Expand Down Expand Up @@ -217,6 +241,8 @@ void Aggregator::publishData()
// have stale items but not all are stale
diag_toplevel_state.level = DiagnosticStatus::ERROR;
}
last_top_level_state_ = diag_toplevel_state.level;

toplevel_state_pub_->publish(diag_toplevel_state);
}

Expand Down
103 changes: 103 additions & 0 deletions diagnostic_aggregator/test/test_critical_pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import unittest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_testing.actions import ReadyToTest
import pytest
import rclpy


@pytest.mark.launch_test
def generate_test_description():
# Launch the aggregator
parameters = [{'analyzers.test.type': 'diagnostic_aggregator/GenericAnalyzer'},
{'analyzers.test.path': 'Test'},
{'analyzers.test.contains': ['test']},
{'critical': True}]

aggregator_cmd = Node(
package='diagnostic_aggregator',
executable='aggregator_node',
name='diagnostic_aggregator',
parameters=parameters)

ld = LaunchDescription()

# Launch the node
ld.add_action(aggregator_cmd)
ld.add_action(ReadyToTest())
return ld


class TestProcessOutput(unittest.TestCase):

@ classmethod
def setUpClass(cls):
# Initialize the ROS context for the test node
rclpy.init()

@ classmethod
def tearDownClass(cls):
# Shutdown the ROS context
rclpy.shutdown()

def setUp(self):
# Create a ROS node for tests
self.node = rclpy.create_node('test_node')
self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1)
self.subscriber = self.node.create_subscription(
DiagnosticStatus,
'/diagnostics_toplevel_state',
lambda msg: self.received_state.append(msg.level),
1)
self.received_state = []

def tearDown(self):
self.node.destroy_node()

def publish_message(self, level):
msg = DiagnosticArray()
msg.status.append(DiagnosticStatus())
msg.status[0].level = level
msg.status[0].name = 'test status'
while len(self.received_state) == 0:
self.publisher.publish(msg)
rclpy.spin_once(self.node)

return self.node.get_clock().now()

def test_critical_publisher(self):
# Publish the ok message and wait till the toplevel state is received
state = DiagnosticStatus.OK
time_0 = self.publish_message(state)

assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Publish the ok message and expect the toplevel state after 1 second period
time_1 = self.publish_message(state)
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
'OK message received too early'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Publish the message and expect the critical error message immediately
state = DiagnosticStatus.ERROR
time_2 = self.publish_message(state)

assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
'Critical error message not received within 0.1 second'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Next error message should be sent at standard 1 second rate
time_3 = self.publish_message(state)

assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
'Periodic error message received too early'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'

0 comments on commit f7638ae

Please sign in to comment.