Skip to content

Commit

Permalink
Added request resources directive (#55)
Browse files Browse the repository at this point in the history
Change-Id: Id148e3fcc689a922e993a8f9f9edbb780b08ca7d

Co-authored-by: Djamel Dahmane <[email protected]>
  • Loading branch information
djdhm and Djamel Dahmane authored Sep 23, 2021
1 parent 760d51f commit f54f7bc
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.{Done, NotUsed}
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.instance.update.InstanceChangeOrSnapshot
import mesosphere.marathon.core.launchqueue.impl.ReviveOffersStreamLogic.{IssueRevive, RoleDirective, UpdateFramework}
import mesosphere.marathon.core.launchqueue.impl.ReviveOffersStreamLogic.{IssueRevive, RequestResources, RoleDirective, UpdateFramework}
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.metrics.{Counter, Metrics}
import org.apache.mesos.Protos.FrameworkInfo
Expand Down Expand Up @@ -75,7 +75,14 @@ class ReviveOffersActor(
Resource.newBuilder.setName("disk")
.setType(SCALAR)
.setScalar(Scalar.newBuilder().setValue(resources.disk)))

.addResources(
Resource.newBuilder.setName("gpus")
.setType(SCALAR)
.setScalar(Scalar.newBuilder().setValue(resources.gpus)))
.addResources(
Resource.newBuilder.setName("network_bandwidth")
.setType(SCALAR)
.setScalar(Scalar.newBuilder().setValue(resources.networkBandwidth)))
b.build
}

Expand Down Expand Up @@ -108,6 +115,7 @@ class ReviveOffersActor(
.collect { case (role, OffersNotWanted) => role }
.toSeq
d.updateFramework(newInfo, suppressedRoles.asJava)

}

case IssueRevive(roles, minimalResourcesPerRole) =>
Expand All @@ -120,6 +128,13 @@ class ReviveOffersActor(

}

case RequestResources(roles, minimalResourcesPerRole) =>
driverHolder.driver.foreach { d =>
roles.foreach(role => {
val requests: List[Request] = List(requestWithResources(minimalResourcesPerRole.getOrElse(role, Resources(0, 0, 0, 0, 0))))
d.requestResources(requests.asJava);
})
}
})
}

Expand All @@ -144,7 +159,12 @@ class ReviveOffersActor(
reviveCountMetric.increment()
logger.info(s"Role '${role}' explicitly revived")
}
directive

case directive @ RequestResources(roles, minimalResourcesPerRole) =>
roles.foreach { role =>
logger.info(s"Role '${role}' request resources")
}
directive
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,37 @@ case class ReviveOffersState(
val iterator = instancesWantingOffers.getOrElse(role, Map.empty).values
.iterator
.filter(launchAllowedOrCleanUpRequired)

if (iterator.isEmpty)
val offersWantedList = iterator.toList;
val minimalResources = findMinimalResources(offersWantedList)
if (offersWantedList.isEmpty)
role -> VersionedRoleState(version, OffersNotWanted)
else
role -> VersionedRoleState(iterator.map(_.version).max, OffersWanted)
role -> VersionedRoleState(offersWantedList.map(_.version).max, OffersWanted, minimalResources)
}.toMap
}

private def findMinimalResources(offersWantedInfos: List[OffersWantedInfo]): Resources = {

var actualMinimal = Resources(0, 0, 0, 0, 0);
var minCpus: Double = Double.MaxValue;
var minMem: Double = Double.MaxValue;
var minDisk: Double = Double.MaxValue;
var minGpus: Int = Int.MaxValue;
var minNetworkBandwidth: Int = Int.MaxValue;
logger.debug(s"Scheduled Instances for launch are ${offersWantedInfos}")

offersWantedInfos.foreach(instance => {
val requiredResources: Resources = instance.resources;
minCpus = List(minCpus, requiredResources.cpus).min
minMem = List(minMem, requiredResources.mem).min
minDisk = List(minDisk, requiredResources.disk).min
minGpus = List(minGpus, requiredResources.gpus).min
minNetworkBandwidth = List(minNetworkBandwidth, requiredResources.networkBandwidth).min;
})

Resources(minCpus, minMem, minDisk, minGpus, minNetworkBandwidth)
}

