From 5a65905d2710718b320b5e49803fbcbfb2dc7eba Mon Sep 17 00:00:00 2001 From: Phil Winder Date: Mon, 18 Jan 2016 17:32:30 +0000 Subject: [PATCH 01/17] Added gradle code to download logstash.zip and place in correct location for spring boot. --- build.gradle | 1 + logstash-scheduler/build.gradle | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/build.gradle b/build.gradle index e709d4b..7cb13a7 100644 --- a/build.gradle +++ b/build.gradle @@ -28,6 +28,7 @@ ext { stompWebsocketVer = "2.3.4" sockjsClientVer = "1.0.0" highchartsVer = "4.1.6" + logstashVer = "2.1.1" // Executor commonsCompressVer = "1.10" diff --git a/logstash-scheduler/build.gradle b/logstash-scheduler/build.gradle index eaa009c..0650786 100644 --- a/logstash-scheduler/build.gradle +++ b/logstash-scheduler/build.gradle @@ -1,5 +1,6 @@ apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'spring-boot' +apply plugin: 'de.undercouch.download' mainClassName = "org.apache.mesos.logstash.scheduler.Application" ext { @@ -12,6 +13,7 @@ buildscript { } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") + classpath 'de.undercouch:gradle-download-task:2.1.0' } } @@ -45,6 +47,7 @@ dependencies { } jar { + dependsOn 'getLogstashZip' baseName = "logstash-mesos-scheduler" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } // Include dependencies from { project(":logstash-executor").getTasksByName("copyJar", false)[0].outputs.files[0] } // Include executor @@ -61,6 +64,13 @@ jar { } } +import de.undercouch.gradle.tasks.download.Download +task getLogstashZip(type: Download) { + src "https://download.elastic.co/logstash/logstash/logstash-" + "$logstashVer" + ".zip" + dest new File(buildDir, './resources/main/public/logstash.zip') + onlyIfNewer true +} + publishing { publications { mavenJava(MavenPublication) { From 1f88f4a1087b00057e1a44e7c16fb3ca8b7889a6 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Tue, 19 Jan 2016 14:59:13 +0100 Subject: [PATCH 02/17] #40 - Code style change. --- .../org/apache/mesos/logstash/scheduler/LogstashScheduler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java index 85c9346..c89e3ea 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java @@ -79,8 +79,7 @@ public void start() { driver = mesosSchedulerDriverFactory.createMesosDriver(this, frameworkBuilder.build(), credentialBuilder.build(), frameworkConfig.getZkUrl()); } - else - { + else { LOGGER.info("Starting Logstash Framework: \n{}", frameworkBuilder); driver = mesosSchedulerDriverFactory.createMesosDriver(this, frameworkBuilder.build(), From 7990de3b30285743e8a0699c42ce76ab35daf13e Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Tue, 19 Jan 2016 15:29:33 +0100 Subject: [PATCH 03/17] #40 - Added enable.docker flag --- .../mesos/logstash/scheduler/Features.java | 9 ++ .../logstash/scheduler/TaskInfoBuilder.java | 85 +++++++++++++------ 2 files changed, 67 insertions(+), 27 deletions(-) diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java index 094a53d..24f0ce8 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java @@ -11,6 +11,15 @@ public class Features { private boolean collectd; private boolean syslog; private boolean file; + private boolean docker; + + public boolean isDocker() { + return docker; + } + + public void setDocker(boolean docker) { + this.docker = docker; + } public boolean isFailover() { return failover; diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index e8ab3a1..6a1966f 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -1,4 +1,6 @@ package org.apache.mesos.logstash.scheduler; + +import org.apache.log4j.Logger; import org.apache.mesos.Protos; import org.apache.mesos.logstash.common.LogstashConstants; import org.apache.mesos.logstash.common.LogstashProtos; @@ -18,6 +20,8 @@ @Component public class TaskInfoBuilder { + public static final Logger LOGGER = Logger.getLogger(TaskInfoBuilder.class); + @Inject private Clock clock; @Inject @@ -28,12 +32,21 @@ public class TaskInfoBuilder { private LogstashConfig logstashConfig; public Protos.TaskInfo buildTask(Protos.Offer offer) { + if (features.isDocker()) { + LOGGER.debug("Building Docker task"); + return buildDockerTask(offer); + } else { + LOGGER.debug("Building native task"); + return buildNativeTask(offer); + } + } + private Protos.TaskInfo buildDockerTask(Protos.Offer offer) { Protos.ContainerInfo.DockerInfo.Builder dockerExecutor = Protos.ContainerInfo.DockerInfo - .newBuilder() - .setForcePullImage(false) - .setNetwork(Protos.ContainerInfo.DockerInfo.Network.BRIDGE) - .setImage(LogstashConstants.EXECUTOR_IMAGE_NAME_WITH_TAG); + .newBuilder() + .setForcePullImage(false) + .setNetwork(Protos.ContainerInfo.DockerInfo.Network.BRIDGE) + .setImage(LogstashConstants.EXECUTOR_IMAGE_NAME_WITH_TAG); if (features.isSyslog()) { dockerExecutor.addPortMappings(Protos.ContainerInfo.DockerInfo.PortMapping.newBuilder().setHostPort(514).setContainerPort(514).setProtocol("udp")); @@ -43,8 +56,8 @@ public Protos.TaskInfo buildTask(Protos.Offer offer) { } Protos.ContainerInfo.Builder container = Protos.ContainerInfo.newBuilder() - .setType(Protos.ContainerInfo.Type.DOCKER) - .setDocker(dockerExecutor.build()); + .setType(Protos.ContainerInfo.Type.DOCKER) + .setDocker(dockerExecutor.build()); if (features.isFile()) { container.addVolumes(Protos.Volume.newBuilder().setHostPath("/").setContainerPath("/logstashpaths").setMode(Protos.Volume.Mode.RO).build()); } @@ -53,19 +66,40 @@ public Protos.TaskInfo buildTask(Protos.Offer offer) { executorConfig, logstashConfig); Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() - .setName(LogstashConstants.NODE_NAME + " executor") - .setExecutorId(Protos.ExecutorID.newBuilder().setValue("executor." + UUID.randomUUID())) - .setContainer(container) - .setCommand(Protos.CommandInfo.newBuilder() - .addArguments("dummyArgument") - .setContainer(Protos.CommandInfo.ContainerInfo.newBuilder() - .setImage(LogstashConstants.EXECUTOR_IMAGE_NAME_WITH_TAG).build()) - .setEnvironment(Protos.Environment.newBuilder() - .addAllVariables(executorEnvVars.getList())) - .setShell(false)) - .build(); + .setName(LogstashConstants.NODE_NAME + " executor") + .setExecutorId(Protos.ExecutorID.newBuilder().setValue("executor." + UUID.randomUUID())) + .setContainer(container) + .setCommand(Protos.CommandInfo.newBuilder() + .addArguments("dummyArgument") + .setContainer(Protos.CommandInfo.ContainerInfo.newBuilder() + .setImage(LogstashConstants.EXECUTOR_IMAGE_NAME_WITH_TAG).build()) + .setEnvironment(Protos.Environment.newBuilder() + .addAllVariables(executorEnvVars.getList())) + .setShell(false)) + .build(); + + return createTask(offer, executorInfo); + } + private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { + ExecutorEnvironmentalVariables executorEnvVars = new ExecutorEnvironmentalVariables( + executorConfig, logstashConfig); + Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() + .setName(LogstashConstants.NODE_NAME + " executor") + .setExecutorId(Protos.ExecutorID.newBuilder().setValue("executor." + UUID.randomUUID())) + .setCommand(Protos.CommandInfo.newBuilder() + .addArguments("dummyArgument") + .setEnvironment(Protos.Environment.newBuilder() + .addAllVariables(executorEnvVars.getList())) + .setShell(false)) + .build(); + + + return createTask(offer, executorInfo); + } + + private Protos.TaskInfo createTask(Protos.Offer offer, Protos.ExecutorInfo executorInfo) { final LogstashProtos.LogstashConfiguration.Builder logstashConfigBuilder = LogstashProtos.LogstashConfiguration.newBuilder(); if (features.isSyslog()) { logstashConfigBuilder.setLogstashPluginInputSyslog( @@ -83,15 +117,14 @@ public Protos.TaskInfo buildTask(Protos.Offer offer) { ); } - return Protos.TaskInfo.newBuilder() - .setExecutor(executorInfo) - .addAllResources(getResourcesList()) - .setName(LogstashConstants.TASK_NAME) - .setTaskId(Protos.TaskID.newBuilder().setValue(formatTaskId(offer))) - .setSlaveId(offer.getSlaveId()) - .setData(logstashConfigBuilder.build().toByteString()) - .build(); + .setExecutor(executorInfo) + .addAllResources(getResourcesList()) + .setName(LogstashConstants.TASK_NAME) + .setTaskId(Protos.TaskID.newBuilder().setValue(formatTaskId(offer))) + .setSlaveId(offer.getSlaveId()) + .setData(logstashConfigBuilder.build().toByteString()) + .build(); } public List getResourcesList() { @@ -132,6 +165,4 @@ private String formatTaskId(Protos.Offer offer) { return LogstashConstants.FRAMEWORK_NAME + "_" + offer.getHostname() + "_" + date; } - - } From 717983793b2876bd245fc1e0484d47d96bba6bc8 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Wed, 20 Jan 2016 10:08:38 +0100 Subject: [PATCH 04/17] #40 - Added web server config --- .../logstash/common/network/NetworkUtils.java | 131 ++++++++++++++++++ .../zookeeper/network/NetworkUtilsTest.java | 62 +++++++++ .../logstash/config/FrameworkConfig.java | 41 ++++++ .../mesos/logstash/scheduler/Features.java | 6 + .../logstash/scheduler/TaskInfoBuilder.java | 29 +++- 5 files changed, 263 insertions(+), 6 deletions(-) create mode 100644 logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java create mode 100644 logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java diff --git a/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java b/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java new file mode 100644 index 0000000..5a21035 --- /dev/null +++ b/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java @@ -0,0 +1,131 @@ +package org.apache.mesos.logstash.common.network; + +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.PumpStreamHandler; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; + +/** + * Utilities to help with networking + */ +@SuppressWarnings("PMD.AvoidUsingHardCodedIP") +public class NetworkUtils { + private static final Logger LOG = Logger.getLogger(NetworkUtils.class); + public static final String DOCKER_MACHINE_IP = "docker-machine ip"; + public static final String LOCALHOST = "127.0.0.1"; + public static final String DOCKER_MACHINE_NAME = "DOCKER_MACHINE_NAME"; + + public static InetAddress hostAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + LOG.error("", e); + throw new RuntimeException("Unable to bind to local host."); + } + } + + public static InetSocketAddress hostSocket(int port) { + return new InetSocketAddress(hostAddress(), port); + } + + public static String addressToString(InetSocketAddress address, Boolean useIpAddress) { + if (useIpAddress) { + return "http://" + address.getAddress().getHostAddress() + ":" + address.getPort(); + } else { + return "http://" + address.getAddress().getHostName() + ":" + address.getPort(); + } + } + + public static String getDockerMachineName(Map environment) { + String envVar = DOCKER_MACHINE_NAME; + String dockerMachineName = environment.getOrDefault(envVar, ""); + if (dockerMachineName == null || dockerMachineName.isEmpty()) { + LOG.debug("The environmental variable DOCKER_MACHINE_NAME was not found. Using docker0 address."); + } + return dockerMachineName; + } + + public static String getDockerHostIpAddress(Map environment) { + String ipAddress = LOCALHOST; // Default of localhost + String dockerMachineName = getDockerMachineName(environment); + + if (!dockerMachineName.isEmpty()) { + LOG.debug("Docker machine name = " + dockerMachineName); + CommandLine commandline = CommandLine.parse(DOCKER_MACHINE_IP); + commandline.addArgument(dockerMachineName); + LOG.debug("Running exec: " + commandline.toString()); + try { + ipAddress = StringUtils.strip(runCommand(commandline)); + } catch (IOException e) { + LOG.error("Unable to run exec command to find ip address.", e); + } + } else { + ipAddress = getDocker0AdapterIPAddress(); + } + LOG.debug("Returned IP address: " + ipAddress); + return ipAddress; + } + + public static Map getEnvironment() { + Map env = Collections.emptyMap(); + try { + env = EnvironmentUtils.getProcEnvironment(); + } catch (IOException e) { + LOG.error("Unable to get environmental variables", e); + } + return env; + } + + public static String runCommand(CommandLine commandline) throws IOException { + DefaultExecutor exec = new DefaultExecutor(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); + exec.setStreamHandler(streamHandler); + exec.execute(commandline); + return outputStream.toString(Charset.defaultCharset().name()); + } + + public static String getDocker0AdapterIPAddress() { + InetAddress docker0 = getLocalAddress("docker0"); + if (docker0 == null) { + LOG.error("Could not get address for docker0"); + return LOCALHOST; + } else { + return docker0.getHostAddress(); + } + } + + private static InetAddress getLocalAddress(String adaptorName){ + try { + Enumeration b = NetworkInterface.getNetworkInterfaces(); + while (b.hasMoreElements()) { + NetworkInterface networkInterface = b.nextElement(); + if (networkInterface.getName().equals(adaptorName)) { + for (InterfaceAddress f : networkInterface.getInterfaceAddresses()) { + if (f.getAddress().isSiteLocalAddress()) { + return f.getAddress(); + } + } + } + } + } catch (SocketException e) { + e.printStackTrace(); + } + return null; + } +} \ No newline at end of file diff --git a/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java b/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java new file mode 100644 index 0000000..78c3ad0 --- /dev/null +++ b/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java @@ -0,0 +1,62 @@ +package org.apache.mesos.logstash.common.zookeeper.network; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.apache.mesos.logstash.common.network.NetworkUtils; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + */ +public class NetworkUtilsTest { + + public static boolean validate(final String ip) { + return InetAddressValidator.getInstance().isValid(ip); + } + + @Test + // Note: On OSX, when not connected to a network, it will return a IPv6 address, which will not validate properly. + // Please connect to a network to obtain a IPv4 address. + public void shouldProvideIPAddress() { + int port = 1234; + String string = NetworkUtils.addressToString(NetworkUtils.hostSocket(port), true); + assertTrue(validate(string.replace("http://", "").replace(":" + port, ""))); + } + + + @Test + public void shouldProvideHostname() { + int port = 1234; + String string = NetworkUtils.addressToString(NetworkUtils.hostSocket(port), false); + assertFalse(validate(string.replace("http://", "").replace(":" + port, ""))); + } + + @Test + public void shouldGetDockerIPAddress() throws IOException { + // Should always be either a valid IP or 127.0.0.1 + assertTrue(validate(NetworkUtils.getDockerHostIpAddress(NetworkUtils.getEnvironment()))); + } + + @Test + public void shouldReturnLocahostOrDocker0AddressWhenNoEnvVar() { + if (NetworkUtils.getDockerHostIpAddress(Collections.emptyMap()).equals(NetworkUtils.LOCALHOST)) { + assertEquals(NetworkUtils.LOCALHOST, NetworkUtils.getDockerHostIpAddress(Collections.emptyMap())); + } else { + assertEquals(NetworkUtils.getDocker0AdapterIPAddress(), NetworkUtils.getDockerHostIpAddress(Collections.emptyMap())); + } + } + + @Test + public void shouldReturnDockerMachineNameWhenIncluded() { + HashMap map = new HashMap<>(); + String dev = "dev"; + map.put(NetworkUtils.DOCKER_MACHINE_NAME, dev); + assertEquals(dev, NetworkUtils.getDockerMachineName(map)); + } +} \ No newline at end of file diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java index 0ce669e..17473a6 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java @@ -1,14 +1,24 @@ package org.apache.mesos.logstash.config; +import org.apache.log4j.Logger; +import org.apache.mesos.logstash.common.network.NetworkUtils; +import org.apache.mesos.logstash.scheduler.Features; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; +import javax.inject.Inject; import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; +import java.net.InetSocketAddress; @Component @ConfigurationProperties public class FrameworkConfig { + + public static final Logger LOGGER = Logger.getLogger(FrameworkConfig.class); + + public static final String LOGSTASH_EXECUTOR_JAR = "logstash-mesos-executor.jar"; + @NotNull @Pattern(regexp = "^zk://.+$") private String zkUrl; @@ -26,6 +36,21 @@ public class FrameworkConfig { private String mesosPrincipal = null; private String mesosSecret = null; + private String javaHome; + + private InetSocketAddress frameworkFileServerAddress; + + @Inject + private Features features; + + public String getJavaHome() { + return javaHome; + } + + public void setJavaHome(String javaHome) { + this.javaHome = javaHome; + } + public String getZkUrl() { return zkUrl; } @@ -97,4 +122,20 @@ public void setMesosSecret(String mesosSecret) { public String getMesosSecret() { return mesosSecret; } + + public void setFrameworkFileServerAddress(InetSocketAddress addr) { + if (addr != null) { + frameworkFileServerAddress = addr; + } else { + LOGGER.error("Could not set webserver address. Was null."); + } + } + + public String getFrameworkFileServerAddress() { + String result = ""; + if (frameworkFileServerAddress != null) { + return NetworkUtils.addressToString(frameworkFileServerAddress, features.getUseIpAddress()); + } + return result; + } } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java index 24f0ce8..4d98be1 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java @@ -12,6 +12,7 @@ public class Features { private boolean syslog; private boolean file; private boolean docker; + private Boolean useIpAddress = false; public boolean isDocker() { return docker; @@ -52,4 +53,9 @@ public boolean isFile() { public void setFile(boolean file) { this.file = file; } + + public Boolean getUseIpAddress() { + return useIpAddress; + } + } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index 6a1966f..f401512 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -6,6 +6,7 @@ import org.apache.mesos.logstash.common.LogstashProtos; import org.apache.mesos.logstash.config.ExecutorConfig; import org.apache.mesos.logstash.config.ExecutorEnvironmentalVariables; +import org.apache.mesos.logstash.config.FrameworkConfig; import org.apache.mesos.logstash.config.LogstashConfig; import org.apache.mesos.logstash.util.Clock; import org.springframework.stereotype.Component; @@ -30,6 +31,8 @@ public class TaskInfoBuilder { private ExecutorConfig executorConfig; @Inject private LogstashConfig logstashConfig; + @Inject + private FrameworkConfig frameworkConfig; public Protos.TaskInfo buildTask(Protos.Offer offer) { if (features.isDocker()) { @@ -85,17 +88,25 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { ExecutorEnvironmentalVariables executorEnvVars = new ExecutorEnvironmentalVariables( executorConfig, logstashConfig); + Protos.CommandInfo.Builder commandInfoBuilder = Protos.CommandInfo.newBuilder() + .setEnvironment(Protos.Environment.newBuilder().addAllVariables(executorEnvVars.getList())); + + String address = frameworkConfig.getFrameworkFileServerAddress(); + if (address == null) { + throw new NullPointerException("Webserver address is null"); + } + String httpPath = address + "/get/" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; + + commandInfoBuilder + .setValue(frameworkConfig.getJavaHome() + "java $JAVA_OPTS -jar ./" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR) + .addUris(Protos.CommandInfo.URI.newBuilder().setValue(httpPath)); + Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() .setName(LogstashConstants.NODE_NAME + " executor") .setExecutorId(Protos.ExecutorID.newBuilder().setValue("executor." + UUID.randomUUID())) - .setCommand(Protos.CommandInfo.newBuilder() - .addArguments("dummyArgument") - .setEnvironment(Protos.Environment.newBuilder() - .addAllVariables(executorEnvVars.getList())) - .setShell(false)) + .setCommand(commandInfoBuilder) .build(); - return createTask(offer, executorInfo); } @@ -127,6 +138,12 @@ private Protos.TaskInfo createTask(Protos.Offer offer, Protos.ExecutorInfo execu .build(); } + private void addIfNotEmpty(List args, String key, String value) { + if (!value.isEmpty()) { + args.addAll(asList(key, value)); + } + } + public List getResourcesList() { int memNeeded = executorConfig.getHeapSize() + logstashConfig.getHeapSize() + executorConfig.getOverheadMem(); From bc84c7de16b64bccc8076209964b076fae71ce9f Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Wed, 20 Jan 2016 12:34:07 +0100 Subject: [PATCH 05/17] #40 - Fixed DeploymentSystem test. * Increased heap size * Offerstrategy now counts executor tasks not scheduler tasks. --- logstash-commons/build.gradle | 4 ++++ .../logstash/common/network/NetworkUtils.java | 20 ++++++++++--------- .../zookeeper/network/NetworkUtilsTest.java | 16 ++++++++------- .../logstash/config/FrameworkConfig.java | 5 ++++- .../logstash/scheduler/OfferStrategy.java | 2 +- .../logstash/scheduler/OfferStrategyTest.java | 3 ++- .../LogstashSchedulerContainer.java | 3 ++- .../systemtest/DeploymentSystemTest.java | 9 +++++++-- 8 files changed, 40 insertions(+), 22 deletions(-) diff --git a/logstash-commons/build.gradle b/logstash-commons/build.gradle index 4225502..aa1074b 100644 --- a/logstash-commons/build.gradle +++ b/logstash-commons/build.gradle @@ -12,6 +12,10 @@ task taskCopyFilesForDocker(type: Copy) { dependencies { compile "com.google.protobuf:protobuf-java:$protobufVer" compile "javax.inject:javax.inject:$javaxInjectVer" + compile "org.springframework:spring-context:4.1.7.RELEASE" + compile "org.apache.commons:commons-exec:1.3" + compile "org.apache.commons:commons-lang3:3.4" + compile "commons-validator:commons-validator:1.5.0" } idea { diff --git a/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java b/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java index 5a21035..237eba5 100644 --- a/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java +++ b/logstash-commons/src/main/java/org/apache/mesos/logstash/common/network/NetworkUtils.java @@ -6,6 +6,7 @@ import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; +import org.springframework.stereotype.Service; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -24,13 +25,14 @@ * Utilities to help with networking */ @SuppressWarnings("PMD.AvoidUsingHardCodedIP") +@Service public class NetworkUtils { private static final Logger LOG = Logger.getLogger(NetworkUtils.class); public static final String DOCKER_MACHINE_IP = "docker-machine ip"; public static final String LOCALHOST = "127.0.0.1"; public static final String DOCKER_MACHINE_NAME = "DOCKER_MACHINE_NAME"; - public static InetAddress hostAddress() { + public InetAddress hostAddress() { try { return InetAddress.getLocalHost(); } catch (UnknownHostException e) { @@ -39,11 +41,11 @@ public static InetAddress hostAddress() { } } - public static InetSocketAddress hostSocket(int port) { + public InetSocketAddress hostSocket(int port) { return new InetSocketAddress(hostAddress(), port); } - public static String addressToString(InetSocketAddress address, Boolean useIpAddress) { + public String addressToString(InetSocketAddress address, Boolean useIpAddress) { if (useIpAddress) { return "http://" + address.getAddress().getHostAddress() + ":" + address.getPort(); } else { @@ -51,7 +53,7 @@ public static String addressToString(InetSocketAddress address, Boolean useIpAdd } } - public static String getDockerMachineName(Map environment) { + public String getDockerMachineName(Map environment) { String envVar = DOCKER_MACHINE_NAME; String dockerMachineName = environment.getOrDefault(envVar, ""); if (dockerMachineName == null || dockerMachineName.isEmpty()) { @@ -60,7 +62,7 @@ public static String getDockerMachineName(Map environment) { return dockerMachineName; } - public static String getDockerHostIpAddress(Map environment) { + public String getDockerHostIpAddress(Map environment) { String ipAddress = LOCALHOST; // Default of localhost String dockerMachineName = getDockerMachineName(environment); @@ -81,7 +83,7 @@ public static String getDockerHostIpAddress(Map environment) { return ipAddress; } - public static Map getEnvironment() { + public Map getEnvironment() { Map env = Collections.emptyMap(); try { env = EnvironmentUtils.getProcEnvironment(); @@ -91,7 +93,7 @@ public static Map getEnvironment() { return env; } - public static String runCommand(CommandLine commandline) throws IOException { + public String runCommand(CommandLine commandline) throws IOException { DefaultExecutor exec = new DefaultExecutor(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); @@ -100,7 +102,7 @@ public static String runCommand(CommandLine commandline) throws IOException { return outputStream.toString(Charset.defaultCharset().name()); } - public static String getDocker0AdapterIPAddress() { + public String getDocker0AdapterIPAddress() { InetAddress docker0 = getLocalAddress("docker0"); if (docker0 == null) { LOG.error("Could not get address for docker0"); @@ -110,7 +112,7 @@ public static String getDocker0AdapterIPAddress() { } } - private static InetAddress getLocalAddress(String adaptorName){ + private InetAddress getLocalAddress(String adaptorName){ try { Enumeration b = NetworkInterface.getNetworkInterfaces(); while (b.hasMoreElements()) { diff --git a/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java b/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java index 78c3ad0..6c3d207 100644 --- a/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java +++ b/logstash-commons/src/test/java/org/apache/mesos/logstash/common/zookeeper/network/NetworkUtilsTest.java @@ -20,12 +20,14 @@ public static boolean validate(final String ip) { return InetAddressValidator.getInstance().isValid(ip); } + private NetworkUtils networkUtils = new NetworkUtils(); + @Test // Note: On OSX, when not connected to a network, it will return a IPv6 address, which will not validate properly. // Please connect to a network to obtain a IPv4 address. public void shouldProvideIPAddress() { int port = 1234; - String string = NetworkUtils.addressToString(NetworkUtils.hostSocket(port), true); + String string = networkUtils.addressToString(networkUtils.hostSocket(port), true); assertTrue(validate(string.replace("http://", "").replace(":" + port, ""))); } @@ -33,22 +35,22 @@ public void shouldProvideIPAddress() { @Test public void shouldProvideHostname() { int port = 1234; - String string = NetworkUtils.addressToString(NetworkUtils.hostSocket(port), false); + String string = networkUtils.addressToString(networkUtils.hostSocket(port), false); assertFalse(validate(string.replace("http://", "").replace(":" + port, ""))); } @Test public void shouldGetDockerIPAddress() throws IOException { // Should always be either a valid IP or 127.0.0.1 - assertTrue(validate(NetworkUtils.getDockerHostIpAddress(NetworkUtils.getEnvironment()))); + assertTrue(validate( networkUtils.getDockerHostIpAddress(networkUtils.getEnvironment()))); } @Test public void shouldReturnLocahostOrDocker0AddressWhenNoEnvVar() { - if (NetworkUtils.getDockerHostIpAddress(Collections.emptyMap()).equals(NetworkUtils.LOCALHOST)) { - assertEquals(NetworkUtils.LOCALHOST, NetworkUtils.getDockerHostIpAddress(Collections.emptyMap())); + if ( networkUtils.getDockerHostIpAddress(Collections.emptyMap()).equals(NetworkUtils.LOCALHOST)) { + assertEquals(NetworkUtils.LOCALHOST, networkUtils.getDockerHostIpAddress(Collections.emptyMap())); } else { - assertEquals(NetworkUtils.getDocker0AdapterIPAddress(), NetworkUtils.getDockerHostIpAddress(Collections.emptyMap())); + assertEquals(networkUtils.getDocker0AdapterIPAddress(), networkUtils.getDockerHostIpAddress(Collections.emptyMap())); } } @@ -57,6 +59,6 @@ public void shouldReturnDockerMachineNameWhenIncluded() { HashMap map = new HashMap<>(); String dev = "dev"; map.put(NetworkUtils.DOCKER_MACHINE_NAME, dev); - assertEquals(dev, NetworkUtils.getDockerMachineName(map)); + assertEquals(dev, networkUtils.getDockerMachineName(map)); } } \ No newline at end of file diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java index 17473a6..1bab25c 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java @@ -43,6 +43,9 @@ public class FrameworkConfig { @Inject private Features features; + @Inject + private NetworkUtils networkUtils; + public String getJavaHome() { return javaHome; } @@ -134,7 +137,7 @@ public void setFrameworkFileServerAddress(InetSocketAddress addr) { public String getFrameworkFileServerAddress() { String result = ""; if (frameworkFileServerAddress != null) { - return NetworkUtils.addressToString(frameworkFileServerAddress, features.getUseIpAddress()); + return networkUtils.addressToString(frameworkFileServerAddress, features.getUseIpAddress()); } return result; } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/OfferStrategy.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/OfferStrategy.java index 75c8a5b..ec68060 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/OfferStrategy.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/OfferStrategy.java @@ -68,7 +68,7 @@ public static OfferResult decline(String reason) { } private boolean isHostAlreadyRunningTask(ClusterState clusterState, Protos.Offer offer) { - return clusterState.getTaskList().stream().anyMatch(taskInfo -> taskInfo.getSlaveId().equals(offer.getSlaveId())); + return clusterState.getTaskList().stream().anyMatch(taskInfo -> taskInfo.getSlaveId().equals(offer.getSlaveId()) && taskInfo.getName().equals("logstash.task")); } private boolean hasEnoughOfResourceType(List resources, String resourceName, double minSize) { diff --git a/logstash-scheduler/src/test/java/org/apache/mesos/logstash/scheduler/OfferStrategyTest.java b/logstash-scheduler/src/test/java/org/apache/mesos/logstash/scheduler/OfferStrategyTest.java index f5e5a53..84d82ec 100644 --- a/logstash-scheduler/src/test/java/org/apache/mesos/logstash/scheduler/OfferStrategyTest.java +++ b/logstash-scheduler/src/test/java/org/apache/mesos/logstash/scheduler/OfferStrategyTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.mesos.Protos; +import org.apache.mesos.logstash.common.LogstashConstants; import org.apache.mesos.logstash.config.ExecutorConfig; import org.apache.mesos.logstash.config.LogstashConfig; import org.apache.mesos.logstash.state.ClusterState; @@ -91,7 +92,7 @@ public void willAcceptValidOffer() throws Exception { private Protos.TaskInfo createTask(String hostname) throws InvalidProtocolBufferException { return Protos.TaskInfo.newBuilder() - .setName("Test") + .setName(LogstashConstants.TASK_NAME) .setTaskId(Protos.TaskID.newBuilder().setValue("TestId").build()) .setSlaveId(Protos.SlaveID.newBuilder().setValue(hostname).build()) .build(); diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java index a27956d..33ed2cc 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java @@ -70,7 +70,8 @@ protected CreateContainerCmd dockerCommand() { "--failover-enabled=false", elasticsearchUrl.map(url -> "--logstash.elasticsearch-url=" + url).orElse(null), "--executor.heap-size=64", - "--logstash.heap-size=128", + "--logstash.heap-size=256", + "--enable.docker=true", withSyslog ? "--enable.syslog=true" : null ).stream().filter(StringUtils::isNotEmpty).collect(Collectors.joining(" ")); diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index 3080b61..51547f7 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.NotModifiedException; import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.model.Container; @@ -50,7 +51,7 @@ public class DeploymentSystemTest { private static DockerClient dockerClient = DockerClientFactory.build(); - private static MesosCluster cluster = new MesosCluster(new ClusterArchitecture.Builder() + private MesosCluster cluster = new MesosCluster(new ClusterArchitecture.Builder() .withZooKeeper() .withMaster() .withSlave(zooKeeper -> new LogstashMesosSlave(dockerClient, zooKeeper)) @@ -65,11 +66,15 @@ public void before() { @After public void after() { - scheduler.ifPresent(scheduler -> dockerClient.listContainersCmd().withSince(scheduler.getContainerId()).exec().stream() + try { + scheduler.ifPresent(scheduler -> dockerClient.listContainersCmd().withSince(scheduler.getContainerId()).exec().stream() .filter(container -> Arrays.stream(container.getNames()).anyMatch(name -> name.startsWith("/mesos-"))) .map(Container::getId) .peek(s -> System.out.println("Stopping mesos- container: " + s)) .forEach(containerId -> dockerClient.stopContainerCmd(containerId).exec())); + } catch (NotModifiedException e) { + // Container is already stopped + } cluster.stop(); } From 35cbc7c38ac1835c829c4029cde4e377ba83ab9d Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Wed, 20 Jan 2016 14:04:35 +0100 Subject: [PATCH 06/17] #40 - Work in progress. Adding Jar mode system test. * Setting up TaskInfoBuilder command --- .../logstash/scheduler/TaskInfoBuilder.java | 8 ++++++-- .../systemtest/LogstashSchedulerContainer.java | 10 +++++++--- .../systemtest/DeploymentSystemTest.java | 16 ++++++++++++++-- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index f401512..b37846c 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -37,10 +37,14 @@ public class TaskInfoBuilder { public Protos.TaskInfo buildTask(Protos.Offer offer) { if (features.isDocker()) { LOGGER.debug("Building Docker task"); - return buildDockerTask(offer); + Protos.TaskInfo taskInfo = buildDockerTask(offer); + LOGGER.debug(taskInfo.toString()); + return taskInfo; } else { LOGGER.debug("Building native task"); - return buildNativeTask(offer); + Protos.TaskInfo taskInfo = buildNativeTask(offer); + LOGGER.debug(taskInfo.toString()); + return taskInfo; } } diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java index 33ed2cc..d00c4cc 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java @@ -25,9 +25,9 @@ public class LogstashSchedulerContainer extends AbstractContainer { public static final String SCHEDULER_NAME = "logstash-scheduler"; private String zookeeperIpAddress; - private final int apiPort = 9092; private Optional elasticsearchUrl = Optional.empty(); private boolean withSyslog = false; + private boolean useDocker = false; public LogstashSchedulerContainer(DockerClient dockerClient, String zookeeperIpAddress) { super(dockerClient); @@ -71,7 +71,7 @@ protected CreateContainerCmd dockerCommand() { elasticsearchUrl.map(url -> "--logstash.elasticsearch-url=" + url).orElse(null), "--executor.heap-size=64", "--logstash.heap-size=256", - "--enable.docker=true", + "--enable.docker=" + useDocker, withSyslog ? "--enable.syslog=true" : null ).stream().filter(StringUtils::isNotEmpty).collect(Collectors.joining(" ")); @@ -79,11 +79,15 @@ protected CreateContainerCmd dockerCommand() { .createContainerCmd(SCHEDULER_IMAGE) .withName(SCHEDULER_NAME + "_" + new SecureRandom().nextInt()) .withEnv("JAVA_OPTS=" + getJavaOpts().stream().collect(Collectors.joining(" "))) - .withExposedPorts(ExposedPort.tcp(apiPort)) + .withExposedPorts(ExposedPort.tcp(9092)) .withCmd(cmd); } public void enableSyslog() { withSyslog = true; } + + public void setDocker(boolean useDocker) { + this.useDocker = useDocker; + } } diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index 51547f7..2b87b6a 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -88,9 +88,21 @@ protected State getClusterStateInfo() { } @Test - public void testDeployment() throws JsonParseException, UnirestException, JsonMappingException { + public void testDeploymentDocker() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); - scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress)); + LogstashSchedulerContainer schedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); + schedulerContainer.setDocker(true); + scheduler = Optional.of(schedulerContainer); + cluster.addAndStartContainer(scheduler.get()); + + waitForFramework(); + } + + @Test + public void testDeploymentJar() throws JsonParseException, UnirestException, JsonMappingException { + String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); + LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); + scheduler = Optional.of(logstashSchedulerContainer); cluster.addAndStartContainer(scheduler.get()); waitForFramework(); From bc384a6285ec10e68a7577ef73eab5cfad84e0d3 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Wed, 20 Jan 2016 16:51:13 +0100 Subject: [PATCH 07/17] #40 - Work in progress. Added config to host executor jar on scheduler. NOTE: Currently the DiscoverySystemTest fails because the Mesos fetcher cannot seem to download the executor from the scheduler. When I add a breakpoint in cluster.stop() I am able to manually fetch the executor jar with curl. See below. I don't understand why the Mesos fetches cannot download it. root@0bcc5c586790:/tmp/mesos/slaves/e9554734-98e1-434f-9e30-f455aff93f32-S0/frameworks/e9554734-98e1-434f-9e30-f455aff93f32-0000/executors/executor.a050196d-97e8-43f8-bdc5-778646cb06dd/runs/latest# cat stderr I0120 15:42:15.261723 800 logging.cpp:172] INFO level logging started! I0120 15:42:15.261976 800 logging.cpp:177] Logging to /var/log I0120 15:42:15.261999 800 fetcher.cpp:414] Fetcher Info: {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/e9554734-98e1-434f-9e30-f455aff93f32-S0","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"http:\/\/172.17.0.6:8080\/logstash-mesos-executor.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/e9554734-98e1-434f-9e30-f455aff93f32-S0\/frameworks\/e9554734-98e1-434f-9e30-f455aff93f32-0000\/executors\/executor.a050196d-97e8-43f8-bdc5-778646cb06dd\/runs\/a2f3b2e6-ecbc-4d24-9fb5-70e6faf6b22b"} I0120 15:42:15.263743 800 fetcher.cpp:369] Fetching URI 'http://172.17.0.6:8080/logstash-mesos-executor.jar' I0120 15:42:15.264490 800 fetcher.cpp:243] Fetching directly into the sandbox directory I0120 15:42:15.264530 800 fetcher.cpp:180] Fetching URI 'http://172.17.0.6:8080/logstash-mesos-executor.jar' I0120 15:42:15.264554 800 fetcher.cpp:127] Downloading resource from 'http://172.17.0.6:8080/logstash-mesos-executor.jar' to '/tmp/mesos/slaves/e9554734-98e1-434f-9e30-f455aff93f32-S0/frameworks/e9554734-98e1-434f-9e30-f455aff93f32-0000/executors/executor.a050196d-97e8-43f8-bdc5-778646cb06dd/runs/a2f3b2e6-ecbc-4d24-9fb5-70e6faf6b22b/logstash-mesos-executor.jar' Failed to fetch 'http://172.17.0.6:8080/logstash-mesos-executor.jar': Error downloading resource: Couldn't connect to server Failed to synchronize with slave (it's probably exited) root@0bcc5c586790:/tmp/mesos/slaves/e9554734-98e1-434f-9e30-f455aff93f32-S0/frameworks/e9554734-98e1-434f-9e30-f455aff93f32-0000/executors/executor.a050196d-97e8-43f8-bdc5-778646cb06dd/runs/latest# curl -XHEAD -v http://172.17.0.> * Hostname was NOT found in DNS cache * Trying 172.17.0.6... * Connected to 172.17.0.6 (172.17.0.6) port 8080 (#0) > HEAD /logstash-mesos-executor.jar HTTP/1.1 > User-Agent: curl/7.35.0 > Host: 172.17.0.6:8080 > Accept: */* > < HTTP/1.1 200 OK < Date: Wed, 20 Jan 2016 15:45:38 GMT < Last-Modified: Wed, 20 Jan 2016 15:01:22 GMT < Content-Type: application/java-archive; charset=UTF-8 < Content-Length: 12559468 * Server Jetty(9.2.z-SNAPSHOT) is not blacklisted < Server: Jetty(9.2.z-SNAPSHOT) --- logstash-scheduler/build.gradle | 14 ++++++++-- .../logstash/config/FrameworkConfig.java | 28 ++++++------------- .../logstash/scheduler/TaskInfoBuilder.java | 5 ++-- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/logstash-scheduler/build.gradle b/logstash-scheduler/build.gradle index 0650786..e1a74b3 100644 --- a/logstash-scheduler/build.gradle +++ b/logstash-scheduler/build.gradle @@ -29,6 +29,7 @@ configurations { dependencies { compile project(':logstash-commons') + compile project(':logstash-executor') compile "log4j:log4j:$log4jVer" // TODO (pnw) I saw logback used somewhere else. Check @@ -46,11 +47,18 @@ dependencies { testCompile("org.springframework:spring-test:4.1.7.RELEASE") } +task copyExecutor(type: Copy) { + from { project(":logstash-executor").getTasksByName("copyJar", false)[0].outputs.files[0] } // Include executor + into "$buildDir/resources/main/static" + rename { String fileName -> + fileName.replace("-${project.version}", "") + } +} + jar { - dependsOn 'getLogstashZip' + dependsOn 'getLogstashZip', 'copyExecutor' baseName = "logstash-mesos-scheduler" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } // Include dependencies - from { project(":logstash-executor").getTasksByName("copyJar", false)[0].outputs.files[0] } // Include executor exclude 'META-INF/*.SF' exclude 'META-INF/*.DSA' @@ -90,7 +98,7 @@ task taskCopyFilesForDocker(type: Copy) { } task copyJar(type: Copy) { - dependsOn 'bootRepackage' + dependsOn 'copyExecutor' from "build/libs/logstash-mesos-scheduler-${project.version}.jar" from "start-scheduler.sh" into 'build/docker' diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java index 1bab25c..e0bc60c 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java @@ -9,7 +9,6 @@ import javax.inject.Inject; import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; -import java.net.InetSocketAddress; @Component @ConfigurationProperties @@ -26,8 +25,9 @@ public class FrameworkConfig { private int zkTimeout = 20000; private String frameworkName = "logstash"; - private int webserverPort = 9092; + private double failoverTimeout = 31449600; + private long reconcilationTimeoutMillis; private String role = "*"; @@ -36,9 +36,7 @@ public class FrameworkConfig { private String mesosPrincipal = null; private String mesosSecret = null; - private String javaHome; - - private InetSocketAddress frameworkFileServerAddress; + private String javaHome = ""; @Inject private Features features; @@ -47,7 +45,11 @@ public class FrameworkConfig { private NetworkUtils networkUtils; public String getJavaHome() { - return javaHome; + if (!javaHome.isEmpty()) { + return javaHome.replaceAll("java$", "").replaceAll("/$", "") + "/"; + } else { + return ""; + } } public void setJavaHome(String javaHome) { @@ -126,19 +128,7 @@ public String getMesosSecret() { return mesosSecret; } - public void setFrameworkFileServerAddress(InetSocketAddress addr) { - if (addr != null) { - frameworkFileServerAddress = addr; - } else { - LOGGER.error("Could not set webserver address. Was null."); - } - } - public String getFrameworkFileServerAddress() { - String result = ""; - if (frameworkFileServerAddress != null) { - return networkUtils.addressToString(frameworkFileServerAddress, features.getUseIpAddress()); - } - return result; + return networkUtils.addressToString(networkUtils.hostSocket(8080), true); } } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index b37846c..d2d4e81 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -13,6 +13,7 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -99,11 +100,11 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { if (address == null) { throw new NullPointerException("Webserver address is null"); } - String httpPath = address + "/get/" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; + String executorUri = address + "/" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; commandInfoBuilder .setValue(frameworkConfig.getJavaHome() + "java $JAVA_OPTS -jar ./" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR) - .addUris(Protos.CommandInfo.URI.newBuilder().setValue(httpPath)); + .addAllUris(Collections.singletonList(Protos.CommandInfo.URI.newBuilder().setValue(executorUri).build())); Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() .setName(LogstashConstants.NODE_NAME + " executor") From 48fab48db43094d7dba86e7ff2203e5798b2b270 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Thu, 21 Jan 2016 10:33:30 +0100 Subject: [PATCH 08/17] #40 Added listener which starts LogstashScheduler after Spring app is started and Jetty server is up and running. --- .../logstash/scheduler/LogstashScheduler.java | 4 -- .../scheduler/WebserverStartedListener.java | 43 +++++++++++++++++++ .../main/resources/META-INF/spring.factories | 1 + .../systemtest/DeploymentSystemTest.java | 5 ++- 4 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/WebserverStartedListener.java create mode 100644 logstash-scheduler/src/main/resources/META-INF/spring.factories diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java index c89e3ea..1552020 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/LogstashScheduler.java @@ -57,7 +57,6 @@ public class LogstashScheduler implements org.apache.mesos.Scheduler { @Inject private FrameworkState frameworkState; - @PostConstruct public void start() { Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder() .setName(frameworkConfig.getFrameworkName()) @@ -88,8 +87,6 @@ public void start() { driver.start(); } - - @PreDestroy public void stop() throws ExecutionException, InterruptedException { configManager.setOnConfigUpdate(null); @@ -159,7 +156,6 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List offers) @Override public void statusUpdate(SchedulerDriver schedulerDriver, TaskStatus status) { - LOGGER.info("Received Status Update. taskId={}, state={}, message={}", status.getTaskId().getValue(), status.getState(), diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/WebserverStartedListener.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/WebserverStartedListener.java new file mode 100644 index 0000000..2274342 --- /dev/null +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/WebserverStartedListener.java @@ -0,0 +1,43 @@ +package org.apache.mesos.logstash.scheduler; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringApplicationRunListener; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; + +/** + * Listener for when Spring's web server for static content has started + */ +public class WebserverStartedListener implements SpringApplicationRunListener { + + public WebserverStartedListener(SpringApplication application, String[] args) { + + } + + @Override + public void started() { + + } + + @Override + public void environmentPrepared(ConfigurableEnvironment environment) { + + } + + @Override + public void contextPrepared(ConfigurableApplicationContext context) { + + } + + @Override + public void contextLoaded(ConfigurableApplicationContext context) { + + } + + @Override + public void finished(ConfigurableApplicationContext context, Throwable exception) { + LogstashScheduler scheduler = context.getBean(LogstashScheduler.class); + scheduler.start(); + } + +} diff --git a/logstash-scheduler/src/main/resources/META-INF/spring.factories b/logstash-scheduler/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..de29692 --- /dev/null +++ b/logstash-scheduler/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.SpringApplicationRunListener=org.apache.mesos.logstash.scheduler.WebserverStartedListener \ No newline at end of file diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index 2b87b6a..c58502c 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -29,6 +29,7 @@ import org.json.JSONArray; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.Arrays; @@ -98,8 +99,8 @@ public void testDeploymentDocker() throws JsonParseException, UnirestException, waitForFramework(); } - @Test - public void testDeploymentJar() throws JsonParseException, UnirestException, JsonMappingException { + @Ignore + public void testDeploymentJar() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); scheduler = Optional.of(logstashSchedulerContainer); From 2be067ff09677078ce328d785a9573469b05be74 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Mon, 25 Jan 2016 16:35:28 +0100 Subject: [PATCH 09/17] #40 DeploymentSystemTest.testJarDeployment works * Upgraded to latest minimesos * Logstash executor and zip now get downloaded to Mesos agents * Docker test now fails because logstash path is now different --- build.gradle | 2 +- .../logstash/executor/LogstashExecutor.java | 45 +++++-------------- .../logstash/executor/LogstashService.java | 6 ++- .../logstash/config/FrameworkConfig.java | 24 +++++++--- .../logstash/scheduler/TaskInfoBuilder.java | 31 ++++--------- .../mesos/logstash/systemtest/Main.java | 4 +- .../AbstractLogstashFrameworkTest.java | 32 ++++++++----- .../systemtest/DeploymentSystemTest.java | 19 ++++---- .../logstash/systemtest/LocalCluster.java | 2 +- 9 files changed, 78 insertions(+), 87 deletions(-) diff --git a/build.gradle b/build.gradle index 7cb13a7..1dfea44 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ ext { // System test dockerJavaVer = "4f094c112"; unirestVer = "1.4.7" - minimesosVer = "0.5.0" + minimesosVer = "db0abe511995bd87d1c76747f9e7417d1f875050" elasticsearchVer = "1.7.3" javaxInjectVer = "1"; diff --git a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashExecutor.java b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashExecutor.java index 999d123..8444545 100644 --- a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashExecutor.java +++ b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashExecutor.java @@ -38,15 +38,19 @@ public void launchTask(final ExecutorDriver driver, final Protos.TaskInfo task) LOGGER.info("Forked thread with LogstashService.run()"); try { logstashService.run(logstashConfiguration); - LOGGER.error("LogstashService finished"); + LOGGER.info("LogstashService finished"); + driver.sendStatusUpdate(Protos.TaskStatus.newBuilder() + .setExecutorId(task.getExecutor().getExecutorId()) + .setTaskId(task.getTaskId()) + .setState(Protos.TaskState.TASK_FINISHED).build()); } catch (Exception e) { LOGGER.error("Logstash service failed", e); + driver.sendStatusUpdate(Protos.TaskStatus.newBuilder() + .setExecutorId(task.getExecutor().getExecutorId()) + .setTaskId(task.getTaskId()) + .setState(Protos.TaskState.TASK_FAILED) + .setMessage(e.getCause().getMessage()).build()); } - - driver.sendStatusUpdate(Protos.TaskStatus.newBuilder() - .setExecutorId(task.getExecutor().getExecutorId()) - .setTaskId(task.getTaskId()) - .setState(Protos.TaskState.TASK_FAILED).build()); driver.stop(); }); thread.setDaemon(true); @@ -71,44 +75,17 @@ public void killTask(ExecutorDriver driver, Protos.TaskID taskId) { @Override public void frameworkMessage(ExecutorDriver driver, byte[] data) { - System.out.println("LogstashExecutor.frameworkMessage"); + LOGGER.info("LogstashExecutor.frameworkMessage"); try { SchedulerMessage message = SchedulerMessage.parseFrom(data); - LOGGER.info("SchedulerMessage. message={}", message); - -/* - if (message.getType().equals(REQUEST_STATS)) { - sendStatsToScheduler(driver); - } else { - updateConfig(message); - } -*/ } catch (InvalidProtocolBufferException e) { LOGGER.error("Error parsing framework message from scheduler.", e); } } -/* - private void updateConfig(SchedulerMessage message) { - LOGGER.info("New configuration received. Reconfiguring..."); - - // TODO extract config and update service: - // logstashService.update(514, "elasticsearch.service:9200"); - } -*/ - -/* - private void sendStatsToScheduler(ExecutorDriver driver) { - driver.sendFrameworkMessage(liveState.getStateAsExecutorMessage().toByteArray()); - - } - -*/ @Override public void shutdown(ExecutorDriver driver) { - // The task i killed automatically, so we don't have to - // do anything. LOGGER.info("Shutting down framework."); } diff --git a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java index d405298..e23f8d5 100644 --- a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java +++ b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java @@ -27,6 +27,10 @@ public class LogstashService { public static final Logger LOGGER = LoggerFactory.getLogger(LogstashService.class); + private static final String LOGSTASH_VERSION = "2.1.1"; + + public static final String LOGSTASH_PATH = "logstash-" + LOGSTASH_VERSION + ""; + private static Optional ofConditional(T message, Predicate predicate) { if (message != null && predicate.test(message)) { return Optional.of(message); @@ -100,7 +104,7 @@ public void run(LogstashProtos.LogstashConfiguration logstashConfiguration) { Process process; try { String[] command = { - "/opt/logstash/bin/logstash", + LOGSTASH_PATH + "/bin/logstash", "--log", "/var/log/logstash.log", "-e", serialize(logstashConfiguration) }; diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java index e0bc60c..03face8 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java @@ -1,8 +1,6 @@ package org.apache.mesos.logstash.config; -import org.apache.log4j.Logger; import org.apache.mesos.logstash.common.network.NetworkUtils; -import org.apache.mesos.logstash.scheduler.Features; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @@ -10,13 +8,15 @@ import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; +// TODO: Add unit test to assert default settings, URIs and commands + @Component @ConfigurationProperties public class FrameworkConfig { - public static final Logger LOGGER = Logger.getLogger(FrameworkConfig.class); + private static final String LOGSTASH_EXECUTOR_JAR = "logstash-mesos-executor.jar"; - public static final String LOGSTASH_EXECUTOR_JAR = "logstash-mesos-executor.jar"; + private static final String LOGSTASH_ZIP = "logstash.zip"; @NotNull @Pattern(regexp = "^zk://.+$") @@ -38,9 +38,6 @@ public class FrameworkConfig { private String javaHome = ""; - @Inject - private Features features; - @Inject private NetworkUtils networkUtils; @@ -131,4 +128,17 @@ public String getMesosSecret() { public String getFrameworkFileServerAddress() { return networkUtils.addressToString(networkUtils.hostSocket(8080), true); } + + public String getLogstashZipUri() { + return getFrameworkFileServerAddress() + "/" + FrameworkConfig.LOGSTASH_ZIP; + } + + public String getLogstashExecutorUri() { + return getFrameworkFileServerAddress() + "/" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; + } + + public String getExecutorCommand() { + return getJavaHome() + "java $JAVA_OPTS -jar ./" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; + } + } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index d2d4e81..bc36c35 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -13,7 +13,7 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -90,21 +90,14 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer) { } private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { - ExecutorEnvironmentalVariables executorEnvVars = new ExecutorEnvironmentalVariables( - executorConfig, logstashConfig); - Protos.CommandInfo.Builder commandInfoBuilder = Protos.CommandInfo.newBuilder() - .setEnvironment(Protos.Environment.newBuilder().addAllVariables(executorEnvVars.getList())); - - String address = frameworkConfig.getFrameworkFileServerAddress(); - if (address == null) { - throw new NullPointerException("Webserver address is null"); - } - String executorUri = address + "/" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR; - - commandInfoBuilder - .setValue(frameworkConfig.getJavaHome() + "java $JAVA_OPTS -jar ./" + FrameworkConfig.LOGSTASH_EXECUTOR_JAR) - .addAllUris(Collections.singletonList(Protos.CommandInfo.URI.newBuilder().setValue(executorUri).build())); + .setEnvironment(Protos.Environment.newBuilder().addAllVariables( + new ExecutorEnvironmentalVariables(executorConfig, logstashConfig).getList())) + .setValue(frameworkConfig.getExecutorCommand()) + .addAllUris(Arrays.asList( + Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashZipUri()).build(), + Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashExecutorUri()).build() + )); Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() .setName(LogstashConstants.NODE_NAME + " executor") @@ -119,7 +112,7 @@ private Protos.TaskInfo createTask(Protos.Offer offer, Protos.ExecutorInfo execu final LogstashProtos.LogstashConfiguration.Builder logstashConfigBuilder = LogstashProtos.LogstashConfiguration.newBuilder(); if (features.isSyslog()) { logstashConfigBuilder.setLogstashPluginInputSyslog( - LogstashProtos.LogstashPluginInputSyslog.newBuilder().setPort(514) // FIXME take from config + LogstashProtos.LogstashPluginInputSyslog.newBuilder().setPort(514) // TODO take from config ); } //TODO: repeat for collectd @@ -143,12 +136,6 @@ private Protos.TaskInfo createTask(Protos.Offer offer, Protos.ExecutorInfo execu .build(); } - private void addIfNotEmpty(List args, String key, String value) { - if (!value.isEmpty()) { - args.addAll(asList(key, value)); - } - } - public List getResourcesList() { int memNeeded = executorConfig.getHeapSize() + logstashConfig.getHeapSize() + executorConfig.getOverheadMem(); diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java index 2f65a96..92e6d8f 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java @@ -7,6 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** * Main app to run Mesos Logstash with Mini Mesos. */ @@ -20,7 +22,7 @@ public static void main(String[] args) throws InterruptedException { MesosCluster cluster = new MesosCluster(ClusterUtil.withSlaves(1, zooKeeper -> new LogstashMesosSlave(dockerClient, zooKeeper)).withMaster().build()); - cluster.start(); + cluster.start(5); /* LOGGER.info("Starting scheduler"); diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java index 3d28a7c..dce299c 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java @@ -3,11 +3,16 @@ import com.containersol.minimesos.MesosCluster; import com.containersol.minimesos.mesos.ClusterUtil; import com.containersol.minimesos.mesos.MesosSlave; +import com.containersol.minimesos.state.State; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.InternalServerErrorException; import com.github.dockerjava.api.model.Container; import com.github.dockerjava.core.DockerClientBuilder; import com.github.dockerjava.core.DockerClientConfig; +import com.jayway.awaitility.Awaitility; +import com.mashape.unirest.http.exceptions.UnirestException; import org.apache.mesos.logstash.common.LogstashProtos.ExecutorMessage; import org.apache.mesos.logstash.config.ConfigManager; import org.junit.Before; @@ -20,6 +25,7 @@ import java.util.List; import java.util.TreeMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import static com.jayway.awaitility.Awaitility.await; @@ -55,11 +61,11 @@ public static void publishExecutorInMesosCluster() throws IOException { clusterDockerClient = DockerClientBuilder.getInstance(dockerConfigBuilder.build()).build(); LogstashSchedulerContainer schedulerContainer = new LogstashSchedulerContainer(clusterDockerClient, cluster.getMesosMasterContainer().getIpAddress()); - schedulerContainer.start(); + schedulerContainer.start(60); } @Before - public void startLogstashFramework() throws IOException, ExecutionException, InterruptedException { + public void startLogstashFramework() throws IOException, ExecutionException, InterruptedException, UnirestException { TemporaryFolder folder = new TemporaryFolder(); folder.create(); @@ -86,16 +92,22 @@ private void printRunningContainers(DockerClient dockerClient) { } } - private static void waitForLogstashFramework() { - // wait for our framework - cluster.waitForState(state -> state.getFramework("logstash") != null); + private static void waitForLogstashFramework() throws UnirestException, JsonParseException, JsonMappingException { + State state = State.fromJSON(cluster.getStateInfoJSON().toString()); + int timeout = 60; + Awaitility.await("Logstash framework did not start within " + timeout + " seconds") + .atMost(timeout, TimeUnit.SECONDS) + .until(() -> state.getFramework("logstash") != null); } - private static void waitForExcutorTaskIsRunning() { - // wait for our executor - cluster.waitForState(state -> state.getFramework("logstash") != null - && state.getFramework("logstash").getTasks().size() > 0 - && "TASK_RUNNING".equals(state.getFramework("logstash").getTasks().get(0).getState())); + private static void waitForExcutorTaskIsRunning() throws UnirestException, JsonParseException, JsonMappingException { + State state = State.fromJSON(cluster.getStateInfoJSON().toString()); + int timeout = 60; + Awaitility.await("Logstash executor did not start within " + timeout + " seconds") + .atMost(timeout, TimeUnit.SECONDS) + .until(() -> state.getFramework("logstash") != null + && state.getFramework("logstash").getTasks().size() > 0 + && "TASK_RUNNING".equals(state.getFramework("logstash").getTasks().get(0).getState())); } /** diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index c58502c..d798b87 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -29,7 +29,6 @@ import org.json.JSONArray; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.Arrays; @@ -81,7 +80,7 @@ public void after() { protected State getClusterStateInfo() { try { - return cluster.getStateInfo(); + return State.fromJSON(cluster.getStateInfoJSON().toString()); } catch (Exception e) { fail(e.getMessage()); return null; @@ -94,17 +93,17 @@ public void testDeploymentDocker() throws JsonParseException, UnirestException, LogstashSchedulerContainer schedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); schedulerContainer.setDocker(true); scheduler = Optional.of(schedulerContainer); - cluster.addAndStartContainer(scheduler.get()); + cluster.addAndStartContainer(scheduler.get(), 60); waitForFramework(); } - @Ignore + @Test public void testDeploymentJar() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); scheduler = Optional.of(logstashSchedulerContainer); - cluster.addAndStartContainer(scheduler.get()); + cluster.addAndStartContainer(scheduler.get(), 60); waitForFramework(); } @@ -136,7 +135,7 @@ protected CreateContainerCmd dockerCommand() { return dockerClient.createContainerCmd("elasticsearch:" + version).withCmd("elasticsearch", "-Des.cluster.name=\"" + elasticsearchClusterName + "\"", "-Des.discovery.zen.ping.multicast.enabled=false"); } }; - cluster.addAndStartContainer(elasticsearchInstance); + cluster.addAndStartContainer(elasticsearchInstance, 60); final int elasticsearchPort = 9300; @@ -156,7 +155,7 @@ protected CreateContainerCmd dockerCommand() { scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200)); scheduler.get().enableSyslog(); - cluster.addAndStartContainer(scheduler.get()); + cluster.addAndStartContainer(scheduler.get(), 60); waitForFramework(); @@ -208,14 +207,14 @@ protected CreateContainerCmd dockerCommand() { public void willAddExecutorOnNewNodes() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress)); - cluster.addAndStartContainer(scheduler.get()); + cluster.addAndStartContainer(scheduler.get(), 60); waitForFramework(); - IntStream.range(0, 2).forEach(value -> cluster.addAndStartContainer(new LogstashMesosSlave(dockerClient, cluster.getZkContainer()))); + IntStream.range(0, 2).forEach(value -> cluster.addAndStartContainer(new LogstashMesosSlave(dockerClient, cluster.getZkContainer()), 60)); await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until( - () -> cluster.getStateInfo().getFramework("logstash").getTasks().stream().filter(task -> task.getState().equals("TASK_RUNNING")).count() == 3 + () -> State.fromJSON(cluster.getStateInfoJSON().toString()).getFramework("logstash").getTasks().stream().filter(task -> task.getState().equals("TASK_RUNNING")).count() == 3 ); // TODO use com.containersol.minimesos.state.Task when it exposes the slave_id property https://github.com/ContainerSolutions/minimesos/issues/168 diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java index cb3881c..0b02743 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java @@ -40,7 +40,7 @@ private void run() throws Exception { DummyFrameworkContainer dummyFrameworkContainer = new DummyFrameworkContainer( clusterDockerClient, "dummy-framework"); - dummyFrameworkContainer.start(); + dummyFrameworkContainer.start(5); System.setProperty("mesos.zk", cluster.getZkUrl()); System.setProperty("mesos.logstash.logstash.heap.size", "128"); From 9f098d590f50b81a78ff716775c7bc9f60641b80 Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Tue, 26 Jan 2016 09:51:47 +0100 Subject: [PATCH 10/17] #40 Fixed all tests * Added LOGSTASH_PATH environment variable for the executor * Set LOGSTASH_PATH based on run mode: Docker or Jar --- .../apache/mesos/logstash/executor/LogstashService.java | 6 ++---- .../logstash/config/ExecutorEnvironmentalVariables.java | 3 ++- .../apache/mesos/logstash/scheduler/TaskInfoBuilder.java | 8 +++++++- .../mesos/logstash/systemtest/DeploymentSystemTest.java | 8 ++++++-- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java index e23f8d5..96f24f4 100644 --- a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java +++ b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java @@ -27,9 +27,7 @@ public class LogstashService { public static final Logger LOGGER = LoggerFactory.getLogger(LogstashService.class); - private static final String LOGSTASH_VERSION = "2.1.1"; - - public static final String LOGSTASH_PATH = "logstash-" + LOGSTASH_VERSION + ""; + public static final String LOGSTASH_PATH = System.getenv("LOGSTASH_PATH"); private static Optional ofConditional(T message, Predicate predicate) { if (message != null && predicate.test(message)) { @@ -104,7 +102,7 @@ public void run(LogstashProtos.LogstashConfiguration logstashConfiguration) { Process process; try { String[] command = { - LOGSTASH_PATH + "/bin/logstash", + LOGSTASH_PATH, "--log", "/var/log/logstash.log", "-e", serialize(logstashConfiguration) }; diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/ExecutorEnvironmentalVariables.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/ExecutorEnvironmentalVariables.java index 23eedb6..a0642ce 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/ExecutorEnvironmentalVariables.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/ExecutorEnvironmentalVariables.java @@ -11,6 +11,7 @@ public class ExecutorEnvironmentalVariables { private static final String native_mesos_library_key = "MESOS_NATIVE_JAVA_LIBRARY"; private static final String native_mesos_library_path = "/usr/lib/libmesos.so"; // libmesos.so is usually symlinked to the version. + public static final String LOGSTASH_PATH = "LOGSTASH_PATH"; public static final String JAVA_OPTS = "JAVA_OPTS"; private final List envList = new ArrayList<>(); @@ -38,7 +39,7 @@ private void populateEnvMap(ExecutorConfig executorConfig, LogstashConfig logsta addToList(JAVA_OPTS, getExecutorHeap(executorConfig) + " " + getLogstashHeap(logstashConfig)); } - private void addToList(String key, String value) { + public void addToList(String key, String value) { envList.add(getEnvProto(key, value)); } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index bc36c35..494284f 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -24,6 +24,8 @@ public class TaskInfoBuilder { public static final Logger LOGGER = Logger.getLogger(TaskInfoBuilder.class); + private static final String LOGSTASH_VERSION = "2.1.1"; + @Inject private Clock clock; @Inject @@ -72,6 +74,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer) { ExecutorEnvironmentalVariables executorEnvVars = new ExecutorEnvironmentalVariables( executorConfig, logstashConfig); + executorEnvVars.addToList(ExecutorEnvironmentalVariables.LOGSTASH_PATH, "/opt/logstash/bin/logstash"); Protos.ExecutorInfo executorInfo = Protos.ExecutorInfo.newBuilder() .setName(LogstashConstants.NODE_NAME + " executor") @@ -90,9 +93,12 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer) { } private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { + ExecutorEnvironmentalVariables executorEnvVars = new ExecutorEnvironmentalVariables(executorConfig, logstashConfig); + executorEnvVars.addToList(ExecutorEnvironmentalVariables.LOGSTASH_PATH, "./logstash-" + LOGSTASH_VERSION + "/bin/logstash"); + Protos.CommandInfo.Builder commandInfoBuilder = Protos.CommandInfo.newBuilder() .setEnvironment(Protos.Environment.newBuilder().addAllVariables( - new ExecutorEnvironmentalVariables(executorConfig, logstashConfig).getList())) + executorEnvVars.getList())) .setValue(frameworkConfig.getExecutorCommand()) .addAllUris(Arrays.asList( Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashZipUri()).build(), diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index d798b87..a909238 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -153,7 +153,9 @@ protected CreateContainerCmd dockerCommand() { }); assertEquals(elasticsearchClusterName, elasticsearchClient.get().admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet().getClusterName()); - scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200)); + LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200); + logstashSchedulerContainer.setDocker(true); + scheduler = Optional.of(logstashSchedulerContainer); scheduler.get().enableSyslog(); cluster.addAndStartContainer(scheduler.get(), 60); @@ -206,7 +208,9 @@ protected CreateContainerCmd dockerCommand() { @Test public void willAddExecutorOnNewNodes() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); - scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress)); + LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress); + logstashSchedulerContainer.setDocker(true); + scheduler = Optional.of(logstashSchedulerContainer); cluster.addAndStartContainer(scheduler.get(), 60); waitForFramework(); From 76a3761ae75e7a000e1bdb0d53d364197a5452bd Mon Sep 17 00:00:00 2001 From: Frank Scholten Date: Wed, 27 Jan 2016 09:41:37 +0100 Subject: [PATCH 11/17] #40 Now uses Logstash version 1.5.6 * NOTE Test willForwardDataToElasticsearch currently faiils: the log message sent to syslog cannot be found in Elasticsearch, however when I put a breakpoint in the code and send the syslog message via UDP from the cli or from a separate application it can be found in ES. So it seems like a race condition. Try changing the await() block * Extracted ElasticsearchContainer * Refactored willForwardDataToElasticsearch. --- build.gradle | 2 +- .../systemtest/DeploymentSystemTest.java | 158 ++++++++---------- .../systemtest/ElasticsearchContainer.java | 72 ++++++++ 3 files changed, 143 insertions(+), 89 deletions(-) create mode 100644 system-test/src/test/java/org/apache/mesos/logstash/systemtest/ElasticsearchContainer.java diff --git a/build.gradle b/build.gradle index 1dfea44..a379b87 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ ext { stompWebsocketVer = "2.3.4" sockjsClientVer = "1.0.0" highchartsVer = "4.1.6" - logstashVer = "2.1.1" + logstashVer = "1.5.6" // Executor commonsCompressVer = "1.10" diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index a909238..b6bed81 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -1,7 +1,6 @@ package org.apache.mesos.logstash.systemtest; import com.containersol.minimesos.MesosCluster; -import com.containersol.minimesos.container.AbstractContainer; import com.containersol.minimesos.mesos.ClusterArchitecture; import com.containersol.minimesos.mesos.DockerClientFactory; import com.containersol.minimesos.state.Framework; @@ -10,27 +9,30 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.NotModifiedException; -import com.github.dockerjava.api.command.CreateContainerCmd; -import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.model.Container; -import com.github.dockerjava.api.model.Link; -import com.github.dockerjava.core.command.PullImageResultCallback; import com.mashape.unirest.http.exceptions.UnirestException; import org.apache.commons.lang.RandomStringUtils; -import org.apache.commons.lang.StringUtils; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.SearchHits; import org.json.JSONArray; +import org.json.JSONObject; import org.junit.After; import org.junit.Before; import org.junit.Test; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -38,7 +40,6 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static com.jayway.awaitility.Awaitility.await; @@ -51,6 +52,8 @@ public class DeploymentSystemTest { private static DockerClient dockerClient = DockerClientFactory.build(); + private static final Logger LOGGER = LoggerFactory.getLogger(DeploymentSystemTest.class); + private MesosCluster cluster = new MesosCluster(new ClusterArchitecture.Builder() .withZooKeeper() .withMaster() @@ -64,6 +67,7 @@ public void before() { cluster.start(); } + @SuppressWarnings({"PMD.EmptyCatchBlock"}) @After public void after() { try { @@ -109,51 +113,36 @@ public void testDeploymentJar() throws JsonParseException, UnirestException, Js } private void waitForFramework() { - await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until(() -> { - Framework framework = getClusterStateInfo().getFramework("logstash"); - assertNotNull(framework); - assertTrue(framework.getTasks().size() > 0); - assertEquals("TASK_RUNNING", framework.getTasks().get(0).getState()); + await().atMost(2, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until(() -> { + JSONArray frameworks = cluster.getStateInfoJSON().getJSONArray("frameworks"); + if (frameworks.length() == 0) { + LOGGER.info("Logstash framework is not yet running"); + return false; + } + + JSONArray tasks = frameworks.getJSONObject(0).getJSONArray("tasks"); + if (tasks.length() != 0) { + if (tasks.getJSONObject(0).getString("name").equals("logstash.task")) { + LOGGER.info("Logstash executor running"); + return true; + } + } + + LOGGER.info("Logstash executor not yet running"); + return false; }); } @Test - public void willForwardDataToElasticsearch() throws JsonParseException, UnirestException, JsonMappingException, ExecutionException { + public void willForwardDataToElasticsearch() throws IOException, UnirestException, ExecutionException, InterruptedException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); - final String elasticsearchClusterName = "test-" + System.currentTimeMillis(); - final AbstractContainer elasticsearchInstance = new AbstractContainer(dockerClient) { - private final String version = "1.7"; - - @Override - protected void pullImage() { - pullImage("elasticsearch", version); - } - - @Override - protected CreateContainerCmd dockerCommand() { - return dockerClient.createContainerCmd("elasticsearch:" + version).withCmd("elasticsearch", "-Des.cluster.name=\"" + elasticsearchClusterName + "\"", "-Des.discovery.zen.ping.multicast.enabled=false"); - } - }; - cluster.addAndStartContainer(elasticsearchInstance, 60); - - final int elasticsearchPort = 9300; + final ElasticsearchContainer elasticsearchContainer = new ElasticsearchContainer(dockerClient); + cluster.addAndStartContainer(elasticsearchContainer, 60); - final AtomicReference elasticsearchClient = new AtomicReference<>(); - await().atMost(30, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS).until(() -> { - Client c = new TransportClient(ImmutableSettings.settingsBuilder().put("cluster.name", elasticsearchClusterName).build()).addTransportAddress(new InetSocketTransportAddress(elasticsearchInstance.getIpAddress(), elasticsearchPort)); - try { - c.admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet(); - } catch (ElasticsearchException e) { - c.close(); - return false; - } - elasticsearchClient.set(c); - return true; - }); - assertEquals(elasticsearchClusterName, elasticsearchClient.get().admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet().getClusterName()); + elasticsearchContainer.waitUntilHealthy(); - LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200); + LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, elasticsearchContainer.getClientUrl()); logstashSchedulerContainer.setDocker(true); scheduler = Optional.of(logstashSchedulerContainer); scheduler.get().enableSyslog(); @@ -161,48 +150,41 @@ protected CreateContainerCmd dockerCommand() { waitForFramework(); - final String sysLogPort = "514"; - final String randomLogLine = "Hello " + RandomStringUtils.randomAlphanumeric(32); - - dockerClient.pullImageCmd("ubuntu:15.10").exec(new PullImageResultCallback()).awaitSuccess(); - final String logstashSlave = dockerClient.listContainersCmd().withSince(cluster.getSlaves()[0].getContainerId()).exec().stream().filter(container -> container.getImage().endsWith("/logstash-executor:latest")).findFirst().map(Container::getId).orElseThrow(() -> new RuntimeException("Unable to find logstash container")); - await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).ignoreExceptions().until(() -> { - assertTrue(dockerClient.inspectContainerCmd(logstashSlave).exec().getState().isRunning()); - - final CreateContainerResponse loggerContainer = dockerClient.createContainerCmd("ubuntu:15.10").withLinks(new Link(logstashSlave, "logstash")).withCmd("logger", "--server=logstash", "--port=" + sysLogPort, "--udp", "--rfc3164", randomLogLine).exec(); - dockerClient.startContainerCmd(loggerContainer.getId()).exec(); - await().atMost(10, TimeUnit.SECONDS).until(() -> { - // TODO: this is a hack to determine whether the container has stopped. - // We should use ...exec().getState().getRunning() but docker-java doesn't provide that - // (even though it's available in the JSON provided by Docker). - final String finishedAt = dockerClient.inspectContainerCmd(loggerContainer.getId()).exec().getState().getFinishedAt(); - return StringUtils.isNotBlank(finishedAt) && !finishedAt.equals("0001-01-01T00:00:00Z"); - }); - final int exitCode = dockerClient.inspectContainerCmd(loggerContainer.getId()).exec().getState().getExitCode(); - dockerClient.removeContainerCmd(loggerContainer.getId()).exec(); - assertEquals(0, exitCode); - elasticsearchClient.get().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits().getAt(0).fields(); - }); - await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).until(() -> { - Map fields = elasticsearchClient.get().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits().getAt(0).fields(); + String logline = "Hello " + RandomStringUtils.randomAlphanumeric(32); - String esMessage = fields.get("message").getValue(); - assertEquals(randomLogLine, esMessage.trim()); - - String esMesosSlaveId = fields.get("mesos_slave_id").getValue(); + Thread thread = new Thread() { + @Override + public void run() { + try { + DatagramSocket socket = null; + socket = new DatagramSocket(); + byte[] buf = logline.getBytes(); + InetAddress address = InetAddress.getByName("localhost"); + DatagramPacket packet = new DatagramPacket(buf, buf.length, address, 514); + socket.send(packet); + } catch (IOException e) { + // Ignore + } + } + }; + thread.run(); - String trueSlaveId; - try { - trueSlaveId = cluster.getStateInfoJSON().getJSONArray("slaves").getJSONObject(0).getString("id"); - } catch (Exception e) { - throw new RuntimeException(e); + await().atMost(2, TimeUnit.MINUTES).pollInterval(2, TimeUnit.SECONDS).until(() -> { + SearchHits hits = elasticsearchContainer.getClient().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits(); + if (hits.totalHits() == 0) { + LOGGER.info("Log message not found in Elasticsearch on " + elasticsearchContainer.getClientUrl()); + return false; } - assertEquals( - trueSlaveId, - esMesosSlaveId.trim() - ); + LOGGER.info("Log message found in Elasticsearch"); return true; }); + + SearchHits hits = elasticsearchContainer.getClient().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits(); + String esMessage = hits.getAt(0).getFields().get("message").getValue(); + assertEquals(logline, esMessage.trim()); + String esMesosSlaveId = hits.getAt(0).getFields().get("mesos_slave_id").getValue(); + String trueSlaveId = cluster.getStateInfoJSON().getJSONArray("slaves").getJSONObject(0).getString("id"); + assertEquals(trueSlaveId, esMesosSlaveId.trim()); } @Test diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/ElasticsearchContainer.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/ElasticsearchContainer.java new file mode 100644 index 0000000..9769753 --- /dev/null +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/ElasticsearchContainer.java @@ -0,0 +1,72 @@ +package org.apache.mesos.logstash.systemtest; + +import com.containersol.minimesos.container.AbstractContainer; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.CreateContainerCmd; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static com.jayway.awaitility.Awaitility.await; + +/** + * Container running Elasticsearch + */ +public class ElasticsearchContainer extends AbstractContainer { + + public static final int CLIENT_PORT = 9200; + public static final int TRANSPORT_PORT = 9300; + public static final String VERSION = "1.7"; + public static final String CLUSTER_NAME = "test-" + System.currentTimeMillis(); + + private AtomicReference client; + + public ElasticsearchContainer(DockerClient dockerClient) { + super(dockerClient); + } + + @Override + protected void pullImage() { + pullImage("elasticsearch", VERSION); + } + + @Override + public String getName() { + return "elasticsearch-" + CLUSTER_NAME; + } + + @Override + protected CreateContainerCmd dockerCommand() { + return dockerClient.createContainerCmd("elasticsearch:" + VERSION).withCmd("elasticsearch", "-Des.cluster.name=\"" + CLUSTER_NAME + "\"", "-Des.discovery.zen.ping.multicast.enabled=false") + .withName(getName()); + } + + public void waitUntilHealthy() { + client = new AtomicReference<>(); + await().atMost(30, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS).until(() -> { + Client c = new TransportClient(ImmutableSettings.settingsBuilder().put("cluster.name", CLUSTER_NAME).build()).addTransportAddress(new InetSocketTransportAddress(getIpAddress(), TRANSPORT_PORT)); + try { + c.admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet(); + } catch (ElasticsearchException e) { + c.close(); + return false; + } + client.set(c); + return true; + }); + } + + public Client getClient() { + return client.get(); + } + + public String getClientUrl() { + return "http://" + getIpAddress() + ":" + CLIENT_PORT; + } +} From 4d1c2d38069fcf0791f537cbc5069e176ec9273f Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Thu, 28 Jan 2016 11:01:06 +0000 Subject: [PATCH 12/17] Fixed forward to elasticsearch test by reverting it --- .../mesos/logstash/scheduler/Features.java | 2 +- .../LogstashSchedulerContainer.java | 4 +- .../systemtest/DeploymentSystemTest.java | 149 +++++++++++------- 3 files changed, 94 insertions(+), 61 deletions(-) diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java index 4d98be1..b00f5e9 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/Features.java @@ -11,7 +11,7 @@ public class Features { private boolean collectd; private boolean syslog; private boolean file; - private boolean docker; + private boolean docker = true; private Boolean useIpAddress = false; public boolean isDocker() { diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java index d00c4cc..fd58b77 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java @@ -27,7 +27,7 @@ public class LogstashSchedulerContainer extends AbstractContainer { private String zookeeperIpAddress; private Optional elasticsearchUrl = Optional.empty(); private boolean withSyslog = false; - private boolean useDocker = false; + private boolean useDocker = true; public LogstashSchedulerContainer(DockerClient dockerClient, String zookeeperIpAddress) { super(dockerClient); @@ -74,7 +74,7 @@ protected CreateContainerCmd dockerCommand() { "--enable.docker=" + useDocker, withSyslog ? "--enable.syslog=true" : null ).stream().filter(StringUtils::isNotEmpty).collect(Collectors.joining(" ")); - + System.out.println("cmd = " + cmd); return dockerClient .createContainerCmd(SCHEDULER_IMAGE) .withName(SCHEDULER_NAME + "_" + new SecureRandom().nextInt()) diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index b6bed81..0596a08 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -1,49 +1,53 @@ package org.apache.mesos.logstash.systemtest; import com.containersol.minimesos.MesosCluster; +import com.containersol.minimesos.container.AbstractContainer; import com.containersol.minimesos.mesos.ClusterArchitecture; import com.containersol.minimesos.mesos.DockerClientFactory; -import com.containersol.minimesos.state.Framework; import com.containersol.minimesos.state.State; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.NotModifiedException; +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.model.Container; +import com.github.dockerjava.api.model.Link; +import com.github.dockerjava.core.command.PullImageResultCallback; import com.mashape.unirest.http.exceptions.UnirestException; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; import org.json.JSONArray; -import org.json.JSONObject; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.ConnectException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.util.Arrays; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static com.jayway.awaitility.Awaitility.await; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests whether the framework is deployed correctly @@ -121,11 +125,9 @@ private void waitForFramework() { } JSONArray tasks = frameworks.getJSONObject(0).getJSONArray("tasks"); - if (tasks.length() != 0) { - if (tasks.getJSONObject(0).getString("name").equals("logstash.task")) { - LOGGER.info("Logstash executor running"); - return true; - } + if (tasks.length() != 0 && tasks.getJSONObject(0).getString("name").equals("logstash.task")) { + LOGGER.info("Logstash executor running"); + return true; } LOGGER.info("Logstash executor not yet running"); @@ -134,59 +136,90 @@ private void waitForFramework() { } @Test - public void willForwardDataToElasticsearch() throws IOException, UnirestException, ExecutionException, InterruptedException { + public void willForwardDataToElasticsearch() throws JsonParseException, UnirestException, JsonMappingException, ExecutionException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); - final ElasticsearchContainer elasticsearchContainer = new ElasticsearchContainer(dockerClient); - cluster.addAndStartContainer(elasticsearchContainer, 60); + final String elasticsearchClusterName = "test-" + System.currentTimeMillis(); + final AbstractContainer elasticsearchInstance = new AbstractContainer(dockerClient) { + private final String version = "1.7"; - elasticsearchContainer.waitUntilHealthy(); - - LogstashSchedulerContainer logstashSchedulerContainer = new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, elasticsearchContainer.getClientUrl()); - logstashSchedulerContainer.setDocker(true); - scheduler = Optional.of(logstashSchedulerContainer); - scheduler.get().enableSyslog(); - cluster.addAndStartContainer(scheduler.get(), 60); - - waitForFramework(); - - String logline = "Hello " + RandomStringUtils.randomAlphanumeric(32); + @Override + protected void pullImage() { + pullImage("elasticsearch", version); + } - Thread thread = new Thread() { @Override - public void run() { - try { - DatagramSocket socket = null; - socket = new DatagramSocket(); - byte[] buf = logline.getBytes(); - InetAddress address = InetAddress.getByName("localhost"); - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, 514); - socket.send(packet); - } catch (IOException e) { - // Ignore - } + protected CreateContainerCmd dockerCommand() { + return dockerClient.createContainerCmd("elasticsearch:" + version).withCmd("elasticsearch", "-Des.cluster.name=\"" + elasticsearchClusterName + "\"", "-Des.discovery.zen.ping.multicast.enabled=false"); } }; - thread.run(); + cluster.addAndStartContainer(elasticsearchInstance, 9999); + + final int elasticsearchPort = 9300; - await().atMost(2, TimeUnit.MINUTES).pollInterval(2, TimeUnit.SECONDS).until(() -> { - SearchHits hits = elasticsearchContainer.getClient().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits(); - if (hits.totalHits() == 0) { - LOGGER.info("Log message not found in Elasticsearch on " + elasticsearchContainer.getClientUrl()); + final AtomicReference elasticsearchClient = new AtomicReference<>(); + await().atMost(30, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS).until(() -> { + Client c = new TransportClient(ImmutableSettings.settingsBuilder().put("cluster.name", elasticsearchClusterName).build()).addTransportAddress(new InetSocketTransportAddress(elasticsearchInstance.getIpAddress(), elasticsearchPort)); + try { + c.admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet(); + } catch (ElasticsearchException e) { + c.close(); return false; } - LOGGER.info("Log message found in Elasticsearch"); + elasticsearchClient.set(c); return true; }); + assertEquals(elasticsearchClusterName, elasticsearchClient.get().admin().cluster().health(Requests.clusterHealthRequest("_all")).actionGet().getClusterName()); - SearchHits hits = elasticsearchContainer.getClient().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits(); - String esMessage = hits.getAt(0).getFields().get("message").getValue(); - assertEquals(logline, esMessage.trim()); - String esMesosSlaveId = hits.getAt(0).getFields().get("mesos_slave_id").getValue(); - String trueSlaveId = cluster.getStateInfoJSON().getJSONArray("slaves").getJSONObject(0).getString("id"); - assertEquals(trueSlaveId, esMesosSlaveId.trim()); - } + scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200)); + scheduler.get().enableSyslog(); + cluster.addAndStartContainer(scheduler.get(), 9999); + + waitForFramework(); + + final String sysLogPort = "514"; + final String randomLogLine = "Hello " + RandomStringUtils.randomAlphanumeric(32); + + dockerClient.pullImageCmd("ubuntu:15.10").exec(new PullImageResultCallback()).awaitSuccess(); + final String logstashSlave = dockerClient.listContainersCmd().withSince(cluster.getSlaves()[0].getContainerId()).exec().stream().filter(container -> container.getImage().endsWith("/logstash-executor:latest")).findFirst().map(Container::getId).orElseThrow(() -> new AssertionError("Unable to find logstash container")); + await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).ignoreExceptions().until(() -> { + assertTrue(dockerClient.inspectContainerCmd(logstashSlave).exec().getState().isRunning()); + + final CreateContainerResponse loggerContainer = dockerClient.createContainerCmd("ubuntu:15.10").withLinks(new Link(logstashSlave, "logstash")).withCmd("logger", "--server=logstash", "--port=" + sysLogPort, "--udp", "--rfc3164", randomLogLine).exec(); + dockerClient.startContainerCmd(loggerContainer.getId()).exec(); + await().atMost(10, TimeUnit.SECONDS).until(() -> { + // TODO: this is a hack to determine whether the container has stopped. + // We should use ...exec().getState().getRunning() but docker-java doesn't provide that + // (even though it's available in the JSON provided by Docker). + final String finishedAt = dockerClient.inspectContainerCmd(loggerContainer.getId()).exec().getState().getFinishedAt(); + return StringUtils.isNotBlank(finishedAt) && !finishedAt.equals("0001-01-01T00:00:00Z"); + }); + final int exitCode = dockerClient.inspectContainerCmd(loggerContainer.getId()).exec().getState().getExitCode(); + dockerClient.removeContainerCmd(loggerContainer.getId()).exec(); + assertEquals(0, exitCode); + elasticsearchClient.get().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits().getAt(0).fields(); + }); + await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).until(() -> { + Map fields = elasticsearchClient.get().prepareSearch("logstash-*").setQuery(QueryBuilders.simpleQueryStringQuery("hello")).addField("message").addField("mesos_slave_id").execute().actionGet().getHits().getAt(0).fields(); + + String esMessage = fields.get("message").getValue(); + assertEquals(randomLogLine, esMessage.trim()); + String esMesosSlaveId = fields.get("mesos_slave_id").getValue(); + + String trueSlaveId; + try { + trueSlaveId = cluster.getStateInfoJSON().getJSONArray("slaves").getJSONObject(0).getString("id"); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals( + trueSlaveId, + esMesosSlaveId.trim() + ); + return true; + }); + } @Test public void willAddExecutorOnNewNodes() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); From bb66b723d019fb609a58c097547471fff0e4065c Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Thu, 28 Jan 2016 17:36:14 +0000 Subject: [PATCH 13/17] Corrected logstash version --- .../org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index 08c1c01..a50728b 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -24,7 +24,7 @@ public class TaskInfoBuilder { public static final Logger LOGGER = Logger.getLogger(TaskInfoBuilder.class); - private static final String LOGSTASH_VERSION = "2.1.1"; //TODO: Right version? + private static final String LOGSTASH_VERSION = "1.5.6"; @Inject private Clock clock; From ae7fd361b878c0417806e9a6db8d6ad540e7a886 Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Thu, 28 Jan 2016 17:51:36 +0000 Subject: [PATCH 14/17] Tarballs --- logstash-scheduler/build.gradle | 8 ++++---- .../org/apache/mesos/logstash/config/FrameworkConfig.java | 6 +++--- .../apache/mesos/logstash/scheduler/TaskInfoBuilder.java | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/logstash-scheduler/build.gradle b/logstash-scheduler/build.gradle index df8d27a..8b90e81 100644 --- a/logstash-scheduler/build.gradle +++ b/logstash-scheduler/build.gradle @@ -44,7 +44,7 @@ task copyExecutor(type: Copy) { } jar { - dependsOn 'getLogstashZip', 'copyExecutor' + dependsOn 'getLogstashTarball', 'copyExecutor' baseName = "logstash-mesos-scheduler" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } // Include dependencies @@ -61,9 +61,9 @@ jar { } import de.undercouch.gradle.tasks.download.Download -task getLogstashZip(type: Download) { - src "https://download.elastic.co/logstash/logstash/logstash-" + "$logstashVer" + ".zip" - dest new File(buildDir, './resources/main/public/logstash.zip') +task getLogstashTarball(type: Download) { + src "https://download.elastic.co/logstash/logstash/logstash-" + "$logstashVer" + ".tar.gz" + dest new File(buildDir, './resources/main/public/logstash.tar.gz') onlyIfNewer true } diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java index 1760fb1..499a600 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/config/FrameworkConfig.java @@ -16,7 +16,7 @@ public class FrameworkConfig { private static final String LOGSTASH_EXECUTOR_JAR = "logstash-mesos-executor.jar"; - private static final String LOGSTASH_ZIP = "logstash.zip"; + private static final String LOGSTASH_TARBALL = "logstash.tar.gz"; @NotNull @Pattern(regexp = "^zk://.+$") @@ -129,8 +129,8 @@ public String getFrameworkFileServerAddress() { return networkUtils.addressToString(networkUtils.hostSocket(8080), true); } - public String getLogstashZipUri() { - return getFrameworkFileServerAddress() + "/" + FrameworkConfig.LOGSTASH_ZIP; + public String getLogstashTarballUri() { + return getFrameworkFileServerAddress() + "/" + FrameworkConfig.LOGSTASH_TARBALL; } public String getLogstashExecutorUri() { diff --git a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java index a50728b..7ff578a 100644 --- a/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java +++ b/logstash-scheduler/src/main/java/org/apache/mesos/logstash/scheduler/TaskInfoBuilder.java @@ -103,7 +103,7 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer) { executorEnvVars.getList())) .setValue(frameworkConfig.getExecutorCommand()) .addAllUris(Arrays.asList( - Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashZipUri()).build(), + Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashTarballUri()).build(), Protos.CommandInfo.URI.newBuilder().setValue(frameworkConfig.getLogstashExecutorUri()).build() )); From fe9f1239d21599dc9f08aed95e184d3ee0fec6e4 Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Fri, 29 Jan 2016 12:07:05 +0000 Subject: [PATCH 15/17] Fixing file input on jarmode --- .../org/apache/mesos/logstash/executor/LogstashService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java index 62dd24e..76dd1ac 100644 --- a/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java +++ b/logstash-executor/src/main/java/org/apache/mesos/logstash/executor/LogstashService.java @@ -44,7 +44,7 @@ private static String serialize(LogstashProtos.LogstashConfiguration logstashCon List inputPlugins = optionalValuesToList( ofConditional(logstashConfiguration.getLogstashPluginInputSyslog(), LogstashProtos.LogstashPluginInputSyslog::isInitialized).map(config -> LS.plugin("syslog", LS.map(LS.kv("port", LS.number(config.getPort()))))), ofConditional(logstashConfiguration.getLogstashPluginInputCollectd(), LogstashProtos.LogstashPluginInputCollectd::isInitialized).map(config -> LS.plugin("udp", LS.map(LS.kv("port", LS.number(config.getPort())), LS.kv("buffer_size", LS.number(1452)), LS.kv("codec", LS.plugin("collectd", LS.map()))))), - ofConditional(logstashConfiguration.getLogstashPluginInputFile(), LogstashProtos.LogstashPluginInputFile::isInitialized).map(config -> LS.plugin("file", LS.map(LS.kv("path", LS.array(config.getPathList().stream().map(path -> "/logstashpaths" + path).map(LS::string).toArray(LS.Value[]::new)))))) + ofConditional(logstashConfiguration.getLogstashPluginInputFile(), LogstashProtos.LogstashPluginInputFile::isInitialized).map(config -> LS.plugin("file", LS.map(LS.kv("path", LS.array(config.getPathList().stream().map(path -> (isRunningInDocker() ? "/logstashpaths" : "") + path).map(LS::string).toArray(LS.Value[]::new)))))) ); List filterPlugins = Arrays.asList( @@ -87,6 +87,10 @@ private static String serialize(LogstashProtos.LogstashConfiguration logstashCon ).serialize(); } + private static boolean isRunningInDocker() { + return System.getenv().containsKey("MESOS_CONTAINER_NAME"); + } + private static T[] filterEmpties(Class type, Optional... optionals) { return Arrays.stream(optionals).filter(Optional::isPresent).map(Optional::get).toArray(size -> (T[]) Array.newInstance(type, size)); } From c1b23fa1d2c032ef4581d85164b05d31102fdf58 Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Fri, 29 Jan 2016 13:57:37 +0000 Subject: [PATCH 16/17] No need over debugging that anymore --- .../logstash/systemtest/LogstashSchedulerContainer.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java index 7c7a9a7..2a1ff42 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/LogstashSchedulerContainer.java @@ -30,12 +30,6 @@ public class LogstashSchedulerContainer extends AbstractContainer { private final Optional mesosRole; private boolean useDocker = true; - @Override - public void start(int timeout) { - super.start(timeout); - System.out.println("LogstashSchedulerContainer.start"); - } - public LogstashSchedulerContainer(DockerClient dockerClient, String zookeeperIpAddress, String mesosRole, String elasticsearchUrl) { super(dockerClient); this.zookeeperIpAddress = zookeeperIpAddress; From ffcb95d130211173a98f617e999ba68dfdfc9a15 Mon Sep 17 00:00:00 2001 From: Martin Westergaard Lassen Date: Fri, 29 Jan 2016 14:12:17 +0000 Subject: [PATCH 17/17] Reverted minimesos to 0.5.0 --- build.gradle | 2 +- .../org/apache/mesos/logstash/systemtest/Main.java | 2 +- .../systemtest/AbstractLogstashFrameworkTest.java | 2 +- .../logstash/systemtest/DeploymentSystemTest.java | 12 ++++++------ .../mesos/logstash/systemtest/LocalCluster.java | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index c662cae..c85278e 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ ext { // System test dockerJavaVer = "4f094c112"; unirestVer = "1.4.7" - minimesosVer = "db0abe511995bd87d1c76747f9e7417d1f875050" + minimesosVer = "0.5.0" elasticsearchVer = "1.7.3" javaxInjectVer = "1"; diff --git a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java index 92e6d8f..980841e 100644 --- a/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java +++ b/system-test/src/main/java/org/apache/mesos/logstash/systemtest/Main.java @@ -22,7 +22,7 @@ public static void main(String[] args) throws InterruptedException { MesosCluster cluster = new MesosCluster(ClusterUtil.withSlaves(1, zooKeeper -> new LogstashMesosSlave(dockerClient, zooKeeper)).withMaster().build()); - cluster.start(5); + cluster.start(); /* LOGGER.info("Starting scheduler"); diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java index 3a17bdf..655218c 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/AbstractLogstashFrameworkTest.java @@ -61,7 +61,7 @@ public static void publishExecutorInMesosCluster() throws IOException { clusterDockerClient = DockerClientBuilder.getInstance(dockerConfigBuilder.build()).build(); LogstashSchedulerContainer schedulerContainer = new LogstashSchedulerContainer(clusterDockerClient, cluster.getMesosMasterContainer().getIpAddress(), null, null); - schedulerContainer.start(60); + schedulerContainer.start(); } @Before diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java index 010482f..58a4687 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/DeploymentSystemTest.java @@ -91,7 +91,7 @@ public void testDeploymentDocker() throws JsonParseException, UnirestException, String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, null, null)); scheduler.get().setDocker(true); - cluster.addAndStartContainer(scheduler.get(), 60); + cluster.addAndStartContainer(scheduler.get()); waitForFramework(); } @@ -100,7 +100,7 @@ public void testDeploymentDocker() throws JsonParseException, UnirestException, public void testDeploymentJar() throws JsonParseException, UnirestException, JsonMappingException { String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, null, null)); - cluster.addAndStartContainer(scheduler.get(), 60); + cluster.addAndStartContainer(scheduler.get()); waitForFramework(); } @@ -142,7 +142,7 @@ protected CreateContainerCmd dockerCommand() { return dockerClient.createContainerCmd("elasticsearch:" + version).withCmd("elasticsearch", "-Des.cluster.name=\"" + elasticsearchClusterName + "\"", "-Des.discovery.zen.ping.multicast.enabled=false"); } }; - cluster.addAndStartContainer(elasticsearchInstance, 9999); + cluster.addAndStartContainer(elasticsearchInstance); final int elasticsearchPort = 9300; @@ -162,7 +162,7 @@ protected CreateContainerCmd dockerCommand() { scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "logstash", "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200)); scheduler.get().enableSyslog(); - cluster.addAndStartContainer(scheduler.get(), 9999); + cluster.addAndStartContainer(scheduler.get()); waitForFramework(); @@ -214,11 +214,11 @@ public void willAddExecutorOnNewNodes() throws JsonParseException, UnirestExcept String zookeeperIpAddress = cluster.getZkContainer().getIpAddress(); scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, null, null)); scheduler.get().setDocker(true); - cluster.addAndStartContainer(scheduler.get(), 60); + cluster.addAndStartContainer(scheduler.get()); waitForFramework(); - IntStream.range(0, 2).forEach(value -> cluster.addAndStartContainer(new LogstashMesosSlave(dockerClient, cluster.getZkContainer()), 60)); + IntStream.range(0, 2).forEach(value -> cluster.addAndStartContainer(new LogstashMesosSlave(dockerClient, cluster.getZkContainer()))); await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until( () -> State.fromJSON(cluster.getStateInfoJSON().toString()).getFramework("logstash").getTasks().stream().filter(task -> task.getState().equals("TASK_RUNNING")).count() == 3 diff --git a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java index 0b02743..cb3881c 100644 --- a/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java +++ b/system-test/src/test/java/org/apache/mesos/logstash/systemtest/LocalCluster.java @@ -40,7 +40,7 @@ private void run() throws Exception { DummyFrameworkContainer dummyFrameworkContainer = new DummyFrameworkContainer( clusterDockerClient, "dummy-framework"); - dummyFrameworkContainer.start(5); + dummyFrameworkContainer.start(); System.setProperty("mesos.zk", cluster.getZkUrl()); System.setProperty("mesos.logstash.logstash.heap.size", "128");