Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator: publish diagnostics_toplevel_state immediately on every degradation #324

Merged
merged 4 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions diagnostic_aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The robot has two of each, one on each side.
The robot also 4 camera sensors, one left and one right and one in the front and one in the back.
These are all the available diagnostic sources:

```
```
/arms/left/motor
/arms/right/motor
/legs/left/motor
Expand Down Expand Up @@ -119,6 +119,9 @@ Additional parameters depend on the type of the analyzer.
Any diagnostic item that is not matched by any analyzer will be published by an "Other" analyzer.
Items created by the "Other" analyzer will go stale after 5 seconds.

The `critical` parameter makes the aggregator react immediately to a degradation in diagnostic state.
This is useful if the toplevel state is parsed by a watchdog for example.

## Launching
You can launch the `aggregator_node` like this (see [example.launch.py.in](example/example.launch.py.in)):
``` python
Expand Down Expand Up @@ -186,4 +189,4 @@ This means that things that are ignored by the `IgnoreAnalyzer` will still be pu
- `analyzers` (map, default: {}) - The analyzers that will be used to aggregate the diagnostics

# Tutorials
TODO: Port tutorials #contributions-welcome
TODO: Port tutorials #contributions-welcome
9 changes: 5 additions & 4 deletions diagnostic_aggregator/mainpage.dox
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See Analyzer for more information on the base class.

\subsubsection generic_analyzer GenericAnalyzer

\b generic_analyzer holds the GenericAnalyzer class, which is the most basic of the Analyzer's. It is used by the diagnostic_aggregator/Aggregator to store, process and republish diagnostics data. The GenericAnalyzer is loaded by the pluginlib as a Analyzer plugin. It is the most basic of all Analyzer's.
\b generic_analyzer holds the GenericAnalyzer class, which is the most basic of the Analyzer's. It is used by the diagnostic_aggregator/Aggregator to store, process and republish diagnostics data. The GenericAnalyzer is loaded by the pluginlib as a Analyzer plugin. It is the most basic of all Analyzer's.

\subsubsection analyzer_group AnalyzerGroup

Expand All @@ -37,10 +37,10 @@ aggregator_node subscribes to "/diagnostics" and publishes an aggregated set of
\subsubsection topics ROS topics

Subscribes to:
- \b "/diagnostics": [diagnostics_msgs/DiagnosticArray]
- \b "/diagnostics": [diagnostics_msgs/DiagnosticArray]

Publishes to:
- \b "/diagnostics_agg": [diagnostics_msgs/DiagnosticArray]
- \b "/diagnostics_agg": [diagnostics_msgs/DiagnosticArray]

\subsubsection parameters ROS parameters

Expand All @@ -49,6 +49,7 @@ Reads the following parameters from the parameter server
- \b "~pub_rate" : \b double [optional] Rate that output diagnostics published
- \b "~base_path" : \b double [optional] Prepended to all analyzed output
- \b "~analyzers" : \b {} Configuration for loading analyzers
- \b "~critical" : \b bool [optional] React immediately to a degradation in diagnostic state

\subsection analyzer_loader analyzer_loader

Expand All @@ -61,4 +62,4 @@ Reads the following parameters from the parameter server
- \b "~analyzers" : \b {} Configuration for loading and testing analyzers


*/
*/
28 changes: 10 additions & 18 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,11 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
checkTimestamp(diag_msg);

bool analyzed = false;
bool immediate_report = false;
{ // lock the whole loop to ensure nothing in the analyzer group changes during it.
std::lock_guard<std::mutex> lock(mutex_);
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
analyzed = false;

const bool top_level_state_transition_to_error =
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
(diag_msg->status[j].level == DiagnosticStatus::ERROR);

if (critical_ && top_level_state_transition_to_error) {
RCLCPP_DEBUG(
logger_, "Received error message: %s, publishing error immediately",
diag_msg->status[j].name.c_str());
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state_critical";
diag_toplevel_state.level = diag_msg->status[j].level;
toplevel_state_pub_->publish(diag_toplevel_state);

// store the last published state
last_top_level_state_ = diag_toplevel_state.level;
}

auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);

if (analyzer_group_->match(item->getName())) {
Expand All @@ -182,8 +165,17 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
if (!analyzed) {
other_analyzer_->analyze(item);
}

// In case there is a degraded state, publish immediately
if (critical_ && item->getLevel() > last_top_level_state_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to problems, because stale is larger than everything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes? Would it be bad to publish immediately if items become stale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ct2034 is this a problem if we publish immediately on stale?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I guess you are right

immediate_report = true;
}
}
}

if (immediate_report) {
publishData();
}
}

Aggregator::~Aggregator()
Expand Down
45 changes: 30 additions & 15 deletions diagnostic_aggregator/test/test_critical_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,41 +67,56 @@ def publish_message(self, level):
rclpy.spin_once(self.node)
return self.node.get_clock().now()

def test_critical_publisher(self):
def critical_publisher_test(
self, initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
):
# Publish the ok message and wait till the toplevel state is received
state = DiagnosticStatus.OK
time_0 = self.publish_message(state)
time_0 = self.publish_message(initial_state)

assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the ok message and expect the toplevel state after 1 second period
time_1 = self.publish_message(state)
time_1 = self.publish_message(initial_state)
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
'OK message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the message and expect the critical error message immediately
state = DiagnosticStatus.ERROR
time_2 = self.publish_message(state)
time_2 = self.publish_message(new_state)

assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
'Critical error message not received within 0.1 second'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")
self.received_state.clear()

# Next error message should be sent at standard 1 second rate
time_3 = self.publish_message(state)
time_3 = self.publish_message(new_state)

assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
'Periodic error message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")

def test_critical_publisher_ok_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
)

def test_critical_publisher_ok_warn(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.WARN
)

def test_critical_publisher_warn_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.WARN, new_state=DiagnosticStatus.ERROR
)
Loading