Skip to content

Commit

Permalink
Filter instances that do not have a task (#34)
Browse files Browse the repository at this point in the history
Co-authored-by: Julien DOCHE <[email protected]>
Co-authored-by: Julien Pepy <[email protected]>
  • Loading branch information
3 people authored Sep 30, 2020
1 parent 96dbf34 commit 01edaad
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ private[health] class HealthCheckActor(
}
}

def getActiveTaskForInstances(instances: Seq[Instance]): Set[Task.Id] = {
instances.filter(_.tasksMap.size != 0).map(_.appTask).filter(_.isActive).map(_.taskId)(collection.breakOut)
}

def purgeStatusOfDoneInstances(instances: Seq[Instance]): Unit = {
logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}")

val activeTaskIds: Set[Task.Id] = instances.map(_.appTask).filter(_.isActive).map(_.taskId)(collection.breakOut)
val activeTaskIds: Set[Task.Id] = getActiveTaskForInstances(instances)
healthByTaskId.retain((taskId, health) => activeTaskIds(taskId))
// FIXME: I discovered this is unsafe since killingInFlight might be used in 2 concurrent threads (see preStart method above)
killingInFlight &= activeTaskIds
Expand Down Expand Up @@ -107,12 +111,12 @@ private[health] class HealthCheckActor(
if (instance.isUnreachable) {
logger.info(s"Instance $instanceId on host ${instance.hostname} is temporarily unreachable. Performing no kill.")
} else {
require(instance.tasksMap.size == 1, "Unexpected pod instance in HealthCheckActor")
if (antiSnowballEnabled && !(checkEnoughInstancesRunning(instance))) {
logger.info(s"[anti-snowball] app ${app.id} version ${app.version} Won't kill $instanceId because too few instances are running")
return
}
logger.info(s"Send kill request for $instanceId on host ${instance.hostname.getOrElse("unknown")} to driver")
require(instance.tasksMap.size == 1, "Unexpected pod instance in HealthCheckActor")
val taskId = instance.appTask.taskId
eventBus.publish(
UnhealthyInstanceKillEvent(
Expand Down Expand Up @@ -140,8 +144,7 @@ private[health] class HealthCheckActor(
/** Check if enough active and ready instances will remain if we kill 1 unhealthy instance */
def checkEnoughInstancesRunning(unhealthyInstance: Instance): Boolean = {
val instances: Seq[Instance] = instanceTracker.specInstancesSync(app.id)
// val activeInstanceIds: Set[Instance.Id] = instances.withFilter(_.isActive).map(_.instanceId)(collection.breakOut)
val activeTaskIds: Set[Task.Id] = instances.map(_.appTask).filter(_.isActive).map(_.taskId)(collection.breakOut)
val activeTaskIds: Set[Task.Id] = getActiveTaskForInstances(instances)
val healthyInstances = healthByTaskId.filterKeys(activeTaskIds)
.filterKeys(taskId => !killingInFlight(taskId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HealthCheckActorTest extends AkkaUnitTest {
val task: Task = instance.appTask

val unreachableInstance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance()
val unscheduledInstance = TestInstanceBuilder.newBuilder(appId).getInstance()
val lostInstance = TestInstanceBuilder.newBuilder(appId).addTaskLost().getInstance()

val healthCheckWorkerHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed] =
Expand Down Expand Up @@ -134,6 +135,17 @@ class HealthCheckActorTest extends AkkaUnitTest {
verifyNoMoreInteractions(f.scheduler, f.killService)
}

"unscheduled instance should not make actor crash" in {
val f = new Fixture
val healthyInstances = Seq.tabulate(9)(_ => f.runningInstance())
val instances = healthyInstances.union(Seq(f.unscheduledInstance))
val actor = f.actor(MarathonHttpHealthCheck(maxConsecutiveFailures = 3, portIndex = Some(PortReference(0))), healthyInstances)

noException shouldBe thrownBy {
actor.underlyingActor.purgeStatusOfDoneInstances(instances)
}
}

"task should always be killed if application doesn't set upgradeStrategy.minimumHealthCapacity" in {
val f = new Fixture
val healthyInstances = Seq.tabulate(9)(_ => f.runningInstance())
Expand Down

0 comments on commit 01edaad

Please sign in to comment.