Skip to content

Commit

Permalink
Merge pull request #87 from mesos/feature/jarMode
Browse files Browse the repository at this point in the history
Jar mode
  • Loading branch information
mwl committed Jan 29, 2016
2 parents cc2153f + ffcb95d commit cb68021
Show file tree
Hide file tree
Showing 20 changed files with 525 additions and 101 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ ext {
elasticsearchVer = "1.7.3"
javaxInjectVer = "1";

// Scheduler
logstashVer = "1.5.6"

// Executor
slf4jVer = "1.7.13"
commonsCompressVer = "1.10"
Expand Down
5 changes: 5 additions & 0 deletions logstash-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ task taskCopyFilesForDocker(type: Copy) {
}

dependencies {
compile 'ch.qos.logback:logback-classic:1.1.3'
compile "com.google.protobuf:protobuf-java:$protobufVer"
compile "javax.inject:javax.inject:$javaxInjectVer"
compile "org.springframework:spring-context:4.1.7.RELEASE"
compile "org.apache.commons:commons-exec:1.3"
compile "org.apache.commons:commons-lang3:3.4"
compile "commons-validator:commons-validator:1.5.0"
}

idea {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.apache.mesos.logstash.common.network;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;

/**
* Utilities to help with networking
*/
@SuppressWarnings("PMD.AvoidUsingHardCodedIP")
@Service
public class NetworkUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetworkUtils.class);
public static final String DOCKER_MACHINE_IP = "docker-machine ip";
public static final String LOCALHOST = "127.0.0.1";
public static final String DOCKER_MACHINE_NAME = "DOCKER_MACHINE_NAME";

public InetAddress hostAddress() {
try {
return InetAddress.getLocalHost();
} catch (UnknownHostException e) {
LOG.error("", e);
throw new RuntimeException("Unable to bind to local host.");
}
}

public InetSocketAddress hostSocket(int port) {
return new InetSocketAddress(hostAddress(), port);
}

public String addressToString(InetSocketAddress address, Boolean useIpAddress) {
if (useIpAddress) {
return "http://" + address.getAddress().getHostAddress() + ":" + address.getPort();
} else {
return "http://" + address.getAddress().getHostName() + ":" + address.getPort();
}
}

public String getDockerMachineName(Map<String, String> environment) {
String envVar = DOCKER_MACHINE_NAME;
String dockerMachineName = environment.getOrDefault(envVar, "");
if (dockerMachineName == null || dockerMachineName.isEmpty()) {
LOG.debug("The environmental variable DOCKER_MACHINE_NAME was not found. Using docker0 address.");
}
return dockerMachineName;
}

public String getDockerHostIpAddress(Map<String, String> environment) {
String ipAddress = LOCALHOST; // Default of localhost
String dockerMachineName = getDockerMachineName(environment);

if (!dockerMachineName.isEmpty()) {
LOG.debug("Docker machine name = " + dockerMachineName);
CommandLine commandline = CommandLine.parse(DOCKER_MACHINE_IP);
commandline.addArgument(dockerMachineName);
LOG.debug("Running exec: " + commandline.toString());
try {
ipAddress = StringUtils.strip(runCommand(commandline));
} catch (IOException e) {
LOG.error("Unable to run exec command to find ip address.", e);
}
} else {
ipAddress = getDocker0AdapterIPAddress();
}
LOG.debug("Returned IP address: " + ipAddress);
return ipAddress;
}

public Map<String, String> getEnvironment() {
Map<String, String> env = Collections.emptyMap();
try {
env = EnvironmentUtils.getProcEnvironment();
} catch (IOException e) {
LOG.error("Unable to get environmental variables", e);
}
return env;
}

public String runCommand(CommandLine commandline) throws IOException {
DefaultExecutor exec = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
exec.setStreamHandler(streamHandler);
exec.execute(commandline);
return outputStream.toString(Charset.defaultCharset().name());
}

public String getDocker0AdapterIPAddress() {
InetAddress docker0 = getLocalAddress("docker0");
if (docker0 == null) {
LOG.error("Could not get address for docker0");
return LOCALHOST;
} else {
return docker0.getHostAddress();
}
}

private InetAddress getLocalAddress(String adaptorName){
try {
Enumeration<NetworkInterface> b = NetworkInterface.getNetworkInterfaces();
while (b.hasMoreElements()) {
NetworkInterface networkInterface = b.nextElement();
if (networkInterface.getName().equals(adaptorName)) {
for (InterfaceAddress f : networkInterface.getInterfaceAddresses()) {
if (f.getAddress().isSiteLocalAddress()) {
return f.getAddress();
}
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.mesos.logstash.common.zookeeper.network;

import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.mesos.logstash.common.network.NetworkUtils;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
*/
public class NetworkUtilsTest {

public static boolean validate(final String ip) {
return InetAddressValidator.getInstance().isValid(ip);
}

private NetworkUtils networkUtils = new NetworkUtils();

@Test
// Note: On OSX, when not connected to a network, it will return a IPv6 address, which will not validate properly.
// Please connect to a network to obtain a IPv4 address.
public void shouldProvideIPAddress() {
int port = 1234;
String string = networkUtils.addressToString(networkUtils.hostSocket(port), true);
assertTrue(validate(string.replace("http://", "").replace(":" + port, "")));
}


@Test
public void shouldProvideHostname() {
int port = 1234;
String string = networkUtils.addressToString(networkUtils.hostSocket(port), false);
assertFalse(validate(string.replace("http://", "").replace(":" + port, "")));
}

@Test
public void shouldGetDockerIPAddress() throws IOException {
// Should always be either a valid IP or 127.0.0.1
assertTrue(validate( networkUtils.getDockerHostIpAddress(networkUtils.getEnvironment())));
}

@Test
public void shouldReturnLocahostOrDocker0AddressWhenNoEnvVar() {
if ( networkUtils.getDockerHostIpAddress(Collections.emptyMap()).equals(NetworkUtils.LOCALHOST)) {
assertEquals(NetworkUtils.LOCALHOST, networkUtils.getDockerHostIpAddress(Collections.emptyMap()));
} else {
assertEquals(networkUtils.getDocker0AdapterIPAddress(), networkUtils.getDockerHostIpAddress(Collections.emptyMap()));
}
}

@Test
public void shouldReturnDockerMachineNameWhenIncluded() {
HashMap<String, String> map = new HashMap<>();
String dev = "dev";
map.put(NetworkUtils.DOCKER_MACHINE_NAME, dev);
assertEquals(dev, networkUtils.getDockerMachineName(map));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ public void launchTask(final ExecutorDriver driver, final Protos.TaskInfo task)
LOGGER.info("Forked thread with LogstashService.run()");
try {
logstashService.run(logstashConfiguration);
LOGGER.error("LogstashService finished");
LOGGER.info("LogstashService finished");
driver.sendStatusUpdate(Protos.TaskStatus.newBuilder()
.setExecutorId(task.getExecutor().getExecutorId())
.setTaskId(task.getTaskId())
.setState(Protos.TaskState.TASK_FINISHED).build());
} catch (Exception e) {
LOGGER.error("Logstash service failed", e);
driver.sendStatusUpdate(Protos.TaskStatus.newBuilder()
.setExecutorId(task.getExecutor().getExecutorId())
.setTaskId(task.getTaskId())
.setState(Protos.TaskState.TASK_FAILED)
.setMessage(e.getCause().getMessage()).build());
}

driver.sendStatusUpdate(Protos.TaskStatus.newBuilder()
.setExecutorId(task.getExecutor().getExecutorId())
.setTaskId(task.getTaskId())
.setState(Protos.TaskState.TASK_FAILED).build());
driver.stop();
});
thread.setDaemon(true);
Expand All @@ -71,44 +75,17 @@ public void killTask(ExecutorDriver driver, Protos.TaskID taskId) {

@Override
public void frameworkMessage(ExecutorDriver driver, byte[] data) {
System.out.println("LogstashExecutor.frameworkMessage");
LOGGER.info("LogstashExecutor.frameworkMessage");
try {
SchedulerMessage message = SchedulerMessage.parseFrom(data);

LOGGER.info("SchedulerMessage. message={}", message);

/*
if (message.getType().equals(REQUEST_STATS)) {
sendStatsToScheduler(driver);
} else {
updateConfig(message);
}
*/
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Error parsing framework message from scheduler.", e);
}
}

/*
private void updateConfig(SchedulerMessage message) {
LOGGER.info("New configuration received. Reconfiguring...");
// TODO extract config and update service:
// logstashService.update(514, "elasticsearch.service:9200");
}
*/

/*
private void sendStatsToScheduler(ExecutorDriver driver) {
driver.sendFrameworkMessage(liveState.getStateAsExecutorMessage().toByteArray());
}
*/
@Override
public void shutdown(ExecutorDriver driver) {
// The task i killed automatically, so we don't have to
// do anything.
LOGGER.info("Shutting down framework.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class LogstashService {

public static final Logger LOGGER = LoggerFactory.getLogger(LogstashService.class);

public static final String LOGSTASH_PATH = System.getenv("LOGSTASH_PATH");

private static <T> Optional<T> ofConditional(T message, Predicate<T> predicate) {
if (message != null && predicate.test(message)) {
return Optional.of(message);
Expand All @@ -42,7 +44,7 @@ private static String serialize(LogstashProtos.LogstashConfiguration logstashCon
List<LS.Plugin> inputPlugins = optionalValuesToList(
ofConditional(logstashConfiguration.getLogstashPluginInputSyslog(), LogstashProtos.LogstashPluginInputSyslog::isInitialized).map(config -> LS.plugin("syslog", LS.map(LS.kv("port", LS.number(config.getPort()))))),
ofConditional(logstashConfiguration.getLogstashPluginInputCollectd(), LogstashProtos.LogstashPluginInputCollectd::isInitialized).map(config -> LS.plugin("udp", LS.map(LS.kv("port", LS.number(config.getPort())), LS.kv("buffer_size", LS.number(1452)), LS.kv("codec", LS.plugin("collectd", LS.map()))))),
ofConditional(logstashConfiguration.getLogstashPluginInputFile(), LogstashProtos.LogstashPluginInputFile::isInitialized).map(config -> LS.plugin("file", LS.map(LS.kv("path", LS.array(config.getPathList().stream().map(path -> "/logstashpaths" + path).map(LS::string).toArray(LS.Value[]::new))))))
ofConditional(logstashConfiguration.getLogstashPluginInputFile(), LogstashProtos.LogstashPluginInputFile::isInitialized).map(config -> LS.plugin("file", LS.map(LS.kv("path", LS.array(config.getPathList().stream().map(path -> (isRunningInDocker() ? "/logstashpaths" : "") + path).map(LS::string).toArray(LS.Value[]::new))))))
);

List<LS.Plugin> filterPlugins = Arrays.asList(
Expand Down Expand Up @@ -85,6 +87,10 @@ private static String serialize(LogstashProtos.LogstashConfiguration logstashCon
).serialize();
}

private static boolean isRunningInDocker() {
return System.getenv().containsKey("MESOS_CONTAINER_NAME");
}

private static <T> T[] filterEmpties(Class<T> type, Optional<T>... optionals) {
return Arrays.stream(optionals).filter(Optional::isPresent).map(Optional::get).toArray(size -> (T[]) Array.newInstance(type, size));
}
Expand All @@ -100,7 +106,7 @@ public void run(LogstashProtos.LogstashConfiguration logstashConfiguration) {
Process process;
try {
String[] command = {
"/opt/logstash/bin/logstash",
LOGSTASH_PATH,
"--log", "/var/log/logstash.log",
"-e", serialize(logstashConfiguration)
};
Expand Down
Loading

0 comments on commit cb68021

Please sign in to comment.