diff --git a/docs/docs/command-line-flags.md b/docs/docs/command-line-flags.md index 7ac086b8758..30688b7b80a 100644 --- a/docs/docs/command-line-flags.md +++ b/docs/docs/command-line-flags.md @@ -170,6 +170,8 @@ The core functionality flags can be also set by environment variable `MARATHON_O * `--mesos_heartbeat_failure_threshold` (Optional. Default: 5): after missing this number of expected communications from the mesos master, infer that marathon has become disconnected from the master. +* v1.5.0`--minimum_viable_task_execution_duration` (Optional. Default: 60 seconds): + Delay (in ms) after which a task is considered viable. If the task starts up correctly, but fails during this timeout, the application is backed off. ## Tuning Flags for Offer Matching/Launching Tasks diff --git a/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueConfig.scala b/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueConfig.scala index fc8ff31d5c6..8d9126bda13 100644 --- a/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueConfig.scala +++ b/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueConfig.scala @@ -2,8 +2,15 @@ package mesosphere.marathon.core.launchqueue import org.rogach.scallop.ScallopConf +import scala.concurrent.duration._ + trait LaunchQueueConfig extends ScallopConf { + lazy val minimumViableTaskExecutionDurationMillis = opt[Long]( + "minimum_viable_task_execution_duration", + descr = "Delay (in ms) after which a task is considered viable.", + default = Some(60000)) + lazy val launchQueueRequestTimeout = opt[Int]( "launch_queue_request_timeout", descr = "INTERNAL TUNING PARAMETER: Timeout (in ms) for requests to the launch queue actor.", @@ -15,4 +22,6 @@ trait LaunchQueueConfig extends ScallopConf { descr = "INTERNAL TUNING PARAMETER: Timeout (in ms) for matched task opereations to be accepted or rejected.", hidden = true, default = Some(10000)) + + lazy val minimumViableTaskExecutionDuration: FiniteDuration = minimumViableTaskExecutionDurationMillis().millis } diff --git a/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueModule.scala b/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueModule.scala index 644d030ed2d..1ce881dc3a4 100644 --- a/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueModule.scala +++ b/src/main/scala/mesosphere/marathon/core/launchqueue/LaunchQueueModule.scala @@ -41,7 +41,7 @@ class LaunchQueueModule( leadershipModule.startWhenLeader(props, "launchQueue") } - val rateLimiter: RateLimiter = new RateLimiter(clock) + val rateLimiter: RateLimiter = new RateLimiter(config, clock) private[this] val rateLimiterActor: ActorRef = { val props = RateLimiterActor.props( rateLimiter, launchQueueActorRef) diff --git a/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiter.scala b/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiter.scala index 944260cd4a0..3d35b6f9137 100644 --- a/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiter.scala +++ b/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiter.scala @@ -3,7 +3,8 @@ package mesosphere.marathon.core.launchqueue.impl import java.util.concurrent.TimeUnit import mesosphere.marathon.core.base.Clock -import mesosphere.marathon.state.{ PathId, RunSpec, Timestamp } +import mesosphere.marathon.core.launchqueue.LaunchQueueConfig +import mesosphere.marathon.state.{ RunSpec, PathId, Timestamp } import org.slf4j.LoggerFactory import scala.concurrent.duration._ @@ -13,20 +14,25 @@ import scala.concurrent.duration._ * * We do not keep the delays for every version because that would include scaling changes or manual restarts. */ -private[launchqueue] class RateLimiter(clock: Clock) { +private[launchqueue] class RateLimiter(config: LaunchQueueConfig, clock: Clock) { import RateLimiter._ /** The task launch delays per run spec and their last config change. */ private[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]() - def cleanUpOverdueDelays(): Unit = { + /** + * Reset delay for tasks that have reached the viability + * threshold. The deadline indicates when the task has been + * launched for the last time. + */ + def resetDelaysOfViableTasks(): Unit = { taskLaunchDelays = taskLaunchDelays.filter { - case (_, delay) => delay.deadline > clock.now() + case (_, delay) => + clock.now() - config.minimumViableTaskExecutionDuration < delay.deadline } } - def getDelay(spec: RunSpec): Timestamp = - // TODO (pods): RunSpec has no versionInfo. Need this? + def getDeadline(spec: RunSpec): Timestamp = taskLaunchDelays.get(spec.id -> spec.versionInfo.lastConfigChangeVersion).map(_.deadline) getOrElse clock.now() def addDelay(spec: RunSpec): Timestamp = { @@ -72,7 +78,7 @@ private object RateLimiter { private val log = LoggerFactory.getLogger(getClass.getName) private object Delay { - def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock, runSpec.backoffStrategy.backoff) + def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock.now() + runSpec.backoffStrategy.backoff, runSpec.backoffStrategy.backoff) def apply(clock: Clock, delay: FiniteDuration): Delay = Delay(clock.now() + delay, delay) } diff --git a/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActor.scala b/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActor.scala index 5f1c016d7b9..714156d6643 100644 --- a/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActor.scala +++ b/src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActor.scala @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props } import akka.event.LoggingReceive import mesosphere.marathon.core.launchqueue.impl.RateLimiterActor.{ AddDelay, - CleanupOverdueDelays, + ResetViableTasksDelays, DecreaseDelay, DelayUpdate, GetDelay, @@ -32,7 +32,7 @@ private[launchqueue] object RateLimiterActor { private[impl] case class AddDelay(runSpec: RunSpec) private[impl] case class DecreaseDelay(runSpec: RunSpec) - private case object CleanupOverdueDelays + private case object ResetViableTasksDelays } private class RateLimiterActor private ( @@ -42,7 +42,7 @@ private class RateLimiterActor private ( override def preStart(): Unit = { import context.dispatcher - cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, CleanupOverdueDelays) + cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, ResetViableTasksDelays) log.info("started RateLimiterActor") } @@ -57,27 +57,29 @@ private class RateLimiterActor private ( ).reduceLeft(_.orElse[Any, Unit](_)) } + /** + * If an app gets removed or updated, the delay should be reset. If + * an app is considered viable, the delay should be reset too. We + * check and reset viable tasks' delays periodically. + */ private[this] def receiveCleanup: Receive = { - case CleanupOverdueDelays => - // If a run spec gets removed or updated, the delay should be reset. - // Still, we can remove overdue delays before that and also make leaks less likely - // by calling this periodically. - rateLimiter.cleanUpOverdueDelays() + case ResetViableTasksDelays => + rateLimiter.resetDelaysOfViableTasks() } private[this] def receiveDelayOps: Receive = { case GetDelay(runSpec) => - sender() ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec)) + sender() ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec)) case AddDelay(runSpec) => rateLimiter.addDelay(runSpec) - launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec)) + launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec)) case DecreaseDelay(runSpec) => // ignore for now case ResetDelay(runSpec) => rateLimiter.resetDelay(runSpec) - launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec)) + launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec)) sender() ! ResetDelayResponse } } diff --git a/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActorTest.scala b/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActorTest.scala index 11d2853f733..a616affafbd 100644 --- a/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActorTest.scala @@ -5,6 +5,8 @@ import akka.pattern.ask import akka.testkit.TestProbe import akka.util.Timeout import mesosphere.marathon.core.base.ConstantClock +import mesosphere.marathon.test.MarathonSpec +import mesosphere.marathon.core.launchqueue.LaunchQueueConfig import mesosphere.marathon.core.task.tracker.InstanceTracker import mesosphere.marathon.state.{ AppDefinition, BackoffStrategy, PathId } import mesosphere.marathon.test.MarathonSpec @@ -48,6 +50,7 @@ class RateLimiterActorTest extends MarathonSpec { private[this] implicit val timeout: Timeout = 3.seconds private[this] implicit var actorSystem: ActorSystem = _ + private[this] var launchQueueConfig: LaunchQueueConfig = _ private[this] var clock: ConstantClock = _ private[this] var rateLimiter: RateLimiter = _ private[this] var taskTracker: InstanceTracker = _ @@ -56,8 +59,11 @@ class RateLimiterActorTest extends MarathonSpec { before { actorSystem = ActorSystem() + launchQueueConfig = new LaunchQueueConfig { + verify() + } clock = ConstantClock() - rateLimiter = Mockito.spy(new RateLimiter(clock)) + rateLimiter = Mockito.spy(new RateLimiter(launchQueueConfig, clock)) taskTracker = mock[InstanceTracker] updateReceiver = TestProbe() val props = RateLimiterActor.props(rateLimiter, updateReceiver.ref) diff --git a/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterTest.scala b/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterTest.scala index 2fcf1690573..4ec76ee11fe 100644 --- a/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterTest.scala +++ b/src/test/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterTest.scala @@ -1,6 +1,7 @@ package mesosphere.marathon.core.launchqueue.impl import mesosphere.marathon.core.base.ConstantClock +import mesosphere.marathon.core.launchqueue.LaunchQueueConfig import mesosphere.marathon.state.PathId._ import mesosphere.marathon.state.{ AppDefinition, BackoffStrategy, Timestamp } import mesosphere.marathon.test.{ MarathonActorSupport, MarathonSpec } @@ -9,51 +10,61 @@ import org.scalatest.Matchers import scala.concurrent.duration._ class RateLimiterTest extends MarathonActorSupport with MarathonSpec with Matchers { + val clock = ConstantClock(Timestamp.now()) + private[this] val launchQueueConfig: LaunchQueueConfig = new LaunchQueueConfig { + verify() + } + test("addDelay") { - val limiter = new RateLimiter(clock) + val limiter = new RateLimiter(launchQueueConfig, clock) val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds)) limiter.addDelay(app) - limiter.getDelay(app) should be(clock.now() + 10.seconds) + limiter.getDeadline(app) should be(clock.now() + 10.seconds) } test("addDelay for existing delay") { - val limiter = new RateLimiter(clock) + val limiter = new RateLimiter(launchQueueConfig, clock) val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds, factor = 2.0)) limiter.addDelay(app) // linter:ignore:IdenticalStatements limiter.addDelay(app) - limiter.getDelay(app) should be(clock.now() + 20.seconds) + limiter.getDeadline(app) should be(clock.now() + 20.seconds) } - test("cleanupOverdueDelays") { - val limiter = new RateLimiter(clock) - val overdue = AppDefinition(id = "overdue".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds)) - limiter.addDelay(overdue) - val stillWaiting = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 20.seconds)) + test("resetDelaysOfViableTasks") { + val time_origin = clock.now() + val limiter = new RateLimiter(launchQueueConfig, clock) + val threshold = launchQueueConfig.minimumViableTaskExecutionDuration + val viable = AppDefinition(id = "viable".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds)) + limiter.addDelay(viable) + val notYetViable = AppDefinition(id = "notYetViable".toPath, backoffStrategy = BackoffStrategy(backoff = 20.seconds)) + limiter.addDelay(notYetViable) + val stillWaiting = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = threshold + 20.seconds)) limiter.addDelay(stillWaiting) - clock += 11.seconds + clock += threshold + 11.seconds - limiter.cleanUpOverdueDelays() + limiter.resetDelaysOfViableTasks() - limiter.getDelay(overdue) should be(clock.now()) - limiter.getDelay(stillWaiting) should be(clock.now() + 9.seconds) + limiter.getDeadline(viable) should be(clock.now()) + limiter.getDeadline(notYetViable) should be(time_origin + 20.seconds) + limiter.getDeadline(stillWaiting) should be(time_origin + threshold + 20.seconds) } test("resetDelay") { - val limiter = new RateLimiter(clock) + val limiter = new RateLimiter(launchQueueConfig, clock) val app = AppDefinition(id = "test".toPath, backoffStrategy = BackoffStrategy(backoff = 10.seconds)) limiter.addDelay(app) limiter.resetDelay(app) - limiter.getDelay(app) should be(clock.now()) + limiter.getDeadline(app) should be(clock.now()) } } diff --git a/src/test/scala/mesosphere/marathon/integration/setup/MarathonTest.scala b/src/test/scala/mesosphere/marathon/integration/setup/MarathonTest.scala index 604ebafaa45..bebef1f594b 100644 --- a/src/test/scala/mesosphere/marathon/integration/setup/MarathonTest.scala +++ b/src/test/scala/mesosphere/marathon/integration/setup/MarathonTest.scala @@ -102,6 +102,7 @@ case class LocalMarathon( "hostname" -> "localhost", "logging_level" -> "debug", "offer_matching_timeout" -> 10.seconds.toMillis.toString // see https://github.com/mesosphere/marathon/issues/4920 + "minimum_viable_task_execution_duration" -> "0" ) ++ conf val args = config.flatMap {