Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use taskflow subflow for graph execution to allow timing of execution #417

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <memory>
#include <shared_mutex>
#include <map>
#include <chrono>
#include <boost/uuid/uuid.hpp>
TESSERACT_COMMON_IGNORE_WARNINGS_POP

Expand Down Expand Up @@ -87,6 +88,9 @@ class TaskComposerNodeInfo
/** @brief Status message */
std::string message;

/** @brief The start time */
std::chrono::system_clock::time_point start_time{ std::chrono::system_clock::now() };

/**
* @brief Time spent in this task in seconds
* @details This is managed by core components so implementation do not need to calculate this
Expand Down
4 changes: 2 additions & 2 deletions tesseract_task_composer/core/src/task_composer_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ std::string TaskComposerGraph::dump(std::ostream& os,
}
os << "]";
os << "\\n Conditional: " << ((conditional_) ? "True" : "False");
if (getType() == TaskComposerNodeType::PIPELINE)
if (getType() == TaskComposerNodeType::PIPELINE || getType() == TaskComposerNodeType::GRAPH)
{
auto it = results_map.find(getUUID());
if (it != results_map.end())
Expand Down Expand Up @@ -378,7 +378,7 @@ std::string TaskComposerGraph::dump(std::ostream& os,
os << "]";

os << "\\n Conditional: " << ((node->isConditional()) ? "True" : "False");
if (node->getType() == TaskComposerNodeType::PIPELINE && (it != results_map.end()))
if (it != results_map.end())
os << "\\nTime: " << std::fixed << std::setprecision(3) << it->second->elapsed_time << "s";

os << "\", margin=\"0.1\", color=" << color << "];\n"; // NOLINT
Expand Down
4 changes: 4 additions & 0 deletions tesseract_task_composer/core/src/task_composer_node_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <boost/serialization/unique_ptr.hpp>
#include <boost/serialization/shared_ptr.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/binary_object.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_serialize.hpp>
#include <mutex>
Expand Down Expand Up @@ -60,6 +61,7 @@ bool TaskComposerNodeInfo::operator==(const TaskComposerNodeInfo& rhs) const
equal &= parent_uuid == rhs.parent_uuid;
equal &= return_value == rhs.return_value;
equal &= message == rhs.message;
equal &= start_time == rhs.start_time;
equal &= tesseract_common::almostEqualRelativeAndAbs(elapsed_time, rhs.elapsed_time, max_diff);
equal &= tesseract_common::isIdentical(inbound_edges, rhs.inbound_edges, false);
equal &= tesseract_common::isIdentical(outbound_edges, rhs.outbound_edges, true);
Expand All @@ -85,6 +87,8 @@ void TaskComposerNodeInfo::serialize(Archive& ar, const unsigned int /*version*/
ar& boost::serialization::make_nvp("parent_uuid", parent_uuid);
ar& boost::serialization::make_nvp("return_value", return_value);
ar& boost::serialization::make_nvp("message", message);
ar& boost::serialization::make_nvp("start_time",
boost::serialization::make_binary_object(&start_time, sizeof(start_time)));
ar& boost::serialization::make_nvp("elapsed_time", elapsed_time);
ar& boost::serialization::make_nvp("inbound_edges", inbound_edges);
ar& boost::serialization::make_nvp("outbound_edges", outbound_edges);
Expand Down
3 changes: 3 additions & 0 deletions tesseract_task_composer/core/src/task_composer_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ TaskComposerPipeline::TaskComposerPipeline(std::string name,

int TaskComposerPipeline::run(TaskComposerContext& context, OptionalTaskComposerExecutor executor) const
{
auto start_time = std::chrono::system_clock::now();
if (context.isAborted())
{
auto info = std::make_unique<TaskComposerNodeInfo>(*this);
info->start_time = start_time;
info->input_keys = input_keys_;
info->output_keys = output_keys_;
info->return_value = 0;
Expand Down Expand Up @@ -80,6 +82,7 @@ int TaskComposerPipeline::run(TaskComposerContext& context, OptionalTaskComposer
timer.stop();
results->input_keys = input_keys_;
results->output_keys = output_keys_;
results->start_time = start_time;
results->elapsed_time = timer.elapsedSeconds();

int value = results->return_value;
Expand Down
3 changes: 3 additions & 0 deletions tesseract_task_composer/core/src/task_composer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ void TaskComposerTask::setTriggerAbort(bool enable) { trigger_abort_ = enable; }

int TaskComposerTask::run(TaskComposerContext& context, OptionalTaskComposerExecutor executor) const
{
auto start_time = std::chrono::system_clock::now();
if (context.isAborted())
{
auto info = std::make_unique<TaskComposerNodeInfo>(*this);
info->start_time = start_time;
info->input_keys = input_keys_;
info->output_keys = output_keys_;
info->return_value = 0;
Expand Down Expand Up @@ -90,6 +92,7 @@ int TaskComposerTask::run(TaskComposerContext& context, OptionalTaskComposerExec
timer.stop();
results->input_keys = input_keys_;
results->output_keys = output_keys_;
results->start_time = start_time;
results->elapsed_time = timer.elapsedSeconds();

int value = results->return_value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace tf
{
class Executor;
class Taskflow;
class Subflow;
class Task;
} // namespace tf

namespace tesseract_planning
Expand Down Expand Up @@ -84,20 +86,21 @@ class TaskflowTaskComposerExecutor : public TaskComposerExecutor

TaskComposerFuture::UPtr run(const TaskComposerNode& node, TaskComposerContext::Ptr context) override final;

static std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
convertToTaskflow(const TaskComposerGraph& task_graph,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor);

static std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
convertToTaskflow(const TaskComposerPipeline& task_pipeline,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor);

static std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
convertToTaskflow(const TaskComposerTask& task,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor);
static tf::Task convertToTaskflow(const TaskComposerGraph& task_graph,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow,
tf::Subflow* parent_sbf);

static void convertToTaskflow(const TaskComposerPipeline& task_pipeline,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow);

static void convertToTaskflow(const TaskComposerTask& task,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow);
};
} // namespace tesseract_planning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TaskflowTaskComposerFuture : public TaskComposerFuture
public:
TaskflowTaskComposerFuture() = default;
TaskflowTaskComposerFuture(std::shared_future<void> future,
std::shared_ptr<const std::vector<std::unique_ptr<tf::Taskflow>>> container,
std::unique_ptr<tf::Taskflow> taskflow,
TaskComposerContext::Ptr context);
~TaskflowTaskComposerFuture() override;
TaskflowTaskComposerFuture(const TaskflowTaskComposerFuture&) = default;
Expand All @@ -70,12 +70,15 @@ class TaskflowTaskComposerFuture : public TaskComposerFuture

TaskComposerFuture::UPtr copy() const override final;

/** @brief Crate DOT Graph using taskflow dump */
void dump(std::ostream& os) const;

private:
/** @brief This is the future return from taskflow executor.run */
std::shared_future<void> future_;

/** @brief Hold objects that must not go out of scope during execution */
std::shared_ptr<const std::vector<std::unique_ptr<tf::Taskflow>>> container_;
/** @brief Hold object that must not go out of scope during execution */
std::shared_ptr<tf::Taskflow> taskflow_;
};
} // namespace tesseract_planning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

#include <tesseract_task_composer/taskflow/taskflow_task_composer_executor.h>
#include <tesseract_task_composer/taskflow/taskflow_task_composer_future.h>
#include <tesseract_task_composer/core/task_composer_node_info.h>
#include <tesseract_common/utils.h>
#include <tesseract_common/timer.h>
#include <taskflow/taskflow.hpp>

namespace tesseract_planning
Expand Down Expand Up @@ -72,25 +74,18 @@ TaskflowTaskComposerExecutor::~TaskflowTaskComposerExecutor() = default;
TaskComposerFuture::UPtr TaskflowTaskComposerExecutor::run(const TaskComposerNode& node,
TaskComposerContext::Ptr context)
{
std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>> taskflow;
auto taskflow = std::make_unique<tf::Taskflow>(node.getName());
if (node.getType() == TaskComposerNodeType::TASK)
taskflow = convertToTaskflow(static_cast<const TaskComposerTask&>(node), *context, *this);
convertToTaskflow(static_cast<const TaskComposerTask&>(node), *context, *this, taskflow.get());
else if (node.getType() == TaskComposerNodeType::PIPELINE)
taskflow = convertToTaskflow(static_cast<const TaskComposerPipeline&>(node), *context, *this);
convertToTaskflow(static_cast<const TaskComposerPipeline&>(node), *context, *this, taskflow.get());
else if (node.getType() == TaskComposerNodeType::GRAPH)
taskflow = convertToTaskflow(static_cast<const TaskComposerGraph&>(node), *context, *this);
convertToTaskflow(static_cast<const TaskComposerGraph&>(node), *context, *this, taskflow.get(), nullptr);
else
throw std::runtime_error("TaskComposerExecutor, unsupported node type!");

#ifndef NDEBUG
std::ofstream out_data;
out_data.open(tesseract_common::getTempPath() + "taskflow_task_executor_" + tesseract_common::getTimestampString() +
".dot");
taskflow->front()->dump(out_data); // dump the graph including dynamic tasks
out_data.close();
#endif
std::shared_future<void> f = executor_->run(*taskflow);

std::shared_future<void> f = executor_->run(*(taskflow->front()));
return std::make_unique<TaskflowTaskComposerFuture>(f, std::move(taskflow), std::move(context));
}

Expand Down Expand Up @@ -133,105 +128,101 @@ void TaskflowTaskComposerExecutor::serialize(Archive& ar, const unsigned int ver
boost::serialization::split_member(ar, *this, version);
}

std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerGraph& task_graph,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor)
tf::Task TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerGraph& task_graph,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow,
tf::Subflow* parent_sbf)
{
auto tf_container = std::make_shared<std::vector<std::unique_ptr<tf::Taskflow>>>();
tf_container->emplace_back(std::make_unique<tf::Taskflow>(task_graph.getName()));

// Must add a Node Info object for the graph
auto info = std::make_unique<TaskComposerNodeInfo>(task_graph);
info->color = "green";
info->input_keys = task_graph.getInputKeys();
info->output_keys = task_graph.getOutputKeys();
task_context.task_infos.addInfo(std::move(info));

// Generate process tasks for each node
std::map<boost::uuids::uuid, tf::Task> tasks;
const auto& nodes = task_graph.getNodes();
for (const auto& pair : nodes)
{
auto edges = pair.second->getOutboundEdges();
if (pair.second->getType() == TaskComposerNodeType::TASK)
{
auto task = std::static_pointer_cast<const TaskComposerTask>(pair.second);
if (edges.size() > 1 && task->isConditional())
tasks[pair.first] =
tf_container->front()
->emplace([task, &task_context, &task_executor] { return task->run(task_context, task_executor); })
.name(pair.second->getName());
else
tasks[pair.first] =
tf_container->front()
->emplace([task, &task_context, &task_executor] { task->run(task_context, task_executor); })
.name(pair.second->getName());
}
else if (pair.second->getType() == TaskComposerNodeType::PIPELINE)
auto fn = [&task_graph, &task_context, &task_executor](tf::Subflow& subflow) {
tesseract_common::Timer timer;
timer.start();

// Node Info
auto info = std::make_unique<TaskComposerNodeInfo>(task_graph);
info->color = "green";
info->input_keys = task_graph.getInputKeys();
info->output_keys = task_graph.getOutputKeys();
info->start_time = std::chrono::system_clock::now();

// Generate process tasks for each node
std::map<boost::uuids::uuid, tf::Task> tasks;
const auto& nodes = task_graph.getNodes();
for (const auto& pair : nodes)
{
auto pipeline = std::static_pointer_cast<const TaskComposerPipeline>(pair.second);
if (edges.size() > 1 && pipeline->isConditional())
tasks[pair.first] = tf_container->front()
->emplace([pipeline, &task_context, &task_executor] {
return pipeline->run(task_context, task_executor);
})
.name(pair.second->getName());
auto edges = pair.second->getOutboundEdges();
if (pair.second->getType() == TaskComposerNodeType::TASK)
{
auto task = std::static_pointer_cast<const TaskComposerTask>(pair.second);
if (edges.size() > 1 && task->isConditional())
tasks[pair.first] =
subflow.emplace([task, &task_context, &task_executor] { return task->run(task_context, task_executor); })
.name(pair.second->getName());
else
tasks[pair.first] =
subflow.emplace([task, &task_context, &task_executor] { task->run(task_context, task_executor); })
.name(pair.second->getName());
}
else if (pair.second->getType() == TaskComposerNodeType::PIPELINE)
{
auto pipeline = std::static_pointer_cast<const TaskComposerPipeline>(pair.second);
if (edges.size() > 1 && pipeline->isConditional())
tasks[pair.first] = subflow
.emplace([pipeline, &task_context, &task_executor] {
return pipeline->run(task_context, task_executor);
})
.name(pair.second->getName());
else
tasks[pair.first] =
subflow.emplace([pipeline, &task_context, &task_executor] { pipeline->run(task_context, task_executor); })
.name(pair.second->getName());
}
else if (pair.second->getType() == TaskComposerNodeType::GRAPH)
{
const auto& graph = static_cast<const TaskComposerGraph&>(*pair.second);
tasks[pair.first] = convertToTaskflow(graph, task_context, task_executor, nullptr, &subflow);
}
else
tasks[pair.first] =
tf_container->front()
->emplace([pipeline, &task_context, &task_executor] { pipeline->run(task_context, task_executor); })
.name(pair.second->getName());
throw std::runtime_error("convertToTaskflow, unsupported node type!");
}
else if (pair.second->getType() == TaskComposerNodeType::GRAPH)

for (const auto& pair : nodes)
{
const auto& graph = static_cast<const TaskComposerGraph&>(*pair.second);
auto sub_tf_container = convertToTaskflow(graph, task_context, task_executor);
tasks[pair.first] = tf_container->front()->composed_of(*sub_tf_container->front());
tf_container->insert(tf_container->end(),
std::make_move_iterator(sub_tf_container->begin()),
std::make_move_iterator(sub_tf_container->end()));
// Ensure the current task precedes the tasks that it is connected to
auto edges = pair.second->getOutboundEdges();
for (const auto& e : edges)
tasks[pair.first].precede(tasks[e]);
}
else
throw std::runtime_error("convertToTaskflow, unsupported node type!");
}
subflow.join();
timer.stop();
info->elapsed_time = timer.elapsedSeconds();
task_context.task_infos.addInfo(std::move(info));
};

for (const auto& pair : nodes)
{
// Ensure the current task precedes the tasks that it is connected to
auto edges = pair.second->getOutboundEdges();
for (const auto& e : edges)
tasks[pair.first].precede(tasks[e]);
}
if (parent_sbf != nullptr)
return parent_sbf->emplace(fn).name(task_graph.getName());

return tf_container;
return taskflow->emplace(fn).name(task_graph.getName());
}

std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerPipeline& task_pipeline,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor)
void TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerPipeline& task_pipeline,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow)
{
auto tf_container = std::make_shared<std::vector<std::unique_ptr<tf::Taskflow>>>();
tf_container->emplace_back(std::make_unique<tf::Taskflow>(task_pipeline.getName()));
tf_container->front()
taskflow
->emplace(
[&task_pipeline, &task_context, &task_executor] { return task_pipeline.run(task_context, task_executor); })
.name(task_pipeline.getName());
return tf_container;
}

std::shared_ptr<std::vector<std::unique_ptr<tf::Taskflow>>>
TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerTask& task,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor)
void TaskflowTaskComposerExecutor::convertToTaskflow(const TaskComposerTask& task,
TaskComposerContext& task_context,
TaskComposerExecutor& task_executor,
tf::Taskflow* taskflow)
{
auto tf_container = std::make_shared<std::vector<std::unique_ptr<tf::Taskflow>>>();
tf_container->emplace_back(std::make_unique<tf::Taskflow>(task.getName()));
tf_container->front()
->emplace([&task, &task_context, &task_executor] { return task.run(task_context, task_executor); })
taskflow->emplace([&task, &task_context, &task_executor] { return task.run(task_context, task_executor); })
.name(task.getName());
return tf_container;
}

} // namespace tesseract_planning
Expand Down
Loading
Loading