From 02137d3b4fd9a56fc56966312a300982fb690300 Mon Sep 17 00:00:00 2001 From: jbkt Date: Fri, 10 Mar 2023 16:39:00 +0100 Subject: [PATCH] Fix fullscan index issue on tasks health (#64) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As it was very slow with several thousands of tasks/instances, this uses two indexes instead. Co-authored-by: Jean-Baptiste Catté --- .../core/health/impl/HealthCheckActor.scala | 22 +-- .../core/health/impl/HealthIndex.scala | 103 ++++++++++++ .../health/impl/HealthCheckActorTest.scala | 2 +- .../core/health/impl/HealthIndexTest.scala | 157 ++++++++++++++++++ 4 files changed, 270 insertions(+), 14 deletions(-) create mode 100644 src/main/scala/mesosphere/marathon/core/health/impl/HealthIndex.scala create mode 100644 src/test/scala/mesosphere/marathon/core/health/impl/HealthIndexTest.scala diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala index 693b5a6d0d..9547a94491 100644 --- a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala +++ b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala @@ -17,7 +17,6 @@ import mesosphere.marathon.core.task.termination.{KillReason, KillService} import mesosphere.marathon.core.task.tracker.InstanceTracker import mesosphere.marathon.state.{AppDefinition, Timestamp} -import scala.collection.concurrent.TrieMap import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -37,7 +36,7 @@ private[health] class HealthCheckActor( implicit val mat = ActorMaterializer() import context.dispatcher - val healthByTaskId = TrieMap.empty[Task.Id, Health] + val healthIndex: HealthIndex = HealthDualTrieMap() var killingInFlight = Set.empty[Task.Id] private case class HealthCheckStreamStopped(thisInstance: this.type) @@ -86,7 +85,7 @@ private[health] class HealthCheckActor( logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}") val activeTaskIds: Set[Task.Id] = getActiveTaskForInstances(instances) - healthByTaskId.retain((taskId, health) => activeTaskIds(taskId)) + healthIndex.retain(taskId => activeTaskIds(taskId)) // FIXME: I discovered this is unsafe since killingInFlight might be used in 2 concurrent threads (see preStart method above) killingInFlight &= activeTaskIds logger.info(s"[anti-snowball] app ${app.id} version ${app.version} currently ${killingInFlight.size} instances killingInFlight") @@ -171,7 +170,7 @@ private[health] class HealthCheckActor( /** Check if HealthCheckActor manages enough instances to decide whether it can kill one or if it needs info from AppHealthCheckActor */ def needInfoFromAppHealthCheckActor(activeTaskIds: Set[Task.Id]): Boolean = { - val managedInstances = healthByTaskId.filterKeys(activeTaskIds) + val managedInstances = healthIndex.filterKeys(activeTaskIds) val enoughInstancesRunning = activeTaskIds.size >= app.instances * app.upgradeStrategy.minimumHealthCapacity val enoughManagedInstances = managedInstances.size >= 1 + (activeTaskIds.size * app.upgradeStrategy.minimumHealthCapacity).toInt enoughInstancesRunning && !enoughManagedInstances @@ -179,7 +178,7 @@ private[health] class HealthCheckActor( /** Check if enough active and ready instances will remain if we kill 1 unhealthy instance */ def checkEnoughInstancesRunning(unhealthyInstance: Instance, activeTaskIds: Set[Task.Id]): Boolean = { - val healthyInstances = healthByTaskId.filterKeys(activeTaskIds) + val healthyInstances = healthIndex.filterKeys(activeTaskIds) .filterKeys(taskId => !killingInFlight(taskId)) logger.info(s"[anti-snowball] app ${app.id} version ${app.version} currently ${killingInFlight.size} instances killingInFlight") @@ -204,7 +203,7 @@ private[health] class HealthCheckActor( def handleHealthResult(result: HealthResult): Unit = { val instanceId = result.instanceId - val health = healthByTaskId.getOrElse(result.taskId, Health(instanceId)) + val health = healthIndex.getOrElse(result.taskId, Health(instanceId)) val updatedHealth = result match { case Healthy(_, _, _, _, _) => @@ -252,11 +251,9 @@ private[health] class HealthCheckActor( // dead task (in addition to the living one), thus sometimes leading // Marathon to report the task / instance / app as unhealthy while // everything is running correctly. - val healthOfInstanceId = healthByTaskId.find(_._1.instanceId == instanceId) - if (healthOfInstanceId.isDefined) - healthByTaskId.remove(healthOfInstanceId.get._1) + healthIndex.removeLeftOverHealthIfAny(instanceId) - healthByTaskId += (result.taskId -> instanceHealth.newHealth) + healthIndex += (result.taskId -> instanceHealth.newHealth) appHealthCheckActor ! HealthCheckStatusChanged(ApplicationKey(app.id, app.version), healthCheck, newHealth) if (health.alive != newHealth.alive && result.publishEvent) { @@ -277,11 +274,10 @@ private[health] class HealthCheckActor( def receive: Receive = { case GetInstanceHealth(instanceId) => - sender() ! healthByTaskId.find(_._1.instanceId == instanceId) - .map(_._2).getOrElse(Health(instanceId)) + sender() ! healthIndex.get(instanceId).getOrElse(Health(instanceId)) case GetAppHealth => - sender() ! AppHealth(healthByTaskId.values.to[Seq]) + sender() ! AppHealth(healthIndex.values.to[Seq]) case result: HealthResult if result.version == app.version => handleHealthResult(result) diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/HealthIndex.scala b/src/main/scala/mesosphere/marathon/core/health/impl/HealthIndex.scala new file mode 100644 index 0000000000..4babc45d74 --- /dev/null +++ b/src/main/scala/mesosphere/marathon/core/health/impl/HealthIndex.scala @@ -0,0 +1,103 @@ +package mesosphere.marathon +package core.health.impl + +import mesosphere.marathon.core.health.Health +import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task + +import scala.collection.concurrent.TrieMap + +trait HealthIndex { + def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health] + + def +=(kv: (Task.Id, Health)): this.type + + def values: Iterable[Health] + + def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health] + + def get(key: Instance.Id): Option[Health] + + def getOrElse(key: Task.Id, default: => Health): Health + + def retain(p: Task.Id => Boolean): this.type +} + +/** + * Old implementation, search by Instance.Id is slow, kept for reference and for tests. + */ +case class HealthSingleTrieMap() extends HealthIndex { + private val healthByTaskId: TrieMap[Task.Id, Health] = TrieMap.empty[Task.Id, Health] + + def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health] = { + val healthOfInstanceId = healthByTaskId.find(_._1.instanceId == key) + if (healthOfInstanceId.isDefined) + healthByTaskId.remove(healthOfInstanceId.get._1) + else + None + } + + /** Slow, full scan algorithm O(n) */ + def get(key: Instance.Id): Option[Health] = healthByTaskId.find(_._1.instanceId == key).map(_._2) + + def getOrElse(key: Task.Id, default: => Health): Health = healthByTaskId.getOrElse(key, default) + + def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health] = { + healthByTaskId.filterKeys(p) + } + + /** Slow */ + def retain(p: Task.Id => Boolean): this.type = { + healthByTaskId.retain((taskId, _) => p(taskId)) + this + } + + def +=(kv: (Task.Id, Health)): this.type = { + healthByTaskId += kv + this + } + + def values: Iterable[Health] = healthByTaskId.values +} + +/** + * New implementation, search by Instance.Id is fast but we need two indexes. + */ +case class HealthDualTrieMap() extends HealthIndex { + private val healthByTaskId: TrieMap[Task.Id, Health] = TrieMap.empty[Task.Id, Health] + private val taskIdByInstanceId: TrieMap[Instance.Id, Task.Id] = TrieMap.empty[Instance.Id, Task.Id] + + def removeLeftOverHealthIfAny(key: Instance.Id): Option[Health] = + taskIdByInstanceId.remove(key).flatMap(oldTaskId => healthByTaskId.remove(oldTaskId)) + + /** Fast O(log(n)) */ + def get(key: Instance.Id): Option[Health] = taskIdByInstanceId.get(key).flatMap(t => healthByTaskId.get(t)) + + def getOrElse(key: Task.Id, default: => Health): Health = healthByTaskId.getOrElse(key, default) + + def filterKeys(p: Task.Id => Boolean): collection.Map[Task.Id, Health] = { + healthByTaskId.filterKeys(p) + } + + /** Slow */ + def retain(p: Task.Id => Boolean): this.type = { + val oldSize = healthByTaskId.size + healthByTaskId.retain((taskId, _) => p(taskId)) + if (healthByTaskId.size < oldSize) { + val retainedInstanceIds = healthByTaskId.keySet.map(_.instanceId) + taskIdByInstanceId.retain((instanceId, _) => retainedInstanceIds.contains(instanceId)) + } + this + } + + def +=(kv: (Task.Id, Health)): this.type = { + kv match { + case (taskId: Task.Id, _: Health) => + healthByTaskId += kv + taskIdByInstanceId += taskId.instanceId -> taskId + } + this + } + + def values: Iterable[Health] = healthByTaskId.values +} diff --git a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala index 5ac290a603..f3ea264ed6 100644 --- a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala @@ -63,7 +63,7 @@ class HealthCheckActorTest extends AkkaUnitTest { new HealthCheckActor(app, appHealthCheckActor.ref, killService, healthCheck, instanceTracker, system.eventStream, healthCheckWorkerHub, healthCheckShieldApi, antiSnowballApi) { instances.map(instance => { val taskId = instance.appTask.taskId - healthByTaskId += (taskId -> Health(instance.instanceId) + healthIndex += (taskId -> Health(instance.instanceId) .update(Healthy(instance.instanceId, taskId, instance.runSpecVersion))) }) } diff --git a/src/test/scala/mesosphere/marathon/core/health/impl/HealthIndexTest.scala b/src/test/scala/mesosphere/marathon/core/health/impl/HealthIndexTest.scala new file mode 100644 index 0000000000..4193c9c7c6 --- /dev/null +++ b/src/test/scala/mesosphere/marathon/core/health/impl/HealthIndexTest.scala @@ -0,0 +1,157 @@ +package mesosphere.marathon +package core.health.impl + +import mesosphere.UnitTest +import mesosphere.marathon.core.health.Health +import mesosphere.marathon.core.health.impl.HealthIndexTest._ +import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task +import mesosphere.marathon.state.{AbsolutePathId, Timestamp} + +class HealthIndexTest extends UnitTest { + "HeathIndex" should { + "get items by Instance ID" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + // Get by Instance ID + indexSimple.get(instanceId1) should equal(Some(health1)) + indexDual.get(instanceId1) should equal(Some(health1)) + + indexSimple.get(instanceId2) should equal(None) + indexDual.get(instanceId2) should equal(None) + } + + "get items by Task ID" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + // Get by Task ID + indexSimple.getOrElse(taskId1, health2) should equal(health1) + indexDual.getOrElse(taskId1, health2) should equal(health1) + + indexSimple.getOrElse(taskId3, health2) should equal(health2) + indexDual.getOrElse(taskId3, health2) should equal(health2) + } + + "remove leftover items" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + // Remove + indexSimple.removeLeftOverHealthIfAny(instanceId2) should equal(None) + indexSimple.removeLeftOverHealthIfAny(instanceId1) should equal(Some(health1)) + indexSimple.get(instanceId1) should equal(None) + indexSimple.removeLeftOverHealthIfAny(instanceId1) should equal(None) + + indexDual.removeLeftOverHealthIfAny(instanceId2) should equal(None) + indexDual.removeLeftOverHealthIfAny(instanceId1) should equal(Some(health1)) + indexDual.get(instanceId1) should equal(None) + indexDual.removeLeftOverHealthIfAny(instanceId1) should equal(None) + } + + "return items" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + val entry2 = taskId2 -> health2 + indexSimple += entry2 + indexDual += entry2 + + // Get values + val expected = Set(health1, health2) + indexSimple.values.toSet should equal(expected) + indexDual.values.toSet should equal(expected) + } + + "filter items" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + val entry2 = taskId2 -> health2 + indexSimple += entry2 + indexDual += entry2 + + val entry3 = taskId3 -> health3 + indexSimple += entry3 + indexDual += entry3 + + // Filter by task ID + indexSimple.filterKeys(taskId => taskId == taskId2) should equal(Map(entry2)) + + indexDual.filterKeys(taskId => taskId == taskId2) should equal(Map(entry2)) + } + + "retain items" in { + val indexSimple: HealthIndex = HealthSingleTrieMap() + val indexDual: HealthIndex = HealthDualTrieMap() + + // Add + val entry1 = taskId1 -> health1 + indexSimple += entry1 + indexDual += entry1 + + val entry2 = taskId2 -> health2 + indexSimple += entry2 + indexDual += entry2 + + val entry3 = taskId3 -> health3 + indexSimple += entry3 + indexDual += entry3 + + // Retain + indexSimple.retain(taskId => taskId == taskId2) + indexSimple.get(instanceId1) should equal(None) + indexSimple.get(instanceId2) should equal(Some(health2)) + + indexDual.retain(taskId => taskId == taskId2) + indexDual.get(instanceId1) should equal(None) + indexDual.get(instanceId2) should equal(Some(health2)) + } + + } + +} + +object HealthIndexTest { + val appId: AbsolutePathId = AbsolutePathId("/test") + val version: Timestamp = Timestamp(1) + val now: Timestamp = Timestamp(2) + + val instanceId1: Instance.Id = Instance.Id.forRunSpec(appId) + val taskId1: Task.Id = Task.Id(instanceId1) + val health1: Health = Health(instanceId1) + + val instanceId2: Instance.Id = Instance.Id.forRunSpec(appId) + val taskId2: Task.Id = Task.Id(instanceId2) + val health2: Health = Health(instanceId2) + + val instanceId3: Instance.Id = Instance.Id.forRunSpec(appId) + val taskId3: Task.Id = Task.Id(instanceId3) + val health3: Health = Health(instanceId3) +}