diff --git a/planner/src/main/java/edu/isi/wings/execution/engine/api/impl/local/LocalExecutionEngine.java b/planner/src/main/java/edu/isi/wings/execution/engine/api/impl/local/LocalExecutionEngine.java
index bab99415..39aa39d4 100644
--- a/planner/src/main/java/edu/isi/wings/execution/engine/api/impl/local/LocalExecutionEngine.java
+++ b/planner/src/main/java/edu/isi/wings/execution/engine/api/impl/local/LocalExecutionEngine.java
@@ -48,10 +48,10 @@
public class LocalExecutionEngine implements PlanExecutionEngine, StepExecutionEngine {
- protected final ExecutorService executor;
+ protected ExecutorService executor; // FIXME: Should be Static to work properly - but causes problem with shutdown
protected Properties props;
- protected int maxParallel = 4;
+ protected int maxParallel = 10;
protected StepExecutionEngine stepEngine;
protected PlanExecutionEngine planEngine;
@@ -66,10 +66,10 @@ public LocalExecutionEngine(Properties props) {
this.maxParallel = Integer.parseInt(props.getProperty("parallel"));
this.stepEngine = this;
this.planEngine = this;
- executor = Executors.newFixedThreadPool(maxParallel);
+ if (executor == null)
+ executor = Executors.newFixedThreadPool(maxParallel);
}
-
@Override
public void setExecutionLogger(ExecutionLoggerAPI logger) {
this.logger = logger;
diff --git a/planner/src/main/java/edu/isi/wings/execution/engine/classes/RuntimePlan.java b/planner/src/main/java/edu/isi/wings/execution/engine/classes/RuntimePlan.java
index 43ba1ed3..16ae41f1 100644
--- a/planner/src/main/java/edu/isi/wings/execution/engine/classes/RuntimePlan.java
+++ b/planner/src/main/java/edu/isi/wings/execution/engine/classes/RuntimePlan.java
@@ -55,6 +55,8 @@ public class RuntimePlan extends URIEntity implements Cloneable {
String callbackUrl;
Cookie[] callbackCookies;
+ Runnable callbackThread;
+
boolean replanned = false;
public RuntimePlan() {
@@ -72,7 +74,7 @@ public RuntimePlan(ExecutionPlan plan) {
this.queue = new ExecutionQueue(plan);
this.runtimeInfo = new RuntimeInfo();
}
-
+
public ExecutionQueue getQueue() {
return queue;
}
@@ -113,6 +115,13 @@ public void onEnd(ExecutionLoggerAPI logger, RuntimeInfo.Status status, String l
}
private void postCallback() {
+ if(this.callbackThread != null) {
+ synchronized(this.callbackThread) {
+ //System.out.println("Notifying execution thread: " + this.callbackThread);
+ this.callbackThread.notify();
+ }
+ }
+
if(this.callbackUrl != null && this.runtimeInfo.status == Status.SUCCESS) {
try {
BasicCookieStore cookieStore = new BasicCookieStore();
@@ -211,6 +220,14 @@ public void setReplanned(boolean replanned) {
this.replanned = replanned;
}
+ public Runnable getCallbackThread() {
+ return callbackThread;
+ }
+
+ public void setCallbackThread(Runnable callbackThread) {
+ this.callbackThread = callbackThread;
+ }
+
public Object clone() throws CloneNotSupportedException{
return super.clone();
}
diff --git a/portal/.settings/org.eclipse.wst.common.component b/portal/.settings/org.eclipse.wst.common.component
index 3ec90d1c..f0b3d6b5 100644
--- a/portal/.settings/org.eclipse.wst.common.component
+++ b/portal/.settings/org.eclipse.wst.common.component
@@ -72,7 +72,7 @@
-
+
uses
diff --git a/portal/src/main/java/edu/isi/wings/portal/classes/util/PlanningAndExecutingThread.java b/portal/src/main/java/edu/isi/wings/portal/classes/util/PlanningAndExecutingThread.java
index c9151e69..92dbaa9f 100644
--- a/portal/src/main/java/edu/isi/wings/portal/classes/util/PlanningAndExecutingThread.java
+++ b/portal/src/main/java/edu/isi/wings/portal/classes/util/PlanningAndExecutingThread.java
@@ -3,6 +3,8 @@
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
+import javax.servlet.ServletContext;
+
import edu.isi.wings.catalog.data.classes.VariableBindingsList;
import edu.isi.wings.catalog.data.classes.VariableBindingsListSet;
import edu.isi.wings.common.CollectionsHelper;
@@ -21,19 +23,38 @@
class ExecutionThread implements Runnable {
RuntimePlan rplan;
Config config;
+ ServletContext context;
- public ExecutionThread(RuntimePlan rplan, Config config) {
+ public ExecutionThread(RuntimePlan rplan, Config config, ServletContext context) {
this.rplan = rplan;
this.config = config;
+ this.context = context;
}
@Override
public void run() {
PlanExecutionEngine engine = config.getDomainExecutionEngine();
- engine.execute(rplan);
+
// Save the engine for an abort if needed
- //this.context.setAttribute("plan_" + rplan.getID(), rplan);
- //this.context.setAttribute("engine_" + rplan.getID(), engine);
+ this.context.setAttribute("plan_" + rplan.getID(), rplan);
+ this.context.setAttribute("engine_" + rplan.getID(), engine);
+
+ // Set callback thread to be this thread
+ synchronized(this) {
+ rplan.setCallbackThread(this);
+
+ // This is an asynchronous call.. So we make it synchronous by waiting until the execution is complete
+ engine.execute(rplan);
+
+ // Wait until notified of completion
+ try {
+ //System.out.println("Waiting for thread: " + this);
+ this.wait();
+ //System.out.println("Execution finished");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
}
@@ -46,6 +67,7 @@ public class PlanningAndExecutingThread implements Runnable {
PlanningAPIBindings api_bindings;
ExecutorService executor;
int max_number_of_executions;
+ ServletContext context;
public PlanningAndExecutingThread(
String ex_prefix,
@@ -54,7 +76,8 @@ public PlanningAndExecutingThread(
int max_number_of_executions,
TemplateBindings template_bindings,
PlanningAPIBindings api_bindings,
- ExecutorService executor) {
+ ExecutorService executor,
+ ServletContext context) {
this.config = config;
this.template_bindings = template_bindings;
this.api_bindings = api_bindings;
@@ -62,6 +85,7 @@ public PlanningAndExecutingThread(
this.executor = executor;
this.ex_prefix = ex_prefix;
this.template_id = template_id;
+ this.context = context;
}
private void addTemplateBindings(Template tpl, TemplateBindings tb) {
@@ -166,7 +190,8 @@ private ArrayList getExpandedTemplates(Template seedtpl) {
}
private void runExecutionPlan(RuntimePlan rplan) {
- ExecutionThread exthread = new ExecutionThread(rplan, config);
+ ExecutionThread exthread = new ExecutionThread(rplan, this.config, this.context);
+ // This is an asynchronous call
executor.submit(exthread);
}
diff --git a/portal/src/main/java/edu/isi/wings/portal/controllers/RunController.java b/portal/src/main/java/edu/isi/wings/portal/controllers/RunController.java
index fc19280a..1f114d08 100644
--- a/portal/src/main/java/edu/isi/wings/portal/controllers/RunController.java
+++ b/portal/src/main/java/edu/isi/wings/portal/controllers/RunController.java
@@ -99,7 +99,8 @@ public RunController(Config config) {
this.templateUrl = config.getUserDomainUrl() + "/workflows";
if (executor == null) {
- executor = Executors.newWorkStealingPool(config.getPlannerConfig().getParallelism());
+ //System.out.println("Parallel:" + config.getPlannerConfig().getParallelism());
+ executor = Executors.newFixedThreadPool(config.getPlannerConfig().getParallelism());
}
}
@@ -355,7 +356,7 @@ public String expandAndRunTemplate(TemplateBindings template_bindings, ServletCo
// Submit the planning and execution thread
executor.submit(new PlanningAndExecutingThread(ex_prefix, template_id,
- this.config, config.getPlannerConfig().getMaxQueueSize(), template_bindings, apis, executor));
+ this.config, config.getPlannerConfig().getMaxQueueSize(), template_bindings, apis, executor, context));
// Return the runid
return runid;