Skip to content

Commit

Permalink
Merge branch 'main' into xiyu/read_only
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyuoh committed Oct 4, 2024
2 parents 14ec741 + e4e82e1 commit 4e9d7b6
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 190 deletions.
4 changes: 4 additions & 0 deletions rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const std::string DestinationRequestTopicName = "destination_requests";
const std::string ModeRequestTopicName = "robot_mode_requests";
const std::string PathRequestTopicName = "robot_path_requests";
const std::string PauseRequestTopicName = "robot_pause_requests";
const std::string FleetStateUpdateTopicName = "fleet_state_update";
const std::string FleetLogUpdateTopicName = "fleet_log_update";

const std::string FinalDoorRequestTopicName = "door_requests";
const std::string AdapterDoorRequestTopicName = "adapter_door_requests";
Expand Down Expand Up @@ -68,6 +70,8 @@ const std::string InterruptRequestTopicName = "robot_interrupt_request";

const std::string TaskApiRequests = "task_api_requests";
const std::string TaskApiResponses = "task_api_responses";
const std::string TaskStateUpdateTopicName = "task_state_update";
const std::string TaskLogUpdateTopicName = "task_log_update";

const std::string ChargingAssignmentsTopicName = "charging_assignments";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ class EasyFullControl::RobotConfiguration
/// Set the minimum lane length.
void set_min_lane_length(std::optional<double> distance);

/// Get the idle behavior.
///
/// If std::nullopt is used, then the fleet-wide default finishing request
/// will be used.
std::optional<rmf_task::ConstRequestFactoryPtr> finishing_request() const;

/// Set the finishing request.
void set_finishing_request(std::optional<rmf_task::ConstRequestFactoryPtr> request);

class Implementation;
private:
rmf_utils::impl_ptr<Implementation> _pimpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include <rmf_traffic/schedule/Participant.hpp>

#include <rmf_task/RequestFactory.hpp>

#include <Eigen/Geometry>
#include <nlohmann/json.hpp>

Expand Down Expand Up @@ -102,6 +104,13 @@ class RobotUpdateHandle
/// property will be assumed as the charger for this robot.
RobotUpdateHandle& set_charger_waypoint(const std::size_t charger_wp);

/// Set a finishing request for this robot.
RobotUpdateHandle& set_finishing_request(rmf_task::ConstRequestFactoryPtr finishing_request);

/// Set a finishing request for this robot to use the fleet-wide finishing
/// request.
RobotUpdateHandle& use_default_finishing_request();

/// Update the current battery level of the robot by specifying its state of
/// charge as a fraction of its total charge capacity, i.e. a value from 0.0
/// to 1.0.
Expand Down
76 changes: 61 additions & 15 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ TaskManagerPtr TaskManager::make(
self->_handle_request(request->json_msg, request->request_id);
});

auto reliable_transient_qos =
rclcpp::ServicesQoS().keep_last(10).reliable().transient_local();
mgr->_task_state_update_pub =
mgr->_context->node()->create_publisher<TaskStateUpdateMsg>(
TaskStateUpdateTopicName, reliable_transient_qos);
mgr->_task_log_update_pub =
mgr->_context->node()->create_publisher<TaskLogUpdateMsg>(
TaskLogUpdateTopicName, reliable_transient_qos.keep_last(100));

const std::vector<nlohmann::json> schemas = {
rmf_api_msgs::schemas::task_state,
rmf_api_msgs::schemas::task_log,
Expand Down Expand Up @@ -564,15 +573,15 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr)

static const auto task_update_validator =
mgr._make_validator(rmf_api_msgs::schemas::task_state_update);
mgr._validate_and_publish_websocket(task_state_update, task_update_validator);
mgr._validate_and_publish_json(task_state_update, task_update_validator);

auto task_log_update = nlohmann::json();
task_log_update["type"] = "task_log_update";
task_log_update["data"] = task_logs;

static const auto log_update_validator =
mgr._make_validator(rmf_api_msgs::schemas::task_log_update);
mgr._validate_and_publish_websocket(task_log_update, log_update_validator);
mgr._validate_and_publish_json(task_log_update, log_update_validator);
}

//==============================================================================
Expand Down Expand Up @@ -941,6 +950,24 @@ void TaskManager::set_idle_task(rmf_task::ConstRequestFactoryPtr task)
}
}

