Skip to content

Commit

Permalink
Squash of replaceactortesting branch without WorkQueue disabled
Browse files Browse the repository at this point in the history
* Simplify isHealthy() and remove unused methods
* Fix restart strategy computation at the end of deployment
* Possibly fix also other corner cases
* Fix TaskReplaceActor fetching instances state
* Add tools to make tests and fix 3 tests in the last testsuite
* Fix some more tests, but break 1 restart strategy test
* RestartStrategy needs to be reworked and seems fragile for now
* Fix 2 tests in ReplaceActorTest suite
* One of the test is slightly different; we don't have to wait for one
  instance to be dead when in overcapacity.
* Improve readability and fix (?) first readiness check
* Fix RestartStrategy
* Attempt #1 to fix CI
* Make some cleanup of unused methods
* Attempt to fix CI #2: Fix fourth test
* we can schedule 2 instances as there is no maximum over capacity.
* Fix behavior of restartStrategy: wait for kill before start a new instance
* Rename ignition to restart Strategy (more adapted to new behavior)
  • Loading branch information
Lqp1 authored and komuta committed Aug 31, 2020
1 parent 7eb13dc commit cf6362e
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class DeploymentManagerActor(
val s = sender()
healthCheckManager.statuses(appId) onComplete {
case Success(r) =>
//TODO(t.lange): Add ReadinessCheck
s ! HealthStatusResponse(r)
case Failure(e) => logger.error(s"Failed to send health status to TaskReplaceActor!", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,13 @@ class TaskReplaceActor(
}

def isHealthy(instance: Instance, healths: Map[Instance.Id, Seq[Health]]): Boolean = {
// FIXME(t.lange): Or use state.healthy.getOrElse
val i_health = healths.find(_._1 == instance.instanceId)
if (instance.hasConfiguredHealthChecks && i_health.isDefined) { // Has healthchecks
if (_isHealthy(i_health.get._2) == true) {
logger.debug(s"Checking ${instance.instanceId}: has healcheck and is healthy")
return true
} else {
logger.debug(s"Checking ${instance.instanceId}: has healcheck and is NOT healthy")
return false
}
} else if (instance.hasConfiguredHealthChecks && !i_health.isDefined) {
logger.debug(s"Checking ${instance.instanceId}: has healhcheck and is UNKNOWN")
if (instance.hasConfiguredHealthChecks && i_health.isDefined)
return _isHealthy(i_health.get._2)
else if (instance.hasConfiguredHealthChecks && !i_health.isDefined)
return false
} else { // Don't have healthchecks
logger.debug(s"Checking ${instance.instanceId}: has no healthchecks")
if (instance.state.goal == Goal.Running) return true
else return false
}
else
return instance.isRunning
}

def _isHealthy(healths: Seq[Health]): Boolean = {
Expand Down Expand Up @@ -99,15 +88,15 @@ class TaskReplaceActor(

logger.info(s"Found health status for ${pathId}: new_running=>${state.newInstancesRunning} old_running=>${state.oldInstancesRunning} new_failing=>${state.newInstancesFailing} old_failing=>${state.oldInstancesFailing}")

val ignitionStrategy = computeRestartStrategy(runSpec, state)
val restartStrategy = computeRestartStrategy(runSpec, state)

logger.info(s"restartStrategy gives : to_kill=>${ignitionStrategy.nrToKillImmediately} to_start=>${ignitionStrategy.nrToStartImmediately}")
logger.info(s"restartStrategy gives : to_kill=>${restartStrategy.nrToKillImmediately} to_start=>${restartStrategy.nrToStartImmediately}")

// kill old instances to free some capacity
for (_ <- 0 until ignitionStrategy.nrToKillImmediately) killNextOldInstance(toKill)
for (_ <- 0 until restartStrategy.nrToKillImmediately) killNextOldInstance(toKill)

// start new instances, if possible
launchInstances(ignitionStrategy.nrToStartImmediately).pipeTo(self)
launchInstances(restartStrategy.nrToStartImmediately).pipeTo(self)

// Check if we reached the end of the deployment, meaning
// that old instances (failing or running) are removed,
Expand Down Expand Up @@ -215,12 +204,12 @@ object TaskReplaceActor extends StrictLogging {
require(runSpec.instances > 0, s"target instance number must be > 0 but is ${runSpec.instances}")
require(totalInstancesRunning >= 0, "current instances count must be >=0")

// Old and new instances that have the Goal.Running & are considered healthy
// Old and new instances that are running & are considered healthy
require(consideredHealthyInstancesCount >= 0, s"running instances count must be >=0 but is $consideredHealthyInstancesCount")

val minHealthy = (runSpec.instances * runSpec.upgradeStrategy.minimumHealthCapacity).ceil.toInt
var maxCapacity = (runSpec.instances * (1 + runSpec.upgradeStrategy.maximumOverCapacity)).toInt
var nrToKillImmediately = math.max(0, consideredHealthyInstancesCount - minHealthy)
var nrToKillImmediately = math.min(math.max(0, consideredHealthyInstancesCount - minHealthy), state.oldInstances)

if (minHealthy == maxCapacity && maxCapacity <= consideredHealthyInstancesCount) {
if (runSpec.isResident) {
Expand Down Expand Up @@ -252,12 +241,11 @@ object TaskReplaceActor extends StrictLogging {

assume(nrToKillImmediately >= 0, s"nrToKillImmediately must be >=0 but is $nrToKillImmediately")
assume(maxCapacity > 0, s"maxCapacity must be >0 but is $maxCapacity")
def canStartNewInstances: Boolean = minHealthy < maxCapacity || consideredHealthyInstancesCount - nrToKillImmediately < maxCapacity
assume(canStartNewInstances, "must be able to start new instances")

val leftCapacity = math.max(0, maxCapacity - totalInstancesRunning)
val instancesNotStartedYet = math.max(0, runSpec.instances - state.newInstances)
val nrToStartImmediately = math.min(instancesNotStartedYet, leftCapacity)
logger.info(s"For maxCapacity ${maxCapacity}, leftCapacity ${leftCapacity} and still not started ${instancesNotStartedYet}, will start ${nrToStartImmediately} now!")
RestartStrategy(nrToKillImmediately = nrToKillImmediately, nrToStartImmediately = nrToStartImmediately, maxCapacity = maxCapacity)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ case class Instance(
case pod: PodDefinition => pod.containers.exists(!_.healthCheck.isEmpty)
case _ => false // non-app/pod RunSpecs don't have health checks
}

def consideredHealthy: Boolean = !hasConfiguredHealthChecks || state.healthy.getOrElse(false)
}

object Instance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ class RestartStrategyTest extends UnitTest {
strategy.nrToKillImmediately shouldBe 0
}

"strategy for normal app with 5 instances, already deployed" in {
Given("A normal app")
val app = AppDefinition(
id = AbsolutePathId("/app"),
role = "*",
instances = 5,
upgradeStrategy = UpgradeStrategy(minimumHealthCapacity = 0.5, maximumOverCapacity = 0))

When("the ignition strategy is computed")
val strategy = computeRestartStrategy(app, new TransitionState(4, 1, 0, 0))

Then("we kill no tasks immediately")
strategy.nrToKillImmediately shouldBe 0
}

"maxCapacity strategy for normal app is not exceeded when a task can be killed immediately" in {
Given("A normal app")
val app = AppDefinition(
Expand Down
Loading

0 comments on commit cf6362e

Please sign in to comment.