Skip to content

Commit

Permalink
Merge branch 'ros2' into fix/small_updater_fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ct2034 committed Jun 27, 2024
2 parents 311750e + dbaec04 commit 005ccd6
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 38 deletions.
28 changes: 10 additions & 18 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,11 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
checkTimestamp(diag_msg);

bool analyzed = false;
bool immediate_report = false;
{ // lock the whole loop to ensure nothing in the analyzer group changes during it.
std::lock_guard<std::mutex> lock(mutex_);
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
analyzed = false;

const bool top_level_state_transition_to_error =
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
(diag_msg->status[j].level == DiagnosticStatus::ERROR);

if (critical_ && top_level_state_transition_to_error) {
RCLCPP_DEBUG(
logger_, "Received error message: %s, publishing error immediately",
diag_msg->status[j].name.c_str());
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state_critical";
diag_toplevel_state.level = diag_msg->status[j].level;
toplevel_state_pub_->publish(diag_toplevel_state);

// store the last published state
last_top_level_state_ = diag_toplevel_state.level;
}

auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);

if (analyzer_group_->match(item->getName())) {
Expand All @@ -182,8 +165,17 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
if (!analyzed) {
other_analyzer_->analyze(item);
}

// In case there is a degraded state, publish immediately
if (critical_ && item->getLevel() > last_top_level_state_) {
immediate_report = true;
}
}
}

if (immediate_report) {
publishData();
}
}

Aggregator::~Aggregator()
Expand Down
45 changes: 30 additions & 15 deletions diagnostic_aggregator/test/test_critical_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,41 +67,56 @@ def publish_message(self, level):
rclpy.spin_once(self.node)
return self.node.get_clock().now()

def test_critical_publisher(self):
def critical_publisher_test(
self, initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
):
# Publish the ok message and wait till the toplevel state is received
state = DiagnosticStatus.OK
time_0 = self.publish_message(state)
time_0 = self.publish_message(initial_state)

assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the ok message and expect the toplevel state after 1 second period
time_1 = self.publish_message(state)
time_1 = self.publish_message(initial_state)
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
'OK message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == initial_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{initial_state}'")
self.received_state.clear()

# Publish the message and expect the critical error message immediately
state = DiagnosticStatus.ERROR
time_2 = self.publish_message(state)
time_2 = self.publish_message(new_state)

assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
'Critical error message not received within 0.1 second'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")
self.received_state.clear()

# Next error message should be sent at standard 1 second rate
time_3 = self.publish_message(state)
time_3 = self.publish_message(new_state)

assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
'Periodic error message received too early'
assert (self.received_state[0] == state), \
assert (self.received_state[0] == new_state), \
('Received state is not the same as the sent state:'
+ f"'{self.received_state[0]}' != '{state}'")
+ f"'{self.received_state[0]}' != '{new_state}'")

def test_critical_publisher_ok_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.ERROR
)

def test_critical_publisher_ok_warn(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.OK, new_state=DiagnosticStatus.WARN
)

def test_critical_publisher_warn_error(self):
self.critical_publisher_test(
initial_state=DiagnosticStatus.WARN, new_state=DiagnosticStatus.ERROR
)
1 change: 1 addition & 0 deletions diagnostic_common_diagnostics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ament_python_install_package(${PROJECT_NAME})
install(PROGRAMS
${PROJECT_NAME}/cpu_monitor.py
${PROJECT_NAME}/ntp_monitor.py
${PROJECT_NAME}/ram_monitor.py
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
22 changes: 21 additions & 1 deletion diagnostic_common_diagnostics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,27 @@ Disable self test.
**To be ported**

## ram_monitor.py
**To be ported**
The `ram_monitor` module allows users to monitor the RAM usage of their system in real-time.
It publishes the usage percentage in a diagnostic message.

* Name of the node is "ram_monitor_" + hostname.
* Uses the following args:
* warning_percentage: If the RAM usage is > warning_percentage, a WARN status will be published.
* window: the maximum length of the used collections.deque for queuing RAM readings.

### Published Topics
#### /diagnostics
diagnostic_msgs/DiagnosticArray
The diagnostics information.

### Parameters
#### warning_percentage
(default: 90)
warning percentage threshold.

#### window
(default: 1)
Length of RAM readings queue.

## sensors_monitor.py
**To be ported**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, ntp_hostname, ntp_port, offset=500, self_offset=500,
do_self_test=True):
"""Initialize the NTPMonitor."""
super().__init__(__class__.__name__)
self.declare_parameter('frequency', 10.0)
frequency = self.get_parameter('frequency').get_parameter_value().double_value

