-
Notifications
You must be signed in to change notification settings - Fork 252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add record, stop, start_discovery, stop_discovery and is_discovery_stopped services for rosbag2_transport::Recorder #1840
base: rolling
Are you sure you want to change the base?
Changes from all commits
4072468
6df3340
56cd53d
9da1899
bf5f3c0
2060659
afdefb5
df51ea7
2c1ed44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
--- | ||
bool stopped |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
--- |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
--- |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
--- |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -86,6 +86,9 @@ class RecorderImpl | |||||
/// Stop discovery | ||||||
void stop_discovery(); | ||||||
|
||||||
/// Return the current discovery state. | ||||||
bool is_discovery_stopped(); | ||||||
|
||||||
std::unordered_map<std::string, std::string> get_requested_or_available_topics(); | ||||||
|
||||||
/// Public members for access by wrapper | ||||||
|
@@ -135,15 +138,20 @@ class RecorderImpl | |||||
std::string serialization_format_; | ||||||
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_; | ||||||
std::unordered_set<std::string> topic_unknown_types_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::IsDiscoveryStopped>::SharedPtr srv_is_discovery_stopped_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::Record>::SharedPtr srv_record_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::StartDiscovery>::SharedPtr srv_start_discovery_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::Stop>::SharedPtr srv_stop_; | ||||||
rclcpp::Service<rosbag2_interfaces::srv::StopDiscovery>::SharedPtr srv_stop_discovery_; | ||||||
|
||||||
std::mutex start_stop_transition_mutex_; | ||||||
std::mutex discovery_mutex_; | ||||||
std::atomic<bool> stop_discovery_ = false; | ||||||
std::atomic<bool> stop_discovery_ = true; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stop_discovery_ shall be false by default. Otherwise, we will immediately exit from
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stop_discovery_ will be exchanged for false in start_discovery(), and if it was previously true, then topics_discovery will be started. it should not exit because of the exchange. subsequently stop_discovery() will exchange stop_discovery_ to true and when start_discovery() is called again it can again exchange stop_discovery_ for false and start topics_discovery() |
||||||
std::atomic_uchar paused_ = 0; | ||||||
std::atomic<bool> in_recording_ = false; | ||||||
std::shared_ptr<KeyboardHandler> keyboard_handler_; | ||||||
|
@@ -294,6 +302,56 @@ void RecorderImpl::record() | |||||
writer_->split_bagfile(); | ||||||
}); | ||||||
|
||||||
srv_start_discovery_ = node->create_service<rosbag2_interfaces::srv::StartDiscovery>( | ||||||
"~/start_discovery", | ||||||
[this]( | ||||||
const std::shared_ptr<rmw_request_id_t>/* request_header */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::StartDiscovery::Request>/* request */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::StartDiscovery::Response>/* response */) | ||||||
{ | ||||||
start_discovery(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to check |
||||||
}); | ||||||
|
||||||
srv_stop_discovery_ = node->create_service<rosbag2_interfaces::srv::StopDiscovery>( | ||||||
"~/stop_discovery", | ||||||
[this]( | ||||||
const std::shared_ptr<rmw_request_id_t>/* request_header */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::StopDiscovery::Request>/* request */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::StopDiscovery::Response>/* response */) | ||||||
{ | ||||||
stop_discovery(); | ||||||
}); | ||||||
|
||||||
srv_is_discovery_stopped_ = node->create_service<rosbag2_interfaces::srv::IsDiscoveryStopped>( | ||||||
"~/is_discovery_stopped", | ||||||
[this]( | ||||||
const std::shared_ptr<rmw_request_id_t>/* request_header */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::IsDiscoveryStopped::Request>/* request */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::IsDiscoveryStopped::Response> response) | ||||||
{ | ||||||
response->stopped = is_discovery_stopped(); | ||||||
}); | ||||||
|
||||||
srv_record_ = node->create_service<rosbag2_interfaces::srv::Record>( | ||||||
"~/record", | ||||||
[this]( | ||||||
const std::shared_ptr<rmw_request_id_t>/* request_header */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::Record::Request>/* request */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::Record::Response>/* response */) | ||||||
{ | ||||||
record(); | ||||||
}); | ||||||
|
||||||
srv_stop_ = node->create_service<rosbag2_interfaces::srv::Stop>( | ||||||
"~/stop", | ||||||
[this]( | ||||||
const std::shared_ptr<rmw_request_id_t>/* request_header */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::Stop::Request>/* request */, | ||||||
const std::shared_ptr<rosbag2_interfaces::srv::Stop::Response>/* response */) | ||||||
{ | ||||||
stop(); | ||||||
}); | ||||||
|
||||||
srv_pause_ = node->create_service<rosbag2_interfaces::srv::Pause>( | ||||||
"~/pause", | ||||||
[this]( | ||||||
|
@@ -328,6 +386,10 @@ void RecorderImpl::record() | |||||
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1); | ||||||
|
||||||
// Start the thread that will publish events | ||||||
{ | ||||||
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_); | ||||||
event_publisher_thread_should_exit_ = false; | ||||||
} | ||||||
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); | ||||||
|
||||||
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; | ||||||
|
@@ -436,7 +498,7 @@ bool RecorderImpl::is_paused() | |||||
void RecorderImpl::start_discovery() | ||||||
{ | ||||||
std::lock_guard<std::mutex> state_lock(discovery_mutex_); | ||||||
if (stop_discovery_.exchange(false)) { | ||||||
if (!stop_discovery_.exchange(false)) { | ||||||
RCLCPP_DEBUG(node->get_logger(), "Recorder topic discovery is already running."); | ||||||
} else { | ||||||
discovery_future_ = | ||||||
|
@@ -464,6 +526,11 @@ void RecorderImpl::stop_discovery() | |||||
} | ||||||
} | ||||||
|
||||||
bool RecorderImpl::is_discovery_stopped() | ||||||
{ | ||||||
return stop_discovery_.load(); | ||||||
} | ||||||
|
||||||
void RecorderImpl::topics_discovery() | ||||||
{ | ||||||
// If using sim time - wait until /clock topic received before even creating subscriptions | ||||||
|
@@ -849,6 +916,12 @@ Recorder::is_paused() | |||||
return pimpl_->is_paused(); | ||||||
} | ||||||
|
||||||
bool | ||||||
Recorder::is_discovery_stopped() | ||||||
{ | ||||||
return pimpl_->is_discovery_stopped(); | ||||||
} | ||||||
|
||||||
std::unordered_map<std::string, std::string> | ||||||
Recorder::get_requested_or_available_topics() | ||||||
{ | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,16 @@ | |
|
||
#include "rclcpp/rclcpp.hpp" | ||
|
||
#include "rosbag2_interfaces/srv/is_discovery_stopped.hpp" | ||
#include "rosbag2_interfaces/srv/is_paused.hpp" | ||
#include "rosbag2_interfaces/srv/pause.hpp" | ||
#include "rosbag2_interfaces/srv/record.hpp" | ||
#include "rosbag2_interfaces/srv/resume.hpp" | ||
#include "rosbag2_interfaces/srv/snapshot.hpp" | ||
#include "rosbag2_interfaces/srv/split_bagfile.hpp" | ||
#include "rosbag2_interfaces/srv/start_discovery.hpp" | ||
#include "rosbag2_interfaces/srv/stop.hpp" | ||
#include "rosbag2_interfaces/srv/stop_discovery.hpp" | ||
#include "rosbag2_transport/recorder.hpp" | ||
|
||
#include "rosbag2_test_common/publication_manager.hpp" | ||
|
@@ -39,11 +44,16 @@ using namespace ::testing; // NOLINT | |
class RecordSrvsTest : public RecordIntegrationTestFixture | ||
{ | ||
public: | ||
using IsDiscoveryStopped = rosbag2_interfaces::srv::IsDiscoveryStopped; | ||
using IsPaused = rosbag2_interfaces::srv::IsPaused; | ||
using Pause = rosbag2_interfaces::srv::Pause; | ||
using Record = rosbag2_interfaces::srv::Record; | ||
using Resume = rosbag2_interfaces::srv::Resume; | ||
using Snapshot = rosbag2_interfaces::srv::Snapshot; | ||
using SplitBagfile = rosbag2_interfaces::srv::SplitBagfile; | ||
using StartDiscovery = rosbag2_interfaces::srv::StartDiscovery; | ||
using Stop = rosbag2_interfaces::srv::Stop; | ||
using StopDiscovery = rosbag2_interfaces::srv::StopDiscovery; | ||
|
||
explicit RecordSrvsTest(bool snapshot_mode = false) | ||
: RecordIntegrationTestFixture(), | ||
|
@@ -80,11 +90,17 @@ class RecordSrvsTest : public RecordIntegrationTestFixture | |
pub_manager.setup_publisher(test_topic_, string_message, 10); | ||
|
||
const std::string ns = "/" + recorder_name_; | ||
cli_is_discovery_stopped_ = client_node_->create_client<IsDiscoveryStopped>(ns + | ||
"/is_discovery_stopped"); | ||
cli_is_paused_ = client_node_->create_client<IsPaused>(ns + "/is_paused"); | ||
cli_pause_ = client_node_->create_client<Pause>(ns + "/pause"); | ||
cli_record_ = client_node_->create_client<Record>(ns + "/record"); | ||
cli_resume_ = client_node_->create_client<Resume>(ns + "/resume"); | ||
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot"); | ||
cli_split_bagfile_ = client_node_->create_client<SplitBagfile>(ns + "/split_bagfile"); | ||
cli_start_discovery_ = client_node_->create_client<StartDiscovery>(ns + "/start_discovery"); | ||
cli_stop_ = client_node_->create_client<Stop>(ns + "/stop"); | ||
cli_stop_discovery_ = client_node_->create_client<StopDiscovery>(ns + "/stop_discovery"); | ||
exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>(); | ||
|
||
exec_->add_node(recorder_); | ||
|
@@ -141,11 +157,16 @@ class RecordSrvsTest : public RecordIntegrationTestFixture | |
|
||
// Service clients | ||
rclcpp::Node::SharedPtr client_node_; | ||
rclcpp::Client<IsDiscoveryStopped>::SharedPtr cli_is_discovery_stopped_; | ||
rclcpp::Client<IsPaused>::SharedPtr cli_is_paused_; | ||
rclcpp::Client<Pause>::SharedPtr cli_pause_; | ||
rclcpp::Client<Record>::SharedPtr cli_record_; | ||
rclcpp::Client<Resume>::SharedPtr cli_resume_; | ||
rclcpp::Client<Snapshot>::SharedPtr cli_snapshot_; | ||
rclcpp::Client<SplitBagfile>::SharedPtr cli_split_bagfile_; | ||
rclcpp::Client<StartDiscovery>::SharedPtr cli_start_discovery_; | ||
rclcpp::Client<Stop>::SharedPtr cli_stop_; | ||
rclcpp::Client<StopDiscovery>::SharedPtr cli_stop_discovery_; | ||
|
||
bool snapshot_mode_; | ||
}; | ||
|
@@ -226,3 +247,37 @@ TEST_F(RecordSrvsTest, pause_resume) | |
is_paused_response = successful_service_request<IsPaused>(cli_is_paused_); | ||
EXPECT_FALSE(is_paused_response->paused); | ||
} | ||
|
||
TEST_F(RecordSrvsTest, stop_start_discovery) | ||
{ | ||
EXPECT_FALSE(recorder_->is_discovery_stopped()); | ||
auto is_discovery_stopped_response = | ||
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_); | ||
EXPECT_FALSE(is_discovery_stopped_response->stopped); | ||
|
||
successful_service_request<StopDiscovery>(cli_stop_discovery_); | ||
EXPECT_TRUE(recorder_->is_discovery_stopped()); | ||
is_discovery_stopped_response = | ||
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_); | ||
EXPECT_TRUE(is_discovery_stopped_response->stopped); | ||
|
||
successful_service_request<StartDiscovery>(cli_start_discovery_); | ||
EXPECT_FALSE(recorder_->is_discovery_stopped()); | ||
is_discovery_stopped_response = | ||
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_); | ||
EXPECT_FALSE(is_discovery_stopped_response->stopped); | ||
} | ||
|
||
TEST_F(RecordSrvsTest, record_stop) | ||
{ | ||
auto & writer = recorder_->get_writer_handle(); | ||
auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle()); | ||
|
||
EXPECT_FALSE(mock_writer.closed_was_called()); | ||
|
||
successful_service_request<Stop>(cli_stop_); | ||
EXPECT_TRUE(mock_writer.closed_was_called()); | ||
|
||
successful_service_request<Record>(cli_record_); | ||
EXPECT_FALSE(mock_writer.closed_was_called()); | ||
} | ||
Comment on lines
+271
to
+283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add recorder start after the stop and then again stop to make sure that we can start again after the stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename to the
is_discovery_running()