From 253256a75cdf26c946ac7858425fc4c6794fd6a7 Mon Sep 17 00:00:00 2001 From: Nicola Loi Date: Mon, 28 Oct 2024 23:00:06 +0000 Subject: [PATCH] Code review Signed-off-by: Nicola Loi --- ros2bag/ros2bag/verb/play.py | 10 +- rosbag2_py/rosbag2_py/_transport.pyi | 2 +- rosbag2_py/src/rosbag2_py/_transport.cpp | 3 +- .../rosbag2_transport/play_options.hpp | 4 +- .../src/rosbag2_transport/player.cpp | 155 +++++++++++------- 5 files changed, 104 insertions(+), 70 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index e033654e8..1047f6eb3 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -169,8 +169,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102 choices=['debug', 'info', 'warn', 'error', 'fatal'], help='Logging level.') parser.add_argument( - '--progress-bar', action='store_true', default=False, - help='Print a progress bar of the playback player.') + '--progress-bar', type=float, default=10.0, + help='Print a progress bar of the playback player at a specific frequency in Hz. ' + 'Default is %(default)d. ' + 'Negative values mark an update for every published message, while ' + ' a zero value disables the progress bar.', + metavar='PROGRESS_BAR_FREQUENCY') def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int: nano_scale = 1000 * 1000 * 1000 @@ -246,7 +250,7 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION - play_options.disable_progress_bar = not args.progress_bar + play_options.progress_bar_print_frequency = args.progress_bar player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index b6b66f288..79462dde8 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -10,7 +10,6 @@ class PlayOptions: delay: float disable_keyboard_controls: bool disable_loan_message: bool - disable_progress_bar: bool exclude_regex_to_filter: str exclude_service_events_to_filter: List[str] exclude_topics_to_filter: List[str] @@ -18,6 +17,7 @@ class PlayOptions: node_prefix: str playback_duration: float playback_until_timestamp: int + progress_bar_print_frequency: float publish_service_requests: bool rate: float read_ahead_queue_size: int diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9b2883b06..0e1788c7c 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -544,8 +544,7 @@ PYBIND11_MODULE(_transport, m) { "playback_until_timestamp", &PlayOptions::getPlaybackUntilTimestamp, &PlayOptions::setPlaybackUntilTimestamp) - .def_readwrite( - "disable_progress_bar", &PlayOptions::disable_progress_bar) + .def_readwrite("progress_bar_print_frequency", &PlayOptions::progress_bar_print_frequency) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index b20f6c909..cf2ad2212 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -124,8 +124,8 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; - // Disable to print a progress bar of the playback player - bool disable_progress_bar = true; + // Rate in Hz at which to print progress bar. + double progress_bar_print_frequency = 10.0; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index e62b448f5..020d335f8 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -80,6 +80,14 @@ rclcpp::QoS publisher_qos_for_topic( namespace rosbag2_transport { + +enum class PlayerStatus : char +{ + RUNNING = 'R', + PAUSED = 'P', + DELAYED = 'D', +}; + constexpr Player::callback_handle_t Player::invalid_callback_handle; class PlayerImpl @@ -123,12 +131,13 @@ class PlayerImpl virtual bool set_rate(double); /// \brief Playing next message from queue when in pause. + /// \param in_burst_mode If true, it will not call the progress bar update to avoid delays. /// \details This is blocking call and it will wait until next available message will be /// published or rclcpp context shut down. /// \note If internal player queue is starving and storage has not been completely loaded, /// this method will wait until new element will be pushed to the queue. /// \return true if player in pause mode and successfully played next message, otherwise false. - virtual bool play_next(); + virtual bool play_next(bool in_burst_mode = false); /// \brief Burst the next \p num_messages messages from the queue when paused. /// \param num_messages The number of messages to burst from the queue. Specifying zero means no @@ -298,12 +307,8 @@ class PlayerImpl void configure_play_until_timestamp(); bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; - template - void print_status() const; - void check_and_print_status() const; - void print_running_status() const; - void print_paused_status() const; - void print_delayed_status() const; + void print_progress_bar_help_str(double progress_bar_print_frequency) const; + void print_progress_bar_status(const PlayerStatus & status, bool force_update = false); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; @@ -360,7 +365,11 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; // progress bar - const bool disable_progress_bar_; + const bool enable_progress_bar_; + const bool progress_bar_update_always_; + const rcutils_duration_value_t progress_bar_update_period_; + std::chrono::steady_clock::time_point progress_bar_last_time_printed_{}; + rcutils_time_point_value_t recv_timestamp_last_published_; }; PlayerImpl::PlayerImpl( @@ -374,7 +383,11 @@ PlayerImpl::PlayerImpl( play_options_(play_options), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()), - disable_progress_bar_(play_options.disable_progress_bar) + enable_progress_bar_(play_options.progress_bar_print_frequency != 0), + progress_bar_update_always_(play_options.progress_bar_print_frequency < 0), + progress_bar_update_period_(play_options.progress_bar_print_frequency != 0 ? + RCUTILS_S_TO_NS(1.0 / play_options.progress_bar_print_frequency) : + std::numeric_limits::max()) { for (auto & topic : play_options_.topics_to_filter) { topic = rclcpp::expand_topic_or_service_name( @@ -423,6 +436,7 @@ PlayerImpl::PlayerImpl( } starting_time_secs_ = RCUTILS_NS_TO_S(static_cast(starting_time_)); duration_secs_ = RCUTILS_NS_TO_S(static_cast(bag_duration)); + recv_timestamp_last_published_ = starting_time_; clock_ = std::make_unique( starting_time_, std::chrono::steady_clock::now, std::chrono::milliseconds{100}, play_options_.start_paused); @@ -433,13 +447,7 @@ PlayerImpl::PlayerImpl( } create_control_services(); add_keyboard_callbacks(); - std::string progress_bar_help_str; - if (!disable_progress_bar_) { - progress_bar_help_str = "Bag Time and Duration [?]: ? = R running, P paused, D delayed"; - } else { - progress_bar_help_str = "Bag Time and Duration progress bar disabled."; - } - RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str); + print_progress_bar_help_str(play_options.progress_bar_print_frequency); } PlayerImpl::~PlayerImpl() @@ -460,10 +468,9 @@ PlayerImpl::~PlayerImpl() if (reader_) { reader_->close(); } - // print new line to keep on screen the latest progress bar - if (!disable_progress_bar_) { - printf("\n"); - fflush(stdout); + // print new line to keep on screen the latest progress bar (which had a carriage return) + if (enable_progress_bar_) { + std::cout << std::endl; } } @@ -515,9 +522,12 @@ bool PlayerImpl::play() do { if (delay > rclcpp::Duration(0, 0)) { RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns"); - print_delayed_status(); + print_progress_bar_status(PlayerStatus::DELAYED); std::chrono::nanoseconds delay_duration(delay.nanoseconds()); std::this_thread::sleep_for(delay_duration); + } else { + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); } { std::lock_guard lk(reader_mutex_); @@ -582,6 +592,8 @@ bool PlayerImpl::play() is_in_playback_ = false; playback_finished_cv_.notify_all(); } + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); }); return true; } @@ -632,21 +644,22 @@ void PlayerImpl::stop() playback_thread_.join(); } } - check_and_print_status(); + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); } void PlayerImpl::pause() { clock_->pause(); RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play."); - print_paused_status(); + print_progress_bar_status(PlayerStatus::PAUSED); } void PlayerImpl::resume() { clock_->resume(); RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play."); - print_running_status(); + print_progress_bar_status(PlayerStatus::RUNNING, true); } void PlayerImpl::toggle_paused() @@ -673,7 +686,8 @@ bool PlayerImpl::set_rate(double rate) } else { RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate); } - check_and_print_status(); + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); return ok; } @@ -706,10 +720,11 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro return nullptr; } -bool PlayerImpl::play_next() +bool PlayerImpl::play_next(bool is_burst_mode) { if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); + print_progress_bar_status(PlayerStatus::RUNNING, true); return false; } @@ -722,6 +737,7 @@ bool PlayerImpl::play_next() // resume() or stop() from another thread while we were waiting on mutex. if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); + print_progress_bar_status(PlayerStatus::RUNNING, true); return false; } skip_message_in_main_play_loop_ = true; @@ -741,7 +757,11 @@ bool PlayerImpl::play_next() next_message_published = publish_message(message_ptr); clock_->jump(message_ptr->recv_timestamp); - print_paused_status(); + // Update the progress bar if we were not called in burst mode to avoid delays. + if (!is_burst_mode) { + recv_timestamp_last_published_ = message_ptr->recv_timestamp; + print_progress_bar_status(PlayerStatus::PAUSED); + } message_queue_.pop(); message_ptr = peek_next_message_from_queue(); } @@ -752,14 +772,14 @@ size_t PlayerImpl::burst(const size_t num_messages) { if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state."); - print_running_status(); + print_progress_bar_status(PlayerStatus::RUNNING, true); return 0; } uint64_t messages_played = 0; for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) { - if (play_next()) { + if (play_next(true)) { ++messages_played; } else { break; @@ -767,7 +787,8 @@ size_t PlayerImpl::burst(const size_t num_messages) } RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages."); - print_running_status(); + print_progress_bar_status(clock_->is_paused() ? + PlayerStatus::PAUSED : PlayerStatus::RUNNING, true); return messages_played; } @@ -993,7 +1014,8 @@ void PlayerImpl::play_messages_from_queue() continue; } publish_message(message_ptr); - print_running_status(); + recv_timestamp_last_published_ = message_ptr->recv_timestamp; + print_progress_bar_status(PlayerStatus::RUNNING); } message_queue_.pop(); message_ptr = peek_next_message_from_queue(); @@ -1565,48 +1587,57 @@ const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options() return play_options_; } -template -void PlayerImpl::print_status() const -{ - static_assert(status == 'R' || status == 'P' || status == 'D', - "Invalid status character"); - double current_time_secs = RCUTILS_NS_TO_S(static_cast(clock_->now())); - double progress_secs = current_time_secs - starting_time_secs_; - printf(" Bag Time %13.6f Duration %.6f/%.6f [%c] \r", - current_time_secs, progress_secs, duration_secs_, status); - fflush(stdout); -} - -void PlayerImpl::check_and_print_status() const +void PlayerImpl::print_progress_bar_help_str(double progress_bar_print_frequency) const { - if (!disable_progress_bar_) { - if (clock_->is_paused()) { - print_status<'P'>(); + std::string progress_bar_help_str; + if (enable_progress_bar_) { + if (progress_bar_update_period_ < 0) { + progress_bar_help_str = "Progress bar enabled with updates for every message."; } else { - print_status<'R'>(); + std::ostringstream oss; + oss << "Progress bar enabled with updates at " + << std::fixed << std::setprecision(1) << progress_bar_print_frequency << " Hz."; + progress_bar_help_str = oss.str(); } + progress_bar_help_str += " [?]: [R]unning, [P]aused, [D]elayed."; + } else { + progress_bar_help_str = "Progress bar disabled."; } + RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str); } -void PlayerImpl::print_running_status() const +void PlayerImpl::print_progress_bar_status(const PlayerStatus & status, bool force_update) { - if (!disable_progress_bar_) { - print_status<'R'>(); + if (!enable_progress_bar_) { + return; } -} -void PlayerImpl::print_paused_status() const -{ - if (!disable_progress_bar_) { - print_status<'P'>(); - } -} + const std::chrono::steady_clock::time_point steady_time_now = std::chrono::steady_clock::now(); -void PlayerImpl::print_delayed_status() const -{ - if (!disable_progress_bar_) { - print_status<'D'>(); + // Check if we should force an update of the progress bar, otherwise check time since last update. + // Force an update if: + // - the user-defined update period is negative; + // - or this function was called with a status other than RUNNING (since we want to update the + // progress bar when we pause or delay the playback, otherwise it will be stuck + // to the last update if not enough time has passed since it); + // - or this function was called with force_update set to true. + if (!(progress_bar_update_always_ || status != PlayerStatus::RUNNING || force_update)) { + if (std::chrono::duration_cast( + steady_time_now - progress_bar_last_time_printed_).count() < progress_bar_update_period_) + { + return; + } } + + const double current_time_secs = RCUTILS_NS_TO_S( + static_cast(recv_timestamp_last_published_)); + const double progress_secs = current_time_secs - starting_time_secs_; + // Print progress bar ending witt carriage return '/r' so it will be overwritten by next print + std::cout << " Bag Time " << std::setw(13) << std::fixed << std::setprecision(6) << + current_time_secs << " Duration " << std::fixed << std::setprecision(6) << + progress_secs << "/" << duration_secs_ << " [" << static_cast(status) << + "]\r" << std::flush; + progress_bar_last_time_printed_ = steady_time_now; } ///////////////////////////////