Skip to content

Commit

Permalink
Fix execution and monitoring threads
Browse files Browse the repository at this point in the history
  • Loading branch information
hvarg committed Feb 21, 2024
1 parent 78e5788 commit 1895ac1
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.diskproject.server.repository;
package org.diskproject.server.adapters.wings;

import java.io.ByteArrayOutputStream;
import java.io.File;
Expand Down Expand Up @@ -43,11 +43,12 @@
import org.diskproject.shared.classes.util.KBConstants;
import org.diskproject.shared.classes.workflow.WorkflowVariable;
import org.diskproject.shared.classes.workflow.VariableBinding;
import org.diskproject.shared.classes.workflow.WorkflowInstantiation;
import org.diskproject.shared.classes.workflow.WorkflowTemplate;
import org.diskproject.shared.classes.workflow.WorkflowRun;
import org.diskproject.shared.classes.workflow.WorkflowRun.RunBinding;
import org.diskproject.shared.classes.workflow.WorkflowRun.RuntimeInfo;
import org.diskproject.shared.classes.workflow.WorkflowRun.Status;
import org.diskproject.shared.classes.common.Status;
import org.diskproject.shared.classes.common.Value;

import com.google.gson.Gson;
Expand Down Expand Up @@ -507,13 +508,13 @@ public WorkflowRun getWorkflowRunStatus(String runid) {
}

public Status getStatusFromString (String statusStr) {
if (statusStr == null || statusStr.equals(""))
try {
return Status.valueOf(statusStr);
} catch (Exception e) {
if (statusStr != null && statusStr.equals("SUCCESS"))
return Status.SUCCESSFUL;
return Status.PENDING;
return statusStr.equals("SUCCESS") ? Status.SUCCESSFUL
: (statusStr.equals("FAILED") || statusStr.equals("FAILURE") ? Status.FAILED
: (statusStr.equals("RUNNING") ? Status.RUNNING
: (statusStr.equals("QUEUED") ? Status.QUEUED : Status.PENDING)));

}
}

// TODO: Hackish function. Fix it !!!! *IMPORTANT*
Expand Down Expand Up @@ -621,9 +622,16 @@ private boolean isPartOfCollection(String key) {
return true;
}

public List<String> runWorkflow (WorkflowInstantiation wf) {
String wfURI = WFLOWID(wf.getName());
System.out.println(wfURI + " == " + wf.getLink());
return null;
}

