Skip to content

Commit

Permalink
(#312) Added a method to export a WRENCH workflow to a WfFormat JSON
Browse files Browse the repository at this point in the history
instance
  • Loading branch information
henricasanova committed Sep 21, 2024
1 parent 7b50eea commit 40cc887
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "SimpleWMS.h"
#include <wrench/tools/wfcommons/WfCommonsWorkflowParser.h>
#include <iostream>
#include <fstream>


/**
Expand Down Expand Up @@ -207,5 +209,14 @@ int main(int argc, char **argv) {
std::cerr << "Number of tasks that failed at least once: " << num_failed_tasks << "\n";
std::cerr << "Average computation time / communication+IO time ratio over all tasks: " << computation_communication_ratio_average << "\n";

/* Exporting the workflow to a WfFormat instance, just for kicks
*/
std::string json_string = wrench::WfCommonsWorkflowParser::createJSONStringFromWorkflow(workflow);
std::string output_file_path = "/tmp/output_workflow.json";
std::ofstream output_file(output_file_path);
output_file << json_string << std::endl;
output_file.close();
std::cerr << "Output WfFormat workflow instance written to " << output_file_path << std::endl;

return 0;
}
15 changes: 13 additions & 2 deletions include/wrench/tools/wfcommons/WfCommonsWorkflowParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace wrench {

public:
/**
* @brief Create an abstract workflow based on a JSON file in the WfFormat (version 1.4) from WfCommons. This method
* @brief Create an abstract workflow based on a JSON file in the WfFormat (version 1.5) from WfCommons. This method
* makes executive decisions when information in the JSON file is incomplete and/or contradictory. Pass true
* as the last argument to see all warnings on stderr.
*
Expand Down Expand Up @@ -78,7 +78,7 @@ namespace wrench {


/**
* @brief Create an abstract workflow based on a JSON file in the WfFormat (version 1.4) from WfCommons. This method
* @brief Create an abstract workflow based on a JSON file in the WfFormat (version 1.5) from WfCommons. This method
* makes executive decisions when information in the JSON file is incomplete and/or contradictory. Pass true
* as the last argument to see all warnings on stderr.
*
Expand Down Expand Up @@ -128,8 +128,19 @@ namespace wrench {
bool enforce_num_cores = false,
bool ignore_avg_cpu = false,
bool show_warnings = false);

/**
* @brief Method to create a JSON string in the WfFormat (version 1.5) from WfCommons, from a workflow object.
*
* @param workflow: a workflow
* @return a JSON string
*/
static std::string createJSONStringFromWorkflow(std::shared_ptr<Workflow> workflow);

};



}// namespace wrench


