From 5bafebb5285fede6c6c7105fef1bf48bf1d4f0da Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Wed, 16 Oct 2024 11:23:41 -0700 Subject: [PATCH] Support replaying multiple bags Signed-off-by: Christophe Bedard --- ros2bag/ros2bag/api/__init__.py | 87 ++++++- ros2bag/ros2bag/verb/convert.py | 22 +- ros2bag/ros2bag/verb/play.py | 39 +++- ros2bag/test/test_api.py | 27 +++ rosbag2_py/src/rosbag2_py/_transport.cpp | 31 ++- rosbag2_transport/CMakeLists.txt | 12 + .../include/rosbag2_transport/player.hpp | 55 ++++- rosbag2_transport/package.xml | 3 + .../rosbag2_transport/meta_priority_queue.hpp | 220 ++++++++++++++++++ .../src/rosbag2_transport/player.cpp | 207 +++++++++++----- .../test/rosbag2_transport/mock_player.hpp | 7 + .../test_composable_player.cpp | 19 +- .../test_meta_priority_queue.cpp | 124 ++++++++++ .../test/rosbag2_transport/test_play.cpp | 62 +++++ 14 files changed, 820 insertions(+), 95 deletions(-) create mode 100644 rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp create mode 100644 rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp diff --git a/ros2bag/ros2bag/api/__init__.py b/ros2bag/ros2bag/api/__init__.py index 565cae52aa..467daf1fa9 100644 --- a/ros2bag/ros2bag/api/__init__.py +++ b/ros2bag/ros2bag/api/__init__.py @@ -21,6 +21,7 @@ import os from typing import Any from typing import Dict +from typing import List from typing import Optional from rclpy.duration import Duration @@ -157,14 +158,94 @@ def check_not_negative_int(arg: str) -> int: raise ArgumentTypeError('{} is not the valid type (int)'.format(value)) -def add_standard_reader_args(parser: ArgumentParser) -> None: +def add_standard_reader_args(parser: ArgumentParser, multi: bool = False) -> None: + """ + Add arguments for one of multiple input bags. + + :param parser: the parser + :param multi: whether this is part of a verb that takes in multiple input bags + """ + # If multi, let user provide an input bag path using an optional positional arg, but require + # them to use --input to provide an input bag with a specific storage ID + reader_choices = rosbag2_py.get_registered_readers() + bag_help_multi_str = '' + storage_help_multi_str = '' + if multi: + bag_help_multi_str = ' Use --input to provide an input bag with a specific storage ID.' + storage_help_multi_str = \ + ' (deprecated: use --input to provide an input bag with a specific storage ID)' parser.add_argument( - 'bag_path', type=check_path_exists, help='Bag to open') + 'bag_path', + nargs='?' if multi else None, + type=check_path_exists, + help='Bag to open.' + bag_help_multi_str) parser.add_argument( '-s', '--storage', default='', choices=reader_choices, help='Storage implementation of bag. ' - 'By default attempts to detect automatically - use this argument to override.') + 'By default attempts to detect automatically - use this argument to override.' + + storage_help_multi_str) + + if multi: + add_multi_bag_input_arg(parser, required=False) + + +def add_multi_bag_input_arg(parser: ArgumentParser, required: bool = False) -> None: + """ + Add option for list of input bags. + + :param parser: the parser + :param required: whether this option should be required + """ + reader_choices = ', '.join(rosbag2_py.get_registered_readers()) + parser.add_argument( + '-i', '--input', + required=required, + action='append', nargs='+', + metavar=('uri', 'storage_id'), + help='URI (and optional storage ID) of an input bag. ' + 'May be provided more than once for multiple input bags. ' + f'Storage ID options are: {reader_choices}.') + + +def input_bag_arg_to_storage_options( + input_arg: List[List[str]], + storage_config_file: Optional[str] = None, +) -> List[rosbag2_py.StorageOptions]: + """ + Convert input bag argument value(s) to list of StorageOptions. + + Raises ValueError if validation fails, including: + 1. Bag path existence + 2. Storage ID + 3. Storage config file existence + + :param input_arg: the values of the input argument + :param storage_config_file: the storage config file, if any + """ + if storage_config_file and not os.path.exists(storage_config_file): + raise ValueError(f"File '{storage_config_file}' does not exist!") + storage_id_options = rosbag2_py.get_registered_readers() + storage_options = [] + for input_bag_info in input_arg: + if len(input_bag_info) > 2: + raise ValueError( + f'--input expects 1 or 2 arguments, {len(input_bag_info)} provided') + bag_path = input_bag_info[0] + if not os.path.exists(bag_path): + raise ValueError(f"Bag path '{bag_path}' does not exist!") + storage_id = input_bag_info[1] if len(input_bag_info) > 1 else '' + if storage_id and storage_id not in storage_id_options: + raise ValueError( + f"Unknown storage ID '{storage_id}', options are: {', '.join(storage_id_options)}") + options = rosbag2_py.StorageOptions( + uri=bag_path, + storage_id=storage_id, + ) + if storage_config_file: + options.storage_config_uri = storage_config_file + storage_options.append(options) + return storage_options def _parse_cli_storage_plugin(): diff --git a/ros2bag/ros2bag/verb/convert.py b/ros2bag/ros2bag/verb/convert.py index 080bb80c9d..78e87502c0 100644 --- a/ros2bag/ros2bag/verb/convert.py +++ b/ros2bag/ros2bag/verb/convert.py @@ -12,23 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse - +from ros2bag.api import add_multi_bag_input_arg +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.verb import VerbExtension from rosbag2_py import bag_rewrite -from rosbag2_py import StorageOptions class ConvertVerb(VerbExtension): """Given an input bag, write out a new bag with different settings.""" def add_arguments(self, parser, cli_name): - parser.add_argument( - '-i', '--input', - required=True, - action='append', nargs='+', - metavar=('uri', 'storage_id'), - help='URI (and optional storage ID) of an input bag. May be provided more than once') + add_multi_bag_input_arg(parser, required=True) parser.add_argument( '-o', '--output-options', type=str, required=True, @@ -37,14 +31,6 @@ def add_arguments(self, parser, cli_name): 'objects. See README.md for some examples.') def main(self, *, args): - input_options = [] - for input_bag in args.input: - if len(input_bag) > 2: - raise argparse.ArgumentTypeError( - f'--input expects 1 or 2 arguments, {len(input_bag)} provided') - storage_options = StorageOptions(uri=input_bag[0]) - if len(input_bag) > 1: - storage_options.storage_id = input_bag[1] - input_options.append(storage_options) + input_options = input_bag_arg_to_storage_options(args.input) bag_rewrite(input_options, args.output_options) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 363d43d59f..ae8d70e3c6 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -21,7 +21,9 @@ from ros2bag.api import check_positive_float from ros2bag.api import convert_service_to_service_event_topic from ros2bag.api import convert_yaml_to_qos_profile +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.api import print_error +from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX from rosbag2_py import Player @@ -42,7 +44,7 @@ class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" def add_arguments(self, parser, cli_name): # noqa: D102 - add_standard_reader_args(parser) + add_standard_reader_args(parser, multi=True) parser.add_argument( '--read-ahead-queue-size', type=int, default=1000, help='size of message queue rosbag tries to hold in memory to help deterministic ' @@ -196,11 +198,36 @@ def main(self, *, args): # noqa: D102 topic_remapping.append('--remap') topic_remapping.append(remap_rule) - storage_options = StorageOptions( - uri=args.bag_path, - storage_id=args.storage, - storage_config_uri=storage_config_file, - ) + # Add bag from optional positional arg, then bag(s) from optional flag + storage_options = [] + if args.bag_path: + storage_options.append(StorageOptions( + uri=args.bag_path, + storage_id=args.storage, + storage_config_uri=storage_config_file, + )) + if args.storage: + print(print_warn('--storage option is deprecated, use --input to ' + 'provide an input bag with a specific storage ID')) + try: + storage_options.extend( + input_bag_arg_to_storage_options(args.input or [], storage_config_file)) + except ValueError as e: + return print_error(str(e)) + if not storage_options: + return print_error('no input bags were provided') + + # Users can currently only provide one storage config file, which is storage + # implementation-specific. Since we can replay bags from different storage + # implementations, this may lead to errors. For now, just warn if input bags have + # different explicit storage IDs and a storage config file is provided. + storage_ids = {options.storage_id for options in storage_options if options.storage_id} + if storage_config_file and len(storage_ids) > 1: + print( + print_warn('a global --storage-config-file was provided, but --input bags are ' + 'using different explicit storage IDs, which may lead to errors with ' + f'replay: {storage_ids}')) + play_options = PlayOptions() play_options.read_ahead_queue_size = args.read_ahead_queue_size play_options.node_prefix = NODE_NAME_PREFIX diff --git a/ros2bag/test/test_api.py b/ros2bag/test/test_api.py index a711a48275..f8f8083056 100644 --- a/ros2bag/test/test_api.py +++ b/ros2bag/test/test_api.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path import unittest from rclpy.qos import DurabilityPolicy @@ -19,9 +20,13 @@ from rclpy.qos import ReliabilityPolicy from ros2bag.api import convert_yaml_to_qos_profile from ros2bag.api import dict_to_duration +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.api import interpret_dict_as_qos_profile +RESOURCES_PATH = Path(__file__).parent / 'resources' + + class TestRos2BagRecord(unittest.TestCase): def test_dict_to_duration_valid(self): @@ -84,3 +89,25 @@ def test_interpret_dict_as_qos_profile_negative(self): qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}} with self.assertRaises(ValueError): interpret_dict_as_qos_profile(qos_dict) + + def test_input_bag_arg_to_storage_options(self): + bag_path = (RESOURCES_PATH / 'empty_bag').as_posix() + # Just use a file that exists; the content does not matter + storage_config_file = (RESOURCES_PATH / 'qos_profile.yaml').as_posix() + + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([['path1', 'id1'], ['path2', 'id2', 'extra']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([['path-does-not-exist']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([[bag_path, 'id-does-not-exist']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([[bag_path, 'mcap']], 'config-file-doesnt-exist') + + self.assertEqual([], input_bag_arg_to_storage_options([], None)) + storage_options = input_bag_arg_to_storage_options( + [[bag_path, 'mcap']], storage_config_file) + self.assertEqual(1, len(storage_options)) + self.assertEqual(bag_path, storage_options[0].uri) + self.assertEqual('mcap', storage_options[0].storage_id) + self.assertEqual(storage_config_file, storage_options[0].storage_config_uri) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9d15e82b15..648b493185 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -183,6 +183,13 @@ class Player void play( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) + { + play_impl(std::vector{storage_options}, play_options, false); + } + + void play( + const std::vector & storage_options, + PlayOptions & play_options) { play_impl(storage_options, play_options, false); } @@ -192,7 +199,7 @@ class Player PlayOptions & play_options, size_t num_messages) { - play_impl(storage_options, play_options, true, num_messages); + play_impl(std::vector{storage_options}, play_options, true, num_messages); } protected: @@ -240,14 +247,17 @@ class Player } void play_impl( - const rosbag2_storage::StorageOptions & storage_options, + const std::vector & storage_options, PlayOptions & play_options, bool burst = false, size_t burst_num_messages = 0) { install_signal_handlers(); try { - auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); + std::vector bags{}; + for (const auto & options : storage_options) { + bags.emplace_back(rosbag2_transport::ReaderWriterFactory::make_reader(options), options); + } std::shared_ptr keyboard_handler; if (!play_options.disable_keyboard_controls) { #ifndef _WIN32 @@ -260,7 +270,7 @@ class Player #endif } auto player = std::make_shared( - std::move(reader), std::move(keyboard_handler), storage_options, play_options); + std::move(bags), std::move(keyboard_handler), play_options); rclcpp::executors::SingleThreadedExecutor exec; exec.add_node(player); @@ -591,7 +601,18 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init<>()) .def(py::init()) - .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) + .def( + "play", + py::overload_cast( + &rosbag2_py::Player::play), + py::arg("storage_options"), + py::arg("play_options")) + .def( + "play", + py::overload_cast &, PlayOptions &>( + &rosbag2_py::Player::play), + py::arg("storage_options"), + py::arg("play_options")) .def( "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), py::arg("num_messages")) diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 2d120b3db5..07d6749a56 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -31,6 +31,7 @@ find_package(keyboard_handler REQUIRED) find_package(rcl REQUIRED) find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) +find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosbag2_compression REQUIRED) @@ -65,6 +66,7 @@ target_link_libraries(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} PRIVATE rcl::rcl rclcpp_components::component + rcpputils::rcpputils rcutils::rcutils rmw::rmw rosbag2_compression::rosbag2_compression @@ -113,6 +115,7 @@ if(BUILD_TESTING) add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources") find_package(ament_cmake_gmock REQUIRED) + find_package(ament_cmake_gtest REQUIRED) find_package(ament_index_cpp REQUIRED) find_package(ament_lint_auto REQUIRED) find_package(composition_interfaces REQUIRED) @@ -534,6 +537,15 @@ if(BUILD_TESTING) rcpputils::rcpputils rosbag2_test_common::rosbag2_test_common ) + + ament_add_gtest(test_meta_priority_queue + test/rosbag2_transport/test_meta_priority_queue.cpp) + target_include_directories(test_meta_priority_queue PRIVATE + $) + target_link_libraries(test_meta_priority_queue + rcpputils::rcpputils + shared_queues_vendor::singleproducerconsumer + ) endif() ament_package() diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index d83bb69d23..6b321571dd 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -74,6 +74,10 @@ class Player : public rclcpp::Node /// delete_on_play_message_callback. using callback_handle_t = uint64_t; + /// \brief Utility type for a pair of rosbag2_cpp::Reader and rosbag2_storage::StorageOptions. + using ReaderStorageOptionsPair = \ + std::pair, rosbag2_storage::StorageOptions>; + /// \brief Const describing invalid value for callback_handle. ROSBAG2_TRANSPORT_PUBLIC static constexpr callback_handle_t invalid_callback_handle = 0; @@ -151,6 +155,55 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Player class with provided parameters and default + /// rosbag2_cpp::reader and KeyboardHandler classes. + /// \note The KeyboardHandler class will be initialized with parameter which is disabling + /// signal handlers in it. + /// \param storage_options Storage options which will each be applied to a rosbag2_cpp::reader + /// after construction. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + const std::vector & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + + /// \brief Constructor which will construct Player class with provided parameters. + /// \param input_bags Vector of pairs of unique pointer to the rosbag2_cpp::Reader class (which + /// will be moved to the internal instance of the Player class during construction) and storage + /// options (which will be applied to the rosbag2_cpp::reader when opening it). + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + std::vector && input_bags, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + + /// \brief Constructor which will construct Player class with provided parameters. + /// \param input_bags Vector of pairs of unique pointer to the rosbag2_cpp::Reader class (which + /// will be moved to the internal instance of the Player class during construction) and storage + /// options (which will be applied to the rosbag2_cpp::reader when opening it). + /// \param keyboard_handler Keyboard handler class uses to handle user input from keyboard. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + std::vector && input_bags, + std::shared_ptr keyboard_handler, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Default destructor. ROSBAG2_TRANSPORT_PUBLIC virtual ~Player(); @@ -301,7 +354,7 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC /// \brief Getter for the currently stored storage options /// \return Copy of the currently stored storage options - const rosbag2_storage::StorageOptions & get_storage_options(); + std::vector get_storage_options(); ROSBAG2_TRANSPORT_PUBLIC /// \brief Getter for the currently stored play options diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index 706d0c76f2..08264d209e 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -15,6 +15,8 @@ rclcpp rclcpp_components + rcpputils + rcutils rmw rosbag2_compression rosbag2_cpp @@ -25,6 +27,7 @@ keyboard_handler ament_cmake_gmock + ament_cmake_gtest ament_index_cpp ament_lint_auto ament_lint_common diff --git a/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp b/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp new file mode 100644 index 0000000000..4155d8a43a --- /dev/null +++ b/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp @@ -0,0 +1,220 @@ +// Copyright 2024 Apex.AI, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ +#define ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ + +#include +#include +#include +#include +#include + +#include "moodycamel/readerwriterqueue.h" + +#include "rcpputils/thread_safety_annotations.hpp" + +/// Priority queue of queues. +/** + * Contains N queues. + * Elements can be enqueued into a specific queue. + * Assumes that items are enqueued into the same queue in order of priority. + * The priority value of an element is computed from a provided function. + * Highest priority means a lower priority value. + * Returns the next element from the highest-priority queue, which is determined by the queue with + * the highest-priority first element, since the first element of each queue is the highest-priority + * element of that queue. + * + * This does impose some more restrictions on concurrency compared to the underlying queue + * implementation. However, if there is only 1 underlying queue, then there is no locking. + * + * \tparam T the element type + * \tparam P the priority value type + * \see moodycamel::ReaderWriterQueue for the underlying queue + */ +template +class MetaPriorityQueue +{ +public: + /** + * Constructor. + * + * \param num_queues the number of priority queues, greather than zero + * \param element_priority_value a function to extract the priority value from an element + */ + MetaPriorityQueue( + const std::size_t num_queues, + const std::function && element_priority_value) + : queues_(num_queues), + element_priority_value_(std::move(element_priority_value)), + queue_priorities_(num_queues), + // When all queues are empty, the index value doesn't matter + priority_queue_index_(0u) + { + assert(num_queues > 0u); + // Set all queue priority values to the lowest priority, i.e., highest value + for (std::size_t i = 0; i < queues_.size(); i++) { + queue_priorities_[i] = lowest_priority_value; + } + } + + /// Enqueue element into a given queue. + /** + * Should only be called by 1 producer thread. + * Elements must be enqueued into the same queue in order of priority for elements to be + * peeked/popped in order of priority. + * No bounds checking is performed for the queue index. + * + * \param element the element to enqueue + * \param queue_index the queue to enqueue it into + * \return `true` if the element was enqueued, `false` otherwise + * \see moodycamel::ReaderWriterQueue::enqueue + */ + bool enqueue(const T & element, const std::size_t queue_index) + { + // Trivial case: 1 underlying queue + if (1u == queues_.size()) { + return queues_[queue_index].enqueue(element); + } + std::lock_guard lk(mutex_priority_); + const bool enqueued = queues_[queue_index].enqueue(element); + // If this queue was empty, update the queue's priority value with the new element + // Otherwise, it's not necessary, since the new element is supposed to be lower-priority + if (enqueued && 1u == size_approx(queue_index)) { + update_priorities(&element, queue_index); + } + return enqueued; + } + + /// Peek at the next item from the highest-priority queue. + /** + * Should only be called by 1 consumer thread. + * + * \return the next item from the highest-priority queue, or `nullptr` if there is none + * \see moodycamel::ReaderWriterQueue::peek + */ + T * peek() + { + return queues_[priority_queue_index_.load()].peek(); + } + + /// Pop the next item from the highest-priority queue. + /** + * Should only be called by 1 consumer thread. + * + * \return `true` if there was an item, `false` otherwise + * \see moodycamel::ReaderWriterQueue::pop + */ + bool pop() + { + // Trivial case: 1 underlying queue + if (1u == queues_.size()) { + return queues_[0u].pop(); + } + std::lock_guard lk(mutex_priority_); + const std::size_t priority_index = priority_queue_index_.load(); + const bool popped = queues_[priority_index].pop(); + // If we pop()ed, update priority + if (popped) { + // peek() and pop() should only be called by the 1 consumer thread, so this is OK + const T * element = queues_[priority_index].peek(); + update_priorities(element, priority_index); + } + return popped; + } + + /// Get the approximate size of the given queue. + /** + * Can be called by both the 1 producer thread and the 1 consumer thread. + * No bounds checking is performed for the queue index. + * + * \param queue_index the index of the queue + * \return the approximate size + * \see moodycamel::ReaderWriterQueue::size_approx + */ + inline std::size_t size_approx(const std::size_t queue_index) const + { + return queues_[queue_index].size_approx(); + } + + /// Get the approximate total size of all queues. + /** + * Can be called by both the 1 producer thread and the 1 consumer thread. + * The size of the underlying queues may change during this call. + * + * \return the approximate total size + * \see ::size_approx + */ + inline std::size_t size_approx() const + { + std::size_t size = 0; + for (std::size_t i = 0; i < queues_.size(); i++) { + size += size_approx(i); + } + return size; + } + +private: + /// Update the queue priority values and the priority queue index. + /** + * Update when: + * 1. Enqueuing an element into a queue that was empty + * 2. Popping an element from a queue + * + * \param element the new element (newly-enqueued or newly-front of queue), or `nullptr` if the + * queue is now empty + * \param queue_index the index of the corresponding queue + */ + void update_priorities(const T * element, const std::size_t queue_index) + RCPPUTILS_TSA_REQUIRES(priority_queue_index_) + { + // Update priority value for the given queue + if (nullptr != element) { + const P priority_value = element_priority_value_(*element); + queue_priorities_[queue_index].store(priority_value); + } else { + // Reset priority value if the queue is now empty + queue_priorities_[queue_index].store(lowest_priority_value); + } + + // Find the index of the highest-priority queue, i.e., to be used next + // Notes: + // 1. If multiple queues have the same priority value, this does favor the first queue + // 2. If all queues are empty, this will return the index of the first queue, and + // popping/peeking will just return nothing + std::size_t priority_queue_index = 0; + for (std::size_t i = 1; i < queues_.size(); i++) { + if (queue_priorities_[i] < queue_priorities_[priority_queue_index].load()) { + priority_queue_index = i; + } + } + priority_queue_index_.store(priority_queue_index); + } + + // Underlying queues + std::vector> queues_; + // Element priority value extraction function + const std::function element_priority_value_; + // Priority values of each queue + std::vector> queue_priorities_; + // Index of the current highest-priority queue + std::atomic_size_t priority_queue_index_; + // Mutex for priority queue tracking + std::mutex mutex_priority_; + + /// Priority value corresponding to the lowest priority. + static constexpr P lowest_priority_value = std::numeric_limits

