Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT add queue and thread for processing mRunnables asynchronously #618

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 116 additions & 83 deletions src/org/minima/system/mds/MDSManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import javax.net.ssl.SSLSocket;

Expand Down Expand Up @@ -106,7 +109,20 @@ public class MDSManager extends MessageProcessor {
/**
* All the current Contexts
*/
ArrayList<MDSJS> mRunnables = new ArrayList();
HashMap<String, MDSJS> mRunnables = new HashMap();

/**
* Queue of pending messages to be delivered to mRunnables and deferred initialisations thereof
*/
BlockingQueue<Runnable> mRunnableQueue = new ArrayBlockingQueue(1000);

Thread mRunnablesThread = new Thread(() -> {
while(true) try {
mRunnableQueue.take().run();
} catch (InterruptedException e) {
MinimaLogger.log("MDS runnable queue processing interrupted: " + e.getMessage());
}
});

/**
* All the Pending Commands
Expand Down Expand Up @@ -147,6 +163,8 @@ public MDSManager() {
}else {
MinimaLogger.log("MDS enabled");
}

mRunnablesThread.start();

PostMessage(MDS_INIT);
}
Expand Down Expand Up @@ -572,13 +590,19 @@ public Runnable getSocketHandler(SSLSocket zSocket) {

//Shutdown the Runnables
MinimaLogger.log("Shutdown MDS runnables..");
for(MDSJS mds : mRunnables) {
try {
mds.shutdown();
}catch(Exception exc) {
MinimaLogger.log(exc);

mRunnableQueue.clear();
mRunnablesThread.interrupt();

mRunnableQueue.add(() -> {
for (MDSJS mds : mRunnables.values()) {
try {
if (mds != null) mds.shutdown();
} catch (Exception exc) {
MinimaLogger.log(exc);
}
}
}
});

//Shut down the servers
MinimaLogger.log("Shutdown MDS File and Command servers..");
Expand Down Expand Up @@ -676,28 +700,32 @@ public Runnable getSocketHandler(SSLSocket zSocket) {
}
}
}

//Send message to the runnables first..
if(sendtoall) {
for(MDSJS mds : mRunnables) {
try {

if(to.equals("*")) {
//Send to the runnable
mds.callMainCallback(poll);
}else {

//Check the MiniDAPPID
if(mds.getMiniDAPPID().equals(to)) {
if (sendtoall) {
mRunnableQueue.add(() -> {
for (String mdsUuid : mRunnables.keySet()) {
try {

if (to.equals("*")) {
//Send to the runnable
mds.callMainCallback(poll);
MDSJS mds = mRunnables.get(mdsUuid);
if (mds != null) mds.callMainCallback(poll);
} else {

//Check the MiniDAPPID
if (mdsUuid.equals(to)) {
//Send to the runnable
MDSJS mds = mRunnables.get(mdsUuid);
if (mds != null) mds.callMainCallback(poll);
}
}

} catch (Exception exc) {
MinimaLogger.log(exc, false);
}

}catch(Exception exc) {
MinimaLogger.log(exc, false);
}
}
});
}

//Add then to the Poll Stack - web minidapps
Expand All @@ -720,16 +748,21 @@ public Runnable getSocketHandler(SSLSocket zSocket) {
PostMiniDAPPChange();

}else if(zMessage.getMessageType().equals(MDS_MINIDAPPS_RESETALL)) {

//Shut down all the Context Objkects..
for(MDSJS mds : mRunnables) {
mds.shutdown();
}

mRunnableQueue.clear();
mRunnablesThread.interrupt();

mRunnableQueue.add(() -> {
//Shut down all the Context Objkects..
for (MDSJS mds : mRunnables.values()) {
if (mds != null) mds.shutdown();
}
});

//Now clear
mRunnables.clear();
mSessionID.clear();

//Scan through and see what we have..
ArrayList<MiniDAPP> dapps = MinimaDB.getDB().getMDSDB().getAllMiniDAPPs();
for(MiniDAPP dapp : dapps) {
Expand Down Expand Up @@ -758,19 +791,12 @@ public Runnable getSocketHandler(SSLSocket zSocket) {

//Remove a MiniDAPP
String uid = zMessage.getString("uid");

//First remove the Runnable
ArrayList<MDSJS> runnables = new ArrayList();
for(MDSJS mds : mRunnables) {
if(mds.getMiniDAPPID().equals(uid)) {
mds.shutdown();
}else {
runnables.add(mds);
}
}

//And switch the list over..
mRunnables = runnables;

mRunnableQueue.add(() -> {
MDSJS mds = mRunnables.get(uid);
if (mds != null) mds.shutdown();
});
mRunnables.remove(uid);

//And now remove the sessionid
mSessionID.remove(convertMiniDAPPID(uid));
Expand Down Expand Up @@ -800,50 +826,57 @@ private void setupMiniDAPP(MiniDAPP zDAPP) {
//Load the file..
byte[] serv = MiniFile.readCompleteFile(service);
String code = new String(serv,MiniString.MINIMA_CHARSET);


//Load it into the service runner..
Context ctx = Context.enter();
ctx.setOptimizationLevel(-1);
ctx.setLanguageVersion(Context.VERSION_1_8);

//Stop JAVA classes from being run..
try {
ctx.setClassShutter(new ClassShutter() {
public boolean visibleToScripts(String className) {

//ONLY MDSJS can be called form JS
if(className.startsWith("org.minima.system.mds.runnable")) {
return true;
//Add the main code to the Runnable
mRunnableQueue.add(
() -> {
//Load it into the service runner..
Context ctx = Context.enter();
ctx.setOptimizationLevel(-1);
ctx.setLanguageVersion(Context.VERSION_1_8);

//Stop JAVA classes from being run..
try {
ctx.setClassShutter(new ClassShutter() {
public boolean visibleToScripts(String className) {

//ONLY MDSJS can be called form JS
if(className.startsWith("org.minima.system.mds.runnable")) {
return true;
}

//MinimaLogger.log("RHINOJS JAVA CLASS DENIED ACCESS : "+className);

return false;
}
});
}catch(SecurityException sec) {
if(sec.getMessage().equals("Cannot overwrite existing ClassShutter object")) {
//we already set it..
}else {
MinimaLogger.log(sec);
}

//MinimaLogger.log("RHINOJS JAVA CLASS DENIED ACCESS : "+className);

return false;
}
});
}catch(SecurityException sec) {
if(sec.getMessage().equals("Cannot overwrite existing ClassShutter object")) {
//we already set it..
}else {
MinimaLogger.log(sec);

//Create the Scope
Scriptable scope = ctx.initStandardObjects();

//Create an MDSJS object
MDSJS mdsjs = new MDSJS(this, zDAPP.getUID(), zDAPP.getName(), ctx, scope);
ScriptableObject.putProperty(scope, "MDS", Context.javaToJS(mdsjs, scope));

//Add the DECIMAL.js code..
//ctx.evaluateString(scope, DECIMALJS, "<decimaljs_"+zDAPP.getUID()+">", 1, null);

ctx.evaluateString(scope, code, "<mds_"+zDAPP.getUID()+">", 1, null);
//Add to our map
mRunnables.put(zDAPP.getUID(), mdsjs);
}
}

//Create the Scope
Scriptable scope = ctx.initStandardObjects();

//Create an MDSJS object
MDSJS mdsjs = new MDSJS(this, zDAPP.getUID(), zDAPP.getName(), ctx, scope);
ScriptableObject.putProperty(scope, "MDS", Context.javaToJS(mdsjs, scope));

//Add the DECIMAL.js code..
//ctx.evaluateString(scope, DECIMALJS, "<decimaljs_"+zDAPP.getUID()+">", 1, null);

//Add the main code to the Runnable
ctx.evaluateString(scope, code, "<mds_"+zDAPP.getUID()+">", 1, null);

//Add to our list
mRunnables.add(mdsjs);
);

//Add placeholder to our map
mRunnables.put(zDAPP.getUID(), null);

}catch(Exception exc) {
MinimaLogger.log("ERROR starting service "+zDAPP.getName()+" "+exc);
Expand Down