diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 363d43d59..1047f6eb3 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -168,6 +168,13 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], help='Logging level.') + parser.add_argument( + '--progress-bar', type=float, default=10.0, + help='Print a progress bar of the playback player at a specific frequency in Hz. ' + 'Default is %(default)d. ' + 'Negative values mark an update for every published message, while ' + ' a zero value disables the progress bar.', + metavar='PROGRESS_BAR_FREQUENCY') def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int: nano_scale = 1000 * 1000 * 1000 @@ -243,6 +250,7 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION + play_options.progress_bar_print_frequency = args.progress_bar player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index fdd8f0780..79462dde8 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -17,6 +17,7 @@ class PlayOptions: node_prefix: str playback_duration: float playback_until_timestamp: int + progress_bar_print_frequency: float publish_service_requests: bool rate: float read_ahead_queue_size: int diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9d15e82b1..0e1788c7c 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -544,6 +544,7 @@ PYBIND11_MODULE(_transport, m) { "playback_until_timestamp", &PlayOptions::getPlaybackUntilTimestamp, &PlayOptions::setPlaybackUntilTimestamp) + .def_readwrite("progress_bar_print_frequency", &PlayOptions::progress_bar_print_frequency) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index e5029c923..cf2ad2212 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -123,6 +123,9 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + // Rate in Hz at which to print progress bar. + double progress_bar_print_frequency = 10.0; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 74a911f1d..e76475adb 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -80,6 +80,14 @@ rclcpp::QoS publisher_qos_for_topic( namespace rosbag2_transport { + +enum class PlayerStatus : char +{ + RUNNING = 'R', + PAUSED = 'P', + DELAYED = 'D', +}; + constexpr Player::callback_handle_t Player::invalid_callback_handle; class PlayerImpl @@ -123,12 +131,13 @@ class PlayerImpl virtual bool set_rate(double); /// \brief Playing next message from queue when in pause. + /// \param in_burst_mode If true, it will not call the progress bar update to avoid delays. /// \details This is blocking call and it will wait until next available message will be /// published or rclcpp context shut down. /// \note If internal player queue is starving and storage has not been completely loaded, /// this method will wait until new element will be pushed to the queue. /// \return true if player in pause mode and successfully played next message, otherwise false. - virtual bool play_next(); + virtual bool play_next(bool in_burst_mode = false); /// \brief Burst the next \p num_messages messages from the queue when paused. /// \param num_messages The number of messages to burst from the queue. Specifying zero means no @@ -298,6 +307,9 @@ class PlayerImpl void configure_play_until_timestamp(); bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + void print_progress_bar_help_str(double progress_bar_print_frequency) const; + void print_progress_bar_status(const PlayerStatus & status, bool force_update = false); + static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; std::atomic_bool cancel_wait_for_next_message_{false}; @@ -328,6 +340,8 @@ class PlayerImpl std::condition_variable playback_finished_cv_; rcutils_time_point_value_t starting_time_; + double starting_time_secs_; + double duration_secs_; // control services rclcpp::Service::SharedPtr srv_pause_; @@ -349,6 +363,13 @@ class PlayerImpl std::vector keyboard_callbacks_; std::shared_ptr player_service_client_manager_; + + // progress bar + const bool enable_progress_bar_; + const bool progress_bar_update_always_; + const rcutils_duration_value_t progress_bar_update_period_; + std::chrono::steady_clock::time_point progress_bar_last_time_printed_{}; + rcutils_time_point_value_t recv_timestamp_last_published_; }; PlayerImpl::PlayerImpl( @@ -361,7 +382,12 @@ PlayerImpl::PlayerImpl( storage_options_(storage_options), play_options_(play_options), keyboard_handler_(std::move(keyboard_handler)), - player_service_client_manager_(std::make_shared()) + player_service_client_manager_(std::make_shared()), + enable_progress_bar_(play_options.progress_bar_print_frequency != 0), + progress_bar_update_always_(play_options.progress_bar_print_frequency < 0), + progress_bar_update_period_(play_options.progress_bar_print_frequency != 0 ? + RCUTILS_S_TO_NS(1.0 / play_options.progress_bar_print_frequency) : + std::numeric_limits::max()) { for (auto & topic : play_options_.topics_to_filter) { topic = rclcpp::expand_topic_or_service_name( @@ -393,6 +419,7 @@ PlayerImpl::PlayerImpl( // keep reader open until player is destroyed reader_->open(storage_options_, {"", rmw_get_serialization_format()}); auto metadata = reader_->get_metadata(); + rcutils_duration_value_t bag_duration = metadata.duration.count(); starting_time_ = std::chrono::duration_cast( metadata.starting_time.time_since_epoch()).count(); // If a non-default (positive) starting time offset is provided in PlayOptions, @@ -405,7 +432,11 @@ PlayerImpl::PlayerImpl( ". Negative start offset ignored."); } else { starting_time_ += play_options_.start_offset; + bag_duration -= play_options_.start_offset; } + starting_time_secs_ = RCUTILS_NS_TO_S(static_cast(starting_time_)); + duration_secs_ = RCUTILS_NS_TO_S(static_cast(bag_duration)); + recv_timestamp_last_published_ = starting_time_; clock_ = std::make_unique( starting_time_, std::chrono::steady_clock::now, std::chrono::milliseconds{100}, play_options_.start_paused); @@ -416,6 +447,7 @@ PlayerImpl::PlayerImpl( } create_control_services(); add_keyboard_callbacks(); + print_progress_bar_help_str(play_options.progress_bar_print_frequency); } PlayerImpl::~PlayerImpl() @@ -436,6 +468,10 @@ PlayerImpl::~PlayerImpl() if (reader_) { reader_->close(); } + // print new line to keep on screen the latest progress bar (which had a carriage return) + if (enable_progress_bar_) { + std::cout << std::endl; + } } const std::chrono::milliseconds @@ -486,8 +522,12 @@ bool PlayerImpl::play() do { if (delay > rclcpp::Duration(0, 0)) { RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns"); + print_progress_bar_status(PlayerStatus::DELAYED); std::chrono::nanoseconds delay_duration(delay.nanoseconds()); std::this_thread::sleep_for(delay_duration); + } else { + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); } { std::lock_guard lk(reader_mutex_); @@ -552,6 +592,8 @@ bool PlayerImpl::play() is_in_playback_ = false; playback_finished_cv_.notify_all(); } + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); }); return true; } @@ -602,18 +644,22 @@ void PlayerImpl::stop() playback_thread_.join(); } } + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); } void PlayerImpl::pause() { clock_->pause(); RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play."); + print_progress_bar_status(PlayerStatus::PAUSED); } void PlayerImpl::resume() { clock_->resume(); RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play."); + print_progress_bar_status(PlayerStatus::RUNNING, true); } void PlayerImpl::toggle_paused() @@ -640,6 +686,8 @@ bool PlayerImpl::set_rate(double rate) } else { RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate); } + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); return ok; } @@ -672,10 +720,11 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro return nullptr; } -bool PlayerImpl::play_next() +bool PlayerImpl::play_next(bool is_burst_mode) { if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); + print_progress_bar_status(PlayerStatus::RUNNING, true); return false; } @@ -688,6 +737,7 @@ bool PlayerImpl::play_next() // resume() or stop() from another thread while we were waiting on mutex. if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); + print_progress_bar_status(PlayerStatus::RUNNING, true); return false; } skip_message_in_main_play_loop_ = true; @@ -707,6 +757,11 @@ bool PlayerImpl::play_next() next_message_published = publish_message(message_ptr); clock_->jump(message_ptr->recv_timestamp); + // Update the progress bar if we were not called in burst mode to avoid delays. + if (!is_burst_mode) { + recv_timestamp_last_published_ = message_ptr->recv_timestamp; + print_progress_bar_status(PlayerStatus::PAUSED); + } message_queue_.pop(); message_ptr = peek_next_message_from_queue(); } @@ -717,13 +772,14 @@ size_t PlayerImpl::burst(const size_t num_messages) { if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state."); + print_progress_bar_status(PlayerStatus::RUNNING, true); return 0; } uint64_t messages_played = 0; for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) { - if (play_next()) { + if (play_next(true)) { ++messages_played; } else { break; @@ -731,6 +787,8 @@ size_t PlayerImpl::burst(const size_t num_messages) } RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages."); + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); return messages_played; } @@ -956,6 +1014,8 @@ void PlayerImpl::play_messages_from_queue() continue; } publish_message(message_ptr); + recv_timestamp_last_published_ = message_ptr->recv_timestamp; + print_progress_bar_status(PlayerStatus::RUNNING); } message_queue_.pop(); message_ptr = peek_next_message_from_queue(); @@ -1527,6 +1587,59 @@ const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options() return play_options_; } +void PlayerImpl::print_progress_bar_help_str(double progress_bar_print_frequency) const +{ + std::string progress_bar_help_str; + if (enable_progress_bar_) { + if (progress_bar_update_period_ < 0) { + progress_bar_help_str = "Progress bar enabled with updates for every message."; + } else { + std::ostringstream oss; + oss << "Progress bar enabled with updates at " + << std::fixed << std::setprecision(1) << progress_bar_print_frequency << " Hz."; + progress_bar_help_str = oss.str(); + } + progress_bar_help_str += " [?]: [R]unning, [P]aused, [D]elayed."; + } else { + progress_bar_help_str = "Progress bar disabled."; + } + RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str); +} + +void PlayerImpl::print_progress_bar_status(const PlayerStatus & status, bool force_update) +{ + if (!enable_progress_bar_) { + return; + } + + const std::chrono::steady_clock::time_point steady_time_now = std::chrono::steady_clock::now(); + + // Check if we should force an update of the progress bar, otherwise check time since last update. + // Force an update if: + // - the user-defined update period is negative; + // - or this function was called with a status other than RUNNING (since we want to update the + // progress bar when we pause or delay the playback, otherwise it will be stuck + // to the last update if not enough time has passed since it); + // - or this function was called with force_update set to true. + if (!(progress_bar_update_always_ || status != PlayerStatus::RUNNING || force_update)) { + if (std::chrono::duration_cast( + steady_time_now - progress_bar_last_time_printed_).count() < progress_bar_update_period_) + { + return; + } + } + + const double current_time_secs = RCUTILS_NS_TO_S( + static_cast(recv_timestamp_last_published_)); + const double progress_secs = current_time_secs - starting_time_secs_; + // Print progress bar ending with carriage return '/r' so it will be overwritten by next print + std::cout << " Bag Time " << std::setw(13) << std::fixed << std::setprecision(6) << + current_time_secs << " Duration " << std::fixed << std::setprecision(6) << + progress_secs << "/" << duration_secs_ << " [" << static_cast(status) << + "]\r" << std::flush; + progress_bar_last_time_printed_ = steady_time_now; +} + /////////////////////////////// // Player public interface