diff --git a/interpreter/src/main/java/org/quil/server/Tasks/Task.java b/interpreter/src/main/java/org/quil/server/Tasks/Task.java index ffbf91c..71df397 100644 --- a/interpreter/src/main/java/org/quil/server/Tasks/Task.java +++ b/interpreter/src/main/java/org/quil/server/Tasks/Task.java @@ -61,24 +61,7 @@ static class Status { public static Task fromString(String taskDescription) { String taskName = UUID.randomUUID().toString(); - - Ignite ignite = Ignition.ignite(); - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setCacheMode(CacheMode.REPLICATED); - cfg.setName("Tasks"); - - try { - cfg.setIndexedTypes(String.class, Task.class, - String.class, PriceTrade.class, - String.class, PricePortfolio.class, - String.class, ScriptedTask.class, - String.class, Class.forName("org.quil.server.Tasks.RunQLObjectsApplication")); - }catch (Exception e) { - logger.info("Failed to set indexed types"); - } - - IgniteCache tasks = ignite.getOrCreateCache(cfg); + IgniteCache tasks = cache(); try { JSONObject taskObj = (JSONObject) (new JSONParser()).parse(taskDescription); @@ -100,37 +83,31 @@ static Task fromJSONObject(JSONObject taskDescription) { } public static Task get(String taskName) { - - Ignite ignite = Ignition.ignite(); - IgniteCache tasks = ignite.getOrCreateCache("Tasks"); + IgniteCache tasks = cache(); return tasks.get(taskName); } public static void updateStatus(String taskName, int status) { - - Ignite ignite = Ignition.ignite(); - IgniteCache tasks = ignite.getOrCreateCache("Tasks"); + IgniteCache tasks = cache(); Task task = tasks.get(taskName); task.setStatus(status); tasks.put(taskName, task); } public static void updateResult(String taskName, String result) { - Ignite ignite = Ignition.ignite(); - IgniteCache tasks = ignite.getOrCreateCache("Tasks"); + IgniteCache tasks = cache(); Task task = tasks.get(taskName); task.setResult(result); - tasks.put(taskName, task); } - - public static HashMap allTasks() { + + public static IgniteCache cache() { Ignite ignite = Ignition.ignite(); CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setCacheMode(CacheMode.REPLICATED); - cfg.setName("Tasks"); + + cfg.setCacheMode(CacheMode.REPLICATED); + cfg.setName("Tasks"); try { cfg.setIndexedTypes(String.class, Task.class, String.class, PriceTrade.class, @@ -140,7 +117,13 @@ public static HashMap allTasks() { }catch (Exception e) { logger.info("Failed to set indexed types"); } - IgniteCache tasks = ignite.getOrCreateCache(cfg); + IgniteCache tasks = ignite.getOrCreateCache(cfg); + + return tasks; + } + + public static HashMap allTasks() { + IgniteCache tasks = cache(); HashMap all = new HashMap(); diff --git a/server/src/main/java/org/quil/server/QuilServer.java b/server/src/main/java/org/quil/server/QuilServer.java index 371e6d0..145a0e0 100644 --- a/server/src/main/java/org/quil/server/QuilServer.java +++ b/server/src/main/java/org/quil/server/QuilServer.java @@ -18,6 +18,7 @@ import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.webapp.WebAppContext; import org.quil.repository.CachedFileSystemRepository; +import org.quil.server.Tasks.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.ignite.Ignition; @@ -34,8 +35,7 @@ public class QuilServer { static final Logger logger = LoggerFactory.getLogger(QuilServer.class); public static void main(String[] args) throws Exception { - - + boolean workerNode = false; try { String env = System.getenv("QUIL_WORKER"); @@ -61,33 +61,16 @@ public static void main(String[] args) throws Exception { } - - - QueuedThreadPool tp = new QueuedThreadPool(); tp.setMaxThreads(1000);tp.setMinThreads(10); Server jettyServer = new Server(tp); - //ServerConnector c = new ServerConnector(jettyServer); - org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector c= new org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector(jettyServer); + org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector c = + new org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector(jettyServer); c.setPort(port); jettyServer.addConnector(c); jettyServer.setHandler(contexts); try { - - // TODO Make configurable - /*TcpDiscoverySpi spi = new TcpDiscoverySpi(); - TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); - ipFinder.setAddresses(Arrays.asList("127.0.0.1", "127.0.0.1:47500..47509")); - spi.setIpFinder(ipFinder); - IgniteConfiguration cfg = new IgniteConfiguration(); - cfg.setDiscoverySpi(spi); - - FifoQueueCollisionSpi spiCollision = new FifoQueueCollisionSpi(); - spiCollision.setParallelJobsNumber(4); - cfg.setCollisionSpi(spiCollision); - - */ if (!workerNode) { boolean clientmode = true; boolean standalone = false; @@ -96,7 +79,7 @@ public static void main(String[] args) throws Exception { if (env.compareToIgnoreCase("yes") == 0 || env.compareToIgnoreCase("true") == 0 ) { clientmode = false; - standalone=true; + standalone = true; } } catch (Exception e) { @@ -104,14 +87,17 @@ public static void main(String[] args) throws Exception { logger.info("Client mode = " + clientmode); - if (clientmode) + if (clientmode) { Ignition.start("config/quil-client.xml"); + } else { - if (standalone) + if (standalone) { Ignition.start("config/quil-worker.xml"); - else + } + else { Ignition.start("config/quil-server.xml"); + } } //cfg.setClientMode(clientmode); } @@ -120,15 +106,11 @@ public static void main(String[] args) throws Exception { } - - //cfg.setPeerClassLoadingEnabled(true); - //cfg.setIncludeEventTypes(EVTS_TASK_EXECUTION);*/ - - - if (!workerNode) { + Task.cache(); + runQuilStartupScript(); CachedFileSystemRepository.instance(); @@ -139,6 +121,7 @@ public static void main(String[] args) throws Exception { jettyServer.join(); } + } finally { jettyServer.destroy(); }