diff --git a/client/pom.xml b/client/pom.xml index 46cf18d1..efe39ebc 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -16,7 +16,7 @@ org.eclipse iofog-agent - 1.0.6 + 1.0.7 ../pom.xml diff --git a/client/src/org/eclipse/iofog/Client.java b/client/src/org/eclipse/iofog/Client.java index 18a9b8e2..8679901c 100644 --- a/client/src/org/eclipse/iofog/Client.java +++ b/client/src/org/eclipse/iofog/Client.java @@ -159,7 +159,7 @@ private static String showHelp() { " -n Set the name of the network adapter\n" + " that holds the correct IP address of \n" + " this machine\n" + - " -l <#MB Limit> Set the limit, in MiB, of disk space\n" + + " -l <#GB Limit> Set the limit, in GiB, of disk space\n" + " that the log files can consume\n" + " -ld Set the directory to use for log file\n" + " storage\n" + diff --git a/daemon/pom.xml b/daemon/pom.xml index 6a0c0808..ea97eda1 100644 --- a/daemon/pom.xml +++ b/daemon/pom.xml @@ -16,7 +16,7 @@ org.eclipse iofog-agent - 1.0.6 + 1.0.7 ../pom.xml diff --git a/daemon/src/org/eclipse/iofog/command_line/CommandLineAction.java b/daemon/src/org/eclipse/iofog/command_line/CommandLineAction.java index 9519e4d0..e4ba8d37 100644 --- a/daemon/src/org/eclipse/iofog/command_line/CommandLineAction.java +++ b/daemon/src/org/eclipse/iofog/command_line/CommandLineAction.java @@ -316,7 +316,7 @@ private static String showHelp() { " -n Set the name of the network adapter\\n" + " that holds the correct IP address of \\n" + " this machine\\n" + - " -l <#MB Limit> Set the limit, in MiB, of disk space\\n" + + " -l <#GB Limit> Set the limit, in GiB, of disk space\\n" + " that the log files can consume\\n" + " -ld Set the directory to use for log file\\n" + " storage\\n" + diff --git a/daemon/src/org/eclipse/iofog/diagnostics/strace/MicroserviceStraceData.java b/daemon/src/org/eclipse/iofog/diagnostics/strace/MicroserviceStraceData.java index e4e3aa85..caa2fff0 100644 --- a/daemon/src/org/eclipse/iofog/diagnostics/strace/MicroserviceStraceData.java +++ b/daemon/src/org/eclipse/iofog/diagnostics/strace/MicroserviceStraceData.java @@ -55,6 +55,10 @@ public AtomicBoolean getStraceRun() { return straceRun; } + public void setStraceRun(boolean straceRun) { + this.straceRun.set(straceRun); + } + @Override public String toString() { return "MicroserviceStraceData{" + diff --git a/daemon/src/org/eclipse/iofog/diagnostics/strace/StraceDiagnosticManger.java b/daemon/src/org/eclipse/iofog/diagnostics/strace/StraceDiagnosticManger.java index 12a58152..5693e312 100644 --- a/daemon/src/org/eclipse/iofog/diagnostics/strace/StraceDiagnosticManger.java +++ b/daemon/src/org/eclipse/iofog/diagnostics/strace/StraceDiagnosticManger.java @@ -56,9 +56,9 @@ public void updateMonitoringMicroservices(JsonObject diagnosticData) { JsonArray straceMicroserviceChanges = diagnosticData.getJsonArray("straceValues"); for (JsonValue microserviceValue : straceMicroserviceChanges) { JsonObject microservice = (JsonObject) microserviceValue; - if (microservice.containsKey("microserviceId")) { - String microserviceUuid = microservice.getString("microserviceId"); - boolean strace = microservice.getInt("straceRun", 0) != 0; + if (microservice.containsKey("microserviceUuid")) { + String microserviceUuid = microservice.getString("microserviceUuid"); + boolean strace = microservice.getBoolean("straceRun"); manageMicroservice(microserviceUuid, strace); } } @@ -66,36 +66,39 @@ public void updateMonitoringMicroservices(JsonObject diagnosticData) { } private void manageMicroservice(String microserviceUuid, boolean strace) { - Optional microserviceOptional = getDataByMicroserviceUuid(microserviceUuid); - if (microserviceOptional.isPresent() && !strace) { - offDiagnosticMicroservice(microserviceOptional.get()); - } else if (!microserviceOptional.isPresent() && strace) { - createAndRunDiagnosticMicroservice(microserviceUuid); - } + if (strace) { + enableMicroserviceStraceDiagnostics(microserviceUuid); + } else { + disableMicroserviceStraceDiagnostics(microserviceUuid); + } + } + + private Optional getStraceDataByMicroserviceUuid(String microserviceUuid) { + return this.monitoringMicroservices.stream() + .filter(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid)) + .findFirst(); } - private void createAndRunDiagnosticMicroservice(String microserviceUuid) { + public void enableMicroserviceStraceDiagnostics(String microserviceUuid) { try { int pid = getPidByMicroserviceUuid(microserviceUuid); - MicroserviceStraceData microserviceStraceData = new MicroserviceStraceData(microserviceUuid, pid, true); - this.monitoringMicroservices.add(microserviceStraceData); - - runStrace(microserviceStraceData); + MicroserviceStraceData newMicroserviceStraceData = new MicroserviceStraceData(microserviceUuid, pid, true); + this.monitoringMicroservices.removeIf( + oldMicroserviceStraceData -> oldMicroserviceStraceData.getMicroserviceUuid().equals(microserviceUuid) + ); + this.monitoringMicroservices.add(newMicroserviceStraceData); + runStrace(newMicroserviceStraceData); } catch (IllegalArgumentException e) { logWarning(MODULE_NAME, "Can't get pid of process"); } - } - - private void offDiagnosticMicroservice(MicroserviceStraceData microserviceStraceData) { - microserviceStraceData.getStraceRun().set(false); - this.monitoringMicroservices.remove(microserviceStraceData); - } - - public Optional getDataByMicroserviceUuid(String microserviceUuid) { - return this.monitoringMicroservices.stream() - .filter(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid)) - .findFirst(); - } + } + + public void disableMicroserviceStraceDiagnostics(String microserviceUuid) { + getStraceDataByMicroserviceUuid(microserviceUuid).ifPresent(microserviceStraceData -> { + microserviceStraceData.setStraceRun(false); + this.monitoringMicroservices.remove(microserviceStraceData); + }); + } private int getPidByMicroserviceUuid(String microserviceUuid) throws IllegalArgumentException { CommandShellResultSet, List> resultSet = CommandShellExecutor.executeCommand("docker top " + microserviceUuid); diff --git a/daemon/src/org/eclipse/iofog/field_agent/FieldAgent.java b/daemon/src/org/eclipse/iofog/field_agent/FieldAgent.java index 875f62ea..0195d849 100644 --- a/daemon/src/org/eclipse/iofog/field_agent/FieldAgent.java +++ b/daemon/src/org/eclipse/iofog/field_agent/FieldAgent.java @@ -139,6 +139,13 @@ private JsonObject getFogStatus() { return json; } + /** + * executes actions after successful status post request + */ + private void onPostStatusSuccess() { + StatusReporter.getProcessManagerStatus().removeNotRunningMicroserviceStatus(); + } + /** * checks if IOFog is not provisioned * @@ -173,7 +180,7 @@ private boolean notProvisioned() { logInfo("sending IOFog status..."); orchestrator.request("status", RequestType.PUT, null, status); - + onPostStatusSuccess(); } catch (CertificateException | SSLHandshakeException e) { verificationFailed(); } catch (ForbiddenException e) { @@ -187,26 +194,30 @@ private boolean notProvisioned() { private final Runnable postDiagnostics = () -> { while (true) { if (StraceDiagnosticManger.getInstance().getMonitoringMicroservices().size() > 0) { - JsonObjectBuilder builder = Json.createObjectBuilder(); + JsonBuilderFactory factory = Json.createBuilderFactory(null); + JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (MicroserviceStraceData microservice : StraceDiagnosticManger.getInstance().getMonitoringMicroservices()) { - builder.add(microservice.getMicroserviceUuid(), microservice.getResultBufferAsString()); + arrayBuilder.add(factory.createObjectBuilder() + .add("microserviceUuid", microservice.getMicroserviceUuid()) + .add("buffer", microservice.getResultBufferAsString()) + ); microservice.getResultBuffer().clear(); } - builder.add("timestamp", new Date().getTime()); - JsonObject json = builder.build(); + JsonObject json = factory.createObjectBuilder() + .add("straceData", arrayBuilder).build(); try { orchestrator.request("strace", RequestType.PUT, null, json); } catch (Exception e) { logWarning("unable send strace logs : " + e.getMessage()); } - } else { + try { Thread.sleep(Configuration.getPostDiagnosticsFreq() * 1000); } catch (InterruptedException e) { - e.printStackTrace(); + logWarning(e.getMessage()); } } } @@ -356,7 +367,7 @@ private void changeVersion() { } private void updateDiagnostics() { - LoggingService.logInfo(MODULE_NAME, "get changes is diagnostic list"); + LoggingService.logInfo(MODULE_NAME, "getting changes for diagnostics"); if (notProvisioned() || !isControllerConnected(false)) { return; } @@ -449,14 +460,14 @@ private void processRoutes(List microservices) { continue; } - String microserviceId = microservice.getMicroserviceUuid(); + String microserviceUuid = microservice.getMicroserviceUuid(); Route microserviceRoute = new Route(); for (String jsonRoute : jsonRoutes) { microserviceRoute.getReceivers().add(jsonRoute); } - routes.put(microserviceId, microserviceRoute); + routes.put(microserviceUuid, microserviceRoute); } microserviceManager.setRoutes(routes); @@ -762,7 +773,7 @@ private void getFogConfig() { instanceConfig.put(DEVICE_SCAN_FREQUENCY.getCommandName(), deviceScanFrequency); if (Configuration.isWatchdogEnabled() != watchdogEnabled) - instanceConfig.put(WATCHDOG_ENABLED.getCommandName(), watchdogEnabled); + instanceConfig.put(WATCHDOG_ENABLED.getCommandName(), watchdogEnabled ? "on" : "off"); if (!Configuration.getGpsCoordinates().equals(gpsCoordinates)) { instanceConfig.put(GPS_COORDINATES.getCommandName(), gpsCoordinates); @@ -922,7 +933,7 @@ public String deProvision() { if (notProvisioned()) { return "\nFailure - not provisioned"; } - + //// TODO: 20.12.18 make deprovision request to controller in order to mark related microservices as not running StatusReporter.setFieldAgentStatus().setControllerStatus(NOT_PROVISIONED); try { Configuration.setIofogUuid(""); diff --git a/daemon/src/org/eclipse/iofog/message_bus/CommandLineHandler.java b/daemon/src/org/eclipse/iofog/message_bus/CommandLineHandler.java index fda2a540..7c1db118 100644 --- a/daemon/src/org/eclipse/iofog/message_bus/CommandLineHandler.java +++ b/daemon/src/org/eclipse/iofog/message_bus/CommandLineHandler.java @@ -17,6 +17,7 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.MessageHandler; +import static org.eclipse.iofog.message_bus.MessageBusServer.messageBusSessionLock; import static org.eclipse.iofog.utils.logging.LoggingService.logWarning; /** @@ -42,7 +43,9 @@ public void onMessage(ClientMessage message) { response.putObjectProperty("receiver", "iofog.commandline.response"); try { - MessageBusServer.getCommandlineProducer().send(response); + synchronized (messageBusSessionLock) { + MessageBusServer.getCommandlineProducer().send(response); + } } catch (Exception exp) { logWarning(MODULE_NAME, exp.getMessage()); } diff --git a/daemon/src/org/eclipse/iofog/message_bus/MessageBusServer.java b/daemon/src/org/eclipse/iofog/message_bus/MessageBusServer.java index fe33cf8c..2d04f8bc 100644 --- a/daemon/src/org/eclipse/iofog/message_bus/MessageBusServer.java +++ b/daemon/src/org/eclipse/iofog/message_bus/MessageBusServer.java @@ -12,11 +12,6 @@ *******************************************************************************/ package org.eclipse.iofog.message_bus; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.eclipse.iofog.microservice.Microservice; import org.eclipse.iofog.utils.Constants; import org.eclipse.iofog.utils.configuration.Configuration; @@ -24,14 +19,8 @@ import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.ClientConsumer; -import org.hornetq.api.core.client.ClientMessage; -import org.hornetq.api.core.client.ClientProducer; -import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.*; import org.hornetq.api.core.client.ClientSession.QueueQuery; -import org.hornetq.api.core.client.ClientSessionFactory; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; @@ -42,6 +31,11 @@ import org.hornetq.core.settings.impl.AddressFullMessagePolicy; import org.hornetq.core.settings.impl.AddressSettings; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * HornetQ server * @@ -49,7 +43,8 @@ * */ public class MessageBusServer { - + + public static final Object messageBusSessionLock = new Object(); private static final String MODULE_NAME = "Message Bus Server"; private ClientSessionFactory sf; private HornetQServer server; @@ -126,21 +121,23 @@ void startServer() throws Exception { * @throws Exception */ void initialize() throws Exception { - messageBusSession = sf.createSession(true, true, 0); - QueueQuery queueQuery = messageBusSession.queueQuery(new SimpleString(Constants.ADDRESS)); - if (queueQuery.isExists()) - messageBusSession.deleteQueue(Constants.ADDRESS); - queueQuery = messageBusSession.queueQuery(new SimpleString(Constants.COMMAND_LINE_ADDRESS)); - if (queueQuery.isExists()) - messageBusSession.deleteQueue(Constants.COMMAND_LINE_ADDRESS); - messageBusSession.createQueue(Constants.ADDRESS, Constants.ADDRESS, false); - messageBusSession.createQueue(Constants.COMMAND_LINE_ADDRESS, Constants.COMMAND_LINE_ADDRESS, false); + synchronized (messageBusSessionLock) { + messageBusSession = sf.createSession(true, true, 0); + QueueQuery queueQuery = messageBusSession.queueQuery(new SimpleString(Constants.ADDRESS)); + if (queueQuery.isExists()) + messageBusSession.deleteQueue(Constants.ADDRESS); + queueQuery = messageBusSession.queueQuery(new SimpleString(Constants.COMMAND_LINE_ADDRESS)); + if (queueQuery.isExists()) + messageBusSession.deleteQueue(Constants.COMMAND_LINE_ADDRESS); + messageBusSession.createQueue(Constants.ADDRESS, Constants.ADDRESS, false); + messageBusSession.createQueue(Constants.COMMAND_LINE_ADDRESS, Constants.COMMAND_LINE_ADDRESS, false); - commandlineProducer = messageBusSession.createProducer(Constants.COMMAND_LINE_ADDRESS); - - commandlineConsumer = messageBusSession.createConsumer(Constants.COMMAND_LINE_ADDRESS, String.format("receiver = '%s'", "iofog.commandline.command")); - commandlineConsumer.setMessageHandler(new CommandLineHandler()); - messageBusSession.start(); + commandlineProducer = messageBusSession.createProducer(Constants.COMMAND_LINE_ADDRESS); + + commandlineConsumer = messageBusSession.createConsumer(Constants.COMMAND_LINE_ADDRESS, String.format("receiver = '%s'", "iofog.commandline.command")); + commandlineConsumer.setMessageHandler(new CommandLineHandler()); + messageBusSession.start(); + } } /** @@ -150,10 +147,14 @@ void initialize() throws Exception { * @throws Exception */ void createCosumer(String name) throws Exception { - if (consumers == null) + if (consumers == null) { consumers = new ConcurrentHashMap<>(); + } - ClientConsumer consumer = messageBusSession.createConsumer(Constants.ADDRESS, String.format("receiver = '%s'", name)); + ClientConsumer consumer; + synchronized (messageBusSessionLock) { + consumer = messageBusSession.createConsumer(Constants.ADDRESS, String.format("receiver = '%s'", name)); + } consumers.put(name, consumer); } @@ -191,9 +192,13 @@ void removeConsumer(String name) { * @throws Exception */ void createProducer(String name) throws Exception { - if (producers == null) + if (producers == null) { producers = new ConcurrentHashMap<>(); - ClientProducer producer = messageBusSession.createProducer(Constants.ADDRESS); + } + ClientProducer producer; + synchronized (messageBusSessionLock) { + producer = messageBusSession.createProducer(Constants.ADDRESS); + } producers.put(name, producer); } diff --git a/daemon/src/org/eclipse/iofog/message_bus/MessageListener.java b/daemon/src/org/eclipse/iofog/message_bus/MessageListener.java index 7235c73d..48cbb9c5 100644 --- a/daemon/src/org/eclipse/iofog/message_bus/MessageListener.java +++ b/daemon/src/org/eclipse/iofog/message_bus/MessageListener.java @@ -16,6 +16,7 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.MessageHandler; +import static org.eclipse.iofog.message_bus.MessageBusServer.messageBusSessionLock; import static org.eclipse.iofog.utils.logging.LoggingService.logWarning; /** @@ -36,10 +37,12 @@ public MessageListener(MessageCallback callback) { @Override public void onMessage(ClientMessage msg) { try { - msg.acknowledge(); + synchronized (messageBusSessionLock) { + msg.acknowledge(); + } } catch (Exception exp) { logWarning(MODULE_NAME, exp.getMessage());} - + Message message = new Message(msg.getBytesProperty("message")); callback.sendRealtimeMessage(message); } diff --git a/daemon/src/org/eclipse/iofog/message_bus/MessagePublisher.java b/daemon/src/org/eclipse/iofog/message_bus/MessagePublisher.java index 05753496..ff0334ca 100644 --- a/daemon/src/org/eclipse/iofog/message_bus/MessagePublisher.java +++ b/daemon/src/org/eclipse/iofog/message_bus/MessagePublisher.java @@ -12,8 +12,6 @@ *******************************************************************************/ package org.eclipse.iofog.message_bus; -import java.util.List; - import org.eclipse.iofog.microservice.Microservice; import org.eclipse.iofog.microservice.Route; import org.eclipse.iofog.utils.logging.LoggingService; @@ -21,7 +19,10 @@ import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; +import java.util.List; + import static org.eclipse.iofog.message_bus.MessageBus.MODULE_NAME; +import static org.eclipse.iofog.message_bus.MessageBusServer.messageBusSessionLock; import static org.eclipse.iofog.utils.logging.LoggingService.logWarning; /** @@ -67,7 +68,9 @@ synchronized void publish(Message message) throws Exception { ClientMessage msg = session.createMessage(false); msg.putObjectProperty("receiver", receiver); msg.putBytesProperty("message", bytes); - producer.send(msg); + synchronized (messageBusSessionLock) { + producer.send(msg); + } } } diff --git a/daemon/src/org/eclipse/iofog/message_bus/MessageReceiver.java b/daemon/src/org/eclipse/iofog/message_bus/MessageReceiver.java index b71ca99f..db126c91 100644 --- a/daemon/src/org/eclipse/iofog/message_bus/MessageReceiver.java +++ b/daemon/src/org/eclipse/iofog/message_bus/MessageReceiver.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import static org.eclipse.iofog.message_bus.MessageBusServer.messageBusSessionLock; import static org.eclipse.iofog.utils.logging.LoggingService.logWarning; /** @@ -71,8 +72,11 @@ private Message getMessage() throws Exception { if (consumer == null || listener != null) return null; - Message result = null; - ClientMessage msg = consumer.receiveImmediate(); + Message result = null; + ClientMessage msg; + synchronized (messageBusSessionLock) { + msg = consumer.receiveImmediate(); + } if (msg != null) { msg.acknowledge(); result = new Message(msg.getBytesProperty("message")); diff --git a/daemon/src/org/eclipse/iofog/microservice/MicroserviceManager.java b/daemon/src/org/eclipse/iofog/microservice/MicroserviceManager.java index 8f0fa074..9bb2d0a3 100644 --- a/daemon/src/org/eclipse/iofog/microservice/MicroserviceManager.java +++ b/daemon/src/org/eclipse/iofog/microservice/MicroserviceManager.java @@ -118,16 +118,16 @@ public Optional findLatestMicroserviceByUuid(String microserviceUu } } - public boolean microserviceExists(List microservices, String microserviceId) { - return findMicroserviceByUuid(microservices, microserviceId).isPresent(); + public boolean microserviceExists(List microservices, String microserviceUuid) { + return findMicroserviceByUuid(microservices, microserviceUuid).isPresent(); } /*** * not thread safe for Microservice obj properties */ - private Optional findMicroserviceByUuid(List microservices, String microserviceId) { + private Optional findMicroserviceByUuid(List microservices, String microserviceUuid) { return microservices.stream() - .filter(microservice -> microservice.getMicroserviceUuid().equals(microserviceId)) + .filter(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid)) .findAny(); } diff --git a/daemon/src/org/eclipse/iofog/process_manager/DockerUtil.java b/daemon/src/org/eclipse/iofog/process_manager/DockerUtil.java index a20dd31d..df9ef9a1 100755 --- a/daemon/src/org/eclipse/iofog/process_manager/DockerUtil.java +++ b/daemon/src/org/eclipse/iofog/process_manager/DockerUtil.java @@ -27,6 +27,7 @@ import org.apache.commons.lang.SystemUtils; import org.eclipse.iofog.microservice.*; import org.eclipse.iofog.status_reporter.StatusReporter; +import org.eclipse.iofog.utils.Constants; import org.eclipse.iofog.utils.configuration.Configuration; import org.eclipse.iofog.utils.logging.LoggingService; @@ -199,6 +200,10 @@ public String getContainerName(Container container) { return container.getNames()[0].substring(1); } + public String getContainerMicroserviceUuid(Container container) { + return getContainerName(container).substring(Constants.IOFOG_DOCKER_CONTAINER_NAME_PREFIX.length()); + } + /** * returns a {@link Container} if exists * @@ -208,7 +213,7 @@ public String getContainerName(Container container) { public Optional getContainer(String microserviceUuid) { List containers = getContainers(); return containers.stream() - .filter(c -> getContainerName(c).equals(microserviceUuid)) + .filter(c -> getContainerMicroserviceUuid(c).equals(microserviceUuid)) .findAny(); } @@ -257,6 +262,30 @@ public MicroserviceStatus getMicroserviceStatus(String containerId) { return result; } + public List getRunningContainers() { + return getContainers().stream() + .filter(container -> { + InspectContainerResponse inspectInfo = dockerClient.inspectContainerCmd(container.getId()).exec(); + ContainerState containerState = inspectInfo.getState(); + return containerState != null + && containerState.getStatus() != null + && MicroserviceState.fromText(containerState.getStatus()) == MicroserviceState.RUNNING; + }) + .collect(Collectors.toList()); + } + + public List getRunningIofogContainers() { + return getRunningContainers().stream() + .filter(container -> getContainerName(container).startsWith(Constants.IOFOG_DOCKER_CONTAINER_NAME_PREFIX)) + .collect(Collectors.toList()); + } + + public List getRunningNonIofogContainers() { + return getRunningContainers().stream() + .filter(container -> !getContainerName(container).startsWith(Constants.IOFOG_DOCKER_CONTAINER_NAME_PREFIX)) + .collect(Collectors.toList()); + } + public Optional getContainerStats(String containerId) { StatsCmd statsCmd = dockerClient.statsCmd(containerId); CountDownLatch countDownLatch = new CountDownLatch(1); @@ -451,7 +480,7 @@ public String createContainer(Microservice microservice, String host) throws Not .withExposedPorts(exposedPorts.toArray(new ExposedPort[0])) .withPortBindings(portBindings) .withEnv("SELFNAME=" + microservice.getMicroserviceUuid()) - .withName(microservice.getMicroserviceUuid()) + .withName(Constants.IOFOG_DOCKER_CONTAINER_NAME_PREFIX + microservice.getMicroserviceUuid()) .withRestartPolicy(restartPolicy); if (microservice.getVolumeMappings() != null) { cmd = cmd diff --git a/daemon/src/org/eclipse/iofog/process_manager/ProcessManager.java b/daemon/src/org/eclipse/iofog/process_manager/ProcessManager.java index d58f5ebf..9c5d72b1 100644 --- a/daemon/src/org/eclipse/iofog/process_manager/ProcessManager.java +++ b/daemon/src/org/eclipse/iofog/process_manager/ProcessManager.java @@ -14,6 +14,7 @@ import com.github.dockerjava.api.model.Container; import org.eclipse.iofog.IOFogModule; +import org.eclipse.iofog.diagnostics.strace.StraceDiagnosticManger; import org.eclipse.iofog.microservice.Microservice; import org.eclipse.iofog.microservice.MicroserviceManager; import org.eclipse.iofog.microservice.MicroserviceState; @@ -29,6 +30,7 @@ import static java.lang.String.format; import static org.eclipse.iofog.process_manager.ContainerTask.Tasks.*; import static org.eclipse.iofog.utils.Constants.ControllerStatus.OK; +import static org.eclipse.iofog.utils.Constants.IOFOG_DOCKER_CONTAINER_NAME_PREFIX; import static org.eclipse.iofog.utils.Constants.PROCESS_MANAGER; /** @@ -101,41 +103,13 @@ public void update() { logInfo("monitoring containers"); try { - for (Microservice microservice : microserviceManager.getLatestMicroservices()) { - - if (!microservice.isUpdating()) { - Optional containerOptional = docker.getContainer(microservice.getMicroserviceUuid()); - - MicroserviceStatus status = containerOptional.isPresent() - ? docker.getMicroserviceStatus(containerOptional.get().getId()) - : new MicroserviceStatus(MicroserviceState.NOT_RUNNING); - StatusReporter.setProcessManagerStatus().setMicroservicesStatus(microservice.getMicroserviceUuid(), status); - - if (microservice.isDelete()) { - if (containerOptional.isPresent()) { - deleteMicroservice(microservice); - } - } else if (!containerOptional.isPresent()) { - addMicroservice(microservice); - } else { - Container container = containerOptional.get(); - updateMicroservice(container, microservice); - } - } - } - - deleteOldMicroservices(); - deleteNonAgentMicroservices(); - StatusReporter.setProcessManagerStatus().setRunningMicroservicesCount(docker.getContainers().size()); - + handleLatestMicroservices(); + deleteRemainingMicroservices(); + updateRunningMicroservicesCount(); } catch (Exception ex) { logWarning(ex.getMessage()); } - - List currentMicroservices = microserviceManager.getLatestMicroservices().stream() - .filter(microservice -> !microservice.isDelete()) - .collect(Collectors.toList()); - microserviceManager.setCurrentMicroservices(currentMicroservices); + updateCurrentMicroservices(); } }; @@ -148,6 +122,7 @@ private void addMicroservice(Microservice microservice) { * @param microservice Microservice object */ private void deleteMicroservice(Microservice microservice) { + disableMicroserviceFeaturesBeforeRemoval(microservice.getMicroserviceUuid()); if (microservice.isDeleteWithCleanup()) { addTask(new ContainerTask(REMOVE_WITH_CLEAN_UP, microservice.getMicroserviceUuid())); } else { @@ -155,6 +130,10 @@ private void deleteMicroservice(Microservice microservice) { } } + private void disableMicroserviceFeaturesBeforeRemoval(String microserviceUuid) { + StraceDiagnosticManger.getInstance().disableMicroserviceStraceDiagnostics(microserviceUuid); + } + private void updateMicroservice(Container container, Microservice microservice) { microservice.setContainerId(container.getId()); try { @@ -168,29 +147,86 @@ private void updateMicroservice(Container container, Microservice microservice) } } + private void handleLatestMicroservices() { + microserviceManager.getLatestMicroservices().stream() + .filter(microservice -> !microservice.isUpdating()) + .forEach(microservice -> { + Optional containerOptional = docker.getContainer(microservice.getMicroserviceUuid()); + MicroserviceStatus status = containerOptional.isPresent() + ? docker.getMicroserviceStatus(containerOptional.get().getId()) + : new MicroserviceStatus(MicroserviceState.NOT_RUNNING); + StatusReporter.setProcessManagerStatus().setMicroservicesStatus(microservice.getMicroserviceUuid(), status); + + if (!containerOptional.isPresent() && !microservice.isDelete()) { + addMicroservice(microservice); + } else if (containerOptional.isPresent() && microservice.isDelete()) { + deleteMicroservice(microservice); + } else if (containerOptional.isPresent() && !microservice.isDelete()) { + updateMicroservice(containerOptional.get(), microservice); + } + }); + } + + private void deleteRemainingMicroservices() { + Set allAgentMicroservices = Stream.concat( + microserviceManager.getLatestMicroservices().stream(), + microserviceManager.getCurrentMicroservices().stream() + ) + .collect(Collectors.toSet()); + deleteOldAgentMicroservices(); + deleteObsoleteAgentMicroservices(allAgentMicroservices); + deleteNonAgentMicroservices(allAgentMicroservices); + } + + private void updateRunningMicroservicesCount() { + StatusReporter.setProcessManagerStatus().setRunningMicroservicesCount(docker.getRunningIofogContainers().size()); + } + + private void updateCurrentMicroservices() { + List currentMicroservices = microserviceManager.getLatestMicroservices().stream() + .filter(microservice -> !microservice.isDelete()) + .collect(Collectors.toList()); + microserviceManager.setCurrentMicroservices(currentMicroservices); + } + /** * Deletes microservices which don't present in latest microservices list but do present in current microservices list */ - private void deleteOldMicroservices() { + private void deleteOldAgentMicroservices() { microserviceManager.getCurrentMicroservices().stream() .filter(microservice -> !microserviceManager.getLatestMicroservices().contains(microservice)) - .forEach(microservice -> addTask(new ContainerTask(REMOVE, microservice.getMicroserviceUuid()))); + .forEach(microservice -> { + MicroserviceStatus status = new MicroserviceStatus(MicroserviceState.NOT_RUNNING); + StatusReporter.setProcessManagerStatus().setMicroservicesStatus(microservice.getMicroserviceUuid(), status); + disableMicroserviceFeaturesBeforeRemoval(microservice.getMicroserviceUuid()); + addTask(new ContainerTask(REMOVE, microservice.getMicroserviceUuid())); + }); + } + + /** + * Deletes obsolete agent microservices that aren't present in current or latest microservices list + * @param allAgentMicroservices all microservices run by iofog agent + */ + private void deleteObsoleteAgentMicroservices(Set allAgentMicroservices) { + docker.getRunningIofogContainers().stream() + .filter(container -> docker.getContainerName(container).startsWith(IOFOG_DOCKER_CONTAINER_NAME_PREFIX)) + .map(container -> docker.getContainerMicroserviceUuid(container)) + .filter(microserviceUuid -> allAgentMicroservices.stream() + .noneMatch(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid))) + .forEach(microserviceUuid -> addTask(new ContainerTask(REMOVE, microserviceUuid))); } /** * Deletes any microservices which don't belong to iofog agent + * @param allAgentMicroservices all microservices run by iofog agent */ - private void deleteNonAgentMicroservices() { + private void deleteNonAgentMicroservices(Set allAgentMicroservices) { if (Configuration.isWatchdogEnabled()) { - Set allAgentMicroservices = Stream.concat( - microserviceManager.getLatestMicroservices().stream(), microserviceManager.getCurrentMicroservices().stream()) - .collect(Collectors.toSet() - ); - docker.getContainers().stream() - .map(container -> docker.getContainerName(container)) - .filter(microserviceUuid -> allAgentMicroservices.stream() - .noneMatch(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid))) - .forEach(microserviceUuid -> addTask(new ContainerTask(REMOVE, microserviceUuid))); + docker.getRunningNonIofogContainers().stream() + .map(container -> docker.getContainerMicroserviceUuid(container)) + .filter(microserviceUuid -> allAgentMicroservices.stream() + .noneMatch(microservice -> microservice.getMicroserviceUuid().equals(microserviceUuid))) + .forEach(microserviceUuid -> addTask(new ContainerTask(REMOVE, microserviceUuid))); } } diff --git a/daemon/src/org/eclipse/iofog/process_manager/ProcessManagerStatus.java b/daemon/src/org/eclipse/iofog/process_manager/ProcessManagerStatus.java index 02f18949..d00ac132 100644 --- a/daemon/src/org/eclipse/iofog/process_manager/ProcessManagerStatus.java +++ b/daemon/src/org/eclipse/iofog/process_manager/ProcessManagerStatus.java @@ -12,10 +12,7 @@ *******************************************************************************/ package org.eclipse.iofog.process_manager; -import org.eclipse.iofog.microservice.Microservice; -import org.eclipse.iofog.microservice.MicroserviceManager; -import org.eclipse.iofog.microservice.MicroserviceStatus; -import org.eclipse.iofog.microservice.Registry; +import org.eclipse.iofog.microservice.*; import org.eclipse.iofog.utils.Constants.LinkStatus; import javax.json.Json; @@ -28,95 +25,100 @@ /** * represents Process Manager status - * - * @author saeid * + * @author saeid */ public class ProcessManagerStatus { - private int runningMicroservicesCount; - private final Map microservicesStatus; - private final Map registriesStatus; - - public ProcessManagerStatus() { - microservicesStatus = new HashMap<>(); - registriesStatus = new HashMap<>(); - runningMicroservicesCount = 0; - } - - /** - * returns {@link Microservice} status in json format - * - * @return string in json format - */ - public String getJsonMicroservicesStatus() { - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); - - NumberFormat nf = NumberFormat.getInstance(Locale.US); - nf.setMaximumFractionDigits(2); - - microservicesStatus.forEach((key, status) -> { - if (status.getContainerId() != null) { - JsonObjectBuilder objectBuilder = Json.createObjectBuilder() - .add("id", key) - .add("containerId", status.getContainerId()) - .add("status", status.getStatus().toString()) - .add("startTime", status.getStartTime()) - .add("operatingDuration", status.getOperatingDuration()) - .add("cpuUsage", nf.format(status.getCpuUsage())) - .add("memoryUsage", String.format("%d", status.getMemoryUsage())); - arrayBuilder.add(objectBuilder); - } - }); - return arrayBuilder.build().toString(); - } - - /** - * returns {@link Registry} status in json format - * - * @return string in json format - */ - public String getJsonRegistriesStatus() { - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); - registriesStatus.forEach((key, value) -> { - JsonObjectBuilder objectBuilder = Json.createObjectBuilder() - .add("url", key) - .add("linkStatus", value.toString()); - arrayBuilder.add(objectBuilder); - - }); - return arrayBuilder.build().toString(); - } - - public int getRunningMicroservicesCount() { - return runningMicroservicesCount; - } - - public ProcessManagerStatus setRunningMicroservicesCount(int count) { - this.runningMicroservicesCount = count; - return this; - } - - public ProcessManagerStatus setMicroservicesStatus(String microserviceUuid, MicroserviceStatus status) { - synchronized (microservicesStatus) { - this.microservicesStatus.put(microserviceUuid, status); - } - return this; - } - - public MicroserviceStatus getMicroserviceStatus(String microserviceUuid) { - synchronized (microservicesStatus) { - if (!this.microservicesStatus.containsKey(microserviceUuid)) - this.microservicesStatus.put(microserviceUuid, new MicroserviceStatus()); - } - return microservicesStatus.get(microserviceUuid); - } - - public int getRegistriesCount() { - return MicroserviceManager.getInstance().getRegistries().size(); - } - - public Map getRegistriesStatus() { - return registriesStatus; - } + private int runningMicroservicesCount; + private final Map microservicesStatus; + private final Map registriesStatus; + + public ProcessManagerStatus() { + microservicesStatus = new HashMap<>(); + registriesStatus = new HashMap<>(); + runningMicroservicesCount = 0; + } + + /** + * returns {@link Microservice} status in json format + * + * @return string in json format + */ + public String getJsonMicroservicesStatus() { + JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); + + NumberFormat nf = NumberFormat.getInstance(Locale.US); + nf.setMaximumFractionDigits(2); + + microservicesStatus.forEach((key, status) -> { + if (status.getContainerId() != null) { + JsonObjectBuilder objectBuilder = Json.createObjectBuilder() + .add("id", key) + .add("containerId", status.getContainerId()) + .add("status", status.getStatus().toString()) + .add("startTime", status.getStartTime()) + .add("operatingDuration", status.getOperatingDuration()) + .add("cpuUsage", nf.format(status.getCpuUsage())) + .add("memoryUsage", String.format("%d", status.getMemoryUsage())); + arrayBuilder.add(objectBuilder); + } + }); + return arrayBuilder.build().toString(); + } + + /** + * returns {@link Registry} status in json format + * + * @return string in json format + */ + public String getJsonRegistriesStatus() { + JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); + registriesStatus.forEach((key, value) -> { + JsonObjectBuilder objectBuilder = Json.createObjectBuilder() + .add("url", key) + .add("linkStatus", value.toString()); + arrayBuilder.add(objectBuilder); + + }); + return arrayBuilder.build().toString(); + } + + public int getRunningMicroservicesCount() { + return runningMicroservicesCount; + } + + public ProcessManagerStatus setRunningMicroservicesCount(int count) { + this.runningMicroservicesCount = count; + return this; + } + + public ProcessManagerStatus setMicroservicesStatus(String microserviceUuid, MicroserviceStatus status) { + synchronized (microservicesStatus) { + this.microservicesStatus.put(microserviceUuid, status); + } + return this; + } + + public MicroserviceStatus getMicroserviceStatus(String microserviceUuid) { + synchronized (microservicesStatus) { + if (!this.microservicesStatus.containsKey(microserviceUuid)) + this.microservicesStatus.put(microserviceUuid, new MicroserviceStatus()); + } + return microservicesStatus.get(microserviceUuid); + } + + public void removeNotRunningMicroserviceStatus() { + synchronized (microservicesStatus) { + microservicesStatus.entrySet().removeIf(entry -> entry.getValue().getStatus() == MicroserviceState.NOT_RUNNING); + } + } + + public int getRegistriesCount() { + return MicroserviceManager.getInstance().getRegistries().size(); + } + + public Map getRegistriesStatus() { + return registriesStatus; + } } diff --git a/daemon/src/org/eclipse/iofog/supervisor/Supervisor.java b/daemon/src/org/eclipse/iofog/supervisor/Supervisor.java index 08c4ea6b..975dac52 100644 --- a/daemon/src/org/eclipse/iofog/supervisor/Supervisor.java +++ b/daemon/src/org/eclipse/iofog/supervisor/Supervisor.java @@ -109,14 +109,14 @@ private void startModule(IOFogModule ioFogModule) throws Exception { private void operationDuration(){ while (true) { + StatusReporter.setSupervisorStatus() + .setOperationDuration(currentTimeMillis()); try { Thread.sleep(Configuration.getStatusReportFreqSeconds() * 1000); } catch (InterruptedException e) { logWarning(e.getMessage()); System.exit(1); } - StatusReporter.setSupervisorStatus() - .setOperationDuration(currentTimeMillis()); } } diff --git a/daemon/src/org/eclipse/iofog/supervisor/SupervisorStatus.java b/daemon/src/org/eclipse/iofog/supervisor/SupervisorStatus.java index 5e3d9e99..e9cdcff0 100644 --- a/daemon/src/org/eclipse/iofog/supervisor/SupervisorStatus.java +++ b/daemon/src/org/eclipse/iofog/supervisor/SupervisorStatus.java @@ -62,7 +62,8 @@ public SupervisorStatus setDaemonLastStart(long daemonLastStart) { } public long getOperationDuration() { - return operationDuration - daemonLastStart; + long opDuration = operationDuration - daemonLastStart; + return opDuration >= 0 ? opDuration : 0; } public SupervisorStatus setOperationDuration(long operationDuration) { diff --git a/daemon/src/org/eclipse/iofog/utils/Constants.java b/daemon/src/org/eclipse/iofog/utils/Constants.java index 4c4de099..a5deb850 100755 --- a/daemon/src/org/eclipse/iofog/utils/Constants.java +++ b/daemon/src/org/eclipse/iofog/utils/Constants.java @@ -104,4 +104,5 @@ public String fullValue() { public static final String SWITCHER_ELEMENT = "switcher"; public static final String SWITCHER_NODE = "current_config"; public static final String OS_GROUP = "iofog-agent"; + public static final String IOFOG_DOCKER_CONTAINER_NAME_PREFIX = "iofog_"; } \ No newline at end of file diff --git a/daemon/src/org/eclipse/iofog/utils/Orchestrator.java b/daemon/src/org/eclipse/iofog/utils/Orchestrator.java index 5c414b14..07c60dd0 100644 --- a/daemon/src/org/eclipse/iofog/utils/Orchestrator.java +++ b/daemon/src/org/eclipse/iofog/utils/Orchestrator.java @@ -27,7 +27,6 @@ import org.eclipse.iofog.field_agent.FieldAgent; import org.eclipse.iofog.field_agent.enums.RequestType; import org.eclipse.iofog.network.IOFogNetworkInterface; -import org.eclipse.iofog.status_reporter.StatusReporter; import org.eclipse.iofog.utils.configuration.Configuration; import org.eclipse.iofog.utils.trustmanager.X509TrustManagerImpl; @@ -79,7 +78,7 @@ public Orchestrator() { public boolean ping() throws Exception { try { JsonObject result = getJSON(controllerUrl + "status"); - return result.getString("status").equals("online"); + return !result.isNull("status"); } catch (Exception exp) { logWarning(MODULE_NAME, exp.getMessage()); throw exp; diff --git a/iofog_version_controller/pom.xml b/iofog_version_controller/pom.xml index ab38c7ae..38306865 100644 --- a/iofog_version_controller/pom.xml +++ b/iofog_version_controller/pom.xml @@ -16,7 +16,7 @@ iofog-agent org.eclipse - 1.0.6 + 1.0.7 4.0.0 diff --git a/pom.xml b/pom.xml index 40162476..eaeebf9b 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 4.0.0 org.eclipse iofog-agent - 1.0.6 + 1.0.7 iofog-agent pom