Skip to content

Commit

Permalink
Bugfix for Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
IKCAP committed Dec 13, 2022
1 parent 52310ef commit 181aecc
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@

public class LocalExecutionEngine implements PlanExecutionEngine, StepExecutionEngine {

protected final ExecutorService executor;
protected ExecutorService executor; // FIXME: Should be Static to work properly - but causes problem with shutdown

protected Properties props;
protected int maxParallel = 4;
protected int maxParallel = 10;

protected StepExecutionEngine stepEngine;
protected PlanExecutionEngine planEngine;
Expand All @@ -66,10 +66,10 @@ public LocalExecutionEngine(Properties props) {
this.maxParallel = Integer.parseInt(props.getProperty("parallel"));
this.stepEngine = this;
this.planEngine = this;
executor = Executors.newFixedThreadPool(maxParallel);
if (executor == null)
executor = Executors.newFixedThreadPool(maxParallel);
}


@Override
public void setExecutionLogger(ExecutionLoggerAPI logger) {
this.logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class RuntimePlan extends URIEntity implements Cloneable {
String callbackUrl;
Cookie[] callbackCookies;

Runnable callbackThread;

boolean replanned = false;

public RuntimePlan() {
Expand All @@ -72,7 +74,7 @@ public RuntimePlan(ExecutionPlan plan) {
this.queue = new ExecutionQueue(plan);
this.runtimeInfo = new RuntimeInfo();
}

public ExecutionQueue getQueue() {
return queue;
}
Expand Down Expand Up @@ -113,6 +115,13 @@ public void onEnd(ExecutionLoggerAPI logger, RuntimeInfo.Status status, String l
}

private void postCallback() {
if(this.callbackThread != null) {
synchronized(this.callbackThread) {
//System.out.println("Notifying execution thread: " + this.callbackThread);
this.callbackThread.notify();
}
}

if(this.callbackUrl != null && this.runtimeInfo.status == Status.SUCCESS) {
try {
BasicCookieStore cookieStore = new BasicCookieStore();
Expand Down Expand Up @@ -211,6 +220,14 @@ public void setReplanned(boolean replanned) {
this.replanned = replanned;
}

public Runnable getCallbackThread() {
return callbackThread;
}

public void setCallbackThread(Runnable callbackThread) {
this.callbackThread = callbackThread;
}

public Object clone() throws CloneNotSupportedException{
return super.clone();
}
Expand Down
2 changes: 1 addition & 1 deletion portal/.settings/org.eclipse.wst.common.component
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@


<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/test/java"/>
<dependent-module archiveName="wings-planner-5.2.5.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/wings-planner/wings-planner">
<dependent-module archiveName="wings-planner-5.3.0.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/wings-planner/wings-planner">
<dependency-type>uses</dependency-type>
</dependent-module>
<dependent-module archiveName="ontapi-1.3.0.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/ontapi/ontapi">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;

import javax.servlet.ServletContext;

import edu.isi.wings.catalog.data.classes.VariableBindingsList;
import edu.isi.wings.catalog.data.classes.VariableBindingsListSet;
import edu.isi.wings.common.CollectionsHelper;
Expand All @@ -21,19 +23,38 @@
class ExecutionThread implements Runnable {
RuntimePlan rplan;
Config config;
ServletContext context;

public ExecutionThread(RuntimePlan rplan, Config config) {
public ExecutionThread(RuntimePlan rplan, Config config, ServletContext context) {
this.rplan = rplan;
this.config = config;
this.context = context;
}

@Override
public void run() {
PlanExecutionEngine engine = config.getDomainExecutionEngine();
engine.execute(rplan);

// Save the engine for an abort if needed
//this.context.setAttribute("plan_" + rplan.getID(), rplan);
//this.context.setAttribute("engine_" + rplan.getID(), engine);
this.context.setAttribute("plan_" + rplan.getID(), rplan);
this.context.setAttribute("engine_" + rplan.getID(), engine);

// Set callback thread to be this thread
synchronized(this) {
rplan.setCallbackThread(this);

// This is an asynchronous call.. So we make it synchronous by waiting until the execution is complete
engine.execute(rplan);

// Wait until notified of completion
try {
//System.out.println("Waiting for thread: " + this);
this.wait();
//System.out.println("Execution finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Expand All @@ -46,6 +67,7 @@ public class PlanningAndExecutingThread implements Runnable {
PlanningAPIBindings api_bindings;
ExecutorService executor;
int max_number_of_executions;
ServletContext context;

public PlanningAndExecutingThread(
String ex_prefix,
Expand All @@ -54,14 +76,16 @@ public PlanningAndExecutingThread(
int max_number_of_executions,
TemplateBindings template_bindings,
PlanningAPIBindings api_bindings,
ExecutorService executor) {
ExecutorService executor,
ServletContext context) {
this.config = config;
this.template_bindings = template_bindings;
this.api_bindings = api_bindings;
this.max_number_of_executions = max_number_of_executions;
this.executor = executor;
this.ex_prefix = ex_prefix;
this.template_id = template_id;
this.context = context;
}

private void addTemplateBindings(Template tpl, TemplateBindings tb) {
Expand Down Expand Up @@ -166,7 +190,8 @@ private ArrayList<Template> getExpandedTemplates(Template seedtpl) {
}

private void runExecutionPlan(RuntimePlan rplan) {
ExecutionThread exthread = new ExecutionThread(rplan, config);
ExecutionThread exthread = new ExecutionThread(rplan, this.config, this.context);
// This is an asynchronous call
executor.submit(exthread);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public RunController(Config config) {
this.templateUrl = config.getUserDomainUrl() + "/workflows";

if (executor == null) {
executor = Executors.newWorkStealingPool(config.getPlannerConfig().getParallelism());
//System.out.println("Parallel:" + config.getPlannerConfig().getParallelism());
executor = Executors.newFixedThreadPool(config.getPlannerConfig().getParallelism());
}
}

Expand Down Expand Up @@ -355,7 +356,7 @@ public String expandAndRunTemplate(TemplateBindings template_bindings, ServletCo

// Submit the planning and execution thread
executor.submit(new PlanningAndExecutingThread(ex_prefix, template_id,
this.config, config.getPlannerConfig().getMaxQueueSize(), template_bindings, apis, executor));
this.config, config.getPlannerConfig().getMaxQueueSize(), template_bindings, apis, executor, context));

// Return the runid
return runid;
Expand Down

0 comments on commit 181aecc

Please sign in to comment.