From 8a92920b0ea77e5c161df980fdbb775b4cd911fc Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 8 Apr 2019 12:56:09 +0200 Subject: [PATCH] Fix restarting unhealthy instances. (#6882) --- .../core/health/impl/HealthCheckActor.scala | 13 ++-- .../HealthCheckIntegrationTest.scala | 77 +++++++++++++++++++ 2 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 src/test/scala/mesosphere/marathon/integration/HealthCheckIntegrationTest.scala diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala index 8be9ba4de89..eaea1ca1ee5 100644 --- a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala +++ b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala @@ -28,9 +28,10 @@ private[health] class HealthCheckActor( healthCheck: HealthCheck, instanceTracker: InstanceTracker, eventBus: EventStream, - healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed])(implicit mat: ActorMaterializer) + healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed]) extends Actor with StrictLogging { + implicit val mat = ActorMaterializer() import context.dispatcher var healthByInstanceId = TrieMap.empty[Instance.Id, Health] @@ -74,11 +75,9 @@ private[health] class HealthCheckActor( def purgeStatusOfDoneInstances(instances: Seq[Instance]): Unit = { logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}") - val activeInstanceIds: Set[Instance.Id] = instances.withFilter(_.isLaunched).map(_.instanceId)(collection.breakOut) - // The Map built with filterKeys wraps the original map and contains a reference to activeInstanceIds. - // Therefore we materialize it into a new map. - activeInstanceIds.foreach { activeId => - healthByInstanceId.remove(activeId) + val inactiveInstanceIds: Set[Instance.Id] = instances.filterNot(_.isActive).map(_.instanceId)(collection.breakOut) + inactiveInstanceIds.foreach { inactiveId => + healthByInstanceId.remove(inactiveId) } val checksToPurge = instances.withFilter(!_.isActive).map(instance => { @@ -207,7 +206,7 @@ object HealthCheckActor { healthCheck: HealthCheck, instanceTracker: InstanceTracker, eventBus: EventStream, - healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed])(implicit mat: ActorMaterializer): Props = { + healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed]): Props = { Props(new HealthCheckActor( app, diff --git a/src/test/scala/mesosphere/marathon/integration/HealthCheckIntegrationTest.scala b/src/test/scala/mesosphere/marathon/integration/HealthCheckIntegrationTest.scala new file mode 100644 index 00000000000..197144eb531 --- /dev/null +++ b/src/test/scala/mesosphere/marathon/integration/HealthCheckIntegrationTest.scala @@ -0,0 +1,77 @@ +package mesosphere.marathon +package integration + +import java.util.UUID + +import mesosphere.AkkaIntegrationTest +import mesosphere.marathon.core.task.Task +import mesosphere.marathon.integration.setup.EmbeddedMarathonTest +import mesosphere.marathon.raml.{AppHealthCheck, AppHealthCheckProtocol} +import mesosphere.marathon.state.PathId + +import scala.concurrent.duration._ + +class HealthCheckIntegrationTest extends AkkaIntegrationTest with EmbeddedMarathonTest { + + def appId(suffix: Option[String] = None): PathId = testBasePath / s"app-${suffix.getOrElse(UUID.randomUUID)}" + + "Health checks" should { + "kill instance with failing Marathon health checks" in { + Given("a deployed app with health checks") + val id = appId(Some(s"replace-marathon-http-health-check")) + val app = appProxy(id, "v1", instances = 1, healthCheck = None). + copy(healthChecks = Set(ramlHealthCheck(AppHealthCheckProtocol.Http))) + val check = registerAppProxyHealthCheck(id, "v1", state = true) + val result = marathon.createAppV2(app) + result should be(Created) + waitForDeployment(result) + + When("the app becomes unhealthy") + val oldTaskId = marathon.tasks(id).value.head.id + check.afterDelay(1.seconds, false) + + Then("the unhealthy instance is killed") + waitForEventWith("unhealthy_instance_kill_event", { event => event.info("taskId") == oldTaskId }) + + And("a replacement is started") + check.afterDelay(1.seconds, true) + eventually { + val currentTasks = marathon.tasks(id).value + currentTasks should have size (1) + currentTasks.map(_.id) should not contain (oldTaskId) + } + } + + "kill instance with failing Mesos health checks" in { + Given("a deployed app with health checks") + val id = appId(Some(s"replace-mesos-http-health-check")) + val app = appProxy(id, "v1", instances = 1, healthCheck = None). + copy(healthChecks = Set(ramlHealthCheck(AppHealthCheckProtocol.Http))) + val check = registerAppProxyHealthCheck(id, "v1", state = true) + val result = marathon.createAppV2(app) + result should be(Created) + waitForDeployment(result) + + When("the app becomes unhealthy") + val oldTaskId = marathon.tasks(id).value.head.id + val oldInstanceId = Task.Id(oldTaskId).instanceId.idString + check.afterDelay(1.seconds, false) + + Then("the unhealthy instance is killed") + waitForEventWith("instance_changed_event", { event => event.info("condition") == "Killed" && event.info("instanceId") == oldInstanceId }) + + Then("the unhealthy instance is killed") + waitForEvent("unhealthy_instance_kill_event") + } + } + + private def ramlHealthCheck(protocol: AppHealthCheckProtocol) = AppHealthCheck( + path = Some("/health"), + protocol = protocol, + gracePeriodSeconds = 3, + intervalSeconds = 1, + maxConsecutiveFailures = 3, + portIndex = Some(0), + delaySeconds = 3 + ) +}