Skip to content

Commit

Permalink
Merge branch 'ros2' into asymingt/aggregate-discard-stale-gracefully
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Henkel <[email protected]>
  • Loading branch information
ct2034 committed Dec 5, 2023
2 parents dd445d2 + 0cc6cc6 commit f79d570
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 2 deletions.
5 changes: 5 additions & 0 deletions diagnostic_aggregator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ if(BUILD_TESTING)
)
endforeach()

add_launch_test(
test/test_critical_pub.py
TIMEOUT 30
)

ament_add_pytest_test(test_discard_behavior
"${CMAKE_CURRENT_SOURCE_DIR}/test/test_discard_behavior.py"
TIMEOUT 60
Expand Down
10 changes: 10 additions & 0 deletions diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ class Aggregator

std::string base_path_; /**< \brief Prepended to all status names of aggregator. */

/*!
*\brief If true, aggregator will publish an error immediately after receiving.
*/
bool critical_;

/*!
*\brief Store the last top level value to publish the critical error only once.
*/
std::uint8_t last_top_level_state_;

/// Records all ROS warnings. No warnings are repeated.
std::set<std::string> ros_warnings_;

Expand Down
28 changes: 27 additions & 1 deletion diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ Aggregator::Aggregator()
pub_rate_(1.0),
history_depth_(1000),
clock_(n_->get_clock()),
base_path_("")
base_path_(""),
critical_(false),
last_top_level_state_(DiagnosticStatus::STALE)
{
RCLCPP_DEBUG(logger_, "constructor");
bool other_as_errors = false;
Expand All @@ -88,12 +90,16 @@ Aggregator::Aggregator()
other_as_errors = param.second.as_bool();
} else if (param.first.compare("history_depth") == 0) {
history_depth_ = param.second.as_int();
} else if (param.first.compare("critical") == 0) {
critical_ = param.second.as_bool();
}
}
RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_);
RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str());
RCLCPP_DEBUG(
logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false"));
RCLCPP_DEBUG(
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));

analyzer_group_ = std::make_unique<AnalyzerGroup>();
if (!analyzer_group_->init(base_path_, "", n_)) {
Expand Down Expand Up @@ -149,6 +155,24 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
std::lock_guard<std::mutex> lock(mutex_);
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
analyzed = false;

const bool top_level_state_transition_to_error =
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
(diag_msg->status[j].level == DiagnosticStatus::ERROR);

if (critical_ && top_level_state_transition_to_error) {
RCLCPP_DEBUG(
logger_, "Received error message: %s, publishing error immediately",
diag_msg->status[j].name.c_str());
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state_critical";
diag_toplevel_state.level = diag_msg->status[j].level;
toplevel_state_pub_->publish(diag_toplevel_state);

// store the last published state
last_top_level_state_ = diag_toplevel_state.level;
}

auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);

if (analyzer_group_->match(item->getName())) {
Expand Down Expand Up @@ -217,6 +241,8 @@ void Aggregator::publishData()
// have stale items but not all are stale
diag_toplevel_state.level = DiagnosticStatus::ERROR;
}
last_top_level_state_ = diag_toplevel_state.level;

toplevel_state_pub_->publish(diag_toplevel_state);
}

Expand Down
103 changes: 103 additions & 0 deletions diagnostic_aggregator/test/test_critical_pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import unittest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_testing.actions import ReadyToTest
import pytest
import rclpy


@pytest.mark.launch_test
def generate_test_description():
# Launch the aggregator
parameters = [{'analyzers.test.type': 'diagnostic_aggregator/GenericAnalyzer'},
{'analyzers.test.path': 'Test'},
{'analyzers.test.contains': ['test']},
{'critical': True}]

aggregator_cmd = Node(
package='diagnostic_aggregator',
executable='aggregator_node',
name='diagnostic_aggregator',
parameters=parameters)

ld = LaunchDescription()

# Launch the node
ld.add_action(aggregator_cmd)
ld.add_action(ReadyToTest())
return ld


class TestProcessOutput(unittest.TestCase):

@ classmethod
def setUpClass(cls):
# Initialize the ROS context for the test node
rclpy.init()

@ classmethod
def tearDownClass(cls):
# Shutdown the ROS context
rclpy.shutdown()

