From 314918d9cda513c95984a516417ac7bad470a698 Mon Sep 17 00:00:00 2001 From: hvarg Date: Wed, 27 Sep 2023 21:17:41 -0300 Subject: [PATCH] Register output files to use in metaworkflows --- .../server/adapters/AirFlowAdapter.java | 6 + .../server/adapters/MethodAdapterManager.java | 98 ++++++ .../server/api/impl/DiskResource.java | 4 +- .../server/repository/DiskRepository.java | 278 +++++++++++------- .../server/repository/KBRepository.java | 22 +- .../server/repository/WingsAdapter.java | 23 ++ .../server/repository/WriteKBRepository.java | 4 +- .../server/threads/DataThread.java | 11 + .../server/threads/ExecutionThread.java | 97 ++++++ .../server/threads/MonitorThread.java | 147 +++++++++ .../server/threads/ThreadManager.java | 147 +++++++++ .../classes/adapters/MethodAdapter.java | 2 + 12 files changed, 712 insertions(+), 127 deletions(-) create mode 100644 server/src/main/java/org/diskproject/server/adapters/MethodAdapterManager.java create mode 100644 server/src/main/java/org/diskproject/server/threads/DataThread.java create mode 100644 server/src/main/java/org/diskproject/server/threads/ExecutionThread.java create mode 100644 server/src/main/java/org/diskproject/server/threads/MonitorThread.java create mode 100644 server/src/main/java/org/diskproject/server/threads/ThreadManager.java diff --git a/server/src/main/java/org/diskproject/server/adapters/AirFlowAdapter.java b/server/src/main/java/org/diskproject/server/adapters/AirFlowAdapter.java index 673cadd..795b4de 100644 --- a/server/src/main/java/org/diskproject/server/adapters/AirFlowAdapter.java +++ b/server/src/main/java/org/diskproject/server/adapters/AirFlowAdapter.java @@ -133,4 +133,10 @@ public String getDataUri(String id) { // Auto-generated method stub return null; } + + @Override + public boolean registerData (String id, String type) { + // Auto-generated method stub + return false; + } } diff --git a/server/src/main/java/org/diskproject/server/adapters/MethodAdapterManager.java b/server/src/main/java/org/diskproject/server/adapters/MethodAdapterManager.java new file mode 100644 index 0000000..81d08aa --- /dev/null +++ b/server/src/main/java/org/diskproject/server/adapters/MethodAdapterManager.java @@ -0,0 +1,98 @@ +package org.diskproject.server.adapters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.diskproject.server.repository.WingsAdapter; +import org.diskproject.server.util.Config; +import org.diskproject.server.util.Config.MethodAdapterConfig; +import org.diskproject.server.util.ConfigKeys; +import org.diskproject.shared.classes.adapters.MethodAdapter; +import org.diskproject.shared.classes.workflow.Workflow; +import org.diskproject.shared.classes.workflow.WorkflowVariable; + +public class MethodAdapterManager { + protected Map byUrl, byName; + + public MethodAdapterManager () { + this.byUrl = new HashMap(); + this.byName = new HashMap(); + // Read Config + for (MethodAdapterConfig ma: Config.get().methodAdapters) { + MethodAdapter curAdapter = null; + switch (ma.type) { + case ConfigKeys.METHOD_TYPE_WINGS: + curAdapter = new WingsAdapter(ma.name, ma.endpoint, ma.username, ma.password, ma.domain, ma.internalServer); + break; + case ConfigKeys.METHOD_TYPE_AIRFLOW: + curAdapter = new AirFlowAdapter(ma.name, ma.endpoint, ma.username, ma.password); + break; + default: + System.out.println("Error: Method adapter type not found: '" + ma.type + "'"); + break; + } + if (curAdapter != null) { + if (ma.version != null) + curAdapter.setVersion(ma.version); + this.byUrl.put(ma.endpoint, curAdapter); + } + } + + // Check method adapters: + if (this.byUrl.size() == 0) { + System.err.println("WARNING: No method adapters found on configuration file."); + } else + for (MethodAdapter curAdp : this.byUrl.values()) { + this.byName.put(curAdp.getName(), curAdp); + if (!curAdp.ping()) { + System.err.println("ERROR: Could not connect with " + curAdp.getEndpointUrl()); + } + } + } + + public MethodAdapter getMethodAdapterByUrl (String url) { + if (this.byUrl.containsKey(url)) + return this.byUrl.get(url); + return null; + } + + public MethodAdapter getMethodAdapterByName (String name) { + if (this.byName.containsKey(name)) + return this.byName.get(name); + return null; + } + + public List getWorkflowList () { + List list = new ArrayList(); + for (MethodAdapter adapter : this.byUrl.values()) { + for (Workflow wf : adapter.getWorkflowList()) { + list.add(wf); + } + } + return list; + } + + public List getWorkflowVariablesByName (String sourceName, String id) { + MethodAdapter cur = this.getMethodAdapterByName(sourceName); + if (cur != null) + return cur.getWorkflowVariables(id); + return null; + } + + public List getWorkflowVariablesByUrl (String sourceUrl, String id) { + MethodAdapter cur = this.getMethodAdapterByUrl(sourceUrl); + if (cur != null) + return cur.getWorkflowVariables(id); + return null; + } + + public String toString () { + String txt = ""; + for (String name: byName.keySet()) { + txt += name + " -> " + byName.get(name).getEndpointUrl() + "\n"; + } + return txt; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/diskproject/server/api/impl/DiskResource.java b/server/src/main/java/org/diskproject/server/api/impl/DiskResource.java index 8253245..38bc161 100644 --- a/server/src/main/java/org/diskproject/server/api/impl/DiskResource.java +++ b/server/src/main/java/org/diskproject/server/api/impl/DiskResource.java @@ -353,7 +353,7 @@ public void deleteTriggeredLOI( public List listWorkflows() { Gson response_error = new Gson(); try { - return this.repo.getWorkflowList(); + return this.repo.methodAdapters.getWorkflowList(); } catch (Exception e) { try { // Create Json error response @@ -381,7 +381,7 @@ public List listWorkflows() { public List getWorkflowVariables( @PathParam("source") String source, @PathParam("id") String id) { - return this.repo.getWorkflowVariables(source, id); + return this.repo.methodAdapters.getWorkflowVariablesByName(source, id); } @GET diff --git a/server/src/main/java/org/diskproject/server/repository/DiskRepository.java b/server/src/main/java/org/diskproject/server/repository/DiskRepository.java index ff0aced..5b0a68f 100644 --- a/server/src/main/java/org/diskproject/server/repository/DiskRepository.java +++ b/server/src/main/java/org/diskproject/server/repository/DiskRepository.java @@ -27,8 +27,10 @@ import org.apache.jena.query.QueryParseException; import org.diskproject.server.adapters.AirFlowAdapter; import org.diskproject.server.adapters.GraphDBAdapter; +import org.diskproject.server.adapters.MethodAdapterManager; import org.diskproject.server.adapters.SparqlAdapter; import org.diskproject.server.adapters.StorageManager; +import org.diskproject.server.threads.ThreadManager; import org.diskproject.server.util.Config; import org.diskproject.server.util.ConfigKeys; import org.diskproject.server.util.KBCache; @@ -92,10 +94,11 @@ public class DiskRepository extends WriteKBRepository { protected KBCache SQOnt; Map vocabularies; - ScheduledExecutorService monitor, monitorData; - ExecutorService executor; - static DataMonitor dataThread; StorageManager externalStorage; + ThreadManager threadManager; + ////ScheduledExecutorService monitor, monitorData; + ////ExecutorService executor; + ////static DataMonitor dataThread; private Map> optionsCache; private Map externalVocabularies; @@ -121,12 +124,13 @@ public DiskRepository() { // Set domain for writing the KB setConfiguration(); this.setDomain(this.server); + this.methodAdapters = new MethodAdapterManager(); // Initialize this.dataAdapters = new HashMap(); - this.methodAdapters = new HashMap(); this.initializeDataAdapters(); - this.initializeMethodAdapters(); + //this.methodAdapters = new HashMap(); + //this.initializeMethodAdapters(); try { initializeKB(); } catch (Exception e) { @@ -134,20 +138,22 @@ public DiskRepository() { System.exit(1); } // Threads - monitor = Executors.newScheduledThreadPool(0); - executor = Executors.newFixedThreadPool(2); - dataThread = new DataMonitor(); + threadManager = new ThreadManager(methodAdapters, this); + ////monitor = Executors.newScheduledThreadPool(0); + ////executor = Executors.newFixedThreadPool(2); + ////dataThread = new DataMonitor(); } public void shutdownExecutors() { - if (monitor != null) - monitor.shutdownNow(); - if (executor != null) - executor.shutdownNow(); - if (dataThread != null) - dataThread.stop(); - if (monitorData != null) - monitorData.shutdownNow(); + threadManager.shutdownExecutors(); + ////if (monitor != null) + //// monitor.shutdownNow(); + ////if (executor != null) + //// executor.shutdownNow(); + ////if (dataThread != null) + //// dataThread.stop(); + ////if (monitorData != null) + //// monitorData.shutdownNow(); } /******************** @@ -297,62 +303,62 @@ public List getDataAdapters() { } // -- Method adapters - private void initializeMethodAdapters() { - for (MethodAdapterConfig ma: Config.get().methodAdapters) { - MethodAdapter curAdapter = null; - switch (ma.type) { - case ConfigKeys.METHOD_TYPE_WINGS: - curAdapter = new WingsAdapter(ma.name, ma.endpoint, ma.username, ma.password, ma.domain, ma.internalServer); - break; - case ConfigKeys.METHOD_TYPE_AIRFLOW: - curAdapter = new AirFlowAdapter(ma.name, ma.endpoint, ma.username, ma.password); - break; - default: - System.out.println("Error: Method adapter type not found: '" + ma.type + "'"); - break; - } - if (curAdapter != null) { - if (ma.version != null) - curAdapter.setVersion(ma.version); - this.methodAdapters.put(ma.endpoint, curAdapter); - } - } + //private void initializeMethodAdapters() { + // for (MethodAdapterConfig ma: Config.get().methodAdapters) { + // MethodAdapter curAdapter = null; + // switch (ma.type) { + // case ConfigKeys.METHOD_TYPE_WINGS: + // curAdapter = new WingsAdapter(ma.name, ma.endpoint, ma.username, ma.password, ma.domain, ma.internalServer); + // break; + // case ConfigKeys.METHOD_TYPE_AIRFLOW: + // curAdapter = new AirFlowAdapter(ma.name, ma.endpoint, ma.username, ma.password); + // break; + // default: + // System.out.println("Error: Method adapter type not found: '" + ma.type + "'"); + // break; + // } + // if (curAdapter != null) { + // if (ma.version != null) + // curAdapter.setVersion(ma.version); + // this.methodAdapters.put(ma.endpoint, curAdapter); + // } + // } - // Check method adapters: - if (this.methodAdapters.size() == 0) { - System.err.println("WARNING: No method adapters found on configuration file."); - } else - for (MethodAdapter curAdp : this.methodAdapters.values()) { - if (!curAdp.ping()) { - System.err.println("ERROR: Could not connect with " + curAdp.getEndpointUrl()); - } - } - } + // // Check method adapters: + // if (this.methodAdapters.size() == 0) { + // System.err.println("WARNING: No method adapters found on configuration file."); + // } else + // for (MethodAdapter curAdp : this.methodAdapters.values()) { + // if (!curAdp.ping()) { + // System.err.println("ERROR: Could not connect with " + curAdp.getEndpointUrl()); + // } + // } + //} - public MethodAdapter getMethodAdapter(String url) { - if (this.methodAdapters.containsKey(url)) - return this.methodAdapters.get(url); - return null; - } + //public MethodAdapter getMethodAdapter(String url) { + // if (this.methodAdapters.containsKey(url)) + // return this.methodAdapters.get(url); + // return null; + //} - public List getWorkflowList() { - List list = new ArrayList(); - for (MethodAdapter adapter : this.methodAdapters.values()) { - for (Workflow wf : adapter.getWorkflowList()) { - list.add(wf); - } - } - return list; - } + //public List getWorkflowList() { + // List list = new ArrayList(); + // for (MethodAdapter adapter : this.methodAdapters.values()) { + // for (Workflow wf : adapter.getWorkflowList()) { + // list.add(wf); + // } + // } + // return list; + //} - public List getWorkflowVariables(String source, String id) { - for (MethodAdapter adapter : this.methodAdapters.values()) { - if (adapter.getName().equals(source)) { - return adapter.getWorkflowVariables(id); - } - } - return null; - } + //public List getWorkflowVariables(String source, String id) { + // for (MethodAdapter adapter : this.methodAdapters.values()) { + // if (adapter.getName().equals(source)) { + // return adapter.getWorkflowVariables(id); + // } + // } + // return null; + //} // -- Vocabulary Initialization private void initializeVocabularies() { @@ -457,6 +463,7 @@ public LineOfInquiry addLOI(String username, LineOfInquiry loi) { String desc = loi.getDescription(); String question = loi.getQuestionId(); String dateCreated = loi.getDateCreated(); + System.out.println(">>> " + loi.getDataQueryExplanation()); // Set or update date if (dateCreated == null || dateCreated.equals("")) { loi.setDateCreated(dateformatter.format(new Date())); @@ -514,9 +521,9 @@ public TriggeredLOI addTriggeredLOI(String username, TriggeredLOI tloi) { tloi.setDateModified(dateformatter.format(new Date())); } writeTLOI(username, tloi); - LineOfInquiry loi = getLOI(username, tloi.getParentLoiId()); - TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, loi, false); - executor.execute(wflowThread); + threadManager.executeTLOI(tloi); + //TLOIExecutionThread wflowThread = new TLOIExecutionThread(username, tloi, loi, false); + //executor.execute(wflowThread); return tloi; } @@ -528,7 +535,7 @@ public TriggeredLOI getTriggeredLOI(String username, String id) { return loadTLOI(username, id); } - private TriggeredLOI updateTriggeredLOI(String username, String id, TriggeredLOI tloi) { + public TriggeredLOI updateTriggeredLOI(String username, String id, TriggeredLOI tloi) { if (tloi.getId() != null && this.deleteTLOI(username, id) && this.writeTLOI(username, tloi)) return tloi; return null; @@ -1191,7 +1198,7 @@ public List queryHypothesis(String username, String id) throws Exc boolean allOk = true; for (WorkflowBindings wb: loi.getWorkflows()) { String source = wb.getSource(); - if (source == null || getMethodAdapterByName(source) == null) { + if (source == null || methodAdapters.getMethodAdapterByName(source) == null) { allOk = false; System.out.println("Warning: " + loi.getId() + " uses an unknown method adapter: " + source); break; @@ -1200,7 +1207,7 @@ public List queryHypothesis(String username, String id) throws Exc if (allOk) for (WorkflowBindings wb: loi.getMetaWorkflows()) { String source = wb.getSource(); - if (source == null || getMethodAdapterByName(source) == null) { + if (source == null || methodAdapters.getMethodAdapterByName(source) == null) { allOk = false; System.out.println("Warning: " + loi.getId() + " uses an unknown method adapter: " + source); break; @@ -1384,23 +1391,49 @@ private List checkExistingTLOIs(String username, List getTLOIBindings(String username, List wflowBindings, + private List getTLOIBindings(String username, List workflowList, Map> dataVarBindings, DataAdapter dataAdapter) throws Exception { - List tloiBindings = new ArrayList(); - for (WorkflowBindings bindings : wflowBindings) { // FOR EACH WORKFLOW + List tloiWorkflowList = new ArrayList(); + + for (WorkflowBindings workflowDef : workflowList) { // FOR EACH WORKFLOW // For each Workflow, create an empty copy to set the values - WorkflowBindings tloiBinding = new WorkflowBindings( - bindings.getWorkflow(), - bindings.getWorkflowLink()); - tloiBinding.setSource(bindings.getSource()); - tloiBinding.setMeta(bindings.getMeta()); - tloiBindings.add(tloiBinding); - MethodAdapter methodAdapter = getMethodAdapterByName(bindings.getSource()); + WorkflowBindings tloiWorkflowDef = new WorkflowBindings( + workflowDef.getWorkflow(), + workflowDef.getWorkflowLink()); + tloiWorkflowDef.setSource(workflowDef.getSource()); + tloiWorkflowDef.setMeta(workflowDef.getMeta()); + tloiWorkflowList.add(tloiWorkflowDef); + MethodAdapter methodAdapter = methodAdapters.getMethodAdapterByName(workflowDef.getSource()); + List allVars = methodAdapter.getWorkflowVariables(workflowDef.getWorkflow()); + Map binSize = new HashMap(); + + // We need to order bindings by the number of datasets. + for (VariableBinding vBinding : workflowDef.getBindings()) { // Normal variable bindings. + String binding = vBinding.getBinding(); + Matcher collmat = varCollPattern.matcher(binding); + Matcher mat = varPattern.matcher(binding); + // Get the sparql variable + String sparqlVar = null; + if (collmat.find() && dataVarBindings.containsKey(collmat.group(1))) { + sparqlVar = collmat.group(1); + } else if (mat.find() && dataVarBindings.containsKey(mat.group(1))) { + sparqlVar = mat.group(1); + } else if (binding.equals("_CSV_")) { + sparqlVar = "_CSV_"; + } - List allVars = methodAdapter.getWorkflowVariables(bindings.getWorkflow()); + if (sparqlVar == null) { + binSize.put(vBinding, 0); + } else { + //List dsUrls = dataVarBindings.get(sparqlVar); + binSize.put(vBinding, dataVarBindings.containsKey(sparqlVar) ? dataVarBindings.get(sparqlVar).size() : 0); + } + } + List LIST = workflowDef.getBindings(); + LIST.sort((VariableBinding b1, VariableBinding b2) -> binSize.get(b1) - binSize.get(b2)); - for (VariableBinding vBinding : bindings.getBindings()) { // Normal variable bindings. + for (VariableBinding vBinding : LIST) { // Normal variable bindings. + //for (VariableBinding vBinding : LIST) { // For each Variable binding, check : // - If this variable expects a collection or single values // - Check the binding values on the data store @@ -1421,7 +1454,7 @@ private List getTLOIBindings(String username, List getTLOIBindings(String username, List newTloiBindings = new ArrayList(); - for (WorkflowBindings tmpBinding : tloiBindings) { // For all already processed workflow - // bindings + for (WorkflowBindings tmpWorkflow : tloiWorkflowList) { // For all already processed workflows for (String dsName : dsNames) { - ArrayList newBindings = (ArrayList) SerializationUtils - .clone((Serializable) tmpBinding.getBindings()); + //List newBindings = + //(ArrayList) SerializationUtils.clone((Serializable) tmpBinding.getBindings()); + List newBindings = new ArrayList(); + for (VariableBinding cur: tmpWorkflow.getBindings()) { + VariableBinding newV = new VariableBinding(cur.getVariable(), cur.getBinding()); + newV.setType(cur.getType()); + newBindings.add(newV); + } - WorkflowBindings newWorkflowBindings = new WorkflowBindings( - bindings.getWorkflow(), - bindings.getWorkflowLink(), + WorkflowBindings newWorkflow = new WorkflowBindings( + workflowDef.getWorkflow(), + workflowDef.getWorkflowLink(), newBindings); - newWorkflowBindings.addBinding(new VariableBinding(vBinding.getVariable(), dsName)); - newWorkflowBindings.setMeta(bindings.getMeta()); - newWorkflowBindings.setSource(bindings.getSource()); - newTloiBindings.add(newWorkflowBindings); + + VariableBinding cur = new VariableBinding(vBinding.getVariable(), dsName); + cur.setType(vBinding.getType()); + newWorkflow.addBinding(cur); + + newWorkflow.setMeta(workflowDef.getMeta()); + newWorkflow.setSource(workflowDef.getSource()); + newTloiBindings.add(newWorkflow); } } - tloiBindings = newTloiBindings; + tloiWorkflowList = newTloiBindings; } } } } - return tloiBindings; + return tloiWorkflowList; } //This adds dsUrls to the data-repository, returns filename -> URL @@ -1816,14 +1866,14 @@ public List runHypothesisAndLOI(String username, String hypId, Str */ public WorkflowRun getWorkflowRunStatus(String source, String id) { - MethodAdapter methodAdapter = getMethodAdapterByName(source); + MethodAdapter methodAdapter = this.methodAdapters.getMethodAdapterByName(source); if (methodAdapter == null) return null; return methodAdapter.getRunStatus(id); } public FileAndMeta getOutputData(String source, String id) { - MethodAdapter methodAdapter = getMethodAdapterByName(source); + MethodAdapter methodAdapter = this.methodAdapters.getMethodAdapterByName(source); if (methodAdapter == null) return null; return methodAdapter.fetchData(methodAdapter.getDataUri(id)); @@ -1833,7 +1883,7 @@ public FileAndMeta getOutputData(String source, String id) { * Threads */ - class TLOIExecutionThread implements Runnable { + /*class TLOIExecutionThread implements Runnable { String username; boolean metamode; TriggeredLOI tloi; @@ -1921,9 +1971,9 @@ public void run() { e.printStackTrace(); } } - } + }*/ - private void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, WorkflowBindings workflow, WorkflowRun run, MethodAdapter methodAdapter, boolean meta) { + public void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, WorkflowBindings workflow, WorkflowRun run, MethodAdapter methodAdapter, boolean meta) { Map outputs = run.getOutputs(); if (outputs == null) return; @@ -2014,7 +2064,7 @@ private void processWorkflowOutputs (TriggeredLOI tloi, LineOfInquiry loi, Workf } }*/ - private Status getOverallRunStatus (TriggeredLOI tloi, boolean metamode) { + /*private Status getOverallRunStatus (TriggeredLOI tloi, boolean metamode) { List wfList = metamode ? tloi.getMetaWorkflows() : tloi.getWorkflows(); // Fist check if theres some pending, queued or running run. for (WorkflowBindings wf: wfList) { @@ -2022,6 +2072,8 @@ private Status getOverallRunStatus (TriggeredLOI tloi, boolean metamode) { RuntimeInfo exec = run.getExecutionInfo(); Status runStatus = exec.status; if (runStatus == Status.PENDING || runStatus == Status.RUNNING || runStatus == Status.QUEUED) { + System.out.println(wf); + System.out.println(runStatus); return Status.RUNNING; } } @@ -2130,10 +2182,12 @@ public void run() { updatedBindings.add(bindings); this.queuedRuns.get(wfName).put(runUri, newRun); } - tloi.setStatus(getOverallRunStatus(tloi, metamode)); if (metamode) tloi.setMetaWorkflows(updatedBindings); else tloi.setWorkflows(updatedBindings); + tloi.setStatus(getOverallRunStatus(tloi, metamode)); Status overallStatus = tloi.getStatus(); + System.out.println("OVERALL"); + System.out.println(overallStatus); if (overallStatus == Status.SUCCESSFUL) { if (metamode) { @@ -2203,5 +2257,5 @@ public void stop() { Thread.currentThread().interrupt(); } } - } + }*/ } diff --git a/server/src/main/java/org/diskproject/server/repository/KBRepository.java b/server/src/main/java/org/diskproject/server/repository/KBRepository.java index a461dcd..dc4357f 100644 --- a/server/src/main/java/org/diskproject/server/repository/KBRepository.java +++ b/server/src/main/java/org/diskproject/server/repository/KBRepository.java @@ -4,10 +4,10 @@ import java.util.Map; import java.util.concurrent.Semaphore; +import org.diskproject.server.adapters.MethodAdapterManager; import org.diskproject.server.util.Config; import org.diskproject.server.util.KBCache; import org.diskproject.shared.classes.adapters.DataAdapter; -import org.diskproject.shared.classes.adapters.MethodAdapter; import org.diskproject.shared.classes.util.KBConstants; import edu.isi.kcap.ontapi.KBAPI; @@ -26,7 +26,8 @@ public class KBRepository implements TransactionsAPI { protected KBCache DISKOnt; protected Map dataAdapters; - protected Map methodAdapters; + //protected Map methodAdapters; + public MethodAdapterManager methodAdapters; protected void setConfiguration() { Config currentConfig = Config.get(); @@ -65,17 +66,16 @@ protected void initializeKB() throws Exception { } else { return; } - } - public MethodAdapter getMethodAdapterByName(String source) { - for (MethodAdapter adapter : this.methodAdapters.values()) { - if (adapter.getName().equals(source)) - return adapter; - } - System.err.println("Error: Method adapter not found " + source); - return null; - } + //public MethodAdapter getMethodAdapterByName(String source) { + // for (MethodAdapter adapter : this.methodAdapters.values()) { + // if (adapter.getName().equals(source)) + // return adapter; + // } + // System.err.println("Error: Method adapter not found " + source); + // return null; + //} // TransactionsAPI functions private void acquire() { diff --git a/server/src/main/java/org/diskproject/server/repository/WingsAdapter.java b/server/src/main/java/org/diskproject/server/repository/WingsAdapter.java index 9116fd8..d4d7871 100644 --- a/server/src/main/java/org/diskproject/server/repository/WingsAdapter.java +++ b/server/src/main/java/org/diskproject/server/repository/WingsAdapter.java @@ -1441,4 +1441,27 @@ public WorkflowRun getRunStatus(String runId) { public FileAndMeta fetchData(String dataId) { return this.fetchDataFromWings(dataId); } + + + + //data_id=http://datascience4all.org/wings-portal-new/export/users/dkhider/ClimateDisk/data/library.owl#Output-9xo020zkzzlut84luu7bp1z16 + //newname=Output-9xo020zkzzlut84luu7bp1z16 + //metadata_json={ + // "type":["http://datascience4all.org/wings-portal-new/export/users/dkhider/ClimateDisk/data/ontology.owl#TimeSeries"] + //}' + + @Override + public boolean registerData (String id, String type) { + String fullId = getDataUri(id); + String registerFileUrl = "users/" + this.getUsername() + "/" + this.domain +"/data/registerData"; + List formdata = new ArrayList(); + formdata.add(new BasicNameValuePair("data_id", fullId)); + formdata.add(new BasicNameValuePair("newname", id)); + if (type != null) { + String jsonFile = "{\"type\":[\"" + type + "\"]}"; + formdata.add(new BasicNameValuePair("metadata_json", jsonFile)); + } + String resp = this.post(registerFileUrl, formdata); + return resp.equals("OK"); + } } \ No newline at end of file diff --git a/server/src/main/java/org/diskproject/server/repository/WriteKBRepository.java b/server/src/main/java/org/diskproject/server/repository/WriteKBRepository.java index 145bb1d..2c97bf1 100644 --- a/server/src/main/java/org/diskproject/server/repository/WriteKBRepository.java +++ b/server/src/main/java/org/diskproject/server/repository/WriteKBRepository.java @@ -907,7 +907,7 @@ private void writeBindings(String userDomain, String id, KBObject bindingprop, for (WorkflowBindings bindings : bindingsList) { String source = bindings.getSource(); String description = bindings.getDescription(); - MethodAdapter methodAdapter = this.getMethodAdapterByName(source); + MethodAdapter methodAdapter = this.methodAdapters.getMethodAdapterByName(source); if (methodAdapter == null) { System.out.println("Method adapter not found " + source); continue; @@ -1075,7 +1075,7 @@ private List loadBindings(String userDomain, String id, KBObje if (sourceObj != null) { String source = sourceObj.getValueAsString(); bindings.setSource(source); - methodAdapter = this.getMethodAdapterByName(source); + methodAdapter = this.methodAdapters.getMethodAdapterByName(source); } String description = kb.getComment(wbObj); diff --git a/server/src/main/java/org/diskproject/server/threads/DataThread.java b/server/src/main/java/org/diskproject/server/threads/DataThread.java new file mode 100644 index 0000000..9b77439 --- /dev/null +++ b/server/src/main/java/org/diskproject/server/threads/DataThread.java @@ -0,0 +1,11 @@ +package org.diskproject.server.threads; + +public class DataThread implements Runnable { + + @Override + public void run() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'run'"); + } + +} diff --git a/server/src/main/java/org/diskproject/server/threads/ExecutionThread.java b/server/src/main/java/org/diskproject/server/threads/ExecutionThread.java new file mode 100644 index 0000000..e6b5fcb --- /dev/null +++ b/server/src/main/java/org/diskproject/server/threads/ExecutionThread.java @@ -0,0 +1,97 @@ +package org.diskproject.server.threads; + +import java.util.List; +import java.util.Map; + +import org.diskproject.shared.classes.adapters.MethodAdapter; +import org.diskproject.shared.classes.loi.LineOfInquiry; +import org.diskproject.shared.classes.loi.TriggeredLOI; +import org.diskproject.shared.classes.loi.WorkflowBindings; +import org.diskproject.shared.classes.workflow.VariableBinding; +import org.diskproject.shared.classes.workflow.WorkflowRun; +import org.diskproject.shared.classes.workflow.WorkflowRun.Status; +import org.diskproject.shared.classes.workflow.WorkflowVariable; + +public class ExecutionThread implements Runnable { + ThreadManager manager; + boolean metamode; + TriggeredLOI tloi; + LineOfInquiry loi; + + public ExecutionThread (ThreadManager manager, TriggeredLOI tloi, LineOfInquiry loi, boolean metamode) { + this.manager = manager; + this.tloi = tloi; + this.loi = loi; + this.metamode = metamode; + } + + @Override + public void run() { + if (this.metamode) + System.out.println("[R] Running execution thread on META mode"); + else + System.out.println("[R] Running execution thread"); + + List workflowList = this.metamode ? tloi.getMetaWorkflows() : tloi.getWorkflows(); + Status currentStatus = Status.RUNNING; + + // Start workflows from tloi + for (WorkflowBindings curWorkflow : workflowList) { + MethodAdapter methodAdapter = this.manager.getMethodAdapters().getMethodAdapterByName(curWorkflow.getSource()); + if (methodAdapter == null) { + currentStatus = Status.FAILED; + break; + } + if (this.metamode) { + // TODO: Here we should map workflow outputs to metaworkflow inputs. + } + + // Get workflow input details + Map inputVariables = methodAdapter.getWorkflowInputs(curWorkflow.getWorkflow()); + List inputBindings = curWorkflow.getBindings(); + this.printWorkflowRun(curWorkflow, inputBindings); + List runIds = methodAdapter.runWorkflow(curWorkflow.getWorkflow(), inputBindings, inputVariables); + + if (runIds != null) { + System.out.println("[R] " + runIds.size() + " Workflows send: "); + for (String rid : runIds) { + WorkflowRun run = new WorkflowRun(); + run.setId(rid); + run.setAsPending(); + System.out.println("[R] ID: " + rid); + curWorkflow.addRun(run); + } + } else { + currentStatus = Status.FAILED; + System.out.println("[R] Error: Could not run workflow"); + } + } + tloi.setStatus(currentStatus); + manager.updateTLOI(tloi); + + // Start monitoring + if (currentStatus == Status.RUNNING) { + manager.monitorTLOI(tloi, loi, metamode); + } else { + System.out.println("[E] Finished: Something when wrong."); + } + } + + public void printWorkflowRun(WorkflowBindings wf, List inputBindings) { + // Execute workflow + System.out.println("[R] Executing " + wf.getWorkflow() + " with " + inputBindings.size() + " parameters:"); + for (VariableBinding v : inputBindings) { + String[] l = v.isCollection() ? v.getBindingAsArray() : null; + int i = 0; + if (l != null) { + System.out.println("[R] - " + v.getVariable() + ": "); + for (String b : l) { + System.out.println("[R] " + String.valueOf(i) + ") " + b); + i++; + } + } else { + System.out.println("[R] - " + v.getVariable() + ": " + v.getBinding()); + } + } + } +} diff --git a/server/src/main/java/org/diskproject/server/threads/MonitorThread.java b/server/src/main/java/org/diskproject/server/threads/MonitorThread.java new file mode 100644 index 0000000..09ffaed --- /dev/null +++ b/server/src/main/java/org/diskproject/server/threads/MonitorThread.java @@ -0,0 +1,147 @@ +package org.diskproject.server.threads; + +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.diskproject.shared.classes.adapters.MethodAdapter; +import org.diskproject.shared.classes.loi.TriggeredLOI; +import org.diskproject.shared.classes.loi.WorkflowBindings; +import org.diskproject.shared.classes.workflow.WorkflowRun; +import org.diskproject.shared.classes.workflow.WorkflowRun.RuntimeInfo; +import org.diskproject.shared.classes.workflow.WorkflowRun.Status; + +public class MonitorThread implements Runnable { + ThreadManager manager; + boolean metamode; + TriggeredLOI tloi; + List runList; // runList = [runId1, runId2, ...] + Map runInfo; // runInfo[runId1] = WorkflowRun + Map runToWf; // runToWf[runId1] = WorkflowBindings + Map runToIndex; // runToIndex[runId1] = 0,1... + + public MonitorThread (ThreadManager manager, TriggeredLOI tloi, boolean metamode) { + this.tloi = tloi; + this.manager = manager; + this.metamode = metamode; + + this.runInfo = new HashMap(); + this.runToWf = new HashMap(); + this.runToIndex = new HashMap(); + this.runList = new ArrayList(); + + Integer index = 0; + for (WorkflowBindings curWorkflow : (metamode ? this.tloi.getMetaWorkflows() : this.tloi.getWorkflows())) { + for (WorkflowRun run : curWorkflow.getRuns().values()) { + RuntimeInfo exec = run.getExecutionInfo(); + Status st = exec.status; + String runId = run.getId(); + if (st == Status.PENDING || st == Status.QUEUED || st == Status.RUNNING) { + runList.add(runId); + runInfo.put(runId, run); + runToWf.put(runId, curWorkflow); + runToIndex.put(runId, index); + } + } + index += 1; + } + } + + private boolean needsMonitoring (RuntimeInfo run) { + return !(run.status == Status.SUCCESSFUL || run.status == Status.FAILED); + } + + private WorkflowRun getNextPendingRun () { + for (String runId: this.runList) { + WorkflowRun run = this.runInfo.get(runId); + if (needsMonitoring(run.getExecutionInfo()) && run.getId() != null) + return run; + } + return null; + } + + private Status getOverallStatus () { + for (String runId: this.runList) { + WorkflowRun run = this.runInfo.get(runId); + Status status = run.getExecutionInfo().status; + if (status == Status.FAILED) + return Status.FAILED; + if (status != Status.SUCCESSFUL) //If all of them are pending then the tloi should be pending too. + return Status.RUNNING; + } + return Status.SUCCESSFUL; + } + + private void updateRun (WorkflowBindings wf, WorkflowRun run) { + this.runInfo.replace(run.getId(), run); // Updates status on the run list + wf.addRun(run); // This replaces old run. + Integer index = this.runToIndex.get(run.getId()); + List list = this.metamode ? this.tloi.getMetaWorkflows() : this.tloi.getWorkflows(); + list.set(index, wf); // Replaces run in the wf list + if (this.metamode) this.tloi.setMetaWorkflows(list); + else this.tloi.setWorkflows(list); + + // Process outputs + if (run.getExecutionInfo().status == Status.SUCCESSFUL) { + this.manager.processFinishedRun(this.tloi, wf, run, metamode); + } + } + + @Override + public void run() { + System.out.println("[M] Running monitoring thread"); + WorkflowRun pendingRun = this.getNextPendingRun(); + if (pendingRun == null || pendingRun.getId() == null) { + System.out.println("[M] No more pending runs."); + return; + } + WorkflowBindings wf = runToWf.get(pendingRun.getId()); + MethodAdapter methodAdapter = this.manager.getMethodAdapters().getMethodAdapterByName(wf.getSource()); + if (methodAdapter == null) { + System.out.println("[M] Error: Method adapter not found: " + wf.getSource()); + return; + } + + String runId = pendingRun.getId().replaceAll("^.*#", ""); + WorkflowRun updatedRun = methodAdapter.getRunStatus(runId); + + RuntimeInfo oldExec = pendingRun.getExecutionInfo(); + RuntimeInfo newExec = updatedRun != null ? updatedRun.getExecutionInfo() : null; + // If we cannot get the status but the run was pending, it means that the run is in the WINGS queue. + if (newExec == null || newExec.status == null) { + System.out.println("[E] Cannot get status for " + tloi.getId() + " - RUN " + runId); + if (oldExec.status == Status.PENDING) { // In queue + updatedRun = pendingRun; + } else { + System.out.println("[E] This should not happen"); + //??? + return; + } + } + updateRun(wf, updatedRun); + + Status status = getOverallStatus(); + this.tloi.setStatus(status); + manager.updateTLOI(tloi); + + if (status == Status.SUCCESSFUL) { + if (metamode) { + System.out.println("[M] " + this.tloi.getId() + " was successfully executed."); + } else { + System.out.println("[M] Starting metamode after " + this.runList.size() + " runs."); + this.manager.executeTLOI(tloi, true); + } + } else if (status == Status.FAILED) { + if (metamode) { + System.out.println("[M] " + this.tloi.getId() + " was executed with errors."); + } else { + System.out.println("[M] " + this.tloi.getId() + " will not run metamode. Some runs failed."); + } + } else { + System.out.println("[M] " + this.tloi.getId() + " still pending."); + manager.schedule(this, 10, TimeUnit.SECONDS); + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/diskproject/server/threads/ThreadManager.java b/server/src/main/java/org/diskproject/server/threads/ThreadManager.java new file mode 100644 index 0000000..81429ba --- /dev/null +++ b/server/src/main/java/org/diskproject/server/threads/ThreadManager.java @@ -0,0 +1,147 @@ +package org.diskproject.server.threads; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.diskproject.server.adapters.MethodAdapterManager; +import org.diskproject.server.repository.DiskRepository; +import org.diskproject.shared.classes.adapters.MethodAdapter; +import org.diskproject.shared.classes.loi.LineOfInquiry; +import org.diskproject.shared.classes.loi.TriggeredLOI; +import org.diskproject.shared.classes.loi.WorkflowBindings; +import org.diskproject.shared.classes.workflow.VariableBinding; +import org.diskproject.shared.classes.workflow.WorkflowRun; +import org.diskproject.shared.classes.workflow.WorkflowVariable; +import org.diskproject.shared.classes.workflow.WorkflowRun.RunBinding; +import org.diskproject.shared.classes.workflow.WorkflowRun.Status; + +public class ThreadManager { + private static String USERNAME = "admin"; + protected MethodAdapterManager methodAdapters; + private DiskRepository disk; + protected ScheduledExecutorService monitor; + protected ExecutorService executor; + + public ThreadManager (MethodAdapterManager methodAdapters, DiskRepository disk) { + this.disk = disk; + this.methodAdapters = methodAdapters; + this.executor = Executors.newFixedThreadPool(2); + this.monitor = Executors.newScheduledThreadPool(0); + } + + public void shutdownExecutors () { + if (monitor != null) monitor.shutdownNow(); + if (executor != null) executor.shutdownNow(); + } + + public MethodAdapterManager getMethodAdapters () { + return methodAdapters; + } + + public void schedule (MonitorThread thread, int time, TimeUnit unit) { + this.monitor.schedule(thread, time, unit); + } + + public void executeTLOI (TriggeredLOI tloi) { + this.executeTLOI(tloi, false); + } + + public void executeTLOI (TriggeredLOI tloi, Boolean meta) { + LineOfInquiry loi = disk.getLOI(USERNAME, tloi.getParentLoiId()); + if (meta) { + tloi.setStatus(Status.RUNNING); + this.addMetaBindings(tloi); + this.updateTLOI(tloi); + } + ExecutionThread workflowThread = new ExecutionThread(this, tloi, loi, meta); + executor.execute(workflowThread); + } + + public void monitorTLOI (TriggeredLOI tloi, LineOfInquiry loi, Boolean metamode) { + MonitorThread monitorThread = new MonitorThread(this, tloi, metamode); + monitor.schedule(monitorThread, 15, TimeUnit.SECONDS); + } + + public void updateTLOI (TriggeredLOI tloi) { + disk.updateTriggeredLOI(USERNAME, tloi.getId(), tloi); + } + + public void processFinishedRun (TriggeredLOI tloi, WorkflowBindings wf, WorkflowRun run, boolean meta) { + LineOfInquiry loi = disk.getLOI(USERNAME, tloi.getParentLoiId()); + MethodAdapter methodAdapter = this.getMethodAdapters().getMethodAdapterByName(wf.getSource()); + disk.processWorkflowOutputs(tloi, loi, wf, run, methodAdapter, meta); + } + + private boolean addMetaBindings (TriggeredLOI tloi) { + List dates = new ArrayList(); + Map> files = new HashMap>(); + System.out.println("Adding data to metaworkflow"); + boolean allOk = true; + //Get all + for (TriggeredLOI cur: disk.listTriggeredLOIs(USERNAME)) { + if (cur.getParentHypothesisId().equals(tloi.getParentHypothesisId()) && + cur.getParentLoiId().equals(tloi.getParentLoiId())) { + //TLOIs that match both, LOI & Hypothesis + for (WorkflowBindings wf: cur.getWorkflows()) { + //MethodAdapter adapter = this.methodAdapters.getMethodAdapterByName(wf.getSource()); + //Map outputVariables = adapter.getWorkflowOutputs(wf.getWorkflow()); + //List outputVariables = adapter.getWorkflowVariables(wf.getWorkflow()); + //System.out.println(outputVariables); + //FIXME: continue here. + for (WorkflowRun run: wf.getRuns().values()) { + for (String outputName: run.getOutputs().keySet()) { + RunBinding out = run.getOutputs().get(outputName); + if (!files.containsKey(outputName)) { + files.put(outputName, new ArrayList()); + } + List list = files.get(outputName); + list.add(out.id.replaceAll("^.*#", "")); + } + dates.add(String.valueOf(run.getExecutionInfo().endTime)); + } + } + } + } + + for (WorkflowBindings wf: tloi.getMetaWorkflows()) { + MethodAdapter adapter = this.methodAdapters.getMethodAdapterByName(wf.getSource()); + List vars = adapter.getWorkflowVariables(wf.getWorkflow()); + for (VariableBinding vb: wf.getBindings()) { + String binding = vb.getBinding(); + if (binding.equals("_RUN_DATE_")) { + vb.setBinding("[" + String.join(",", dates) + "]"); + } else { + if (binding.startsWith("[") && binding.endsWith("]")) { + binding = binding.substring(1, binding.length() -1); + } + for (String outName: files.keySet()) { + if (binding.equals("!" + outName)) { + vb.setBinding("[" + String.join(",", files.get(outName)) + "]"); + String type = null; + for (WorkflowVariable wv: vars) { + if (vb.getVariable().equals(wv.getName()) && wv.getType().size() > 0) { + type = wv.getType().get(0); + } + } + + System.out.println("type: " + type); + // Upload files: + for (String dataid: files.get(outName)) { + if (!adapter.registerData(dataid, type)); + allOk = false; + } + } + } + } + } + } + + return allOk; + } +} \ No newline at end of file diff --git a/shared/src/main/java/org/diskproject/shared/classes/adapters/MethodAdapter.java b/shared/src/main/java/org/diskproject/shared/classes/adapters/MethodAdapter.java index 0761c0b..abefd75 100644 --- a/shared/src/main/java/org/diskproject/shared/classes/adapters/MethodAdapter.java +++ b/shared/src/main/java/org/diskproject/shared/classes/adapters/MethodAdapter.java @@ -100,6 +100,8 @@ public String toString () { public abstract Map getRunVariableBindings (String runId); + public abstract boolean registerData (String dataid, String type); + // Test connection with source public abstract boolean ping (); } \ No newline at end of file