Skip to content

Commit

Permalink
Support multiple topics via ros2 topic hz. (#929)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoya Fujita <[email protected]>
Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
fujitatomoya authored Sep 6, 2024
1 parent 303f0b1 commit e2fddb2
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 35 deletions.
135 changes: 100 additions & 35 deletions ros2topic/ros2topic/verb/hz.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import functools
import math
import threading
import time

import rclpy

from rclpy.clock import Clock
from rclpy.clock import ClockType
from rclpy.executors import ExternalShutdownException
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
Expand All @@ -56,7 +58,8 @@ class HzVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic_name',
help="Name of the ROS topic to listen to (e.g. '/chatter')")
nargs='+',
help="Names of the ROS topic to listen to (e.g. '/chatter')")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
parser.add_argument(
Expand All @@ -80,7 +83,7 @@ def main(self, *, args):


def main(args):
topic = args.topic_name
topics = args.topic_name
if args.filter_expr:
def expr_eval(expr):
def eval_fn(m):
Expand All @@ -91,7 +94,7 @@ def eval_fn(m):
filter_expr = None

with DirectNode(args) as node:
_rostopic_hz(node.node, topic, window_size=args.window_size, filter_expr=filter_expr,
_rostopic_hz(node.node, topics, window_size=args.window_size, filter_expr=filter_expr,
use_wtime=args.use_wtime)


Expand Down Expand Up @@ -202,13 +205,10 @@ def get_hz(self, topic=None):
"""
if not self.get_times(topic=topic):
return
elif self.get_last_printed_tn(topic=topic) == 0:
self.set_last_printed_tn(self.get_msg_tn(topic=topic), topic=topic)
return
elif self.get_msg_tn(topic=topic) < self.get_last_printed_tn(topic=topic) + 1e9:
elif self.get_msg_tn(topic=topic) == self.get_last_printed_tn(topic=topic):
return
with self.lock:
# Get frequency every one minute
# Get frequency every one second
times = self.get_times(topic=topic)
n = len(times)
mean = sum(times) / n
Expand All @@ -225,43 +225,108 @@ def get_hz(self, topic=None):

return rate, min_delta, max_delta, std_dev, n

def print_hz(self, topic=None):
def print_hz(self, topics=None):
"""Print the average publishing rate to screen."""
ret = self.get_hz(topic)
if ret is None:
return
rate, min_delta, max_delta, std_dev, window = ret
print('average rate: %.3f\n\tmin: %.3fs max: %.3fs std dev: %.5fs window: %s'
% (rate * 1e9, min_delta * 1e-9, max_delta * 1e-9, std_dev * 1e-9, window))
return

def get_format_hz(stat):
return stat[0] * 1e9, stat[1] * 1e-9, stat[2] * 1e-9, stat[3] * 1e-9, stat[4]

if len(topics) == 1:
ret = self.get_hz(topics[0])
if ret is None:
return
rate, min_delta, max_delta, std_dev, window = get_format_hz(ret)
print('average rate: %.3f\n\tmin: %.3fs max: %.3fs std dev: %.5fs window: %s'
% (rate, min_delta, max_delta, std_dev, window))
return

def _rostopic_hz(node, topic, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
# monitoring multiple topics' hz
header = ['topic', 'rate', 'min_delta', 'max_delta', 'std_dev', 'window']
stats = {h: [] for h in header}
for topic in topics:
hz_stat = self.get_hz(topic)
if hz_stat is None:
continue
rate, min_delta, max_delta, std_dev, window = get_format_hz(hz_stat)
stats['topic'].append(topic)
stats['rate'].append('{:.3f}'.format(rate))
stats['min_delta'].append('{:.3f}'.format(min_delta))
stats['max_delta'].append('{:.3f}'.format(max_delta))
stats['std_dev'].append('{:.5f}'.format(std_dev))
stats['window'].append(str(window))
if not stats['topic']:
return
print(_get_ascii_table(header, stats))


def _get_ascii_table(header, cols):
# compose table with left alignment
header_aligned = []
col_widths = []
for h in header:
col_width = max(len(h), max(len(el) for el in cols[h]))
col_widths.append(col_width)
header_aligned.append(h.center(col_width))
for i, el in enumerate(cols[h]):
cols[h][i] = str(cols[h][i]).ljust(col_width)
# sum of col and each 3 spaces width
table_width = sum(col_widths) + 3 * (len(header) - 1)
n_rows = len(cols[header[0]])
body = '\n'.join(' '.join(cols[h][i] for h in header) for i in range(n_rows))
table = '{header}\n{hline}\n{body}\n'.format(
header=' '.join(header_aligned), hline='=' * table_width, body=body)
return table


def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
"""
Periodically print the publishing rate of a topic to console until shutdown.
:param topic: topic name, ``list`` of ``str``
:param topics: list of topic names, ``list`` of ``str``
:param window_size: number of messages to average over, -1 for infinite, ``int``
:param filter_expr: Python filter expression that is called with m, the message instance
"""
# pause hz until topic is published
msg_class = get_msg_class(
node, topic, blocking=True, include_hidden_topics=True)

if msg_class is None:
rt = ROSTopicHz(node, window_size, filter_expr=filter_expr, use_wtime=use_wtime)
topics_len = len(topics)
topics_to_be_removed = []
for topic in topics:
msg_class = get_msg_class(
node, topic, blocking=True, include_hidden_topics=True)

if msg_class is None:
topics_to_be_removed.append(topic)
print('WARNING: failed to find message type for topic [%s]' % topic)
continue

node.create_subscription(
msg_class,
topic,
functools.partial(rt.callback_hz, topic=topic),
qos_profile_sensor_data)
if topics_len > 1:
print('Subscribed to [%s]' % topic)

# remove the topics from the list if failed to find message type
while (topic in topics_to_be_removed):
topics.remove(topic)
if len(topics) == 0:
node.destroy_node()
rclpy.try_shutdown()
return

rt = ROSTopicHz(node, window_size, filter_expr=filter_expr, use_wtime=use_wtime)
node.create_subscription(
msg_class,
topic,
functools.partial(rt.callback_hz, topic=topic),
qos_profile_sensor_data)

while rclpy.ok():
rclpy.spin_once(node)
rt.print_hz(topic)

node.destroy_node()
rclpy.shutdown()
try:
def thread_func():
while rclpy.ok():
rt.print_hz(topics)
time.sleep(1.0)

print_thread = threading.Thread(target=thread_func)
print_thread.start()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
print_thread.join()
29 changes: 29 additions & 0 deletions ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,35 @@ def test_topic_hz(self):
average_rate = float(average_rate_line_pattern.match(head_line).group(1))
assert math.isclose(average_rate, 1., rel_tol=1e-2)

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_multiple_topics_hz(self):
header_pattern = re.compile(r'\s+topic\s+rate\s+min_delta\s+max_delta\s+std_dev\s+window')
hline_pattern = re.compile(r'=+')
chatter_line_pattern = re.compile(
r'/chatter\s+(\d+.\d{3})\s+\d+.\d{3}\s+\d+.\d{3}\s+\d+.\d{5}\s+\d+\s+')
hidden_chatter_line_pattern = re.compile(
r'/_hidden_chatter\s+(\d+.\d{3})\s+\d+.\d{3}\s+\d+.\d{3}\s+\d+.\d{5}\s+\d+\s+')
with self.launch_topic_command(
arguments=['hz', '/chatter', '/_hidden_chatter']
) as topic_command:
assert topic_command.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
'Subscribed to [/chatter]',
'Subscribed to [/_hidden_chatter]',
header_pattern, hline_pattern,
chatter_line_pattern, hidden_chatter_line_pattern
], strict=True
), timeout=10)
assert topic_command.wait_for_shutdown(timeout=10)

chatter_line = topic_command.output.splitlines()[4]
chatter_average_rate = float(chatter_line_pattern.match(chatter_line).group(1))
assert math.isclose(chatter_average_rate, 1., rel_tol=1e-2)
hidden_chatter_line = topic_command.output.splitlines()[5]
hidden_chatter_average_rate = float(hidden_chatter_line_pattern.match(
hidden_chatter_line).group(1))
assert math.isclose(hidden_chatter_average_rate, 1., rel_tol=1e-2)

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_filtered_topic_hz(self):
average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})')
Expand Down

0 comments on commit e2fddb2

Please sign in to comment.