def setUp(self):
# Create a ROS node for tests
self.node = rclpy.create_node('test_node')
self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1)
self.subscriber = self.node.create_subscription(
DiagnosticStatus,
'/diagnostics_toplevel_state',
lambda msg: self.received_state.append(msg.level),
1)
self.received_state = []

def tearDown(self):
self.node.destroy_node()

def publish_message(self, level):
msg = DiagnosticArray()
msg.status.append(DiagnosticStatus())
msg.status[0].level = level
msg.status[0].name = 'test status'
while len(self.received_state) == 0:
self.publisher.publish(msg)
rclpy.spin_once(self.node)

return self.node.get_clock().now()

def test_critical_publisher(self):
# Publish the ok message and wait till the toplevel state is received
state = DiagnosticStatus.OK
time_0 = self.publish_message(state)

assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Publish the ok message and expect the toplevel state after 1 second period
time_1 = self.publish_message(state)
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
'OK message received too early'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Publish the message and expect the critical error message immediately
state = DiagnosticStatus.ERROR
time_2 = self.publish_message(state)

assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
'Critical error message not received within 0.1 second'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
self.received_state.clear()

# Next error message should be sent at standard 1 second rate
time_3 = self.publish_message(state)

assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
'Periodic error message received too early'
assert (self.received_state[0] == state), \
'Received state is not the same as the sent state'
12 changes: 12 additions & 0 deletions diagnostic_updater/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,22 @@ if(BUILD_TESTING)
"rclcpp_lifecycle"
"std_msgs"
)
ament_add_gtest(status_msg_test test/status_msg_test.cpp)
target_include_directories(status_msg_test
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(
status_msg_test
"diagnostic_msgs"
"rclcpp"
)

find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(diagnostic_updater_test.py "test/diagnostic_updater_test.py")
ament_add_pytest_test(test_DiagnosticStatusWrapper.py "test/test_diagnostic_status_wrapper.py")
ament_add_pytest_test(status_msg_test.py "test/status_msg_test.py")
endif()

ament_python_install_package(${PROJECT_NAME})
Expand Down
15 changes: 14 additions & 1 deletion diagnostic_updater/diagnostic_updater/_diagnostic_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,19 @@ def __init__(self, node, period=1.0):
self.hwid = ''
self.warn_nohwid_done = False

self.use_fqn_parameter = 'diagnostic_updater.use_fqn'
if self.node.has_parameter(self.use_fqn_parameter):
self.__use_fqn = self.node.get_parameter(
self.use_fqn_parameter).value
else:
self.__use_fqn = self.node.declare_parameter(
self.use_fqn_parameter, False).value

if self.__use_fqn:
self.node_name = '/'.join([self.node.get_namespace(), self.node.get_name()])
else:
self.node_name = self.node.get_name()

def update(self):
"""
Update the diagnostics.
Expand Down Expand Up @@ -352,7 +365,7 @@ def publish(self, msg):
da = DiagnosticArray()
da.header.stamp = now.to_msg() # Add timestamp for ROS 0.10
for stat in msg:
stat.name = self.node.get_name() + ': ' + stat.name
stat.name = self.node_name + ': ' + stat.name
db = DiagnosticStatus()
db.name = stat.name
db.message = stat.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,17 @@ class Updater : public DiagnosticTaskVector
period_ = rclcpp::Duration::from_seconds(period);

reset_timer();

constexpr const char * use_fqn_param_name = "diagnostic_updater.use_fqn";
rclcpp::ParameterValue use_fqn_param;
if (parameters_interface->has_parameter(use_fqn_param_name)) {
use_fqn_param = parameters_interface->get_parameter(use_fqn_param_name).get_parameter_value();
} else {
use_fqn_param = parameters_interface->declare_parameter(
use_fqn_param_name, rclcpp::ParameterValue(false));
}
node_name_ = use_fqn_param.get<bool>() ?
base_interface->get_fully_qualified_name() : base_interface->get_name();
}

/**
Expand Down
1 change: 1 addition & 0 deletions diagnostic_updater/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<test_depend>ament_lint_common</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_testing</test_depend>
<!-- <test_depend>launch_testing_ros</test_depend> -->
<test_depend>python3-pytest</test_depend>
<test_depend>rclcpp_lifecycle</test_depend>

Expand Down
Loading

0 comments on commit f79d570

Please sign in to comment.