//==============================================================================
void TaskManager::use_default_idle_task()
{
auto fleet_handle = _fleet_handle.lock();
if (!fleet_handle)
{
RCLCPP_ERROR(
_context->node()->get_logger(),
"Attempting to use default idle task for [%s] but its fleet is shutting down",
_context->requester_id().c_str());
return;
}
const auto& impl =
agv::FleetUpdateHandle::Implementation::get(*fleet_handle);

set_idle_task(impl.idle_task);
}

//==============================================================================
void TaskManager::set_queue(
const std::vector<TaskManager::Assignment>& assignments)
Expand Down Expand Up @@ -1691,7 +1718,13 @@ void TaskManager::_begin_waiting()
}

if (!_responsive_wait_enabled)
{
if (_waiting)
{
_waiting.cancel({"Idle behavior updated"}, _context->now());
}
return;
}

if (_context->location().empty())
{
Expand Down Expand Up @@ -2029,7 +2062,7 @@ void TaskManager::_schema_loader(
}

//==============================================================================
void TaskManager::_validate_and_publish_websocket(
void TaskManager::_validate_and_publish_json(
const nlohmann::json& msg,
const nlohmann::json_schema::json_validator& validator) const
{
Expand All @@ -2044,19 +2077,32 @@ void TaskManager::_validate_and_publish_websocket(
return;
}

if (!_broadcast_client.has_value())
return;
if (_broadcast_client.has_value())
{
const auto client = _broadcast_client->lock();
if (!client)
{
RCLCPP_ERROR(
_context->node()->get_logger(),
"Unable to lock BroadcastClient within TaskManager of robot [%s]",
_context->name().c_str());
return;
}
client->publish(msg);
}

const auto client = _broadcast_client->lock();
if (!client)
if (msg["type"] == "task_state_update")
{
RCLCPP_ERROR(
_context->node()->get_logger(),
"Unable to lock BroadcastClient within TaskManager of robot [%s]",
_context->name().c_str());
return;
TaskStateUpdateMsg update_msg;
update_msg.data = msg.dump();
_task_state_update_pub->publish(update_msg);
}
else if (msg["type"] == "task_log_update")
{
TaskLogUpdateMsg update_msg;
update_msg.data = msg.dump();
_task_log_update_pub->publish(update_msg);
}
client->publish(msg);
}

//==============================================================================
Expand Down Expand Up @@ -2158,7 +2204,7 @@ rmf_task::State TaskManager::_publish_pending_task(
static const auto validator =
_make_validator(rmf_api_msgs::schemas::task_state_update);

_validate_and_publish_websocket(task_state_update, validator);
_validate_and_publish_json(task_state_update, validator);

_pending_task_info[pending.request()] = cache;
return pending.finish_state();
Expand Down Expand Up @@ -2225,7 +2271,7 @@ void TaskManager::_publish_canceled_pending_task(
static const auto validator =
_make_validator(rmf_api_msgs::schemas::task_state_update);

_validate_and_publish_websocket(task_state_update, validator);
_validate_and_publish_json(task_state_update, validator);
}

//==============================================================================
Expand Down
13 changes: 12 additions & 1 deletion rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <rmf_fleet_msgs/msg/robot_mode.hpp>
#include <rmf_task_msgs/msg/task_summary.hpp>

#include <std_msgs/msg/string.hpp>

#include <nlohmann/json.hpp>
#include <nlohmann/json-schema.hpp>

Expand Down Expand Up @@ -125,6 +127,8 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

void set_idle_task(rmf_task::ConstRequestFactoryPtr task);

void use_default_idle_task();

/// Set the queue for this task manager with assignments generated from the
/// task planner
void set_queue(const std::vector<Assignment>& assignments);
Expand Down Expand Up @@ -395,6 +399,13 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>
bool _task_state_update_available = true;
std::chrono::steady_clock::time_point _last_update_time;

using TaskStateUpdateMsg = std_msgs::msg::String;
rclcpp::Publisher<TaskStateUpdateMsg>::SharedPtr _task_state_update_pub =
nullptr;

using TaskLogUpdateMsg = std_msgs::msg::String;
rclcpp::Publisher<TaskLogUpdateMsg>::SharedPtr _task_log_update_pub = nullptr;

// Container to keep track of tasks that have been started by this TaskManager
// Use the _register_executed_task() to populate this container.
std::vector<std::string> _executed_task_registry;
Expand Down Expand Up @@ -552,7 +563,7 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

/// Validate and publish a json. This can be used for task
/// state and log updates
void _validate_and_publish_websocket(
void _validate_and_publish_json(
const nlohmann::json& msg,
const nlohmann::json_schema::json_validator& validator) const;

Expand Down
Loading

0 comments on commit 4e9d7b6

Please sign in to comment.