@Override
public List<String> runWorkflow(String workflowName, List<VariableBinding> vBindings, Map<String, WorkflowVariable> inputVariables) {
workflowName = WFLOWID(workflowName);

String toPost = null, getData = null, getParams = null, getExpansions = null;
JsonObject response = null;
try {
Expand All @@ -645,7 +653,8 @@ public List<String> runWorkflow(String workflowName, List<VariableBinding> vBind
return null;
}
} catch (Exception e) {
System.err.println("Error planning data for workflow run. " + e.getMessage());
e.printStackTrace();
System.err.println("Error planning data for workflow run. ");
System.err.println("REQUEST: " + toPost);
System.err.println("RESPONSE: " + getData);
return null;
Expand Down Expand Up @@ -1262,33 +1271,22 @@ private String toPlanAcceptableFormat(String wfName, List<VariableBinding> vbl,
for (int i = 0; i < vbl.size(); i++) {
VariableBinding vb = vbl.get(i);
if (vb.getVariable().equals(v.getName())) {
String curBinding = "\"" + wfName + v.getName() + "\":[";
String bindingValue = vb.getSingleBinding();

if (v.getDimensionality() == 0) { // && !bindingValue.startsWith("[")) { FIXME!
curBinding += "\"" + (v.isParam() ? "" : dataID) + bindingValue + "\"";
String curBinding = "\"" + wfName + v.getName() + "\":";
String prefix = v.isParam() ? "" : dataID;
if (v.getDimensionality() == 0) {
//String escList = vb.toJsonList(prefix).replaceAll("\"", Matcher.quoteReplacement("\\\""));
String escList = vb.toJsonList(prefix).replaceAll("\"", "");
curBinding += "[\"" + (vb.getIsArray() ?
escList : prefix + vb.getSingleBinding()) + "\"]";
} else {
if (v.getDimensionality() == 0) {
System.err.println("WARNING: Variable " + v.getName() + " has dimensionality 0 but the binding is an array");
}
String[] dBs = bindingValue
.replaceFirst("^\\[", "")
.replaceFirst("\\]$", "")
.split("\\s*,\\s*");
for (int j = 0; j < dBs.length; j++) {
if (dBs[j].length() > 0) {
curBinding += "\"" + (v.isParam() ? "" : dataID) + dBs[j] + "\",";
}
}
curBinding = curBinding.substring(0, curBinding.length() - 1); //rm extra comma
curBinding += (vb.getIsArray() ? vb.toJsonList(prefix) : ("\"" + prefix + vb.getSingleBinding() + "\""));
}


if (v.isParam()) {
paramBindings += curBinding + "],";
paramBindings += curBinding + ",";
paramAdded = true;
} else {
dataBindings += curBinding + "],";
dataBindings += curBinding + ",";
dataAdded = true;
}
}
Expand All @@ -1302,15 +1300,15 @@ private String toPlanAcceptableFormat(String wfName, List<VariableBinding> vbl,
return output;
}

private String postWithSpecifiedMediaType(String pageId, String data, String type, String type2) {
private String postWithSpecifiedMediaType(String pageId, String data, String type, String contentType) {
this.login();
CloseableHttpClient client = HttpClientBuilder.create()
.setDefaultCookieStore(this.cookieStore).build();
try {
HttpPost securedResource = new HttpPost(server + "/" + pageId);
securedResource.setEntity(new StringEntity(data));
securedResource.addHeader("Accept", type);
securedResource.addHeader("Content-type", type2);
securedResource.addHeader("Content-type", contentType);
CloseableHttpResponse httpResponse = client.execute(securedResource);
try {
HttpEntity responseEntity = httpResponse.getEntity();
Expand Down Expand Up @@ -1443,13 +1441,6 @@ public FileAndMeta fetchData(String dataId) {
}



//data_id=http://datascience4all.org/wings-portal-new/export/users/dkhider/ClimateDisk/data/library.owl#Output-9xo020zkzzlut84luu7bp1z16
//newname=Output-9xo020zkzzlut84luu7bp1z16
//metadata_json={
// "type":["http://datascience4all.org/wings-portal-new/export/users/dkhider/ClimateDisk/data/ontology.owl#TimeSeries"]
//}'

@Override
public boolean registerData (String id, String type) {
String fullId = getDataUri(id);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/diskproject/server/db/DiskDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,8 @@ private void readLiteralAsBindingValue (String rawValue, VariableBinding vb) {
private KBObject writeVariableBinding (VariableBinding vBinding, String prefix) {
String id = null;
if (vBinding.getVariable() != null && prefix !=null && !prefix.equals("")) {
String[] fragments = vBinding.getVariable().split("/");
id = prefix + "/bindings/" + fragments[fragments.length-1];
String[] fragments = vBinding.getVariable().split("/"); //This generally is the variable name
id = prefix + "/bindings/" + GUID.randomId(fragments[fragments.length-1]);
}
KBObject vBindingObj = domainKB.createObjectOfClass(id, DISKOnt.getClass(DISK.VARIABLE_BINDING));
if (vBinding.getVariable() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import java.util.Map;

import org.diskproject.server.adapters.AirFlowAdapter;
import org.diskproject.server.adapters.wings.WingsAdapter;
import org.diskproject.server.db.DiskDB;
import org.diskproject.server.repository.WingsAdapter;
import org.diskproject.server.util.Config;
import org.diskproject.server.util.Config.MethodAdapterConfig;
import org.diskproject.server.util.ConfigKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import org.apache.jena.query.QueryParseException;

Expand Down Expand Up @@ -43,7 +44,9 @@
import org.diskproject.shared.classes.util.QuestionOptionsRequest;
import org.diskproject.shared.classes.vocabulary.Vocabulary;
import org.diskproject.shared.classes.workflow.WorkflowVariable;
import org.diskproject.shared.classes.workflow.Execution;
import org.diskproject.shared.classes.workflow.VariableBinding;
import org.diskproject.shared.classes.workflow.WorkflowInstantiation;
import org.diskproject.shared.classes.workflow.WorkflowRun;
import org.diskproject.shared.classes.workflow.WorkflowSeed;
import org.diskproject.shared.ontologies.DISK;
Expand Down Expand Up @@ -529,7 +532,7 @@ public List<TriggeredLOI> queryGoal (String id) throws Exception, QueryParseExce
}

loiMatch.setQueryResults(solutions, query);
tlois.add(loiMatch.createTLOI());
tlois.add(uploadData(loiMatch.createTLOI()));
} else {
System.out.println("Warning: No data source or template. " + cur.getId());
}
Expand All @@ -539,13 +542,85 @@ public List<TriggeredLOI> queryGoal (String id) throws Exception, QueryParseExce
return checkExistingTLOIs(tlois);
}

private TriggeredLOI uploadData (TriggeredLOI tloi) {
//Check and upload files.
DataAdapter dataAdapter = dataAdapters.getMethodAdapterByEndpoint(tloi.getDataQueryTemplate().getEndpoint());
List<WorkflowInstantiation> wf = new ArrayList<WorkflowInstantiation>(),
mwf = new ArrayList<WorkflowInstantiation>();
for (WorkflowInstantiation i: tloi.getWorkflows()) {
wf.add(uploadData(i, dataAdapter));
}
for (WorkflowInstantiation i: tloi.getMetaWorkflows()) {
mwf.add(uploadData(i, dataAdapter));
}
tloi.setWorkflows(wf);
tloi.setMetaWorkflows(mwf);
return tloi;
}

private WorkflowInstantiation uploadData (WorkflowInstantiation inst, DataAdapter dataAdapter) {
Map<String,VariableBinding> dataBindings = new HashMap<String,VariableBinding>();
for (VariableBinding b: inst.getDataBindings()) {
dataBindings.put(b.getVariable(), b);
}
//Check and upload files.
Map<String,Map<String,String>> uploaded = new HashMap<String,Map<String,String>>();
for (VariableBinding b: inst.getInputs()) {
VariableBinding val = dataBindings.get(b.getVariable());
if (val != null) {
//All of these variables should be links
List<String> dsUrls = new ArrayList<String>();
for (String url: val.getBinding()) {
if (url.startsWith("http")) { //Better way to check this later
dsUrls.add(url);
} else {
System.out.println("Error: Input binding is not an URL. " + b.toString());
}
}

try {
Map<String, String> urlToName = addData(dsUrls,
methodAdapters.getMethodAdapterByEndpoint(inst.getSource()),
dataAdapter, b.getDatatype());
uploaded.put(b.getVariable(), urlToName);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Now, we have the IDs we need on WINGs. Replace the dataBindings.
List<VariableBinding> newDataBindings = new ArrayList<VariableBinding>();
for (VariableBinding b: inst.getDataBindings()) {
Map<String, String> urlToName = uploaded.get(b.getVariable());
if (urlToName != null) {
VariableBinding newBinding = new VariableBinding(b);
b.setVariable("_" + b.getVariable());
List<String> replacedUrls = new ArrayList<String>();
for (String val: newBinding.getBinding()) {
String newVal = urlToName.get(val);
if (newVal != null) {
replacedUrls.add(newVal);
} else {
//This should never happend
replacedUrls.add(val);
}
}
newBinding.setBinding(replacedUrls);
newDataBindings.add(newBinding);
}
newDataBindings.add(b);
}
inst.setDataBindings(newDataBindings);
return inst;
}

// This replaces all triggered lines of inquiry already executed.
private List<TriggeredLOI> checkExistingTLOIs(List<TriggeredLOI> tlois) {
List<TriggeredLOI> checked = new ArrayList<TriggeredLOI>();
Map<String, List<TriggeredLOI>> cache = new HashMap<String, List<TriggeredLOI>>();
for (TriggeredLOI tloi : tlois) {
String parentLoiId = tloi.getParentLoi().getId();
System.out.println("Checking " + tloi.getId() + " (" + parentLoiId + ")");
//System.out.println("Checking " + tloi.getId() + " (" + parentLoiId + ")");
if (!cache.containsKey(parentLoiId)) {
cache.put(parentLoiId,
getTLOIsForHypothesisAndLOI(tloi.getParentGoal().getId(), parentLoiId));
Expand Down Expand Up @@ -708,8 +783,9 @@ public FileAndMeta getOutputData(String source, String id) {
* Threads
*/

public void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, WorkflowSeed workflow, WorkflowRun run, MethodAdapter methodAdapter, boolean meta) {
Map<String, RunBinding> outputs = run.getOutputs();
public void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, WorkflowSeed workflow, Execution run, MethodAdapter methodAdapter, boolean meta) {
Map<String, RunBinding> outputs = null ; //run.getOutputs();
System.out.println("Should process results now!");
if (outputs == null) return;

Map<String,String> outputAssignations = new HashMap<String,String>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.diskproject.server.threads;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.diskproject.shared.classes.adapters.MethodAdapter;
import org.diskproject.shared.classes.common.Status;
import org.diskproject.shared.classes.loi.LineOfInquiry;
import org.diskproject.shared.classes.loi.TriggeredLOI;
import org.diskproject.shared.classes.workflow.Execution;
import org.diskproject.shared.classes.workflow.VariableBinding;
import org.diskproject.shared.classes.workflow.WorkflowInstantiation;
import org.diskproject.shared.classes.workflow.WorkflowRun;
import org.diskproject.shared.classes.workflow.WorkflowRun.Status;
import org.diskproject.shared.classes.workflow.WorkflowVariable;

public class ExecutionThread implements Runnable {
Expand Down Expand Up @@ -47,26 +48,27 @@ public void run() {
}

// Get workflow input details
Map<String, WorkflowVariable> inputVariables = methodAdapter.getWorkflowInputs(curWorkflow.getLink());
Map<String, WorkflowVariable> inputVariables = methodAdapter.getWorkflowInputs(curWorkflow.getName());
List<VariableBinding> inputBindings = curWorkflow.getDataBindings();
this.printWorkflowRun(curWorkflow, inputBindings);
List<String> runIds = methodAdapter.runWorkflow(curWorkflow.getId(), inputBindings, inputVariables);
List<String> runIds = methodAdapter.runWorkflow(curWorkflow.getName(), inputBindings, inputVariables);

if (runIds != null) {
System.out.println("[R] " + runIds.size() + " Workflows send: ");
List<Execution> runs = new ArrayList<Execution>();
for (String rid : runIds) {
WorkflowRun run = new WorkflowRun();
run.setId(rid);
run.setAsPending();
Execution run = new Execution(rid);
run.setStatus(Status.PENDING);
System.out.println("[R] ID: " + rid);
//curWorkflow.addRun(run); FIXME
runs.add(run);
}
curWorkflow.setExecutions(runs);
} else {
currentStatus = Status.FAILED;
System.out.println("[R] Error: Could not run workflow");
}
}
//tloi.setStatus(currentStatus); TODO!
tloi.setStatus(currentStatus);
manager.updateTLOI(tloi);

// Start monitoring
Expand All @@ -81,16 +83,17 @@ public void printWorkflowRun(WorkflowInstantiation wf, List<VariableBinding> inp
// Execute workflow
System.out.println("[R] Executing " + wf.getLink() + " with " + inputBindings.size() + " parameters:");
for (VariableBinding v : inputBindings) {
List<String> l = v.getIsArray() ? v.getBindings() : null;
int i = 0;
if (l != null) {
System.out.println("[R] - " + v.getVariable() + ": ");
for (String b : l) {
System.out.println("[R] " + String.valueOf(i) + ") " + b);
i++;
if (!v.getVariable().startsWith("_")) {
if (v.getIsArray()) {
int i = 0;
System.out.println("[R] - " + v.getVariable() + ": ");
for (String b : v.getBindings()) {
System.out.println("[R] " + String.valueOf(i) + ") " + b);
i++;
}
} else {
System.out.println("[R] - " + v.getVariable() + ": " + v.getSingleBinding());
}
} else {
System.out.println("[R] - " + v.getVariable() + ": " + v.getBinding());
}
}
}
Expand Down
Loading

0 comments on commit 1895ac1

Please sign in to comment.