Skip to content

Commit

Permalink
Fixes d2iq-archive#3059 - Reset task's delay only after viability thr…
Browse files Browse the repository at this point in the history
…eshold has been reached (d2iq-archive#4915)

* Reset task's delay only after viability threshold has been reached

* Rename config parameter to minimum_viable_task_execution_duration, added documentation and fixed the test setup.
  • Loading branch information
aquamatthias authored and pierrecdn committed May 4, 2017
1 parent dbe5d87 commit 65004a2
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 35 deletions.
2 changes: 2 additions & 0 deletions docs/docs/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ The core functionality flags can be also set by environment variable `MARATHON_O
* `--mesos_heartbeat_failure_threshold` (Optional. Default: 5):
after missing this number of expected communications from the mesos master, infer that marathon has become
disconnected from the master.
* <span class="label label-default">v1.5.0</span>`--minimum_viable_task_execution_duration` (Optional. Default: 60 seconds):
Delay (in ms) after which a task is considered viable. If the task starts up correctly, but fails during this timeout, the application is backed off.

## Tuning Flags for Offer Matching/Launching Tasks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ package mesosphere.marathon.core.launchqueue

import org.rogach.scallop.ScallopConf

import scala.concurrent.duration._

trait LaunchQueueConfig extends ScallopConf {

lazy val minimumViableTaskExecutionDurationMillis = opt[Long](
"minimum_viable_task_execution_duration",
descr = "Delay (in ms) after which a task is considered viable.",
default = Some(60000))

lazy val launchQueueRequestTimeout = opt[Int](
"launch_queue_request_timeout",
descr = "INTERNAL TUNING PARAMETER: Timeout (in ms) for requests to the launch queue actor.",
Expand All @@ -15,4 +22,6 @@ trait LaunchQueueConfig extends ScallopConf {
descr = "INTERNAL TUNING PARAMETER: Timeout (in ms) for matched task opereations to be accepted or rejected.",
hidden = true,
default = Some(10000))

lazy val minimumViableTaskExecutionDuration: FiniteDuration = minimumViableTaskExecutionDurationMillis().millis
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LaunchQueueModule(
leadershipModule.startWhenLeader(props, "launchQueue")
}

val rateLimiter: RateLimiter = new RateLimiter(clock)
val rateLimiter: RateLimiter = new RateLimiter(config, clock)
private[this] val rateLimiterActor: ActorRef = {
val props = RateLimiterActor.props(
rateLimiter, launchQueueActorRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package mesosphere.marathon.core.launchqueue.impl
import java.util.concurrent.TimeUnit

import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.state.{ PathId, RunSpec, Timestamp }
import mesosphere.marathon.core.launchqueue.LaunchQueueConfig
import mesosphere.marathon.state.{ RunSpec, PathId, Timestamp }
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
Expand All @@ -13,20 +14,25 @@ import scala.concurrent.duration._
*
* We do not keep the delays for every version because that would include scaling changes or manual restarts.
*/
private[launchqueue] class RateLimiter(clock: Clock) {
private[launchqueue] class RateLimiter(config: LaunchQueueConfig, clock: Clock) {
import RateLimiter._

/** The task launch delays per run spec and their last config change. */
private[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]()

def cleanUpOverdueDelays(): Unit = {
/**
* Reset delay for tasks that have reached the viability
* threshold. The deadline indicates when the task has been
* launched for the last time.
*/
def resetDelaysOfViableTasks(): Unit = {
taskLaunchDelays = taskLaunchDelays.filter {
case (_, delay) => delay.deadline > clock.now()
case (_, delay) =>
clock.now() - config.minimumViableTaskExecutionDuration < delay.deadline
}
}

def getDelay(spec: RunSpec): Timestamp =
// TODO (pods): RunSpec has no versionInfo. Need this?
def getDeadline(spec: RunSpec): Timestamp =
taskLaunchDelays.get(spec.id -> spec.versionInfo.lastConfigChangeVersion).map(_.deadline) getOrElse clock.now()

def addDelay(spec: RunSpec): Timestamp = {
Expand Down Expand Up @@ -72,7 +78,7 @@ private object RateLimiter {
private val log = LoggerFactory.getLogger(getClass.getName)

private object Delay {
def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock, runSpec.backoffStrategy.backoff)
def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock.now() + runSpec.backoffStrategy.backoff, runSpec.backoffStrategy.backoff)
def apply(clock: Clock, delay: FiniteDuration): Delay = Delay(clock.now() + delay, delay)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props }
import akka.event.LoggingReceive
import mesosphere.marathon.core.launchqueue.impl.RateLimiterActor.{
AddDelay,
CleanupOverdueDelays,
ResetViableTasksDelays,
DecreaseDelay,
DelayUpdate,
GetDelay,
Expand Down Expand Up @@ -32,7 +32,7 @@ private[launchqueue] object RateLimiterActor {
private[impl] case class AddDelay(runSpec: RunSpec)
private[impl] case class DecreaseDelay(runSpec: RunSpec)

private case object CleanupOverdueDelays
private case object ResetViableTasksDelays
}

private class RateLimiterActor private (
Expand All @@ -42,7 +42,7 @@ private class RateLimiterActor private (

override def preStart(): Unit = {
import context.dispatcher
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, CleanupOverdueDelays)
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, ResetViableTasksDelays)
log.info("started RateLimiterActor")
}

Expand All @@ -57,27 +57,29 @@ private class RateLimiterActor private (
).reduceLeft(_.orElse[Any, Unit](_))
}

/**
* If an app gets removed or updated, the delay should be reset. If
* an app is considered viable, the delay should be reset too. We
* check and reset viable tasks' delays periodically.
*/
private[this] def receiveCleanup: Receive = {
case CleanupOverdueDelays =>
// If a run spec gets removed or updated, the delay should be reset.
// Still, we can remove overdue delays before that and also make leaks less likely
// by calling this periodically.
rateLimiter.cleanUpOverdueDelays()
case ResetViableTasksDelays =>
rateLimiter.resetDelaysOfViableTasks()
}

private[this] def receiveDelayOps: Receive = {
case GetDelay(runSpec) =>
sender() ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
sender() ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))

case AddDelay(runSpec) =>
rateLimiter.addDelay(runSpec)
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))

case DecreaseDelay(runSpec) => // ignore for now

case ResetDelay(runSpec) =>
rateLimiter.resetDelay(runSpec)
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))
sender() ! ResetDelayResponse
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import akka.pattern.ask
import akka.testkit.TestProbe
import akka.util.Timeout
import mesosphere.marathon.core.base.ConstantClock
import mesosphere.marathon.test.MarathonSpec
import mesosphere.marathon.core.launchqueue.LaunchQueueConfig
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.state.{ AppDefinition, BackoffStrategy, PathId }
import mesosphere.marathon.test.MarathonSpec
Expand Down Expand Up @@ -48,6 +50,7 @@ class RateLimiterActorTest extends MarathonSpec {

private[this] implicit val timeout: Timeout = 3.seconds
private[this] implicit var actorSystem: ActorSystem = _
private[this] var launchQueueConfig: LaunchQueueConfig = _
private[this] var clock: ConstantClock = _
private[this] var rateLimiter: RateLimiter = _
private[this] var taskTracker: InstanceTracker = _
Expand All @@ -56,8 +59,11 @@ class RateLimiterActorTest extends MarathonSpec {

before {
actorSystem = ActorSystem()
launchQueueConfig = new LaunchQueueConfig {
verify()
}
clock = ConstantClock()
rateLimiter = Mockito.spy(new RateLimiter(clock))
rateLimiter = Mockito.spy(new RateLimiter(launchQueueConfig, clock))
taskTracker = mock[InstanceTracker]
updateReceiver = TestProbe()
val props = RateLimiterActor.props(rateLimiter, updateReceiver.ref)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mesosphere.marathon.core.launchqueue.impl

import mesosphere.marathon.core.base.ConstantClock
import mesosphere.marathon.core.launchqueue.LaunchQueueConfig
import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state.{ AppDefinition, BackoffStrategy, Timestamp }
import mesosphere.marathon.test.{ MarathonActorSupport, MarathonSpec }
Expand All @@ -9,51 +10,61 @@ import org.scalatest.Matchers
import scala.concurrent.duration._

class RateLimiterTest extends MarathonActorSupport with MarathonSpec with Matchers {

val clock = ConstantClock(Timestamp.now())

private[this] val launchQueueConfig: LaunchQueueConfig = new LaunchQueueConfig {
verify()
}

test("addDelay") {
val limiter = new RateLimiter(clock)
val limiter = new RateLimiter(launchQueueConfig, clock)
val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds))

limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 10.seconds)
limiter.getDeadline(app) should be(clock.now() + 10.seconds)
}

test("addDelay for existing delay") {
val limiter = new RateLimiter(clock)
val limiter = new RateLimiter(launchQueueConfig, clock)
val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds, factor = 2.0))

limiter.addDelay(app) // linter:ignore:IdenticalStatements
limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 20.seconds)
limiter.getDeadline(app) should be(clock.now() + 20.seconds)
}

test("cleanupOverdueDelays") {
val limiter = new RateLimiter(clock)
val overdue = AppDefinition(id = "overdue".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds))
limiter.addDelay(overdue)
val stillWaiting = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 20.seconds))
test("resetDelaysOfViableTasks") {
val time_origin = clock.now()
val limiter = new RateLimiter(launchQueueConfig, clock)
val threshold = launchQueueConfig.minimumViableTaskExecutionDuration
val viable = AppDefinition(id = "viable".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds))
limiter.addDelay(viable)
val notYetViable = AppDefinition(id = "notYetViable".toPath, backoffStrategy = BackoffStrategy(backoff = 20.seconds))
limiter.addDelay(notYetViable)
val stillWaiting = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = threshold + 20.seconds))
limiter.addDelay(stillWaiting)

clock += 11.seconds
clock += threshold + 11.seconds

limiter.cleanUpOverdueDelays()
limiter.resetDelaysOfViableTasks()

limiter.getDelay(overdue) should be(clock.now())
limiter.getDelay(stillWaiting) should be(clock.now() + 9.seconds)
limiter.getDeadline(viable) should be(clock.now())
limiter.getDeadline(notYetViable) should be(time_origin + 20.seconds)
limiter.getDeadline(stillWaiting) should be(time_origin + threshold + 20.seconds)
}

test("resetDelay") {
val limiter = new RateLimiter(clock)
val limiter = new RateLimiter(launchQueueConfig, clock)
val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds))

limiter.addDelay(app)

limiter.resetDelay(app)

limiter.getDelay(app) should be(clock.now())
limiter.getDeadline(app) should be(clock.now())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ case class LocalMarathon(
"hostname" -> "localhost",
"logging_level" -> "debug",
"offer_matching_timeout" -> 10.seconds.toMillis.toString // see https://github.com/mesosphere/marathon/issues/4920
"minimum_viable_task_execution_duration" -> "0"
) ++ conf

val args = config.flatMap {
Expand Down

0 comments on commit 65004a2

Please sign in to comment.