Expand Down
1 change: 1 addition & 0 deletions include/wrench/workflow/Workflow.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ namespace wrench {

unsigned long getNumLevels();

double getStartDate();
double getCompletionDate();

std::vector<std::shared_ptr<DataFile>> getInputFiles() const;
Expand Down
24 changes: 20 additions & 4 deletions src/wrench/workflow/Workflow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,18 +666,34 @@ namespace wrench {
* If the workflow has not completed)
*/
double Workflow::getCompletionDate() {
double makespan = -1.0;
double completion_date = -1.0;
for (auto const &task: this->tasks) {
if (task.second->getState() != WorkflowTask::State::COMPLETED) {
makespan = -1.0;
completion_date = -1.0;
break;
} else {
makespan = std::max<double>(makespan, task.second->getEndDate());
completion_date = std::max<double>(completion_date, task.second->getEndDate());
}
}
return makespan;
return completion_date;
}

/**
* @brief Returns the workflow's start date
* @return a date in seconds (or a negative value
* if no workflow task has successfully completed)
*/
double Workflow::getStartDate() {
double start_date = -1.0;
for (auto const &task: this->tasks) {
if (task.second->getState() == WorkflowTask::State::COMPLETED) {
start_date = std::min<double>(start_date, task.second->getStartDate());
}
}
return start_date;
}


/**
* @brief Get the workflow task for which a file is an output
* @param file: a file
Expand Down
105 changes: 105 additions & 0 deletions tools/wfcommons/src/WfCommonsWorkflowParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <vector>
#include <fstream>
#include <nlohmann/json.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

WRENCH_LOG_CATEGORY(wfcommons_workflow_parser, "Log category for WfCommonsWorkflowParser");

Expand Down Expand Up @@ -345,5 +346,109 @@ namespace wrench {
return workflow;
}

/**
* @brief Helper method to build the workflow's JSON specification
* @param workflow
* @return
*/
nlohmann::json build_workflow_specification_json(std::shared_ptr<Workflow> workflow) {
nlohmann::json json_specification;

// Tasks
json_specification["tasks"] = nlohmann::json::array();
for (const auto &task : workflow->getTasks()) {
nlohmann::json json_task;
json_task["name"] = task->getID();
json_task["id"] = task->getID();
nlohmann::json parent_tasks = nlohmann::json::array();
for (const auto &parent : task->getParents()) {
parent_tasks.push_back(parent->getID());
}
nlohmann::json child_tasks = nlohmann::json::array();
for (const auto &child : task->getChildren()) {
child_tasks.push_back(child->getID());
}
nlohmann::json input_files = nlohmann::json::array();
for (const auto &file : task->getInputFiles()) {
input_files.push_back(file->getID());
}
nlohmann::json output_files = nlohmann::json::array();
for (const auto &file : task->getOutputFiles()) {
output_files.push_back(file->getID());
}
json_task["parents"] = parent_tasks;
json_task["children"] = child_tasks;
json_task["inputFiles"] = input_files;
json_task["outputFiles"] = output_files;

json_specification["tasks"].push_back(json_task);
}

// Files
json_specification["files"] = nlohmann::json::array();
for (const auto &item : workflow->getFileMap()) {
auto file = item.second;
nlohmann::json json_file;
json_file["id"] = file->getID();
json_file["sizeInBytes"] = (long long)(file->getSize());
json_specification["files"].push_back(json_file);
}

return json_specification;
}

/**
* @brief Helper method to build the workflow's JSON execution
* @param workflow
* @return
*/
nlohmann::json build_workflow_execution_json(std::shared_ptr<Workflow> workflow) {
nlohmann::json json_execution;

json_execution["makespanInSeconds"] = workflow->getCompletionDate() - workflow->getStartDate();
boost::posix_time::ptime t = boost::posix_time::microsec_clock::universal_time();
json_execution["executedAt"] = to_iso_extended_string(t);

std::set<std::string> used_machines;

// Tasks
json_execution["tasks"] = nlohmann::json::array();
for (const auto &task : workflow->getTasks()) {
nlohmann::json json_task;
json_task["id"] = task->getID();
json_task["runtimeInSeconds"] = task->getEndDate() - task->getStartDate();
json_task["coreCount"] = task->getNumCoresAllocated();
nlohmann::json json_machines;
auto execution_host = task->getPhysicalExecutionHost();
json_execution["tasks"].push_back(json_task);
}

return json_execution;
}


/**
* Documentation in .h file
*/
std::string WfCommonsWorkflowParser::createJSONStringFromWorkflow(std::shared_ptr<Workflow> workflow) {
nlohmann::json json_doc;
json_doc["name"] = workflow->getName();
json_doc["description"] = "Generated from a WRENCH simulator";
boost::posix_time::ptime t = boost::posix_time::microsec_clock::universal_time();
json_doc["createAt"] = to_iso_extended_string(t);
json_doc["schemaVersion"] = "1.5";

nlohmann::json json_workflow;

json_workflow["specification"] = build_workflow_specification_json(workflow);

if (workflow->isDone()) {
json_workflow["execution"] = build_workflow_execution_json(workflow);
}

json_doc["workflow"] = json_workflow;
return json_doc.dump();
}


};// namespace wrench

0 comments on commit 40cc887

Please sign in to comment.