Skip to content

Commit

Permalink
Aggregator: publish diagnostics_toplevel_state immediately on every d…
Browse files Browse the repository at this point in the history
…egradation (ros#324)

* Aggregator: publish diagnostics_toplevel_state immediately on every degradation
* Update docs
* Re-use the publishData function such that also the actual diagnostics are available
* Expand test with more degradation cases

(cherry picked from commit dbaec04)
  • Loading branch information
Timple committed Jun 27, 2024
1 parent cee513a commit d80ba59
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 33 deletions.
28 changes: 10 additions & 18 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,11 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
checkTimestamp(diag_msg);

bool analyzed = false;
bool immediate_report = false;
{ // lock the whole loop to ensure nothing in the analyzer group changes during it.
std::lock_guard<std::mutex> lock(mutex_);
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
analyzed = false;

const bool top_level_state_transition_to_error =
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
(diag_msg->status[j].level == DiagnosticStatus::ERROR);

if (critical_ && top_level_state_transition_to_error) {
RCLCPP_DEBUG(
logger_, "Received error message: %s, publishing error immediately",
diag_msg->status[j].name.c_str());
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state_critical";
diag_toplevel_state.level = diag_msg->status[j].level;
toplevel_state_pub_->publish(diag_toplevel_state);

// store the last published state
last_top_level_state_ = diag_toplevel_state.level;
}

auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);

if (analyzer_group_->match(item->getName())) {
Expand All @@ -182,8 +165,17 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
if (!analyzed) {
other_analyzer_->analyze(item);
}

// In case there is a degraded state, publish immediately
if (critical_ && item->getLevel() > last_top_level_state_) {
immediate_report = true;
}
}
}

if (immediate_report) {
publishData();
}
}

Aggregator::~Aggregator()
Expand Down
45 changes: 30 additions & 15 deletions diagnostic_aggregator/test/test_critical_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,41 +67,56 @@ def publish_message(self, level):
rclpy.spin_once(self.node)
return self.node.get_clock().now()

def test_critical_publisher(self):
def critical_publisher_test(
self, initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
):
# Publish the ok message and wait till the toplevel state is received
state = DiagnosticStatus.OK
time_0 = self.publish_message(state)
time_0 = self.publish_message(initial_state)

assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the ok message and expect the toplevel state after 1 second period
time_1 = self.publish_message(state)
time_1 = self.publish_message(initial_state)
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
'OK message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the message and expect the critical error message immediately
state = DiagnosticStatus.ERROR
time_2 = self.publish_message(state)
time_2 = self.publish_message(new_state)

assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
'Critical error message not received within 0.1 second'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")
self.received_state.clear()

# Next error message should be sent at standard 1 second rate
time_3 = self.publish_message(state)
time_3 = self.publish_message(new_state)

assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
'Periodic error message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")

def test_critical_publisher_ok_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
)

def test_critical_publisher_ok_warn(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.WARN
)

def test_critical_publisher_warn_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.WARN, new_state=DiagnosticStatus.ERROR
)

0 comments on commit d80ba59

Please sign in to comment.