Skip to content

Commit

Permalink
[POC-4] Tasks cache init upon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
apfadler committed Jun 24, 2016
1 parent b1d1d45 commit f821a02
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 64 deletions.
49 changes: 16 additions & 33 deletions interpreter/src/main/java/org/quil/server/Tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,7 @@ static class Status {

public static Task fromString(String taskDescription) {
String taskName = UUID.randomUUID().toString();

Ignite ignite = Ignition.ignite();
CacheConfiguration<String, Task> cfg = new CacheConfiguration<String, Task>();

cfg.setCacheMode(CacheMode.REPLICATED);
cfg.setName("Tasks");

try {
cfg.setIndexedTypes(String.class, Task.class,
String.class, PriceTrade.class,
String.class, PricePortfolio.class,
String.class, ScriptedTask.class,
String.class, Class.forName("org.quil.server.Tasks.RunQLObjectsApplication"));
}catch (Exception e) {
logger.info("Failed to set indexed types");
}

IgniteCache<String,Task> tasks = ignite.getOrCreateCache(cfg);
IgniteCache<String,Task> tasks = cache();

try {
JSONObject taskObj = (JSONObject) (new JSONParser()).parse(taskDescription);
Expand All @@ -100,37 +83,31 @@ static Task fromJSONObject(JSONObject taskDescription) {
}

public static Task get(String taskName) {

Ignite ignite = Ignition.ignite();
IgniteCache<String,Task> tasks = ignite.getOrCreateCache("Tasks");
IgniteCache<String,Task> tasks = cache();

return tasks.get(taskName);
}

public static void updateStatus(String taskName, int status) {

Ignite ignite = Ignition.ignite();
IgniteCache<String,Task> tasks = ignite.getOrCreateCache("Tasks");
IgniteCache<String,Task> tasks = cache();
Task task = tasks.get(taskName);
task.setStatus(status);
tasks.put(taskName, task);
}

public static void updateResult(String taskName, String result) {
Ignite ignite = Ignition.ignite();
IgniteCache<String,Task> tasks = ignite.getOrCreateCache("Tasks");
IgniteCache<String,Task> tasks = cache();
Task task = tasks.get(taskName);
task.setResult(result);

tasks.put(taskName, task);
}
public static HashMap<String, Task> allTasks() {

public static IgniteCache<String, Task> cache() {
Ignite ignite = Ignition.ignite();
CacheConfiguration<String, Task> cfg = new CacheConfiguration<String, Task>();
cfg.setCacheMode(CacheMode.REPLICATED);
cfg.setName("Tasks");

cfg.setCacheMode(CacheMode.REPLICATED);
cfg.setName("Tasks");
try {
cfg.setIndexedTypes(String.class, Task.class,
String.class, PriceTrade.class,
Expand All @@ -140,7 +117,13 @@ public static HashMap<String, Task> allTasks() {
}catch (Exception e) {
logger.info("Failed to set indexed types");
}
IgniteCache<String,Task> tasks = ignite.getOrCreateCache(cfg);
IgniteCache<String,Task> tasks = ignite.getOrCreateCache(cfg);

return tasks;
}

public static HashMap<String, Task> allTasks() {
IgniteCache<String,Task> tasks = cache();

HashMap<String, Task> all = new HashMap<String, Task>();

Expand Down
45 changes: 14 additions & 31 deletions server/src/main/java/org/quil/server/QuilServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.webapp.WebAppContext;
import org.quil.repository.CachedFileSystemRepository;
import org.quil.server.Tasks.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ignite.Ignition;
Expand All @@ -34,8 +35,7 @@ public class QuilServer {
static final Logger logger = LoggerFactory.getLogger(QuilServer.class);

public static void main(String[] args) throws Exception {



boolean workerNode = false;
try {
String env = System.getenv("QUIL_WORKER");
Expand All @@ -61,33 +61,16 @@ public static void main(String[] args) throws Exception {
}





QueuedThreadPool tp = new QueuedThreadPool();
tp.setMaxThreads(1000);tp.setMinThreads(10);
Server jettyServer = new Server(tp);
//ServerConnector c = new ServerConnector(jettyServer);
org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector c= new org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector(jettyServer);
org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector c =
new org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector(jettyServer);
c.setPort(port);
jettyServer.addConnector(c);
jettyServer.setHandler(contexts);

try {

// TODO Make configurable
/*TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList("127.0.0.1", "127.0.0.1:47500..47509"));
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(spi);
FifoQueueCollisionSpi spiCollision = new FifoQueueCollisionSpi();
spiCollision.setParallelJobsNumber(4);
cfg.setCollisionSpi(spiCollision);
*/
if (!workerNode) {
boolean clientmode = true;
boolean standalone = false;
Expand All @@ -96,22 +79,25 @@ public static void main(String[] args) throws Exception {
if (env.compareToIgnoreCase("yes") == 0 || env.compareToIgnoreCase("true") == 0 )
{
clientmode = false;
standalone=true;
standalone = true;
}

} catch (Exception e) {
}

logger.info("Client mode = " + clientmode);

if (clientmode)
if (clientmode) {
Ignition.start("config/quil-client.xml");
}
else
{
if (standalone)
if (standalone) {
Ignition.start("config/quil-worker.xml");
else
}
else {
Ignition.start("config/quil-server.xml");
}
}
//cfg.setClientMode(clientmode);
}
Expand All @@ -120,15 +106,11 @@ public static void main(String[] args) throws Exception {


}

//cfg.setPeerClassLoadingEnabled(true);
//cfg.setIncludeEventTypes(EVTS_TASK_EXECUTION);*/




if (!workerNode) {

Task.cache();

runQuilStartupScript();

CachedFileSystemRepository.instance();
Expand All @@ -139,6 +121,7 @@ public static void main(String[] args) throws Exception {

jettyServer.join();
}

} finally {
jettyServer.destroy();
}
Expand Down

0 comments on commit f821a02

Please sign in to comment.