diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp index 52f951850..5915541b4 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp @@ -28,6 +28,8 @@ const std::string DestinationRequestTopicName = "destination_requests"; const std::string ModeRequestTopicName = "robot_mode_requests"; const std::string PathRequestTopicName = "robot_path_requests"; const std::string PauseRequestTopicName = "robot_pause_requests"; +const std::string FleetStateUpdateTopicName = "fleet_state_update"; +const std::string FleetLogUpdateTopicName = "fleet_log_update"; const std::string FinalDoorRequestTopicName = "door_requests"; const std::string AdapterDoorRequestTopicName = "adapter_door_requests"; @@ -68,6 +70,8 @@ const std::string InterruptRequestTopicName = "robot_interrupt_request"; const std::string TaskApiRequests = "task_api_requests"; const std::string TaskApiResponses = "task_api_responses"; +const std::string TaskStateUpdateTopicName = "task_state_update"; +const std::string TaskLogUpdateTopicName = "task_log_update"; const std::string ChargingAssignmentsTopicName = "charging_assignments"; diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/EasyFullControl.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/EasyFullControl.hpp index 3a3b02fe1..5068094a3 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/EasyFullControl.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/EasyFullControl.hpp @@ -316,6 +316,15 @@ class EasyFullControl::RobotConfiguration /// Set the minimum lane length. void set_min_lane_length(std::optional distance); + /// Get the idle behavior. + /// + /// If std::nullopt is used, then the fleet-wide default finishing request + /// will be used. + std::optional finishing_request() const; + + /// Set the finishing request. + void set_finishing_request(std::optional request); + class Implementation; private: rmf_utils::impl_ptr _pimpl; diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp index 132c749c6..f74a35017 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp @@ -25,6 +25,8 @@ #include +#include + #include #include @@ -102,6 +104,13 @@ class RobotUpdateHandle /// property will be assumed as the charger for this robot. RobotUpdateHandle& set_charger_waypoint(const std::size_t charger_wp); + /// Set a finishing request for this robot. + RobotUpdateHandle& set_finishing_request(rmf_task::ConstRequestFactoryPtr finishing_request); + + /// Set a finishing request for this robot to use the fleet-wide finishing + /// request. + RobotUpdateHandle& use_default_finishing_request(); + /// Update the current battery level of the robot by specifying its state of /// charge as a fraction of its total charge capacity, i.e. a value from 0.0 /// to 1.0. diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 6f50f62f9..d26717c3a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -192,6 +192,15 @@ TaskManagerPtr TaskManager::make( self->_handle_request(request->json_msg, request->request_id); }); + auto reliable_transient_qos = + rclcpp::ServicesQoS().keep_last(10).reliable().transient_local(); + mgr->_task_state_update_pub = + mgr->_context->node()->create_publisher( + TaskStateUpdateTopicName, reliable_transient_qos); + mgr->_task_log_update_pub = + mgr->_context->node()->create_publisher( + TaskLogUpdateTopicName, reliable_transient_qos.keep_last(100)); + const std::vector schemas = { rmf_api_msgs::schemas::task_state, rmf_api_msgs::schemas::task_log, @@ -564,7 +573,7 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr) static const auto task_update_validator = mgr._make_validator(rmf_api_msgs::schemas::task_state_update); - mgr._validate_and_publish_websocket(task_state_update, task_update_validator); + mgr._validate_and_publish_json(task_state_update, task_update_validator); auto task_log_update = nlohmann::json(); task_log_update["type"] = "task_log_update"; @@ -572,7 +581,7 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr) static const auto log_update_validator = mgr._make_validator(rmf_api_msgs::schemas::task_log_update); - mgr._validate_and_publish_websocket(task_log_update, log_update_validator); + mgr._validate_and_publish_json(task_log_update, log_update_validator); } //============================================================================== @@ -941,6 +950,24 @@ void TaskManager::set_idle_task(rmf_task::ConstRequestFactoryPtr task) } } +//============================================================================== +void TaskManager::use_default_idle_task() +{ + auto fleet_handle = _fleet_handle.lock(); + if (!fleet_handle) + { + RCLCPP_ERROR( + _context->node()->get_logger(), + "Attempting to use default idle task for [%s] but its fleet is shutting down", + _context->requester_id().c_str()); + return; + } + const auto& impl = + agv::FleetUpdateHandle::Implementation::get(*fleet_handle); + + set_idle_task(impl.idle_task); +} + //============================================================================== void TaskManager::set_queue( const std::vector& assignments) @@ -1691,7 +1718,13 @@ void TaskManager::_begin_waiting() } if (!_responsive_wait_enabled) + { + if (_waiting) + { + _waiting.cancel({"Idle behavior updated"}, _context->now()); + } return; + } if (_context->location().empty()) { @@ -2029,7 +2062,7 @@ void TaskManager::_schema_loader( } //============================================================================== -void TaskManager::_validate_and_publish_websocket( +void TaskManager::_validate_and_publish_json( const nlohmann::json& msg, const nlohmann::json_schema::json_validator& validator) const { @@ -2044,19 +2077,32 @@ void TaskManager::_validate_and_publish_websocket( return; } - if (!_broadcast_client.has_value()) - return; + if (_broadcast_client.has_value()) + { + const auto client = _broadcast_client->lock(); + if (!client) + { + RCLCPP_ERROR( + _context->node()->get_logger(), + "Unable to lock BroadcastClient within TaskManager of robot [%s]", + _context->name().c_str()); + return; + } + client->publish(msg); + } - const auto client = _broadcast_client->lock(); - if (!client) + if (msg["type"] == "task_state_update") { - RCLCPP_ERROR( - _context->node()->get_logger(), - "Unable to lock BroadcastClient within TaskManager of robot [%s]", - _context->name().c_str()); - return; + TaskStateUpdateMsg update_msg; + update_msg.data = msg.dump(); + _task_state_update_pub->publish(update_msg); + } + else if (msg["type"] == "task_log_update") + { + TaskLogUpdateMsg update_msg; + update_msg.data = msg.dump(); + _task_log_update_pub->publish(update_msg); } - client->publish(msg); } //============================================================================== @@ -2158,7 +2204,7 @@ rmf_task::State TaskManager::_publish_pending_task( static const auto validator = _make_validator(rmf_api_msgs::schemas::task_state_update); - _validate_and_publish_websocket(task_state_update, validator); + _validate_and_publish_json(task_state_update, validator); _pending_task_info[pending.request()] = cache; return pending.finish_state(); @@ -2225,7 +2271,7 @@ void TaskManager::_publish_canceled_pending_task( static const auto validator = _make_validator(rmf_api_msgs::schemas::task_state_update); - _validate_and_publish_websocket(task_state_update, validator); + _validate_and_publish_json(task_state_update, validator); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 429b20808..630128331 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -32,6 +32,8 @@ #include #include +#include + #include #include @@ -125,6 +127,8 @@ class TaskManager : public std::enable_shared_from_this void set_idle_task(rmf_task::ConstRequestFactoryPtr task); + void use_default_idle_task(); + /// Set the queue for this task manager with assignments generated from the /// task planner void set_queue(const std::vector& assignments); @@ -395,6 +399,13 @@ class TaskManager : public std::enable_shared_from_this bool _task_state_update_available = true; std::chrono::steady_clock::time_point _last_update_time; + using TaskStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr _task_state_update_pub = + nullptr; + + using TaskLogUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr _task_log_update_pub = nullptr; + // Container to keep track of tasks that have been started by this TaskManager // Use the _register_executed_task() to populate this container. std::vector _executed_task_registry; @@ -552,7 +563,7 @@ class TaskManager : public std::enable_shared_from_this /// Validate and publish a json. This can be used for task /// state and log updates - void _validate_and_publish_websocket( + void _validate_and_publish_json( const nlohmann::json& msg, const nlohmann::json_schema::json_validator& validator) const; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/EasyFullControl.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/EasyFullControl.cpp index dfb267e1a..b1d8edd8d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/EasyFullControl.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/EasyFullControl.cpp @@ -122,6 +122,7 @@ class EasyFullControl::RobotConfiguration::Implementation std::optional max_merge_waypoint_distance; std::optional max_merge_lane_distance; std::optional min_lane_length; + std::optional finishing_request; }; //============================================================================== @@ -136,7 +137,8 @@ EasyFullControl::RobotConfiguration::RobotConfiguration( responsive_wait, max_merge_waypoint_distance, max_merge_lane_distance, - min_lane_length + min_lane_length, + std::nullopt })) { // Do nothing @@ -211,6 +213,20 @@ void EasyFullControl::RobotConfiguration::set_min_lane_length( _pimpl->min_lane_length = distance; } +//============================================================================== +std::optional EasyFullControl:: +RobotConfiguration::finishing_request() const +{ + return _pimpl->finishing_request; +} + +//============================================================================== +void EasyFullControl::RobotConfiguration::set_finishing_request( + std::optional request) +{ + _pimpl->finishing_request = std::move(request); +} + //============================================================================== class EasyFullControl::RobotCallbacks::Implementation { @@ -1619,6 +1635,152 @@ ConsiderRequest consider_all() }; } +std::optional parse_finishing_request( + const YAML::Node& finishing_request_yaml, + const rmf_traffic::agv::Graph& graph, + std::optional robot_name, + bool& error) +{ + rmf_task::ConstRequestFactoryPtr finishing_request; + std::string finishing_request_string; + + if (!finishing_request_yaml) + { + if (robot_name.has_value()) + { + // No finishing request is provided for this specific robot, default to + // fleet-wide finishing request by returning nullopt. + return std::nullopt; + } + // No finishing request specified for fleet, default to nothing. + std::cout + << "A default finishing request was not provided for the fleet. The " + "valid finishing requests are [charge, park, nothing]. The task planner " + "will default to [nothing]." + << std::endl; + return finishing_request; + } + + if (finishing_request_yaml.IsMap()) + { + // Configure the robot-specific finishing request + const YAML::Node& request_type_yaml = finishing_request_yaml["type"]; + std::string request_type_string; + + if (request_type_yaml) + { + request_type_string = request_type_yaml.as(); + if (request_type_string == "park") + { + // Check if a specific parking spot waypoint was chosen + const YAML::Node& parking_spot_yaml = + finishing_request_yaml["waypoint_name"]; + std::string parking_spot_string; + if (parking_spot_yaml) + { + if (!robot_name.has_value()) + { + // We don't accept this specification for fleet-wide finishing + // requests because we can't send all the same robots to the same + // parking spot. + const auto mark = parking_spot_yaml.Mark(); + std::cerr + << "Cannot assign a specific parking spot waypoint to the " + "fleet-wide default finishing request (line " << (mark.line + 1) + << ", column " << mark.column << ") because then all robots " + "would attempt to park at the same location." << std::endl; + error = true; + return nullptr; + } + + parking_spot_string = parking_spot_yaml.as(); + const auto* parking_wp = graph.find_waypoint(parking_spot_string); + if (!parking_wp) + { + const auto mark = parking_spot_yaml.Mark(); + std::cerr + << "Provided parking spot [" << parking_spot_string + << "] (line " << (mark.line + 1) << ", column " << mark.column + << ") is not found in the fleet navigation graph. Unable to " + "configure the fleet." << std::endl; + error = true; + return nullptr; + } + + std::size_t parking_wp_index = parking_wp->index(); + finishing_request = + std::make_shared( + "idle", nullptr, parking_wp_index); + std::cout + << "Robot [" << robot_name.value() + << "] is configured to perform ParkRobot at [" + << parking_spot_string << "] as finishing request." << std::endl; + return finishing_request; + } + } + finishing_request_string = request_type_string; + } + else + { + const auto mark = finishing_request_yaml.Mark(); + std::cerr + << "Missing [type] for finishing_request object (line " + << (mark.line + 1) << ", column " << mark.column << ")" << std::endl; + error = true; + return nullptr; + } + } + else + { + finishing_request_string = finishing_request_yaml.as(); + } + + if (finishing_request_string == "charge") + { + auto charge_factory = + std::make_shared(); + charge_factory->set_indefinite(true); + finishing_request = charge_factory; + } + else if (finishing_request_string == "park") + { + finishing_request = + std::make_shared( + "idle", nullptr); + } + else if (finishing_request_string == "nothing") + { + // Do nothing + } + else + { + const auto mark = finishing_request_yaml.Mark(); + std::cerr + << "The finishing request [" << finishing_request_string << "] (line " + << (mark.line + 1) << ", column " << mark.column + << ") is unsupported. The valid finishing requests are " + "[charge, park, nothing]."; + + error = true; + return nullptr; + } + + if (robot_name.has_value()) + { + std::cout + << "Robot-specific finishing task for [" << robot_name.value() + << "] set to [" << finishing_request_string << "]" << std::endl; + } + else + { + std::cout + << "Default fleet finishing task set to [" << finishing_request_string + << "]" << std::endl; + } + + return finishing_request; +} + //============================================================================== class EasyFullControl::EasyRobotUpdateHandle::Implementation { @@ -2152,7 +2314,7 @@ EasyFullControl::FleetConfiguration::from_config_files( { const auto mark = retreat_to_charger_yaml.Mark(); std::cout << "[retreat_to_charger_interval] Unsupported value type " - << "provided: line " << mark.line << ", column " << mark.column + << "provided: line " << (mark.line + 1) << ", column " << mark.column << std::endl; return std::nullopt; } @@ -2204,52 +2366,18 @@ EasyFullControl::FleetConfiguration::from_config_files( } // Finishing tasks - std::string finishing_request_string; - const auto& finishing_request_yaml = rmf_fleet["finishing_request"]; - if (!finishing_request_yaml) - { - std::cout - << "Finishing request is not provided. The valid finishing requests " - "are [charge, park, nothing]. The task planner will default to [nothing]." - << std::endl; - } - else - { - finishing_request_string = finishing_request_yaml.as(); - } - std::cout << "Finishing request: " << finishing_request_string << std::endl; - rmf_task::ConstRequestFactoryPtr finishing_request; - if (finishing_request_string == "charge") - { - auto charge_factory = - std::make_shared(); - charge_factory->set_indefinite(true); - finishing_request = charge_factory; - std::cout - << "Fleet is configured to perform ChargeBattery as finishing request" - << std::endl; - } - else if (finishing_request_string == "park") - { - finishing_request = - std::make_shared( - "idle", nullptr); - std::cout - << "Fleet is configured to perform ParkRobot as finishing request" - << std::endl; - } - else if (finishing_request_string == "nothing") - { - std::cout << "Fleet is not configured to perform any finishing request" - << std::endl; - } - else - { - std::cout - << "Provided finishing request " << finishing_request_string - << "is unsupported. The valid finishing requests are" - "[charge, park, nothing]. The task planner will default to [nothing]."; + rmf_task::ConstRequestFactoryPtr default_finishing_request; + const auto& default_finishing_request_yaml = rmf_fleet["finishing_request"]; + bool fleet_request_error = false; + auto configured_default_finishing_request = parse_finishing_request( + default_finishing_request_yaml, graph, std::nullopt, fleet_request_error); + if (fleet_request_error) + { + // Invalid FleetConfiguration + return std::nullopt; } + if (configured_default_finishing_request.has_value()) + default_finishing_request = configured_default_finishing_request.value(); // Ignore rotations within path commands bool skip_rotation_commands = true; @@ -2441,12 +2569,26 @@ EasyFullControl::FleetConfiguration::from_config_files( min_lane_length = min_lane_length_yaml.as(); } + std::optional finishing_request = + std::nullopt; + const YAML::Node& finishing_request_yaml = + robot_config_yaml["finishing_request"]; + bool robot_request_error = false; + finishing_request = parse_finishing_request( + finishing_request_yaml, graph, robot_name, robot_request_error); + if (robot_request_error) + { + // Invalid FleetConfiguration + return std::nullopt; + } + auto config = RobotConfiguration( std::move(chargers), responsive_wait, max_merge_waypoint_distance, max_merge_lane_distance, min_lane_length); + config.set_finishing_request(finishing_request); known_robot_configurations.insert_or_assign(robot_name, config); } else @@ -2503,7 +2645,7 @@ EasyFullControl::FleetConfiguration::from_config_files( { const auto mark = lane_yaml.Mark(); std::cerr << "[strict_lanes] Unrecognized lane format at line " - << mark.line << ", column " << mark.column << std::endl; + << (mark.line + 1) << ", column " << mark.column << std::endl; return std::nullopt; } @@ -2514,7 +2656,7 @@ EasyFullControl::FleetConfiguration::from_config_files( { const auto mark = lane_yaml[0].Mark(); std::cerr << "[strict_lanes] Unrecognized waypoint name [" << wp0_name - << "] at line " << mark.line << ", column " << mark.column << std::endl; + << "] at line " << (mark.line + 1) << ", column " << mark.column << std::endl; return std::nullopt; } @@ -2523,7 +2665,7 @@ EasyFullControl::FleetConfiguration::from_config_files( { const auto mark = lane_yaml[1].Mark(); std::cerr << "[strict_lanes] Unrecognized waypoint name [" << wp1_name - << "] at line " << mark.line << ", column " << mark.column << std::endl; + << "] at line " << (mark.line + 1) << ", column " << mark.column << std::endl; return std::nullopt; } @@ -2569,7 +2711,7 @@ EasyFullControl::FleetConfiguration::from_config_files( { const auto mark = lane_yaml.Mark(); std::cerr << "[strict_lanes] Unable to find a lane from [" << wp0_name - << "] to [" << wp1_name << "] at line " << mark.line << ", column " + << "] to [" << wp1_name << "] at line " << (mark.line + 1) << ", column " << mark.column << std::endl; } } @@ -2590,7 +2732,7 @@ EasyFullControl::FleetConfiguration::from_config_files( account_for_battery_drain, task_consideration, action_consideration, - finishing_request, + default_finishing_request, skip_rotation_commands, server_uri, rmf_traffic::time::from_seconds(max_delay), @@ -3202,6 +3344,8 @@ auto EasyFullControl::add_robot( enable_responsive_wait = *configuration.responsive_wait(); } + auto finishing_request = configuration.finishing_request(); + _pimpl->fleet_handle->add_robot( insertion.first->second, robot_name, @@ -3217,7 +3361,8 @@ auto EasyFullControl::add_robot( action_executor = callbacks.action_executor(), localization = std::move(localization), nav_params = robot_nav_params, - enable_responsive_wait + enable_responsive_wait, + finishing_request ](const RobotUpdateHandlePtr& updater) { auto context = RobotUpdateHandle::Implementation::get(*updater) @@ -3236,7 +3381,8 @@ auto EasyFullControl::add_robot( localization, context, nav_params, - enable_responsive_wait + enable_responsive_wait, + finishing_request ](const auto&) { cmd_handle->w_context = context; @@ -3251,6 +3397,11 @@ auto EasyFullControl::add_robot( handle->set_charger_waypoint(*charger_index); } handle->enable_responsive_wait(enable_responsive_wait); + if (finishing_request.has_value()) + { + handle->set_finishing_request(finishing_request.value()); + context->robot_finishing_request(true); + } RCLCPP_INFO( node->get_logger(), diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 9067ff51d..ef805da3e 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -1226,151 +1226,160 @@ void FleetUpdateHandle::Implementation::update_fleet() const //============================================================================== void FleetUpdateHandle::Implementation::update_fleet_state() const { - // Publish to API server - if (broadcast_client) + nlohmann::json fleet_state_update_msg; + fleet_state_update_msg["type"] = "fleet_state_update"; + auto& fleet_state_msg = fleet_state_update_msg["data"]; + fleet_state_msg["name"] = name; + auto& robots = fleet_state_msg["robots"]; + robots = std::unordered_map(); + for (const auto& [context, mgr] : task_managers) { - nlohmann::json fleet_state_update_msg; - fleet_state_update_msg["type"] = "fleet_state_update"; - auto& fleet_state_msg = fleet_state_update_msg["data"]; - fleet_state_msg["name"] = name; - auto& robots = fleet_state_msg["robots"]; - robots = std::unordered_map(); - for (const auto& [context, mgr] : task_managers) + const auto& name = context->name(); + nlohmann::json& json = robots[name]; + json["name"] = name; + json["status"] = mgr->robot_status(); + json["task_id"] = mgr->current_task_id().value_or(""); + json["unix_millis_time"] = + std::chrono::duration_cast( + context->now().time_since_epoch()).count(); + json["battery"] = context->current_battery_soc(); + + const auto location_msg = convert_location(*context); + if (location_msg.has_value()) { - const auto& name = context->name(); - nlohmann::json& json = robots[name]; - json["name"] = name; - json["status"] = mgr->robot_status(); - json["task_id"] = mgr->current_task_id().value_or(""); - json["unix_millis_time"] = - std::chrono::duration_cast( - context->now().time_since_epoch()).count(); - json["battery"] = context->current_battery_soc(); - - const auto location_msg = convert_location(*context); - if (location_msg.has_value()) - { - nlohmann::json& location = json["location"]; - location["map"] = location_msg->level_name; - location["x"] = location_msg->x; - location["y"] = location_msg->y; - location["yaw"] = location_msg->yaw; - } + nlohmann::json& location = json["location"]; + location["map"] = location_msg->level_name; + location["x"] = location_msg->x; + location["y"] = location_msg->y; + location["yaw"] = location_msg->yaw; + } - std::lock_guard lock(context->reporting().mutex()); - const auto& issues = context->reporting().open_issues(); - auto& issues_msg = json["issues"]; - issues_msg = std::vector(); - for (const auto& issue : issues) - { - nlohmann::json issue_msg; - issue_msg["category"] = issue->category; - issue_msg["detail"] = issue->detail; - issues_msg.push_back(std::move(issue_msg)); - } + std::lock_guard lock(context->reporting().mutex()); + const auto& issues = context->reporting().open_issues(); + auto& issues_msg = json["issues"]; + issues_msg = std::vector(); + for (const auto& issue : issues) + { + nlohmann::json issue_msg; + issue_msg["category"] = issue->category; + issue_msg["detail"] = issue->detail; + issues_msg.push_back(std::move(issue_msg)); + } - const auto& commission = context->commission(); - nlohmann::json commission_json; - commission_json["dispatch_tasks"] = - commission.is_accepting_dispatched_tasks(); - commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); - commission_json["idle_behavior"] = - commission.is_performing_idle_behavior(); + const auto& commission = context->commission(); + nlohmann::json commission_json; + commission_json["dispatch_tasks"] = + commission.is_accepting_dispatched_tasks(); + commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); + commission_json["idle_behavior"] = + commission.is_performing_idle_behavior(); - json["commission"] = commission_json; + json["commission"] = commission_json; - nlohmann::json mutex_groups_json; - std::vector locked_mutex_groups; - for (const auto& g : context->locked_mutex_groups()) - { - locked_mutex_groups.push_back(g.first); - } - mutex_groups_json["locked"] = std::move(locked_mutex_groups); - - std::vector requesting_mutex_groups; - for (const auto& g : context->requesting_mutex_groups()) - { - requesting_mutex_groups.push_back(g.first); - } - mutex_groups_json["requesting"] = std::move(requesting_mutex_groups); + nlohmann::json mutex_groups_json; + std::vector locked_mutex_groups; + for (const auto& g : context->locked_mutex_groups()) + { + locked_mutex_groups.push_back(g.first); + } + mutex_groups_json["locked"] = std::move(locked_mutex_groups); - json["mutex_groups"] = std::move(mutex_groups_json); + std::vector requesting_mutex_groups; + for (const auto& g : context->requesting_mutex_groups()) + { + requesting_mutex_groups.push_back(g.first); } + mutex_groups_json["requesting"] = std::move(requesting_mutex_groups); - try + json["mutex_groups"] = std::move(mutex_groups_json); + } + + try + { + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_state_update_msg); + + // Publish to API server + if (broadcast_client) { static const auto validator = make_validator(rmf_api_msgs::schemas::fleet_state_update); - validator.validate(fleet_state_update_msg); - std::unique_lock lock(*update_callback_mutex); - if (update_callback) - update_callback(fleet_state_update_msg); broadcast_client->publish(fleet_state_update_msg); } - catch (const std::exception& e) - { - RCLCPP_ERROR( - node->get_logger(), - "Malformed outgoing fleet state json message: %s\nMessage:\n%s", - e.what(), - fleet_state_update_msg.dump(2).c_str()); - } + + FleetStateUpdateMsg update_msg; + update_msg.data = fleet_state_update_msg.dump(); + fleet_state_update_pub->publish(update_msg); + } + catch (const std::exception& e) + { + RCLCPP_ERROR( + node->get_logger(), + "Malformed outgoing fleet state json message: %s\nMessage:\n%s", + e.what(), + fleet_state_update_msg.dump(2).c_str()); } } //============================================================================== void FleetUpdateHandle::Implementation::update_fleet_logs() const { - if (broadcast_client) + nlohmann::json fleet_log_update_msg; + fleet_log_update_msg["type"] = "fleet_log_update"; + auto& fleet_log_msg = fleet_log_update_msg["data"]; + fleet_log_msg["name"] = name; + // TODO(MXG): fleet_log_msg["log"] + auto& robots_msg = fleet_log_msg["robots"]; + robots_msg = std::unordered_map(); + for (const auto& [context, _] : task_managers) { - nlohmann::json fleet_log_update_msg; - fleet_log_update_msg["type"] = "fleet_log_update"; - auto& fleet_log_msg = fleet_log_update_msg["data"]; - fleet_log_msg["name"] = name; - // TODO(MXG): fleet_log_msg["log"] - auto& robots_msg = fleet_log_msg["robots"]; - robots_msg = std::unordered_map(); - for (const auto& [context, _] : task_managers) - { - auto robot_log_msg_array = std::vector(); + auto robot_log_msg_array = std::vector(); - std::lock_guard lock(context->reporting().mutex()); - const auto& log = context->reporting().log(); - for (const auto& entry : log_reader.read(log.view())) - robot_log_msg_array.push_back(log_to_json(entry)); + std::lock_guard lock(context->reporting().mutex()); + const auto& log = context->reporting().log(); + for (const auto& entry : log_reader.read(log.view())) + robot_log_msg_array.push_back(log_to_json(entry)); - if (!robot_log_msg_array.empty()) - robots_msg[context->name()] = std::move(robot_log_msg_array); - } + if (!robot_log_msg_array.empty()) + robots_msg[context->name()] = std::move(robot_log_msg_array); + } - if (robots_msg.empty()) - { - // No new logs to report - return; - } + if (robots_msg.empty()) + { + // No new logs to report + return; + } - try - { - static const auto validator = - make_validator(rmf_api_msgs::schemas::fleet_log_update); + try + { + static const auto validator = + make_validator(rmf_api_msgs::schemas::fleet_log_update); - validator.validate(fleet_log_update_msg); + validator.validate(fleet_log_update_msg); - std::unique_lock lock(*update_callback_mutex); - if (update_callback) - update_callback(fleet_log_update_msg); - broadcast_client->publish(fleet_log_update_msg); - } - catch (const std::exception& e) + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_log_update_msg); + + if (broadcast_client) { - RCLCPP_ERROR( - node->get_logger(), - "Malformed outgoing fleet log json message: %s\nMessage:\n%s", - e.what(), - fleet_log_update_msg.dump(2).c_str()); + broadcast_client->publish(fleet_log_update_msg); } + + FleetLogUpdateMsg update_msg; + update_msg.data = fleet_log_update_msg.dump(); + fleet_log_update_pub->publish(update_msg); + } + catch (const std::exception& e) + { + RCLCPP_ERROR( + node->get_logger(), + "Malformed outgoing fleet log json message: %s\nMessage:\n%s", + e.what(), + fleet_log_update_msg.dump(2).c_str()); } } @@ -2615,7 +2624,9 @@ bool FleetUpdateHandle::set_task_planner_params( for (const auto& t : self->_pimpl->task_managers) { t.first->task_planner(self->_pimpl->task_planner); - t.second->set_idle_task(idle_task); + // Skip setting idle task if there exists a robot-specific behavior + if (!t.first->robot_finishing_request()) + t.second->set_idle_task(idle_task); } }); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp index 761276bd0..caf007a13 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp @@ -696,6 +696,18 @@ void RobotContext::set_nav_params(std::shared_ptr value) _nav_params = std::move(value); } +//============================================================================== +bool RobotContext::robot_finishing_request() const +{ + return _robot_finishing_request; +} + +//============================================================================== +void RobotContext::robot_finishing_request(bool robot_specific) +{ + _robot_finishing_request = robot_specific; +} + //============================================================================== class RobotContext::NegotiatorLicense { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp index 76e2e2910..155b86f16 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp @@ -500,6 +500,14 @@ class RobotContext /// robots. void set_nav_params(std::shared_ptr value); + /// Check whether this robot is using a robot-specific or fleet-wide finishing + /// request. + bool robot_finishing_request() const; + + /// Toggle the robot_finishing_request flag to indicate whether this robot is + /// using a robot-specific or fleet-wide finishing request. + void robot_finishing_request(bool robot_specific); + class NegotiatorLicense; /// Set the schedule negotiator that will take responsibility for this robot. @@ -907,6 +915,7 @@ class RobotContext std::make_unique(); std::shared_ptr _task_planner; std::weak_ptr _task_manager; + bool _robot_finishing_request = false; RobotUpdateHandle::Unstable::Watchdog _lift_watchdog; rmf_traffic::Duration _lift_rewait_duration = std::chrono::seconds(0); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index 761d11216..60a492444 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -266,6 +266,50 @@ RobotUpdateHandle& RobotUpdateHandle::set_charger_waypoint( return *this; } +//============================================================================== +RobotUpdateHandle& RobotUpdateHandle::set_finishing_request( + rmf_task::ConstRequestFactoryPtr finishing_request) +{ + if (const auto context = _pimpl->get_context()) + { + context->worker().schedule( + [finishing_request, w = context->weak_from_this()]( + const auto&) + { + const auto context = w.lock(); + if (!context) + return; + + const auto mgr = context->task_manager(); + mgr->set_idle_task(finishing_request); + }); + } + + return *this; +} + +//============================================================================== +RobotUpdateHandle& RobotUpdateHandle::use_default_finishing_request() +{ + if (const auto context = _pimpl->get_context()) + { + context->worker().schedule([w = context->weak_from_this()](const auto&) + { + const auto context = w.lock(); + if (!context) + return; + + const auto mgr = context->task_manager(); + mgr->use_default_idle_task(); + // Disable robot_finishing_request flag in RobotContext + context->robot_finishing_request(false); + } + ); + } + + return *this; +} + //============================================================================== void RobotUpdateHandle::update_battery_soc(const double battery_soc) { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 7b5800445..e8ef93b91 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -18,6 +18,8 @@ #ifndef SRC__RMF_FLEET_ADAPTER__AGV__INTERNAL_FLEETUPDATEHANDLE_HPP #define SRC__RMF_FLEET_ADAPTER__AGV__INTERNAL_FLEETUPDATEHANDLE_HPP +#include + #include #include @@ -296,6 +298,15 @@ class FleetUpdateHandle::Implementation std::shared_ptr> task_managers = {}; std::shared_ptr broadcast_client = nullptr; + + using FleetStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr fleet_state_update_pub = + nullptr; + + using FleetLogUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr fleet_log_update_pub = + nullptr; + // Map uri to schema for validator loader function std::unordered_map schema_dictionary = {}; @@ -531,6 +542,13 @@ class FleetUpdateHandle::Implementation }); } + handle->_pimpl->fleet_state_update_pub = + handle->_pimpl->node->create_publisher( + FleetStateUpdateTopicName, reliable_transient_qos.keep_last(10)); + handle->_pimpl->fleet_log_update_pub = + handle->_pimpl->node->create_publisher( + FleetLogUpdateTopicName, reliable_transient_qos.keep_last(100)); + // Add PerformAction event to deserialization auto validator = handle->_pimpl->deserialization.make_validator_shared( schemas::event_description__perform_action); diff --git a/rmf_fleet_adapter_python/src/adapter.cpp b/rmf_fleet_adapter_python/src/adapter.cpp index 2c5bc5684..44c87139d 100644 --- a/rmf_fleet_adapter_python/src/adapter.cpp +++ b/rmf_fleet_adapter_python/src/adapter.cpp @@ -147,6 +147,8 @@ PYBIND11_MODULE(rmf_adapter, m) { py::arg("start_set")) .def("set_charger_waypoint", &agv::RobotUpdateHandle::set_charger_waypoint, py::arg("charger_wp")) + .def("set_finishing_request", &agv::RobotUpdateHandle::set_finishing_request, + py::arg("finishing_request")) .def("update_battery_soc", &agv::RobotUpdateHandle::update_battery_soc, py::arg("battery_soc")) .def("override_status", &agv::RobotUpdateHandle::override_status, @@ -878,7 +880,11 @@ PYBIND11_MODULE(rmf_adapter, m) { .def_property( "compatible_chargers", &agv::EasyFullControl::RobotConfiguration::compatible_chargers, - &agv::EasyFullControl::RobotConfiguration::set_compatible_chargers); + &agv::EasyFullControl::RobotConfiguration::set_compatible_chargers) + .def_property( + "finishing_request", + &agv::EasyFullControl::RobotConfiguration::finishing_request, + &agv::EasyFullControl::RobotConfiguration::set_finishing_request); py::class_(m_easy_full_control, "RobotCallbacks") .def(py::init< diff --git a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp index fea8c49eb..7765c6469 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp @@ -34,6 +34,7 @@ const std::string DispatchCommandTopicName = Prefix + "dispatch_request"; const std::string DispatchAckTopicName = Prefix + "dispatch_ack"; const std::string DispatchStatesTopicName = "dispatch_states"; const std::string TaskStatusTopicName = "task_summaries"; +const std::string TaskStateUpdateTopicName = "task_state_update"; } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp index c8d92f0d0..0db2ab6a0 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp @@ -46,6 +46,8 @@ #include #include +#include + #include #include @@ -141,6 +143,10 @@ class Dispatcher::Implementation rclcpp::Subscription::SharedPtr api_request; rclcpp::Publisher::SharedPtr api_response; + using TaskStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr task_state_update_pub = + nullptr; + class ApiMemory { public: @@ -294,9 +300,15 @@ class Dispatcher::Implementation this->handle_dispatch_ack(*msg); }); + task_state_update_pub = node->create_publisher( + rmf_task_ros2::TaskStateUpdateTopicName, + rclcpp::ServicesQoS().keep_last(10).reliable().transient_local()); + if (server_uri) + { broadcast_client = rmf_websocket::BroadcastClient::make( *server_uri, node); + } auctioneer = bidding::Auctioneer::make( node, @@ -794,15 +806,19 @@ class Dispatcher::Implementation } /// Publish failed bid + const auto task_state = + create_task_state_json(dispatch_state, "failed"); + auto task_state_update = _task_state_update_json; + task_state_update["data"] = task_state; if (broadcast_client) { - const auto task_state = - create_task_state_json(dispatch_state, "failed"); - auto task_state_update = _task_state_update_json; - task_state_update["data"] = task_state; broadcast_client->publish(task_state_update); } + TaskStateUpdateMsg update_msg; + update_msg.data = task_state_update.dump(); + task_state_update_pub->publish(update_msg); + auctioneer->ready_for_next_bid(); return; }