Skip to content

Commit

Permalink
Merge pull request #99 from mesos/informative-decline-logging
Browse files Browse the repository at this point in the history
More informative logging of offer decline reasons
  • Loading branch information
mwl committed Jan 28, 2016
2 parents 2477154 + eb3927d commit cc2153f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.synchronizedCollection;
Expand Down Expand Up @@ -152,10 +150,10 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers)
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Received offerId={}: {}", offerId, flattenProtobufString(offer.toString()));
}

final OfferStrategy.OfferResult result = offerStrategy.evaluate(clusterState, offer);

if (result.acceptable) {
if (result.acceptable()) {
LOGGER.info("Accepting offer offerId={}", offerId);

TaskInfo taskInfo = taskInfoBuilder.buildTask(offer);
Expand All @@ -166,7 +164,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers)

clusterState.addTask(taskInfo);
} else {
LOGGER.info("Declined offer with offerId={} with reason={}", offerId, result.reason.orElse("UNKNOWN"));
LOGGER.debug("Declined offer offerId={} because: " + result.complaints.stream().collect(Collectors.joining("; ")));
schedulerDriver.declineOffer(offer.getId());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Arrays.asList;

Expand All @@ -31,12 +34,7 @@ public class OfferStrategy {
@Inject
private Features features;

private List<OfferRule> acceptanceRules = asList(
new OfferRule("Host already running task", this::isHostAlreadyRunningTask),
new OfferRule("Offer did not have enough CPU resources", this::isNotEnoughCPU),
new OfferRule("Offer did not have enough RAM resources", this::isNotEnoughRAM),
new OfferRule("Offer did not have ports available", this::isNotWithNeededPorts)
);
private List<Rule> acceptanceRules = asList(this::hostRule, this::cpuRule, this::ramRule, this::portsRule);

private List<Integer> neededPorts() {
final ArrayList<Integer> ports = new ArrayList<>();
Expand All @@ -50,9 +48,13 @@ private List<Integer> neededPorts() {
}

public OfferResult evaluate(ClusterState clusterState, Protos.Offer offer) {
final Optional<OfferRule> decline = acceptanceRules.stream().filter(offerRule -> offerRule.rule.accepts(clusterState, offer)).limit(1).findFirst();
if (decline.isPresent()) {
return OfferResult.decline(decline.get().declineReason);
final List<String> complaints =
acceptanceRules
.stream()
.flatMap(rule -> rule.complaintsFor(clusterState, offer))
.collect(Collectors.toList());
if (!complaints.isEmpty()) {
return OfferResult.decline(complaints);
}

return OfferResult.accept();
Expand All @@ -62,76 +64,70 @@ public OfferResult evaluate(ClusterState clusterState, Protos.Offer offer) {
* Offer result
*/
public static class OfferResult {
final boolean acceptable;
final Optional<String> reason;
final List<String> complaints;

private OfferResult(boolean acceptable, Optional<String> reason) {
this.acceptable = acceptable;
this.reason = reason;
private OfferResult(List<String> complaints) {
this.complaints = complaints;
}

public boolean acceptable() {
return complaints.isEmpty();
}

public static OfferResult accept() {
return new OfferResult(true, Optional.<String>empty());
return new OfferResult(Collections.emptyList());
}

public static OfferResult decline(String reason) {
return new OfferResult(false, Optional.of(reason));
public static OfferResult decline(List<String> complaints) {
return new OfferResult(complaints);
}
}

private boolean isHostAlreadyRunningTask(ClusterState clusterState, Protos.Offer offer) {
return clusterState.getTaskList().stream().anyMatch(taskInfo -> taskInfo.getSlaveId().equals(offer.getSlaveId()));
private Stream<String> hostRule(ClusterState clusterState, Protos.Offer offer) {
return clusterState
.getTaskList()
.stream()
.filter(taskInfo -> taskInfo.getSlaveId().equals(offer.getSlaveId()))
.map(taskInfo -> "host " + taskInfo.getSlaveId().getValue() + " is already running task " + taskInfo.getTaskId().getValue());
}

private boolean hasEnoughOfResourceType(List<Protos.Resource> resources, String resourceName, double minSize) {
for (Protos.Resource resource : resources) {
if (resourceName.equals(resource.getName())) {
return resource.getScalar().getValue() >= minSize;
}
private Stream<String> complaintsForResourceType(List<Protos.Resource> resources, String resourceName, double minSize) {
double totalSize = resources.stream().filter(resource -> resource.getName().equals(resourceName)).collect(Collectors.summingDouble(resource -> resource.getScalar().getValue()));
if (totalSize < minSize) {
return Collections.singletonList("required minimum " + minSize + " " + resourceName + " but offer only has " + totalSize + " in total").stream();
} else {
return Arrays.asList(new String[]{}).stream();
}

return false;
}

private boolean isNotEnoughCPU(ClusterState clusterState, Protos.Offer offer) {
return !hasEnoughOfResourceType(offer.getResourcesList(), "cpus", executorConfig.getCpus());
private Stream<String> cpuRule(ClusterState clusterState, Protos.Offer offer) {
return complaintsForResourceType(offer.getResourcesList(), "cpus", executorConfig.getCpus());
}

private boolean isNotEnoughRAM(ClusterState clusterState, Protos.Offer offer) {
return !hasEnoughOfResourceType(offer.getResourcesList(), "mem", executorConfig.getHeapSize() + logstashConfig.getHeapSize() + executorConfig.getOverheadMem());
private Stream<String> ramRule(ClusterState clusterState, Protos.Offer offer) {
return complaintsForResourceType(offer.getResourcesList(), "mem", executorConfig.getHeapSize() + logstashConfig.getHeapSize() + executorConfig.getOverheadMem());
}

private boolean isNotWithNeededPorts(ClusterState clusterState, Protos.Offer offer) {
return !neededPorts().stream()
.allMatch(
private Stream<String> portsRule(ClusterState clusterState, Protos.Offer offer) {
return neededPorts()
.stream()
.filter(
port -> offer.getResourcesList().stream()
.filter(Protos.Resource::hasRanges) // TODO: 23/11/2015 Check wether this can be removed
.anyMatch(resource -> portIsInRanges(port, resource.getRanges()))
);

.noneMatch(resource -> portIsInRanges(port, resource.getRanges()))
)
.map(port -> "required port " + port + " but was not in offer");
}

private boolean portIsInRanges(int port, Protos.Value.Ranges ranges) {
return ranges.getRangeList().stream().anyMatch(range -> new LongRange(range.getBegin(), range.getEnd()).containsLong(port));
}
/**
* Rule and reason container object
*/
private static class OfferRule {
String declineReason;
Rule rule;

public OfferRule(String declineReason, Rule rule) {
this.declineReason = declineReason;
this.rule = rule;
}
}

/**
* Interface for checking offers
*/
@FunctionalInterface
private interface Rule {
boolean accepts(ClusterState clusterState, Protos.Offer offer);
Stream<String> complaintsFor(ClusterState clusterState, Protos.Offer offer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.Optional;
import java.util.Arrays;

import static java.util.Collections.singletonList;
import static org.apache.mesos.logstash.scheduler.Resources.*;
Expand All @@ -38,8 +38,8 @@ public void willDeclineOfferIfHostIsAlreadyRunningTask() throws Exception {
when(clusterState.getTaskList()).thenReturn(singletonList(createTask("host1")));

final OfferStrategy.OfferResult result = offerStrategy.evaluate(clusterState, validOffer("host1"));
assertFalse(result.acceptable);
assertEquals("Host already running task", result.reason.get());
assertFalse(result.acceptable());
assertEquals(Arrays.asList("host host1 is already running task TestId"), result.complaints);
}

@Test
Expand All @@ -48,8 +48,8 @@ public void willDeclineOfferIfOfferDoesNotHaveEnoughCpu() throws Exception {
when(executorConfig.getCpus()).thenReturn(1.0);

final OfferStrategy.OfferResult result = offerStrategy.evaluate(clusterState, baseOfferBuilder("host2").addResources(cpus(0.9, FRAMEWORK_ROLE)).build());
assertFalse(result.acceptable);
assertEquals("Offer did not have enough CPU resources", result.reason.get());
assertFalse(result.acceptable());
assertEquals(Arrays.asList("required minimum 1.0 cpus but offer only has 0.9 in total"), result.complaints);
}

@Test
Expand All @@ -58,8 +58,8 @@ public void willDeclineOfferIfOfferDoesNotHaveEnoughMem() throws Exception {
when(executorConfig.getHeapSize()).thenReturn(2048);

final OfferStrategy.OfferResult result = offerStrategy.evaluate(clusterState, baseOfferBuilder("host2").addResources(cpus(1.0, FRAMEWORK_ROLE)).build());
assertFalse(result.acceptable);
assertEquals("Offer did not have enough RAM resources", result.reason.get());
assertFalse(result.acceptable());
assertEquals(Arrays.asList("required minimum 2048.0 mem but offer only has 0.0 in total"), result.complaints);
}

@Test
Expand All @@ -76,8 +76,8 @@ public void willDeclineOfferIfOfferDoesNotHaveNeededPorts() throws Exception {
.addResources(cpus(1.0, FRAMEWORK_ROLE))
.addResources(mem(512, FRAMEWORK_ROLE))
.build());
assertFalse(result.acceptable);
assertEquals("Offer did not have ports available", result.reason.get());
assertFalse(result.acceptable());
assertEquals(Arrays.asList("required port 514 but was not in offer", "required port 25826 but was not in offer"), result.complaints);
}

@Test
Expand All @@ -96,8 +96,7 @@ public void willAcceptValidOffer() throws Exception {
.addResources(mem(512, FRAMEWORK_ROLE))
.addResources(portRange(1, 25826, FRAMEWORK_ROLE))
.build());
assertTrue(result.acceptable);
assertFalse(result.reason.isPresent());
assertTrue(result.acceptable());
}

@Test
Expand All @@ -114,8 +113,7 @@ public void willAcceptValidOfferFromCommonPool() throws Exception {
.addResources(mem(512, "*"))
.addResources(portRange(514, 514, "*"))
.build());
assertEquals(Optional.empty(), result.reason);
assertTrue(result.acceptable);
assertTrue(result.acceptable());
}

@Test
Expand All @@ -130,8 +128,7 @@ public void willAcceptValidOfferWhenNoPortsAreNeeded() throws Exception {
.addResources(cpus(1.0, FRAMEWORK_ROLE))
.addResources(mem(512, FRAMEWORK_ROLE))
.build());
assertTrue(result.acceptable);
assertFalse(result.reason.isPresent());
assertTrue(result.acceptable());
}

private Protos.TaskInfo createTask(String hostname) throws InvalidProtocolBufferException {
Expand Down

0 comments on commit cc2153f

Please sign in to comment.