self.ntp_hostname = ntp_hostname
self.ntp_port = ntp_port
Expand Down Expand Up @@ -85,8 +87,8 @@ def __init__(self, ntp_hostname, ntp_port, offset=500, self_offset=500,

# we need to periodically republish this
self.current_msg = None
self.pubtimer = self.create_timer(0.1, self.pubCB)
self.checktimer = self.create_timer(0.1, self.checkCB)
self.pubtimer = self.create_timer(1/frequency, self.pubCB)
self.checktimer = self.create_timer(1/frequency, self.checkCB)

def pubCB(self):
with self.mutex:
Expand All @@ -95,6 +97,7 @@ def pubCB(self):

def checkCB(self):
new_msg = DIAG.DiagnosticArray()
new_msg.header.stamp = self.get_clock().now().to_msg()

st = self.ntp_diag(self.stat)
if st is not None:
Expand Down Expand Up @@ -159,7 +162,7 @@ def add_kv(stat_values, key, value):

def ntp_monitor_main(argv=sys.argv[1:]):
# filter out ROS args
argv = [a for a in argv if not a.startswith('__') and not a == '--ros-args' and not a == '-r']
argv = argv[:argv.index('--ros-args')] if '--ros-args' in argv else argv

import argparse
parser = argparse.ArgumentParser()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
#
# Software License Agreement (BSD License)
#
# Copyright (c) 2017, TNO IVS, Helmond, Netherlands
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the TNO IVS nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# \author Rein Appeldoorn

import collections
import socket

from diagnostic_msgs.msg import DiagnosticStatus

from diagnostic_updater import DiagnosticTask, Updater

import psutil

import rclpy


class RamTask(DiagnosticTask):

def __init__(self, warning_percentage, window):
DiagnosticTask.__init__(self, 'RAM Information')
self._warning_percentage = int(warning_percentage)
self._readings = collections.deque(maxlen=window)

def run(self, stat):
self._readings.append(psutil.virtual_memory().percent)
ram_average = sum(self._readings) / len(self._readings)

stat.add('RAM Load Average', f'{ram_average:.2f}')

if ram_average > self._warning_percentage:
stat.summary(
DiagnosticStatus.WARN,
f'RAM Average exceeds {self._warning_percentage:d} percent',
)
else:
stat.summary(DiagnosticStatus.OK, f'RAM Average {ram_average:.2f} percent')

return stat


def main():
hostname = socket.gethostname()
rclpy.init()
node = rclpy.create_node(f'ram_monitor_{hostname.replace("-", "_")}')

updater = Updater(node)
updater.setHardwareID(hostname)
updater.add(
RamTask(
node.declare_parameter('warning_percentage', 90).value,
node.declare_parameter('window', 1).value,
)
)

rclpy.spin(node)


if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ class Updater : public DiagnosticTaskVector
*/
void update()
{
if (rclcpp::ok()) {
if (rclcpp::ok(base_interface_->get_context())) {
bool warn_nohwid = hwid_.empty();

std::vector<diagnostic_msgs::msg::DiagnosticStatus> status_vec;
Expand Down
36 changes: 36 additions & 0 deletions diagnostic_updater/test/diagnostic_updater_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,42 @@ TEST(DiagnosticUpdater, testUpdaterAsNodeClassMember) {
SUCCEED();
}

class SaveIfCalled : public diagnostic_updater::DiagnosticTask
{
public:
SaveIfCalled()
: DiagnosticTask("SaveIfCalled") {}

void run(diagnostic_updater::DiagnosticStatusWrapper & s)
{
s.summary(0, "Have been called!");
has_been_called_ = true;
}

bool has_been_called() const
{
return has_been_called_;
}

private:
bool has_been_called_ = false;
};


TEST(DiagnosticUpdater, testCustomContext) {
auto context = std::make_shared<rclcpp::Context>();
context->init(0, nullptr, rclcpp::InitOptions());

auto node =
std::make_shared<rclcpp::Node>("CustomContextNode", rclcpp::NodeOptions().context(context));
diagnostic_updater::Updater updater(node);
SaveIfCalled save_if_called;
updater.add(save_if_called);
updater.force_update();
ASSERT_TRUE(save_if_called.has_been_called());
context->shutdown("End test");
}

TEST(DiagnosticUpdater, testDiagnosticStatusWrapperKeyValuePairs) {
diagnostic_updater::DiagnosticStatusWrapper stat;

Expand Down

0 comments on commit 005ccd6

Please sign in to comment.