From cf67b0dc5af68355cbbedad719b590b16cf7ba8c Mon Sep 17 00:00:00 2001 From: David Conner Date: Tue, 30 Apr 2024 20:08:19 -0400 Subject: [PATCH] Add throttle option for logging (PR #14) --- flexbe_core/flexbe_core/logger.py | 99 ++++++++-- flexbe_core/test/flexbe_logger_test.py | 67 +++++++ flexbe_core/test/test_logger.py | 262 +++++++++++++++++++++++++ 3 files changed, 413 insertions(+), 15 deletions(-) create mode 100644 flexbe_core/test/flexbe_logger_test.py create mode 100644 flexbe_core/test/test_logger.py diff --git a/flexbe_core/flexbe_core/logger.py b/flexbe_core/flexbe_core/logger.py index 54afc90..a2ada01 100644 --- a/flexbe_core/flexbe_core/logger.py +++ b/flexbe_core/flexbe_core/logger.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2023 Philipp Schillinger, Team ViGIR, Christopher Newport University +# Copyright 2024 Philipp Schillinger, Team ViGIR, Christopher Newport University # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -31,7 +31,9 @@ """Realize behavior-specific logging.""" +from rclpy.exceptions import ParameterNotDeclaredException from rclpy.node import Node +from rclpy.duration import Duration from flexbe_msgs.msg import BehaviorLog @@ -47,6 +49,10 @@ class Logger: LOGGING_TOPIC = 'flexbe/log' + # max number of items in last logged dict (used for log_throttle) + MAX_LAST_LOGGED_SIZE = 1024 + LAST_LOGGED_CLEARING_RATIO = 0.2 + _pub = None _node = None @@ -54,6 +60,26 @@ class Logger: def initialize(node: Node): Logger._node = node Logger._pub = node.create_publisher(BehaviorLog, Logger.LOGGING_TOPIC, 100) + Logger._last_logged = {} + + # Optional parameters that can be defined + try: + size_param = node.get_parameter("max_throttle_logging_size") + if size_param.type_ == size_param.Type.INTEGER: + Logger.MAX_LAST_LOGGED_SIZE = size_param.value + except ParameterNotDeclaredException as exc: + pass + + try: + clear_param = node.get_parameter("throttle_logging_clear_ratio") + if clear_param.type_ in (clear_param.Type.INTEGER, clear_param.Type.DOUBLE): + Logger.LAST_LOGGED_CLEARING_RATIO = clear_param.value + except ParameterNotDeclaredException as exc: + pass + + Logger._node.get_logger().debug(f"Enable throttle logging option with " + f"max size={Logger.MAX_LAST_LOGGED_SIZE} " + f"clear ratio={Logger.LAST_LOGGED_CLEARING_RATIO}") @staticmethod def log(text: str, severity: int): @@ -67,6 +93,25 @@ def log(text: str, severity: int): # also log locally Logger.local(text, severity) + @staticmethod + def log_throttle(period : float, text: str, severity : int): + # create unique identifier for each logging message + log_id = f"{severity}_{text}" + time_now = Logger._node.get_clock().now() + # only log when it's the first time or period time has passed for the logging message + if not log_id in Logger._last_logged.keys() or \ + time_now - Logger._last_logged[log_id] > Duration(seconds=period): + Logger.log(text, severity) + Logger._last_logged.update({log_id: time_now}) + + if len(Logger._last_logged) > Logger.MAX_LAST_LOGGED_SIZE: + # iterate through last logged items, sorted by the timestamp (oldest last) + clear_size = Logger.MAX_LAST_LOGGED_SIZE * (1 - Logger.LAST_LOGGED_CLEARING_RATIO) + for i, log in enumerate(sorted(Logger._last_logged.items(), key=lambda item: item[1], reverse=True)): + # remove defined percentage of oldest items + if i > clear_size: + Logger._last_logged.pop(log[0]) + @staticmethod def local(text: str, severity: int): if Logger._node is None: @@ -87,57 +132,81 @@ def local(text: str, severity: int): # NOTE: Below text strings can only have single % symbols if they are being treated # as format strings with appropriate arguments (otherwise replace with %% for simple string without args) @staticmethod - def logdebug(text, *args): + def logdebug(text: str, *args): Logger.log(text % args, Logger.REPORT_DEBUG) @staticmethod - def loginfo(text, *args): + def loginfo(text: str, *args): Logger.log(text % args, Logger.REPORT_INFO) @staticmethod - def logwarn(text, *args): + def logwarn(text: str, *args): Logger.log(text % args, Logger.REPORT_WARN) @staticmethod - def loghint(text, *args): + def loghint(text: str, *args): Logger.log(text % args, Logger.REPORT_HINT) @staticmethod - def logerr(text, *args): + def logerr(text: str, *args): Logger.log(text % args, Logger.REPORT_ERROR) @staticmethod - def localdebug(text, *args): + def logdebug_throttle(period: float, text: str, *args): + Logger.log_throttle(period, text % args, Logger.REPORT_DEBUG) + + @staticmethod + def loginfo_throttle(period: float, text: str, *args): + Logger.log_throttle(period, text % args, Logger.REPORT_INFO) + + @staticmethod + def logwarn_throttle(period: float, text: str, *args): + Logger.log_throttle(period, text % args, Logger.REPORT_WARN) + + @staticmethod + def loghint_throttle(period: float, text: str, *args): + Logger.log_throttle(period, text % args, Logger.REPORT_HINT) + + @staticmethod + def logerr_throttle(period: float, text: str, *args): + Logger.log_throttle(period, text % args, Logger.REPORT_ERROR) + + @staticmethod + def localdebug(text: str, *args): Logger.local(text % args, Logger.REPORT_DEBUG) @staticmethod - def localinfo(text, *args): + def localinfo(text: str, *args): Logger.local(text % args, Logger.REPORT_INFO) @staticmethod - def localwarn(text, *args): + def localwarn(text: str, *args): Logger.local(text % args, Logger.REPORT_WARN) @staticmethod - def localerr(text, *args): + def localhint(text: str, *args): + Logger.local(text % args, Logger.REPORT_HINT) + + @staticmethod + def localerr(text: str, *args): Logger.local(text % args, Logger.REPORT_ERROR) @staticmethod - def debug(text, *args): + def debug(text: str, *args): Logger.logdebug(text, *args) @staticmethod - def info(text, *args): + def info(text: str, *args): Logger.loginfo(text, *args) @staticmethod - def warning(text, *args): + def warning(text: str, *args): Logger.logwarn(text, *args) @staticmethod - def hint(text, *args): + def hint(text: str, *args): Logger.loghint(text, *args) @staticmethod - def error(text, *args): + def error(text: str, *args): Logger.logerr(text, *args) diff --git a/flexbe_core/test/flexbe_logger_test.py b/flexbe_core/test/flexbe_logger_test.py new file mode 100644 index 0000000..e182d2f --- /dev/null +++ b/flexbe_core/test/flexbe_logger_test.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Christopher Newport University +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the Philipp Schillinger, Team ViGIR, Christopher Newport University nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +"""Test description for test proxies.""" +import os +import sys +import launch +import launch_testing.actions +import pytest + + +@pytest.mark.rostest +def generate_test_description(): + + path_to_test = os.path.dirname(__file__) + + TEST_PROC_PATH = os.path.join(path_to_test, 'test_logger.py') + + # This is necessary to get unbuffered output from the process under test + proc_env = os.environ.copy() + proc_env['PYTHONUNBUFFERED'] = '1' + + test_logger = launch.actions.ExecuteProcess( + cmd=[sys.executable, TEST_PROC_PATH], + env=proc_env, + output='screen', + sigterm_timeout=launch.substitutions.LaunchConfiguration('sigterm_timeout', default=90), + sigkill_timeout=launch.substitutions.LaunchConfiguration('sigkill_timeout', default=90) + ) + + return ( + launch.LaunchDescription([ + test_logger, + launch_testing.actions.ReadyToTest() + ]), + { + 'test_logger': test_logger, + } + ) diff --git a/flexbe_core/test/test_logger.py b/flexbe_core/test/test_logger.py new file mode 100644 index 0000000..8f2aed8 --- /dev/null +++ b/flexbe_core/test/test_logger.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Christopher Newport University +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the Philipp Schillinger, Team ViGIR, Christopher Newport University nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +"""Test FlexBE Exception handling.""" +import time +import unittest + +import rclpy + +from rclpy.executors import MultiThreadedExecutor +from flexbe_core import set_node, EventState, OperatableStateMachine +from flexbe_core.core.exceptions import StateError, StateMachineError, UserDataError +from flexbe_core.proxy import initialize_proxies, shutdown_proxies +from flexbe_core.logger import Logger + +class TestLogger(unittest.TestCase): + """Test FlexBE Logger handling.""" + + test = 0 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def setUp(self): + TestLogger.test += 1 + self.context = rclpy.context.Context() + rclpy.init(context=self.context) + + self.executor = MultiThreadedExecutor(context=self.context) + self.node = rclpy.create_node("logger_test_" + str(self.test), context=self.context) + self.node.get_logger().info(" set up logger test %d (%d) ... " % (self.test, self.context.ok())) + self.executor.add_node(self.node) + initialize_proxies(self.node) + + def tearDown(self): + self.node.get_logger().info(" shutting down logger test %d (%d) ... " % (self.test, self.context.ok())) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + + self.node.get_logger().info(" shutting down proxies in logger test %d ... " % (self.test)) + shutdown_proxies() + time.sleep(0.1) + + self.node.get_logger().info(" destroy node in core test %d ... " % (self.test)) + self.node.destroy_node() + + time.sleep(0.1) + self.executor.shutdown() + time.sleep(0.1) + + # Kill it with fire to make sure not stray published topics are available + rclpy.shutdown(context=self.context) + time.sleep(0.2) + + def test_throttle_logger_one(self): + self.node.get_logger().info("test_throttle_logger_one ...") + self.node.declare_parameter("max_throttle_logging_size", 100) + self.node.declare_parameter("throttle_logging_clear_ratio", 0.25) + set_node(self.node) # Update the logger node + + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + OperatableStateMachine.initialize_ros(self.node) + node = self.node + + class ThrottleSingleLog(EventState): + """Local Test state definition.""" + + def __init__(self): + self.initialize_ros(node) + super().__init__(outcomes=['done']) + self._trials = Logger.MAX_LAST_LOGGED_SIZE*2 + Logger.logerr_throttle(0.0, "test") + + def execute(self, userdata): + Logger.logerr_throttle(0.0, "test") + self._trials -= 1 + if self._trials == 0: + return 'done' + + state_instance = ThrottleSingleLog() + sm = OperatableStateMachine(outcomes=['done']) + with sm: + OperatableStateMachine.add('state', state_instance, transitions={'done': 'done'}) + outcome = None + self.assertEqual(Logger.MAX_LAST_LOGGED_SIZE, 100) # default size + self.assertAlmostEqual(Logger.LAST_LOGGED_CLEARING_RATIO, 0.25) # default ratio + self.assertEqual(state_instance._trials, Logger.MAX_LAST_LOGGED_SIZE*2 ) + while outcome is None: + outcome = sm.execute(None) + self.assertEqual(len(Logger._last_logged), 1) + self.assertEqual(outcome, "done") + self.assertEqual(state_instance._trials, 0 ) + + self.assertIsNone(sm._last_exception) + self.node.get_logger().info("test_throttle_logger_one - OK! ") + + def test_throttle_logger_err_multi(self): + self.node.get_logger().info("test_throttle_logger_err_multi ...") + self.node.declare_parameter("max_throttle_logging_size", 200) + self.node.declare_parameter("throttle_logging_clear_ratio", 0.35) + set_node(self.node) # Update the logger node + + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + OperatableStateMachine.initialize_ros(self.node) + node = self.node + + class ThrottleMultiLog(EventState): + """Local Test state definition.""" + + def __init__(self): + self.initialize_ros(node) + super().__init__(outcomes=['done']) + self._trials = Logger.MAX_LAST_LOGGED_SIZE*2 + Logger.logerr_throttle(0.01, f"0_test") + + def execute(self, userdata): + Logger.logerr_throttle(0.01, f"{self._trials}_test") + self._trials -= 1 + if self._trials == 0: + return 'done' + + state_instance = ThrottleMultiLog() + sm = OperatableStateMachine(outcomes=['done']) + with sm: + OperatableStateMachine.add('state', state_instance, transitions={'done': 'done'}) + outcome = None + self.assertEqual(Logger.MAX_LAST_LOGGED_SIZE, 200) # default size + self.assertAlmostEqual(Logger.LAST_LOGGED_CLEARING_RATIO, 0.35) # default ratio + self.assertEqual(state_instance._trials, Logger.MAX_LAST_LOGGED_SIZE*2 ) + while outcome is None: + outcome = sm.execute(None) + self.assertTrue(1 < len(Logger._last_logged) <= Logger.MAX_LAST_LOGGED_SIZE) + self.assertEqual(outcome, "done") + self.assertEqual(state_instance._trials, 0) + + self.assertIsNone(sm._last_exception) + self.node.get_logger().info("test_throttle_logger_err_multi - OK! ") + + def test_throttle_logger_multiple_params(self): + self.node.get_logger().info("test_throttle_logger_multiple_params ...") + self.node.declare_parameter("max_throttle_logging_size", 100) + self.node.declare_parameter("throttle_logging_clear_ratio", 0.7) + + set_node(self.node) # Update the logger node + + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + OperatableStateMachine.initialize_ros(self.node) + node = self.node + + class ThrottleMultiLog(EventState): + """Local Test state definition.""" + + def __init__(self): + self.initialize_ros(node) + super().__init__(outcomes=['done']) + self._trials = Logger.MAX_LAST_LOGGED_SIZE*2 + Logger.logerr_throttle(0.01, f"0_test") + + def execute(self, userdata): + Logger.logerr(f"{self._trials}_test") + Logger.logerr_throttle(0.0, f"{self._trials}_test") + Logger.logwarn_throttle(0.0, f"{self._trials}_test") + Logger.loginfo_throttle(0.0, f"{self._trials}_test") + Logger.loghint_throttle(0.0, f"{self._trials}_test") + self._trials -= 1 + if self._trials == 0: + return 'done' + + state_instance = ThrottleMultiLog() + sm = OperatableStateMachine(outcomes=['done']) + with sm: + OperatableStateMachine.add('state', state_instance, transitions={'done': 'done'}) + outcome = None + self.assertEqual(state_instance._trials, Logger.MAX_LAST_LOGGED_SIZE*2 ) + self.assertEqual(Logger.MAX_LAST_LOGGED_SIZE, 100) # parameterized size + self.assertAlmostEqual(Logger.LAST_LOGGED_CLEARING_RATIO, 0.7) # parameterized + while outcome is None: + outcome = sm.execute(None) + self.assertTrue(1 < len(Logger._last_logged) <= Logger.MAX_LAST_LOGGED_SIZE) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.001) + self.assertEqual(outcome, "done") + self.assertEqual(state_instance._trials, 0) + + self.assertIsNone(sm._last_exception) + self.node.get_logger().info("test_throttle_logger_multiple - OK! ") + + def test_throttle_logger_multiple(self): + self.node.get_logger().info("test_throttle_logger_multiple_params ...") + self.node.declare_parameter("max_throttle_logging_size", 120) + self.node.declare_parameter("throttle_logging_clear_ratio", 0.22) + set_node(self.node) # Update the logger node + + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + OperatableStateMachine.initialize_ros(self.node) + node = self.node + + class ThrottleMultiLog(EventState): + """Local Test state definition.""" + + def __init__(self): + self.initialize_ros(node) + super().__init__(outcomes=['done']) + self._trials = Logger.MAX_LAST_LOGGED_SIZE*2 + Logger.logerr_throttle(0.01, f"0_test") + + def execute(self, userdata): + Logger.logerr(f"{self._trials}_test") + Logger.logerr_throttle(0.0, f"{self._trials}_test") + Logger.logwarn_throttle(0.0, f"{self._trials}_test") + Logger.loginfo_throttle(0.0, f"{self._trials}_test") + Logger.loghint_throttle(0.0, f"{self._trials}_test") + self._trials -= 1 + if self._trials == 0: + return 'done' + + state_instance = ThrottleMultiLog() + sm = OperatableStateMachine(outcomes=['done']) + with sm: + OperatableStateMachine.add('state', state_instance, transitions={'done': 'done'}) + outcome = None + self.assertEqual(state_instance._trials, Logger.MAX_LAST_LOGGED_SIZE*2 ) + self.assertEqual(Logger.MAX_LAST_LOGGED_SIZE, 120) # default size + self.assertAlmostEqual(Logger.LAST_LOGGED_CLEARING_RATIO, 0.22) # default ratio + while outcome is None: + outcome = sm.execute(None) + self.assertTrue(1 < len(Logger._last_logged) <= Logger.MAX_LAST_LOGGED_SIZE) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.001) + self.assertEqual(outcome, "done") + self.assertEqual(state_instance._trials, 0) + + self.assertIsNone(sm._last_exception) + self.node.get_logger().info("test_throttle_logger_multiple_params - OK! ") + +if __name__ == '__main__': + unittest.main()