/** @return true if a instance has no active delay, or the instance requires clean up. */
private def launchAllowedOrCleanUpRequired(wantedInfo: OffersWantedInfo): Boolean = {
wantedInfo.reason == OffersWantedReason.CleaningUpReservations || !activeDelays.contains(wantedInfo.ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ object ReviveOffersStreamLogic extends StrictLogging {

case class NotDelayed(element: RunSpecConfigRef) extends DelayedStatus

var minimalResourcesPerRole: Map[Role, Resources] = Map();
/**
* Watches a stream of rate limiter updates and emits Active(configRef) when a configRef has an active backoff delay,
* and Inactive(configRef) when it doesn't any longer.
Expand Down Expand Up @@ -76,37 +75,12 @@ object ReviveOffersStreamLogic extends StrictLogging {
reviveStateFromInstancesAndDelays(defaultRole)
.buffer(1, OverflowStrategy.dropHead) // While we are back-pressured, we drop older interim frames
.via(RateLimiterFlow.apply(minReviveOffersInterval))
.map(l => findminimalResourcesPerRole(l))
.map(_.roleReviveVersions)
.via(reviveDirectiveFlow(enableSuppress))
.map(l => { logger.info(s"Issuing following suppress/revive directives: = ${l} and offer wanted == ${OffersWanted}"); l })
.via(reviveRepeaterWithTicks)
}

def findminimalResourcesPerRole(offersWanted: ReviveOffersState): ReviveOffersState = {
minimalResourcesPerRole = Map();
offersWanted.instancesWantingOffers.foreach(roleInfo => {
val role: String = roleInfo._1;
var actualMinimal = Resources(0.0, 0, 0, 0, 0);
var minCpus: Double = Double.MaxValue;
var minMem: Double = Double.MaxValue;
var minDisk: Double = Double.MaxValue;
val instances: Map[Instance.Id, OffersWantedInfo] = roleInfo._2;

instances.foreach(instance => {
val requiredResources: Resources = instance._2.resources;
minCpus = if (minCpus > requiredResources.cpus) requiredResources.cpus else minCpus;
minMem = if (minMem > requiredResources.mem) requiredResources.mem else minMem;
minDisk = if (minDisk > requiredResources.disk) requiredResources.disk else minDisk;
actualMinimal = Resources(minCpus, minMem, minDisk, 0, 0)
})

minimalResourcesPerRole = minimalResourcesPerRole + (role -> actualMinimal);
})

return offersWanted
}

def reviveDirectiveFlow(enableSuppress: Boolean): Flow[Map[Role, VersionedRoleState], RoleDirective, NotUsed] = {
val logic = if (enableSuppress) new ReviveDirectiveFlowLogicWithSuppression else new ReviveDirectiveFlowLogicWithoutSuppression
Flow[Map[Role, VersionedRoleState]]
Expand All @@ -128,7 +102,7 @@ object ReviveOffersStreamLogic extends StrictLogging {
*/
private[impl] trait ReviveDirectiveFlowLogic {
def lastOffersWantedVersion(lastState: Map[Role, VersionedRoleState], role: Role): Option[Long] =
lastState.get(role).collect { case VersionedRoleState(version, OffersWanted) => version }
lastState.get(role).collect { case VersionedRoleState(version, OffersWanted, _) => version }

def directivesForDiff(lastState: Map[Role, VersionedRoleState], newState: Map[Role, VersionedRoleState]): List[RoleDirective]
}
Expand All @@ -139,6 +113,13 @@ object ReviveOffersStreamLogic extends StrictLogging {
val rolesChanged = lastState.keySet != newState.keySet
val directives = List.newBuilder[RoleDirective]

val minimalResourcesPerRole = newState.iterator
.collect {
case (role, VersionedRoleState(version, OffersWanted, minimalResources)) => role -> minimalResources
}
.toMap
logger.info(s"Request resources for roles ${minimalResourcesPerRole}")

if (rolesChanged) {
val newRoleState = newState.keysIterator.map { role =>
role -> OffersWanted
Expand All @@ -153,13 +134,20 @@ object ReviveOffersStreamLogic extends StrictLogging {
}
val needsExplicitRevive = newState.iterator
.collect {
case (role, VersionedRoleState(_, OffersWanted)) if !lastState.get(role).exists(_.roleState.isWanted) => role
case (role, VersionedRoleState(version, OffersWanted)) if lastOffersWantedVersion(lastState, role).exists(_ < version) => role
case (role, VersionedRoleState(_, OffersWanted, _)) if !lastState.get(role).exists(_.roleState.isWanted) => role
case (role, VersionedRoleState(version, OffersWanted, _)) if lastOffersWantedVersion(lastState, role).exists(_ < version) => role
}
.toSet

val requestResources = RequestResources(
minimalResourcesPerRole.keySet,
minimalResourcesPerRole
)
logger.info(s"minimal resources are ${minimalResourcesPerRole}")
if (needsExplicitRevive.nonEmpty)
directives += IssueRevive(needsExplicitRevive, minimalResourcesPerRole)
else if (minimalResourcesPerRole.keySet.nonEmpty)
directives += requestResources;

directives.result()
}
Expand All @@ -168,7 +156,7 @@ object ReviveOffersStreamLogic extends StrictLogging {
private[impl] class ReviveDirectiveFlowLogicWithSuppression extends ReviveDirectiveFlowLogic {

private def offersNotWantedRoles(state: Map[Role, VersionedRoleState]): Set[Role] =
state.collect { case (role, VersionedRoleState(_, OffersNotWanted)) => role }.toSet
state.collect { case (role, VersionedRoleState(_, OffersNotWanted, _)) => role }.toSet

def updateFrameworkNeeded(lastState: Map[Role, VersionedRoleState], newState: Map[Role, VersionedRoleState]) = {
val rolesChanged = lastState.keySet != newState.keySet
Expand All @@ -178,11 +166,17 @@ object ReviveOffersStreamLogic extends StrictLogging {

def directivesForDiff(lastState: Map[Role, VersionedRoleState], newState: Map[Role, VersionedRoleState]): List[RoleDirective] = {
val directives = List.newBuilder[RoleDirective]
val minimalResourcesPerRole = newState.iterator
.collect {
case (role, VersionedRoleState(version, OffersWanted, minimalResources)) => role -> minimalResources
}
.toMap

if (updateFrameworkNeeded(lastState, newState)) {
val roleState = newState.map {
case (role, VersionedRoleState(_, state)) => role -> state
case (role, VersionedRoleState(_, state, _)) => role -> state
}

val newlyWanted = newState
.iterator
.collect { case (role, v) if v.roleState.isWanted && !lastState.get(role).exists(_.roleState.isWanted) => role }
Expand All @@ -192,14 +186,20 @@ object ReviveOffersStreamLogic extends StrictLogging {
.iterator
.collect { case (role, v) if !v.roleState.isWanted && lastState.get(role).exists(_.roleState.isWanted) => role }
.to[Set]
directives += UpdateFramework(roleState, newlyRevived = newlyWanted, newlySuppressed = newlyNotWanted)

directives += UpdateFramework(roleState, newlyRevived = newlyWanted, newlySuppressed = newlyNotWanted, minimalResourcesPerRole)
}

val rolesNeedingRevive = newState.view
.collect { case (role, VersionedRoleState(version, OffersWanted)) if lastOffersWantedVersion(lastState, role).exists(_ < version) => role }.toSet
.collect { case (role, VersionedRoleState(version, OffersWanted, _)) if lastOffersWantedVersion(lastState, role).exists(_ < version) => role }.toSet

logger.debug(s"minimal requested resources are ${minimalResourcesPerRole}")

// If no roles need revive, send a Request resources to update the minimal set on the Allocator
if (rolesNeedingRevive.nonEmpty)
directives += IssueRevive(rolesNeedingRevive, minimalResourcesPerRole)
else if (minimalResourcesPerRole.keySet.nonEmpty)
directives += RequestResources(minimalResourcesPerRole.keySet, minimalResourcesPerRole)

directives.result()

Expand Down Expand Up @@ -229,7 +229,9 @@ object ReviveOffersStreamLogic extends StrictLogging {
*/
private[impl] class ReviveRepeaterLogic extends StrictLogging {
var currentRoleState: Map[Role, RoleOfferState] = Map.empty
var currentMinimalResouces: Map[Role, Resources] = Map.empty
var repeatIn: Map[Role, Int] = Map.empty
var requestRepeatIn: Map[Role, Int] = Map.empty

def markRolesForRepeat(roles: Iterable[Role]): Unit =
roles.foreach {
Expand All @@ -238,36 +240,66 @@ object ReviveOffersStreamLogic extends StrictLogging {
repeatIn += role -> 2
}

def markRolesForRequestRepeat(roles: Iterable[Role]): Unit =
roles.foreach {
role =>
// Override any old state.
requestRepeatIn += role -> 2
}

def processRoleDirective(directive: RoleDirective): Unit = directive match {
case updateFramework: UpdateFramework =>
logger.info(s"Issuing update framework for $updateFramework")
currentRoleState = updateFramework.roleState
currentMinimalResouces = updateFramework.minimalResourcesPerRole
markRolesForRepeat(updateFramework.newlyRevived)

case IssueRevive(roles, minimalResourcesPerRole) =>
logger.info(s"Issuing revive for roles $roles")
currentMinimalResouces = minimalResourcesPerRole
markRolesForRepeat(roles) // set / reset the repeat delay

case RequestResources(roles, minimalResourcesPerRole) =>
if (currentMinimalResouces != minimalResourcesPerRole) {
logger.info(s"Issuing Request for roles ${roles}")
currentMinimalResouces = minimalResourcesPerRole
markRolesForRequestRepeat(roles)
}

}

def handleTick(): List[RoleDirective] = {
// Decrease tick counts and filter out those that are zero.
val newRepeatIn = repeatIn.collect {
case (k, v) if v >= 1 => k -> (v - 1)
}
val newRequestRepeatIn = requestRepeatIn.collect {
case (k, v) if v >= 1 => k -> (v - 1)
}

// Repeat revives for those roles that waited for a tick.
val rolesForReviveRepetition = newRepeatIn.iterator.collect {
case (role, counter) if counter == 0 && currentRoleState.get(role).contains(OffersWanted) => role
}.toSet
val rolesForRequestRepetition = newRequestRepeatIn.iterator.collect {
case (role, counter) if counter == 0 && currentRoleState.get(role).contains(OffersWanted) => role
}.toSet

repeatIn = newRepeatIn
requestRepeatIn = newRequestRepeatIn

if (rolesForReviveRepetition.isEmpty) {
logger.info(s"Found no roles suitable for revive repetition.")
Nil
if (requestRepeatIn.isEmpty) {
logger.info(s"Found no roles suitable for request repetition.")
Nil
} else {
logger.info(s"Repeat request for roles ${requestRepeatIn}.")
List(RequestResources(rolesForRequestRepetition, currentMinimalResouces))
}
} else {
logger.info(s"Repeat revive for roles $rolesForReviveRepetition.")
List(IssueRevive(rolesForReviveRepetition, minimalResourcesPerRole))
List(IssueRevive(rolesForReviveRepetition, currentMinimalResouces))
}
}
}
Expand All @@ -289,7 +321,7 @@ object ReviveOffersStreamLogic extends StrictLogging {
minimalResourcesPerRole: Map[Role, Resources] = Map.empty) extends RoleDirective

case class IssueRevive(roles: Set[String], minimalResourcesPerRole: Map[Role, Resources] = Map.empty) extends RoleDirective

case class VersionedRoleState(version: Long, roleState: RoleOfferState)
case class RequestResources(roles: Set[String], minimalResourcesPerRole: Map[Role, Resources] = Map.empty) extends RoleDirective
case class VersionedRoleState(version: Long, roleState: RoleOfferState, minimalResources: Resources = Resources(0, 0, 0, 0, 0))

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ReviveOffersStateTest extends UnitTest with Inside {
.withSnapshot(InstancesSnapshot(List(monitoringScheduledInstance)), defaultRole = "*")

val priorVersion = inside(state.roleReviveVersions("monitoring")) {
case VersionedRoleState(version, roleState) =>
case VersionedRoleState(version, roleState, _) =>
roleState shouldBe OffersWanted
version
}
Expand All @@ -48,7 +48,7 @@ class ReviveOffersStateTest extends UnitTest with Inside {
state = state.withoutDelay(monitoringApp.configRef)

inside(state.roleReviveVersions("monitoring")) {
case VersionedRoleState(version, roleState) =>
case VersionedRoleState(version, roleState, _) =>
roleState shouldBe OffersWanted
version should be > priorVersion
}
Expand Down
Loading

0 comments on commit f54f7bc

Please sign in to comment.