::max(); +}; + +#endif // ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 74a911f1d8..1243d78211 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -24,8 +25,6 @@ #include #include -#include "moodycamel/readerwriterqueue.h" - #include "rcl/graph.h" #include "rclcpp/rclcpp.hpp" @@ -43,6 +42,7 @@ #include "rosbag2_transport/reader_writer_factory.hpp" #include "logging.hpp" +#include "meta_priority_queue.hpp" namespace { @@ -87,12 +87,12 @@ class PlayerImpl public: using callback_handle_t = Player::callback_handle_t; using play_msg_callback_t = Player::play_msg_callback_t; + using ReaderStorageOptionsPair = Player::ReaderStorageOptionsPair; PlayerImpl( Player * owner, - std::unique_ptr reader, + std::vector && input_bags, std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, const rosbag2_transport::PlayOptions & play_options); virtual ~PlayerImpl(); @@ -217,7 +217,7 @@ class PlayerImpl /// \brief Getter for the currently stored storage options /// \return Copy of the currently stored storage options - const rosbag2_storage::StorageOptions & get_storage_options(); + std::vector get_storage_options(); /// \brief Getter for the currently stored play options /// \return Copy of the currently stored play options @@ -283,7 +283,9 @@ class PlayerImpl rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); void load_storage_content(); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void enqueue_up_to_boundary( + const size_t boundary, + const size_t bag_index) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; void play_messages_from_queue(); void prepare_publishers(); @@ -304,16 +306,17 @@ class PlayerImpl std::atomic_bool stop_playback_{false}; std::mutex reader_mutex_; - std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); + std::vector input_bags_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); void publish_clock_update(); void publish_clock_update(const rclcpp::Time & time); Player * owner_; - rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; - moodycamel::ReaderWriterQueue message_queue_; + MetaPriorityQueue< + rosbag2_storage::SerializedBagMessageSharedPtr, + rcutils_time_point_value_t> message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; @@ -353,48 +356,58 @@ class PlayerImpl PlayerImpl::PlayerImpl( Player * owner, - std::unique_ptr reader, + std::vector && input_bags, std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, const rosbag2_transport::PlayOptions & play_options) -: owner_(owner), - storage_options_(storage_options), +: input_bags_(std::move(input_bags)), + owner_(owner), play_options_(play_options), + message_queue_( + input_bags_.size(), + [](const rosbag2_storage::SerializedBagMessageSharedPtr & msg) { + return msg->recv_timestamp; + }), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { for (auto & topic : play_options_.topics_to_filter) { topic = rclcpp::expand_topic_or_service_name( - topic, owner->get_name(), - owner->get_namespace(), false); + topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & exclude_topic : play_options_.exclude_topics_to_filter) { exclude_topic = rclcpp::expand_topic_or_service_name( - exclude_topic, owner->get_name(), - owner->get_namespace(), false); + exclude_topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & service_event_topic : play_options_.services_to_filter) { service_event_topic = rclcpp::expand_topic_or_service_name( - service_event_topic, owner->get_name(), - owner->get_namespace(), false); + service_event_topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & exclude_service_event_topic : play_options_.exclude_services_to_filter) { exclude_service_event_topic = rclcpp::expand_topic_or_service_name( - exclude_service_event_topic, owner->get_name(), - owner->get_namespace(), false); + exclude_service_event_topic, owner_->get_name(), + owner_->get_namespace(), false); } { std::lock_guard lk(reader_mutex_); - reader_ = std::move(reader); - // keep reader open until player is destroyed - reader_->open(storage_options_, {"", rmw_get_serialization_format()}); - auto metadata = reader_->get_metadata(); - starting_time_ = std::chrono::duration_cast( - metadata.starting_time.time_since_epoch()).count(); + starting_time_ = std::numeric_limits::max(); + for (const auto & [reader, storage_options] : input_bags_) { + // keep readers open until player is destroyed + reader->open(storage_options, {"", rmw_get_serialization_format()}); + // Find earliest starting time + const auto metadata = reader->get_metadata(); + const auto metadata_starting_time = std::chrono::duration_cast( + metadata.starting_time.time_since_epoch()).count(); + if (metadata_starting_time < starting_time_) { + starting_time_ = metadata_starting_time; + } + } // If a non-default (positive) starting time offset is provided in PlayOptions, // then add the offset to the starting time obtained from reader metadata if (play_options_.start_offset < 0) { @@ -431,10 +444,12 @@ PlayerImpl::~PlayerImpl() keyboard_handler_->delete_key_press_callback(cb_handle); } } - // closes reader + // closes readers std::lock_guard lk(reader_mutex_); - if (reader_) { - reader_->close(); + for (const auto & [reader, _] : input_bags_) { + if (reader) { + reader->close(); + } } } @@ -491,7 +506,9 @@ bool PlayerImpl::play() } { std::lock_guard lk(reader_mutex_); - reader_->seek(starting_time_); + for (const auto & [reader, _] : input_bags_) { + reader->seek(starting_time_); + } clock_->jump(starting_time_); } load_storage_content_ = true; @@ -755,7 +772,9 @@ void PlayerImpl::seek(rcutils_time_point_value_t time_point) std::lock_guard lk(reader_mutex_); // Purge current messages in queue. while (message_queue_.pop()) {} - reader_->seek(time_point); + for (const auto & [reader, _] : input_bags_) { + reader->seek(time_point); + } clock_->jump(time_point); // Restart queuing thread if it has finished running (previously reached end of bag), // otherwise, queueing should continue automatically after releasing mutex @@ -885,8 +904,15 @@ Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle() void PlayerImpl::wait_for_filled_queue() const { + const auto read_ahead_queues_filled = [this]() { + bool filled = true; + for (std::size_t i = 0; i < input_bags_.size(); i++) { + filled &= message_queue_.size_approx(i) >= play_options_.read_ahead_queue_size; + } + return filled; + }; while ( - message_queue_.size_approx() < play_options_.read_ahead_queue_size && + !read_ahead_queues_filled() && !is_storage_completely_loaded() && rclcpp::ok() && !stop_playback_) { std::this_thread::sleep_for(queue_read_wait_period_); @@ -901,26 +927,37 @@ void PlayerImpl::load_storage_content() while (rclcpp::ok() && load_storage_content_ && !stop_playback_) { rcpputils::unique_lock lk(reader_mutex_); - if (!reader_->has_next()) {break;} + const bool no_messages = std::all_of( + input_bags_.cbegin(), + input_bags_.cend(), + [](const auto & reader_options) {return !reader_options.first->has_next();}); + if (no_messages) { + break; + } + for (size_t bag_index = 0; bag_index < input_bags_.size(); bag_index++) { + if (!input_bags_[bag_index].first->has_next()) { + continue; + } - if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(queue_upper_boundary); - } else { - lk.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (message_queue_.size_approx(bag_index) < queue_lower_boundary) { + enqueue_up_to_boundary(queue_upper_boundary, bag_index); + } else { + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } } -void PlayerImpl::enqueue_up_to_boundary(size_t boundary) +void PlayerImpl::enqueue_up_to_boundary(const size_t boundary, const size_t bag_index) { rosbag2_storage::SerializedBagMessageSharedPtr message; - for (size_t i = message_queue_.size_approx(); i < boundary; i++) { - if (!reader_->has_next()) { + for (size_t i = message_queue_.size_approx(bag_index); i < boundary; i++) { + if (!input_bags_[bag_index].first->has_next()) { break; } - message = reader_->read_next(); - message_queue_.enqueue(message); + message = input_bags_[bag_index].first->read_next(); + message_queue_.enqueue(message, bag_index); } } @@ -1055,7 +1092,9 @@ void PlayerImpl::prepare_publishers() storage_filter.regex_to_exclude = play_options_.exclude_regex_to_filter; storage_filter.exclude_topics = play_options_.exclude_topics_to_filter; storage_filter.exclude_service_events = play_options_.exclude_services_to_filter; - reader_->set_filter(storage_filter); + for (const auto & [reader, _] : input_bags_) { + reader->set_filter(storage_filter); + } // Create /clock publisher if (play_options_.clock_publish_frequency > 0.f || play_options_.clock_publish_on_topic_publish) { @@ -1092,7 +1131,11 @@ void PlayerImpl::prepare_publishers() } // Create topic publishers - auto topics = reader_->get_all_topics_and_types(); + std::vector topics{}; + for (const auto & [reader, _] : input_bags_) { + auto bag_topics = reader->get_all_topics_and_types(); + topics.insert(topics.begin(), bag_topics.begin(), bag_topics.end()); + } std::string topic_without_support_acked; for (const auto & topic : topics) { const bool is_service_event_topic = rosbag2_cpp::is_service_event_topic(topic.name, topic.type); @@ -1180,7 +1223,9 @@ void PlayerImpl::prepare_publishers() message.node_name = owner_->get_fully_qualified_name(); split_event_pub_->publish(message); }; - reader_->add_event_callbacks(callbacks); + for (const auto & [reader, _] : input_bags_) { + reader->add_event_callbacks(callbacks); + } } void PlayerImpl::run_play_msg_pre_callbacks( @@ -1517,9 +1562,13 @@ void PlayerImpl::publish_clock_update(const rclcpp::Time & time) } } -const rosbag2_storage::StorageOptions & PlayerImpl::get_storage_options() +std::vector PlayerImpl::get_storage_options() { - return storage_options_; + std::vector storage_options{}; + for (const auto & [_, options] : input_bags_) { + storage_options.push_back(options); + } + return storage_options; } const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options() @@ -1546,8 +1595,10 @@ Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_o auto reader = ReaderWriterFactory::make_reader(storage_options); + std::vector input_bags{}; + input_bags.emplace_back(std::move(reader), storage_options); pimpl_ = std::make_unique( - this, std::move(reader), keyboard_handler, storage_options, play_options); + this, std::move(input_bags), keyboard_handler, play_options); pimpl_->play(); } @@ -1560,6 +1611,28 @@ Player::Player( storage_options, play_options, node_name, node_options) {} +Player::Player( + const std::vector & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)) +{ + std::shared_ptr keyboard_handler; + if (!play_options.disable_keyboard_controls) { + keyboard_handler = std::make_shared(); + } + + std::vector input_bags{}; + for (const auto & options : storage_options) { + input_bags.emplace_back(std::make_unique(), options); + } + pimpl_ = std::make_unique( + this, std::move(input_bags), keyboard_handler, play_options); +} + Player::Player( std::unique_ptr reader, const rosbag2_storage::StorageOptions & storage_options, @@ -1578,12 +1651,40 @@ Player::Player( const rosbag2_transport::PlayOptions & play_options, const std::string & node_name, const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)) +{ + std::vector input_bags{}; + input_bags.emplace_back(std::move(reader), storage_options); + pimpl_ = std::make_unique( + this, std::move(input_bags), keyboard_handler, play_options); +} + +Player::Player( + std::vector && input_bags, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player( + std::move(input_bags), + play_options.disable_keyboard_controls ? nullptr : std::make_shared(), + play_options, + node_name, + node_options) +{} + +Player::Player( + std::vector && input_bags, + std::shared_ptr keyboard_handler, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) : rclcpp::Node( node_name, rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), pimpl_(std::make_unique( - this, std::move(reader), std::move(keyboard_handler), - storage_options, play_options)) + this, std::move(input_bags), std::move(keyboard_handler), play_options)) {} Player::~Player() = default; @@ -1703,7 +1804,7 @@ size_t Player::get_number_of_registered_on_play_msg_post_callbacks() return pimpl_->get_number_of_registered_on_play_msg_post_callbacks(); } -const rosbag2_storage::StorageOptions & Player::get_storage_options() +std::vector Player::get_storage_options() { return pimpl_->get_storage_options(); } diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index 9220679b0b..0283145e71 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -35,6 +35,13 @@ class MockPlayer : public rosbag2_transport::Player : Player(std::move(reader), storage_options, play_options, node_name) {} + MockPlayer( + std::vector && input_bags, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_mock_player") + : Player(std::move(input_bags), play_options, node_name) + {} + explicit MockPlayer( const std::string & node_name = "rosbag2_mock_composable_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index 3875a0b1b0..4fc71f2003 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -178,19 +178,20 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { play_options.service_requests_source, rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); - EXPECT_EQ(storage_options.uri, uri_str); - EXPECT_EQ(storage_options.storage_id, GetParam()); - EXPECT_EQ(storage_options.storage_config_uri, ""); - EXPECT_EQ(storage_options.max_bagfile_size, 12345); - EXPECT_EQ(storage_options.max_bagfile_duration, 54321); - EXPECT_EQ(storage_options.max_cache_size, 9898); - EXPECT_EQ(storage_options.storage_preset_profile, "resilient"); - EXPECT_EQ(storage_options.snapshot_mode, false); + ASSERT_EQ(1, storage_options.size()); + EXPECT_EQ(storage_options[0].uri, uri_str); + EXPECT_EQ(storage_options[0].storage_id, GetParam()); + EXPECT_EQ(storage_options[0].storage_config_uri, ""); + EXPECT_EQ(storage_options[0].max_bagfile_size, 12345); + EXPECT_EQ(storage_options[0].max_bagfile_duration, 54321); + EXPECT_EQ(storage_options[0].max_cache_size, 9898); + EXPECT_EQ(storage_options[0].storage_preset_profile, "resilient"); + EXPECT_EQ(storage_options[0].snapshot_mode, false); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"} }; - EXPECT_EQ(storage_options.custom_data, custom_data); + EXPECT_EQ(storage_options[0].custom_data, custom_data); } TEST_P(ComposablePlayerIntegrationTests, player_can_automatically_play_file_after_composition) { diff --git a/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp b/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp new file mode 100644 index 0000000000..b6070c9b7a --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp @@ -0,0 +1,124 @@ +// Copyright 2024 Apex.AI, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "meta_priority_queue.hpp" + +TEST(meta_priority_queue, test_queue_trivial) +{ + MetaPriorityQueue queue(1u, [](const int & v){return v;}); + + EXPECT_TRUE(queue.enqueue(1, 0u)); + EXPECT_TRUE(queue.enqueue(2, 0u)); + EXPECT_TRUE(queue.enqueue(3, 0u)); + EXPECT_TRUE(queue.enqueue(4, 0u)); + + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(2, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(3, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(4, *queue.peek()); + EXPECT_TRUE(queue.pop()); +} + +TEST(meta_priority_queue, test_queue_multi) +{ + MetaPriorityQueue queue(3u, [](const int & v){return v;}); + + // Empty + EXPECT_EQ(nullptr, queue.peek()); + EXPECT_FALSE(queue.pop()); + EXPECT_EQ(nullptr, queue.peek()); + EXPECT_EQ(0u, queue.size_approx()); + EXPECT_EQ(0u, queue.size_approx(0u)); + EXPECT_EQ(0u, queue.size_approx(1u)); + EXPECT_EQ(0u, queue.size_approx(2u)); + + // First new element + EXPECT_TRUE(queue.enqueue(2, 1u)); + EXPECT_EQ(2, *queue.peek()); + EXPECT_EQ(1u, queue.size_approx()); + EXPECT_EQ(0u, queue.size_approx(0u)); + EXPECT_EQ(1u, queue.size_approx(1u)); + EXPECT_EQ(0u, queue.size_approx(2u)); + EXPECT_EQ(2, *queue.peek()); + + // New element in different queue + EXPECT_TRUE(queue.enqueue(1, 2u)); + EXPECT_EQ(2u, queue.size_approx()); + EXPECT_EQ(0u, queue.size_approx(0u)); + EXPECT_EQ(1u, queue.size_approx(1u)); + EXPECT_EQ(1u, queue.size_approx(2u)); + EXPECT_EQ(1, *queue.peek()); + + // New element in same queue + EXPECT_TRUE(queue.enqueue(3, 2u)); + + // Pop + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(2, *queue.peek()); + EXPECT_TRUE(queue.pop()); + std::cout << "line 57" << std::endl; + EXPECT_EQ(3, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(nullptr, queue.peek()); + EXPECT_FALSE(queue.pop()); + EXPECT_EQ(0u, queue.size_approx()); + + // Enqueue then dequeue a bunch + EXPECT_TRUE(queue.enqueue(1, 0u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(4, 2u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(6, 2u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(2, 1u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(3, 0u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(5, 0u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(3, 1u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.enqueue(7, 1u)); + EXPECT_EQ(1, *queue.peek()); + EXPECT_EQ(8u, queue.size_approx()); + EXPECT_EQ(3u, queue.size_approx(0u)); + EXPECT_EQ(3u, queue.size_approx(1u)); + EXPECT_EQ(2u, queue.size_approx(2u)); + + EXPECT_EQ(1, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(2, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(3, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(3, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(4, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(5, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(6, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(7, *queue.peek()); + EXPECT_TRUE(queue.pop()); + EXPECT_EQ(nullptr, queue.peek()); + EXPECT_FALSE(queue.pop()); + EXPECT_EQ(0u, queue.size_approx()); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 8b7b40f509..42fcaf379f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -174,6 +174,68 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_multi) +{ + auto msg = get_messages_basic_types()[0]; + msg->int32_value = 42; + + auto topic_types = std::vector{ + {1u, "topic1", "test_msgs/BasicTypes", "", {}, ""}, + {2u, "topic2", "test_msgs/BasicTypes", "", {}, ""}, + }; + + // Make sure each reader's/bag's messages are ordered by time + // However, do interlace messages across bags + constexpr std::size_t num_bags = 3u; + std::vector>> messages_list{}; + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 1, msg), + serialize_test_message("topic2", 5, msg), + serialize_test_message("topic1", 8, msg), + serialize_test_message("topic2", 10, msg), + serialize_test_message("topic1", 13, msg), + serialize_test_message("topic2", 14, msg)}); + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 2, msg), + serialize_test_message("topic2", 3, msg), + serialize_test_message("topic1", 6, msg), + serialize_test_message("topic2", 10, msg), + serialize_test_message("topic1", 12, msg), + serialize_test_message("topic2", 16, msg)}); + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 1, msg), + serialize_test_message("topic2", 4, msg), + serialize_test_message("topic1", 7, msg), + serialize_test_message("topic2", 9, msg), + serialize_test_message("topic1", 11, msg), + serialize_test_message("topic2", 15, msg)}); + std::vector bags{}; + std::size_t total_messages = 0u; + for (std::size_t i = 0u; i < num_bags; i++) { + auto prepared_mock_reader = std::make_unique(); + total_messages += messages_list[i].size(); + prepared_mock_reader->prepare(messages_list[i], topic_types); + bags.emplace_back( + std::make_unique(std::move(prepared_mock_reader)), storage_options_); + } + ASSERT_GT(total_messages, 0u); + + auto player = std::make_shared(std::move(bags), play_options_); + std::size_t num_played_messages = 0u; + rcutils_time_point_value_t last_timetamp = \ + std::numeric_limits::lowest(); + const auto callback = [&](std::shared_ptr msg) { + // Make sure messages are played in order + EXPECT_LE(last_timetamp, msg->recv_timestamp); + last_timetamp = msg->recv_timestamp; + num_played_messages++; + }; + player->add_on_play_message_pre_callback(callback); + player->play(); + player->wait_for_playback_to_finish(); + EXPECT_EQ(total_messages, num_played_messages); +} + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1";