Skip to content

Commit

Permalink
Add validation and tests for pods with residency and persistent volum…
Browse files Browse the repository at this point in the history
…es defined (#6008)

Add validation and tests for pods with residency and persistent volumes defined

JIRA issues:
https://jira.mesosphere.com/browse/MARATHON-8069
  • Loading branch information
Ivan Chernetsky authored and meln1k committed Feb 14, 2018
1 parent 5a789e7 commit 2bf46b3
Show file tree
Hide file tree
Showing 12 changed files with 486 additions and 17 deletions.
32 changes: 30 additions & 2 deletions src/main/scala/mesosphere/marathon/api/v2/PodNormalization.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package mesosphere.marathon
package api.v2

import mesosphere.marathon.raml.{ Endpoint, Network, NetworkMode, Pod, PodContainer }
import mesosphere.marathon.raml.{ AnyToRaml, Endpoint, Network, NetworkMode, Pod, PodContainer, PodPersistentVolume, PodSchedulingPolicy, PodUpgradeStrategy }
import mesosphere.marathon.stream.Implicits._

object PodNormalization {

Expand Down Expand Up @@ -42,10 +43,37 @@ object PodNormalization {
}
}

/**
* If a pod has one or more persistent volumes, this method ensure that
* the pod's upgrade and unreachable strategies have values which make
* sense for resident pods: the unreachable strategy should be disabled
* and the upgrade strategy has to have maximumOverCapacity set to 0
* for such pods.
*
* @param pod a pod which scheduling policy should be normalized
* @return a normalized scheduling policy
*/
def normalizeScheduling(pod: Pod): Option[PodSchedulingPolicy] = {
val hasPersistentVolumes = pod.volumes.existsAn[PodPersistentVolume]
if (hasPersistentVolumes) {
val defaultUpgradeStrategy = state.UpgradeStrategy.forResidentTasks.toRaml
val defaultUnreachableStrategy = state.UnreachableStrategy.default(hasPersistentVolumes).toRaml
val scheduling = pod.scheduling.getOrElse(PodSchedulingPolicy())
val upgradeStrategy = scheduling.upgrade.getOrElse(PodUpgradeStrategy(
minimumHealthCapacity = defaultUpgradeStrategy.minimumHealthCapacity,
maximumOverCapacity = defaultUpgradeStrategy.maximumOverCapacity))
val unreachableStrategy = scheduling.unreachableStrategy.getOrElse(defaultUnreachableStrategy)
Some(scheduling.copy(
upgrade = Some(upgradeStrategy),
unreachableStrategy = Some(unreachableStrategy)))
} else pod.scheduling
}

def apply(config: Config): Normalization[Pod] = Normalization { pod =>
val networks = Networks(config, Some(pod.networks)).normalize.networks.filter(_.nonEmpty).getOrElse(DefaultNetworks)
NetworkNormalization.requireContainerNetworkNameResolution(networks)
val containers = Containers(networks, pod.containers).normalize.containers
pod.copy(containers = containers, networks = networks)
val scheduling = normalizeScheduling(pod)
pod.copy(containers = containers, networks = networks, scheduling = scheduling)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ trait AppValidation {
update.fetch is optional(every(valid))
update.portDefinitions is optional(portDefinitionsValidator)
update.container is optional(validContainer(enabledFeatures, update.networks.getOrElse(Nil), update.secrets.getOrElse(Map.empty)))
update.acceptedResourceRoles is optional(ResourceRole.validAcceptedResourceRoles(update.residency.isDefined) and notEmpty)
update.acceptedResourceRoles is optional(ResourceRole.validAcceptedResourceRoles("app", update.residency.isDefined) and notEmpty)
update.networks is optional(NetworkValidation.defaultNetworkNameValidator(defaultNetworkName))
},
isTrue("must not be root")(!_.id.fold(false)(PathId(_).isRoot)),
Expand Down Expand Up @@ -415,7 +415,7 @@ trait AppValidation {
} -> (featureEnabled(enabledFeatures, Features.SECRETS))
app.secrets is featureEnabledImplies(enabledFeatures, Features.SECRETS)(secretValidator)
app.env is envValidator(strictNameValidation = false, app.secrets, enabledFeatures)
app.acceptedResourceRoles is optional(ResourceRole.validAcceptedResourceRoles(app.residency.isDefined) and notEmpty)
app.acceptedResourceRoles is optional(ResourceRole.validAcceptedResourceRoles("app", app.residency.isDefined) and notEmpty)
app must complyWithGpuRules(enabledFeatures)
app must complyWithMigrationAPI
app must complyWithReadinessCheckRules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import mesosphere.marathon.core.plugin.PluginManager
import mesosphere.marathon.core.pod.PodDefinition
import mesosphere.marathon.plugin.validation.RunSpecValidator
import mesosphere.marathon.raml._
import mesosphere.marathon.state.PathId
import mesosphere.marathon.state.{ PathId, ResourceRole, RootGroup }
import mesosphere.marathon.util.SemanticVersion
// scalastyle:on

Expand Down Expand Up @@ -200,6 +200,25 @@ trait PodsValidation extends GeneralPurposeCombinators {
// When https://github.com/wix/accord/issues/120 is resolved, we can inline this expression again
private def podSecretVolumes(pod: Pod) =
pod.volumes.collect { case sv: PodSecretVolume => sv }
private def podPersistentVolumes(pod: Pod) =
pod.volumes.collect { case pv: PodPersistentVolume => pv }
private def podAcceptedResourceRoles(pod: Pod) =
pod.scheduling.flatMap(_.placement.map(_.acceptedResourceRoles)).getOrElse(Seq.empty).toSet

val haveUnreachableDisabledForResidentPods: Validator[Pod] =
isTrue[Pod]("unreachableStrategy must be disabled for pods with persistent volumes") { pod =>
if (podPersistentVolumes(pod).isEmpty)
true
else {
val unreachableStrategy = pod.scheduling.flatMap(_.unreachableStrategy)
unreachableStrategy.isEmpty || unreachableStrategy == Some(UnreachableDisabled())
}
}

val haveValidAcceptedResourceRoles: Validator[Pod] = validator[Pod] { pod =>
(podAcceptedResourceRoles(pod) as "acceptedResourceRoles"
is empty or valid(ResourceRole.validAcceptedResourceRoles("pod", podPersistentVolumes(pod).nonEmpty)))
}

def podValidator(enabledFeatures: Set[String], mesosMasterVersion: SemanticVersion, defaultNetworkName: Option[String]): Validator[Pod] = validator[Pod] { pod =>
PathId(pod.id) as "id" is valid and PathId.absolutePathValidator and PathId.nonEmptyPath
Expand All @@ -222,9 +241,12 @@ trait PodsValidation extends GeneralPurposeCombinators {
pod.scheduling is optional(schedulingValidator)
pod.scaling is optional(scalingValidator)
pod is endpointNamesUnique and endpointContainerPortsUnique and endpointHostPortsUnique
pod should complyWithPodUpgradeStrategyRules
pod should haveUnreachableDisabledForResidentPods
pod should haveValidAcceptedResourceRoles
}

def volumeNames(volumes: Seq[PodVolume]) = volumes.map(volumeName)
def volumeNames(volumes: Seq[PodVolume]): Seq[String] = volumes.map(volumeName)
def volumeName(volume: PodVolume): String = volume match {
case PodEphemeralVolume(name) => name
case PodHostVolume(name, _) => name
Expand All @@ -239,6 +261,65 @@ trait PodsValidation extends GeneralPurposeCombinators {
new And(plugins: _*).apply(pod)
}
}

def residentUpdateIsValid(from: PodDefinition): Validator[PodDefinition] = {
val changeNoVolumes =
isTrue[PodDefinition]("persistent volumes cannot be updated") { to =>
val fromVolumes = from.persistentVolumes
val toVolumes = to.persistentVolumes
def sameSize = fromVolumes.size == toVolumes.size
def noVolumeChange = fromVolumes.forall { fromVolume =>
toVolumes.find(_.name == fromVolume.name).contains(fromVolume)
}
sameSize && noVolumeChange
}

val changeNoCpuResource =
isTrue[PodDefinition](CpusPersistentVolumes) { to =>
from.resources.cpus == to.resources.cpus
}

val changeNoMemResource =
isTrue[PodDefinition](MemPersistentVolumes) { to =>
from.resources.mem == to.resources.mem
}

val changeNoDiskResource =
isTrue[PodDefinition](DiskPersistentVolumes) { to =>
from.resources.disk == to.resources.disk
}

val changeNoGpuResource =
isTrue[PodDefinition](GpusPersistentVolumes) { to =>
from.resources.gpus == to.resources.gpus
}

val changeNoHostPort =
isTrue[PodDefinition](HostPortsPersistentVolumes) { to =>
val fromHostPorts = from.containers.flatMap(_.endpoints.flatMap(_.hostPort)).toSet
val toHostPorts = to.containers.flatMap(_.endpoints.flatMap(_.hostPort)).toSet
fromHostPorts == toHostPorts
}

validator[PodDefinition] { pod =>
pod should changeNoVolumes
pod should changeNoCpuResource
pod should changeNoMemResource
pod should changeNoDiskResource
pod should changeNoGpuResource
pod should changeNoHostPort
pod.upgradeStrategy is state.UpgradeStrategy.validForResidentTasks
}
}

def updateIsValid(from: RootGroup): Validator[PodDefinition] = new Validator[PodDefinition] {
override def apply(pod: PodDefinition): Result = {
from.pod(pod.id) match {
case (Some(last)) if last.isResident || pod.isResident => residentUpdateIsValid(last)(pod)
case _ => Success
}
}
}
}

object PodsValidation extends PodsValidation {
Expand All @@ -252,6 +333,11 @@ object PodsValidationMessages {
val VolumeNamesMustBeUnique = "volume names must be unique"
val ContainerNamesMustBeUnique = "container names must be unique"
val SecretVolumeMustReferenceSecret = "volume.secret must refer to an existing secret"
val CpusPersistentVolumes = "cpus cannot be updated if a pod has persistent volumes"
val MemPersistentVolumes = "mem cannot be updated if a pod has persistent volumes"
val DiskPersistentVolumes = "disk cannot be updated if a pod has persistent volumes"
val GpusPersistentVolumes = "gpus cannot be updated if a pod has persistent volumes"
val HostPortsPersistentVolumes = "host ports cannot be updated if a pod has persistent volumes"
// Note: we should keep this in sync with AppValidationMessages
val NetworkNameRequiredForMultipleContainerNetworks =
"networkNames must be a single item list when hostPort is specified and more than 1 container network is defined"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.regex.Pattern
import com.wix.accord._
import com.wix.accord.dsl._
import mesosphere.marathon.api.v2.Validation
import mesosphere.marathon.raml.{ App, Apps, Constraint, ConstraintOperator, PodPlacementPolicy, PodSchedulingBackoffStrategy, PodSchedulingPolicy, PodUpgradeStrategy, UpgradeStrategy }
import mesosphere.marathon.raml.{ App, Apps, Constraint, ConstraintOperator, Pod, PodPersistentVolume, PodPlacementPolicy, PodSchedulingBackoffStrategy, PodSchedulingPolicy, PodUpgradeStrategy, UpgradeStrategy }
import mesosphere.marathon.state.ResourceRole

import scala.util.Try
Expand Down Expand Up @@ -40,11 +40,28 @@ trait SchedulingValidation {
app.upgradeStrategy is optional(implied(app.residency.nonEmpty)(validForResidentTasks))
}

def podUpgradeStrategy(pod: Pod): Option[PodUpgradeStrategy] = {
pod.scheduling.flatMap(_.upgrade)
}

def podHasPersistentVolumes(pod: Pod): Boolean = {
pod.volumes.exists(_.isInstanceOf[PodPersistentVolume])
}
val complyWithPodUpgradeStrategyRules: Validator[Pod] = validator[Pod] { pod =>
(podUpgradeStrategy(pod) as "upgrade"
is optional(implied(podHasPersistentVolumes(pod))(validForResidentPods)))
}

lazy val validForResidentTasks: Validator[UpgradeStrategy] = validator[UpgradeStrategy] { strategy =>
strategy.minimumHealthCapacity is between(0.0, 1.0)
strategy.maximumOverCapacity should be == 0.0
}

lazy val validForResidentPods: Validator[PodUpgradeStrategy] = validator[PodUpgradeStrategy] { strategy =>
strategy.minimumHealthCapacity is between(0.0, 1.0)
strategy.maximumOverCapacity should be == 0.0
}

lazy val validForSingleInstanceApps: Validator[UpgradeStrategy] = validator[UpgradeStrategy] { strategy =>
strategy.minimumHealthCapacity should be == 0.0
strategy.maximumOverCapacity should be == 0.0
Expand Down Expand Up @@ -98,7 +115,7 @@ trait SchedulingValidation {
}

val placementStrategyValidator = validator[PodPlacementPolicy] { ppp =>
ppp.acceptedResourceRoles.toSet is empty or ResourceRole.validAcceptedResourceRoles(false) // TODO(jdef) assumes pods!! change this to properly support apps
ppp.acceptedResourceRoles.toSet is empty or ResourceRole.validAcceptedResourceRoles("pod", false) // TODO(jdef) assumes pods!! change this to properly support apps
ppp.constraints is empty or every(complyWithConstraintRules)
}

Expand Down Expand Up @@ -151,4 +168,5 @@ object SchedulingValidationMessages {
val ConstraintUniqueDoesNotAcceptValue = "UNIQUE does not accept a value"
val IllegalConstraintSpecification = "illegal constraint specification"
val ConstraintOperatorInvalid = "operator must be one of the following UNIQUE, CLUSTER, GROUP_BY, LIKE, MAX_PER, UNLIKE or IS"
val UpgradeStrategyMustBeDefinedWithPersistentVolumes = "upgrade strategy must be defined"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.util.UUID
import com.wix.accord._
import com.wix.accord.dsl._
import mesosphere.marathon.api.v2.Validation._
import mesosphere.marathon.api.v2.validation.PodsValidation
import mesosphere.marathon.core.deployment.impl.DeploymentPlanReverter
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.pod.{ MesosContainer, PodDefinition }
Expand Down Expand Up @@ -340,6 +341,7 @@ object DeploymentPlan {
def deploymentPlanValidator(): Validator[DeploymentPlan] = {
validator[DeploymentPlan] { plan =>
plan.createdOrUpdatedApps as "app" is every(AppDefinition.updateIsValid(plan.original))
plan.createdOrUpdatedPods as "pod" is every(PodsValidation.updateIsValid(plan.original))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ object AppDefinition extends GeneralPurposeCombinators {
appDef.secrets is valid(Secret.secretsValidator)
appDef.secrets is empty or featureEnabled(enabledFeatures, Features.SECRETS)
appDef.env is valid(EnvVarValue.envValidator)
appDef.acceptedResourceRoles is empty or valid(ResourceRole.validAcceptedResourceRoles(appDef.isResident))
appDef.acceptedResourceRoles is empty or valid(ResourceRole.validAcceptedResourceRoles("app", appDef.isResident))
appDef must complyWithGpuRules(enabledFeatures)
appDef must complyWithMigrationAPI
appDef must complyWithReadinessCheckRules
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/marathon/state/ResourceRole.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ object ResourceRole {
// "A role name must not include a space (\x20) character".

@SuppressWarnings(Array("ComparisonToEmptySet"))
def validAcceptedResourceRoles(isResident: Boolean): Validator[Set[String]] =
def validAcceptedResourceRoles(runSpecType: String, isResident: Boolean): Validator[Set[String]] =
validator[Set[String]] { acceptedResourceRoles =>
acceptedResourceRoles is notEmpty
acceptedResourceRoles.each is ResourceRole.validResourceRole
} and isTrue("""A resident app must have `acceptedResourceRoles = ["*"]`.""") { acceptedResourceRoles =>
} and isTrue(s"""A resident $runSpecType must have `acceptedResourceRoles = ["*"]`.""") { acceptedResourceRoles =>
!isResident || acceptedResourceRoles == Set(ResourceRole.Unreserved)
}

Expand Down
Loading

0 comments on commit 2bf46b3

Please sign in to comment.