Skip to content

Commit

Permalink
Fix fullscan index issue on tasks health (#64)
Browse files Browse the repository at this point in the history
As it was very slow with several thousands of tasks/instances, this uses two indexes instead.

Co-authored-by: Jean-Baptiste Catté <[email protected]>
  • Loading branch information
jbkt and Jean-Baptiste Catté committed Mar 10, 2023
1 parent 05d2295 commit 02137d3
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import mesosphere.marathon.core.task.termination.{KillReason, KillService}
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.state.{AppDefinition, Timestamp}

import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand All @@ -37,7 +36,7 @@ private[health] class HealthCheckActor(
implicit val mat = ActorMaterializer()
import context.dispatcher

val healthByTaskId = TrieMap.empty[Task.Id, Health]
val healthIndex: HealthIndex = HealthDualTrieMap()
var killingInFlight = Set.empty[Task.Id]

private case class HealthCheckStreamStopped(thisInstance: this.type)
Expand Down Expand Up @@ -86,7 +85,7 @@ private[health] class HealthCheckActor(
logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}")

val activeTaskIds: Set[Task.Id] = getActiveTaskForInstances(instances)
healthByTaskId.retain((taskId, health) => activeTaskIds(taskId))
healthIndex.retain(taskId => activeTaskIds(taskId))
// FIXME: I discovered this is unsafe since killingInFlight might be used in 2 concurrent threads (see preStart method above)
killingInFlight &= activeTaskIds
logger.info(s"[anti-snowball] app ${app.id} version ${app.version} currently ${killingInFlight.size} instances killingInFlight")
Expand Down Expand Up @@ -171,15 +170,15 @@ private[health] class HealthCheckActor(

/** Check if HealthCheckActor manages enough instances to decide whether it can kill one or if it needs info from AppHealthCheckActor */
def needInfoFromAppHealthCheckActor(activeTaskIds: Set[Task.Id]): Boolean = {
val managedInstances = healthByTaskId.filterKeys(activeTaskIds)
val managedInstances = healthIndex.filterKeys(activeTaskIds)
val enoughInstancesRunning = activeTaskIds.size >= app.instances * app.upgradeStrategy.minimumHealthCapacity
val enoughManagedInstances = managedInstances.size >= 1 + (activeTaskIds.size * app.upgradeStrategy.minimumHealthCapacity).toInt
enoughInstancesRunning && !enoughManagedInstances
}

/** Check if enough active and ready instances will remain if we kill 1 unhealthy instance */
def checkEnoughInstancesRunning(unhealthyInstance: Instance, activeTaskIds: Set[Task.Id]): Boolean = {
val healthyInstances = healthByTaskId.filterKeys(activeTaskIds)
val healthyInstances = healthIndex.filterKeys(activeTaskIds)
.filterKeys(taskId => !killingInFlight(taskId))

logger.info(s"[anti-snowball] app ${app.id} version ${app.version} currently ${killingInFlight.size} instances killingInFlight")
Expand All @@ -204,7 +203,7 @@ private[health] class HealthCheckActor(

def handleHealthResult(result: HealthResult): Unit = {
val instanceId = result.instanceId
val health = healthByTaskId.getOrElse(result.taskId, Health(instanceId))
val health = healthIndex.getOrElse(result.taskId, Health(instanceId))

val updatedHealth = result match {
case Healthy(_, _, _, _, _) =>
Expand Down Expand Up @@ -252,11 +251,9 @@ private[health] class HealthCheckActor(
// dead task (in addition to the living one), thus sometimes leading
// Marathon to report the task / instance / app as unhealthy while
// everything is running correctly.
val healthOfInstanceId = healthByTaskId.find(_._1.instanceId == instanceId)
if (healthOfInstanceId.isDefined)
healthByTaskId.remove(healthOfInstanceId.get._1)
healthIndex.removeLeftOverHealthIfAny(instanceId)

healthByTaskId += (result.taskId -> instanceHealth.newHealth)
healthIndex += (result.taskId -> instanceHealth.newHealth)
appHealthCheckActor ! HealthCheckStatusChanged(ApplicationKey(app.id, app.version), healthCheck, newHealth)

if (health.alive != newHealth.alive && result.publishEvent) {
Expand All @@ -277,11 +274,10 @@ private[health] class HealthCheckActor(

def receive: Receive = {
case GetInstanceHealth(instanceId) =>
sender() ! healthByTaskId.find(_._1.instanceId == instanceId)
.map(_._2).getOrElse(Health(instanceId))
sender() ! healthIndex.get(instanceId).getOrElse(Health(instanceId))

case GetAppHealth =>
sender() ! AppHealth(healthByTaskId.values.to[Seq])
sender() ! AppHealth(healthIndex.values.to[Seq])

case result: HealthResult if result.version == app.version =>
handleHealthResult(result)
Expand Down
103 changes: 103 additions & 0 deletions src/main/scala/mesosphere/marathon/core/health/impl/HealthIndex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package mesosphere.marathon
package core.health.impl

import mesosphere.marathon.core.health.Health
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task

import scala.collection.concurrent.TrieMap

trait HealthIndex {
def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health]

def +=(kv: (Task.Id, Health)): this.type

def values: Iterable[Health]

def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health]

def get(key: Instance.Id): Option[Health]

def getOrElse(key: Task.Id, default: => Health): Health

def retain(p: Task.Id => Boolean): this.type
}

/**
* Old implementation, search by Instance.Id is slow, kept for reference and for tests.
*/
case class HealthSingleTrieMap() extends HealthIndex {
private val healthByTaskId: TrieMap[Task.Id, Health] = TrieMap.empty[Task.Id, Health]

def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health] = {
val healthOfInstanceId = healthByTaskId.find(_._1.instanceId == key)
if (healthOfInstanceId.isDefined)
healthByTaskId.remove(healthOfInstanceId.get._1)
else
None
}

/** Slow, full scan algorithm O(n) */
def get(key: Instance.Id): Option[Health] = healthByTaskId.find(_._1.instanceId == key).map(_._2)

def getOrElse(key: Task.Id, default: => Health): Health = healthByTaskId.getOrElse(key, default)

def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health] = {
healthByTaskId.filterKeys(p)
}

/** Slow */
def retain(p: Task.Id => Boolean): this.type = {
healthByTaskId.retain((taskId, _) => p(taskId))
this
}

def +=(kv: (Task.Id, Health)): this.type = {
healthByTaskId += kv
this
}

def values: Iterable[Health] = healthByTaskId.values
}

/**
* New implementation, search by Instance.Id is fast but we need two indexes.
*/
case class HealthDualTrieMap() extends HealthIndex {
private val healthByTaskId: TrieMap[Task.Id, Health] = TrieMap.empty[Task.Id, Health]
private val taskIdByInstanceId: TrieMap[Instance.Id, Task.Id] = TrieMap.empty[Instance.Id, Task.Id]

def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health] =
taskIdByInstanceId.remove(key).flatMap(oldTaskId => healthByTaskId.remove(oldTaskId))

/** Fast O(log(n)) */
def get(key: Instance.Id): Option[Health] = taskIdByInstanceId.get(key).flatMap(t => healthByTaskId.get(t))

def getOrElse(key: Task.Id, default: => Health): Health = healthByTaskId.getOrElse(key, default)

def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health] = {
healthByTaskId.filterKeys(p)
}

/** Slow */
def retain(p: Task.Id => Boolean): this.type = {
val oldSize = healthByTaskId.size
healthByTaskId.retain((taskId, _) => p(taskId))
if (healthByTaskId.size < oldSize) {
val retainedInstanceIds = healthByTaskId.keySet.map(_.instanceId)
taskIdByInstanceId.retain((instanceId, _) => retainedInstanceIds.contains(instanceId))
}
this
}

def +=(kv: (Task.Id, Health)): this.type = {
kv match {
case (taskId: Task.Id, _: Health) =>
healthByTaskId += kv
taskIdByInstanceId += taskId.instanceId -> taskId
}
this
}

def values: Iterable[Health] = healthByTaskId.values
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class HealthCheckActorTest extends AkkaUnitTest {
new HealthCheckActor(app, appHealthCheckActor.ref, killService, healthCheck, instanceTracker, system.eventStream, healthCheckWorkerHub, healthCheckShieldApi, antiSnowballApi) {
instances.map(instance => {
val taskId = instance.appTask.taskId
healthByTaskId += (taskId -> Health(instance.instanceId)
healthIndex += (taskId -> Health(instance.instanceId)
.update(Healthy(instance.instanceId, taskId, instance.runSpecVersion)))
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package mesosphere.marathon
package core.health.impl

import mesosphere.UnitTest
import mesosphere.marathon.core.health.Health
import mesosphere.marathon.core.health.impl.HealthIndexTest._
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.state.{AbsolutePathId, Timestamp}

class HealthIndexTest extends UnitTest {
"HeathIndex" should {
"get items by Instance ID" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

// Get by Instance ID
indexSimple.get(instanceId1) should equal(Some(health1))
indexDual.get(instanceId1) should equal(Some(health1))

indexSimple.get(instanceId2) should equal(None)
indexDual.get(instanceId2) should equal(None)
}

"get items by Task ID" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

// Get by Task ID
indexSimple.getOrElse(taskId1, health2) should equal(health1)
indexDual.getOrElse(taskId1, health2) should equal(health1)

indexSimple.getOrElse(taskId3, health2) should equal(health2)
indexDual.getOrElse(taskId3, health2) should equal(health2)
}

"remove leftover items" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

// Remove
indexSimple.removeLeftOverHealthIfAny(instanceId2) should equal(None)
indexSimple.removeLeftOverHealthIfAny(instanceId1) should equal(Some(health1))
indexSimple.get(instanceId1) should equal(None)
indexSimple.removeLeftOverHealthIfAny(instanceId1) should equal(None)

indexDual.removeLeftOverHealthIfAny(instanceId2) should equal(None)
indexDual.removeLeftOverHealthIfAny(instanceId1) should equal(Some(health1))
indexDual.get(instanceId1) should equal(None)
indexDual.removeLeftOverHealthIfAny(instanceId1) should equal(None)
}

"return items" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

val entry2 = taskId2 -> health2
indexSimple += entry2
indexDual += entry2

// Get values
val expected = Set(health1, health2)
indexSimple.values.toSet should equal(expected)
indexDual.values.toSet should equal(expected)
}

"filter items" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

val entry2 = taskId2 -> health2
indexSimple += entry2
indexDual += entry2

val entry3 = taskId3 -> health3
indexSimple += entry3
indexDual += entry3

// Filter by task ID
indexSimple.filterKeys(taskId => taskId == taskId2) should equal(Map(entry2))

indexDual.filterKeys(taskId => taskId == taskId2) should equal(Map(entry2))
}

"retain items" in {
val indexSimple: HealthIndex = HealthSingleTrieMap()
val indexDual: HealthIndex = HealthDualTrieMap()

// Add
val entry1 = taskId1 -> health1
indexSimple += entry1
indexDual += entry1

val entry2 = taskId2 -> health2
indexSimple += entry2
indexDual += entry2

val entry3 = taskId3 -> health3
indexSimple += entry3
indexDual += entry3

// Retain
indexSimple.retain(taskId => taskId == taskId2)
indexSimple.get(instanceId1) should equal(None)
indexSimple.get(instanceId2) should equal(Some(health2))

indexDual.retain(taskId => taskId == taskId2)
indexDual.get(instanceId1) should equal(None)
indexDual.get(instanceId2) should equal(Some(health2))
}

}

}

object HealthIndexTest {
val appId: AbsolutePathId = AbsolutePathId("/test")
val version: Timestamp = Timestamp(1)
val now: Timestamp = Timestamp(2)

val instanceId1: Instance.Id = Instance.Id.forRunSpec(appId)
val taskId1: Task.Id = Task.Id(instanceId1)
val health1: Health = Health(instanceId1)

val instanceId2: Instance.Id = Instance.Id.forRunSpec(appId)
val taskId2: Task.Id = Task.Id(instanceId2)
val health2: Health = Health(instanceId2)

val instanceId3: Instance.Id = Instance.Id.forRunSpec(appId)
val taskId3: Task.Id = Task.Id(instanceId3)
val health3: Health = Health(instanceId3)
}

0 comments on commit 02137d3

Please sign in to comment.