Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Respond back with Mesos Source field exactly as we've received it (#6750
Browse files Browse the repository at this point in the history
)

Backport of fc5b115

Respond back with Mesos Source field exactly as we've received it (#6749)

Mesos can introduce new fields. By simply replying with the same data structure
that we were given, we don't have to add explicit support for the new fields.

Also, bump Mesos dependency to 1.7.1-rc1

JIRA Issues: MARATHON-8539
  • Loading branch information
Tim Harper authored Dec 21, 2018
1 parent b4c2299 commit 48bfd60
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 48 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ object Dependency {
val Logstash = "4.9"
val MarathonApiConsole = "3.0.8-accept"
val MarathonUI = "1.3.1"
val Mesos = "1.5.0"
val Mesos = "1.7.1-rc1"
val Mustache = "0.9.0"
val PlayJson = "2.6.7"
val Raven = "8.0.3"
Expand Down
73 changes: 41 additions & 32 deletions src/main/scala/mesosphere/marathon/state/Volume.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,21 @@ object HostVolume {
}

case class DiskSource(
diskType: DiskType, path: Option[String],
id: Option[String], metadata: Option[Map[String, String]], profileName: Option[String]) {
diskType: DiskType, source: Option[Source]) {

lazy val id: Option[String] = source.filter(_.hasId).map(_.getId)
lazy val path: Option[String] =
if (diskType == DiskType.Mount)
source.filter(_.hasMount).map(_.getMount.getRoot)
else
source.filter(_.hasPath).map(_.getPath.getRoot)
lazy val metadata: Option[Map[String, String]] = source.map { s => s.getMetadata.fromProto }
lazy val profileName: Option[String] = source.filter(_.hasProfile).map(_.getProfile)

if (diskType == DiskType.Root)
require(path.isEmpty, "Path is not allowed for diskType")
else
require(path.isDefined, "Path is required for non-root diskTypes")
require(path.nonEmpty, "Path is required for non-root diskTypes")

override def toString: String = {
val diskTypeStr = path match {
Expand All @@ -125,43 +134,43 @@ case class DiskSource(
components.mkString(";")
}

def asMesos: Option[Source] = (path, diskType) match {
case (None, DiskType.Root) =>
None
case (Some(p), DiskType.Path | DiskType.Mount) =>
val bld = Source.newBuilder
diskType.toMesos.foreach(bld.setType)
if (diskType == DiskType.Mount)
bld.setMount(Source.Mount.newBuilder().setRoot(p))
else
bld.setPath(Source.Path.newBuilder().setRoot(p))
id.foreach(bld.setId)
metadata.foreach(metadata => bld.setMetadata(metadata.toMesosLabels))
profileName.foreach(bld.setProfile)
Some(bld.build)
case (_, _) =>
throw new RuntimeException("invalid state")
}
def asMesos: Option[Source] = source
}

object DiskSource {
val root = DiskSource(DiskType.Root, None, None, None, None)
val root = DiskSource(DiskType.Root, None)

def fromMesos(source: Option[Source]): DiskSource = {
val diskType = DiskType.fromMesosType(source.map(_.getType))
val id = source.flatMap(s => if (s.hasId) Some(s.getId) else None)
val metadata = source.flatMap { source =>
if (source.hasMetadata) Some(source.getMetadata.fromProto) else None
}
val profileName = source.flatMap(s => if (s.hasProfile) Some(s.getProfile) else None)
diskType match {
DiskSource(diskType, source)
}
/**
* Create a Mesos protobuf for testing purposes; We should always prefer to send the same source protobuf that Mesos
* sends us in order to reply back with new fields Mesos introduces in the future
*/
def fromParams(diskType: DiskType, path: Option[String],
id: Option[String], metadata: Option[Map[String, String]], profileName: Option[String], vendor: Option[String]): DiskSource = {

val source = diskType match {
case DiskType.Root =>
DiskSource(DiskType.Root, None, id, metadata, profileName)
case DiskType.Mount =>
DiskSource(DiskType.Mount, Some(source.get.getMount.getRoot), id, metadata, profileName)
case DiskType.Path =>
DiskSource(DiskType.Path, Some(source.get.getPath.getRoot), id, metadata, profileName)
None
case DiskType.Path | DiskType.Mount =>
val bld = Source.newBuilder
diskType.toMesos.foreach(bld.setType)
val p = path.getOrElse(throw new IllegalArgumentException("Path is required for Mount or Path volumes"))
if (diskType == DiskType.Mount)
bld.setMount(Source.Mount.newBuilder().setRoot(p))
else
bld.setPath(Source.Path.newBuilder().setRoot(p))
id.foreach(bld.setId)
metadata.foreach(metadata => bld.setMetadata(metadata.toMesosLabels))
profileName.foreach(bld.setProfile)
vendor.foreach(bld.setVendor)
Some(bld.build)
case _ =>
throw new RuntimeException("invalid state")
}
DiskSource(diskType, source)
}
}

Expand Down
21 changes: 9 additions & 12 deletions src/test/scala/mesosphere/marathon/state/VolumeTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,23 @@ class VolumeTest extends UnitTest {
}

"validating that DiskSource asMesos converts to an Option Mesos Protobuffer" in {
DiskSource(DiskType.Root, None, None, None, None).asMesos shouldBe None
val Some(pathDisk) = DiskSource(DiskType.Path, Some("/path/to/folder"), None, None, None).asMesos
DiskSource(DiskType.Root, None).asMesos shouldBe None
val Some(pathDisk) = DiskSource.fromParams(DiskType.Path, Some("/path/to/folder"), None, None, None, None).asMesos
pathDisk.getPath.getRoot shouldBe "/path/to/folder"
pathDisk.getType shouldBe Source.Type.PATH
pathDisk.hasId shouldBe false
pathDisk.hasMetadata shouldBe false
pathDisk.hasProfile shouldBe false

val Some(mountDisk) = DiskSource(DiskType.Mount, Some("/path/to/mount"), None, None, None).asMesos
val Some(mountDisk) = DiskSource.fromParams(DiskType.Mount, Some("/path/to/mount"), None, None, None, None).asMesos
mountDisk.getMount.getRoot shouldBe "/path/to/mount"
mountDisk.getType shouldBe Source.Type.MOUNT
pathDisk.hasId shouldBe false
pathDisk.hasMetadata shouldBe false
pathDisk.hasProfile shouldBe false

val Some(pathCsiDisk) = DiskSource(DiskType.Path, Some("/path/to/folder"),
Some("csiPathDisk"), Some(Map("pathKey" -> "pathValue")), Some("pathProfile")).asMesos
val Some(pathCsiDisk) = DiskSource.fromParams(DiskType.Path, Some("/path/to/folder"),
Some("csiPathDisk"), Some(Map("pathKey" -> "pathValue")), Some("pathProfile"), None).asMesos
pathCsiDisk.getPath.getRoot shouldBe "/path/to/folder"
pathCsiDisk.getType shouldBe Source.Type.PATH
pathCsiDisk.getId shouldBe "csiPathDisk"
Expand All @@ -173,8 +173,8 @@ class VolumeTest extends UnitTest {
pathCsiDisk.getMetadata.getLabels(0).getValue shouldBe "pathValue"
pathCsiDisk.getProfile shouldBe "pathProfile"

val Some(mountCsiDisk) = DiskSource(DiskType.Mount, Some("/path/to/mount"),
Some("csiMountDisk"), Some(Map("mountKey" -> "mountValue")), Some("mountProfile")).asMesos
val Some(mountCsiDisk) = DiskSource.fromParams(DiskType.Mount, Some("/path/to/mount"),
Some("csiMountDisk"), Some(Map("mountKey" -> "mountValue")), Some("mountProfile"), None).asMesos
mountCsiDisk.getMount.getRoot shouldBe "/path/to/mount"
mountCsiDisk.getType shouldBe Source.Type.MOUNT
mountCsiDisk.getId shouldBe "csiMountDisk"
Expand All @@ -184,13 +184,10 @@ class VolumeTest extends UnitTest {
mountCsiDisk.getProfile shouldBe "mountProfile"

a[IllegalArgumentException] shouldBe thrownBy {
DiskSource(DiskType.Root, Some("/path"), None, None, None).asMesos
DiskSource.fromParams(DiskType.Path, None, None, None, None, None).asMesos
}
a[IllegalArgumentException] shouldBe thrownBy {
DiskSource(DiskType.Path, None, None, None, None).asMesos
}
a[IllegalArgumentException] shouldBe thrownBy {
DiskSource(DiskType.Mount, None, None, None, None).asMesos
DiskSource.fromParams(DiskType.Mount, None, None, None, None, None).asMesos
}
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/test/scala/mesosphere/mesos/ResourceMatcherTest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package mesosphere.mesos

import java.util.UUID

import com.google.protobuf.UnknownFieldSet
import mesosphere.UnitTest
import mesosphere.marathon.Protos.Constraint
import mesosphere.marathon.Protos.Constraint.Operator
Expand All @@ -25,7 +28,6 @@ import mesosphere.util.state.FrameworkId
import org.apache.mesos.Protos.Attribute
import org.scalatest.Inside
import org.scalatest.prop.TableDrivenPropertyChecks
import java.util.UUID

import scala.collection.immutable.Seq

Expand Down Expand Up @@ -576,6 +578,47 @@ class ResourceMatcherTest extends UnitTest with Inside with TableDrivenPropertyC
noMatch.reasons should not contain NoOfferMatchReason.InsufficientPorts
}

"resource matcher preserves unknown fields on the Source protobuf object" in {
val disk = MarathonTestHelper.pathDisk("/path1")

val diskWithUnknownFields = disk.toBuilder.setSource(
disk.getSource.toBuilder.setUnknownFields(
UnknownFieldSet.newBuilder
.addField(254, UnknownFieldSet.Field.newBuilder().addFixed32(100).build)
.build()).build).build

val offerWithUnrecognizedSourceField = MarathonTestHelper.makeBasicOffer()
.addResources(MarathonTestHelper.scalarResource("disk", 1024.0,
disk = Some(diskWithUnknownFields)))
.build()

val volume = VolumeWithMount(
PersistentVolume(
name = None,
persistent = PersistentVolumeInfo(
size = 128,
`type` = DiskType.Path)),
VolumeMount(None, "/var/lib/data"))

val app = AppDefinition(
id = "/test".toRootPath,
resources = Resources(
cpus = 1.0,
mem = 128.0,
disk = 0.0),
container = Some(Container.Mesos(
volumes = List(volume))),
versionInfo = OnlyVersion(Timestamp(2)))

inside(ResourceMatcher.matchResources(
offerWithUnrecognizedSourceField, app,
knownInstances = Seq(),
ResourceSelector.reservable, config, Seq.empty)) {
case m: ResourceMatchResponse.Match =>
m.resourceMatch.localVolumes.head.source.asMesos.get.getUnknownFields.getField(254).getFixed32List.get(0) shouldBe 100
}
}

"match resources success with constraints and old tasks in previous version" in {
val offer = MarathonTestHelper.makeBasicOffer(beginPort = 0, endPort = 0)
.addAttributes(TextAttribute("region", "pl-east"))
Expand Down Expand Up @@ -710,9 +753,9 @@ class ResourceMatcherTest extends UnitTest with Inside with TableDrivenPropertyC

resourceMatchResponse shouldBe a[ResourceMatchResponse.Match]
resourceMatchResponse.asInstanceOf[ResourceMatchResponse.Match].resourceMatch.scalarMatch("disk").get.consumed.toSet shouldBe Set(
DiskResourceMatch.Consumption(1024.0, "*", None, None, DiskSource(DiskType.Path, Some("/path2"), None, None, None),
DiskResourceMatch.Consumption(1024.0, "*", None, None, DiskSource.fromParams(DiskType.Path, Some("/path2"), None, None, None, None),
Some(VolumeWithMount(persistentVolume, mount))),
DiskResourceMatch.Consumption(476.0, "*", None, None, DiskSource(DiskType.Path, Some("/path2"), None, None, None),
DiskResourceMatch.Consumption(476.0, "*", None, None, DiskSource.fromParams(DiskType.Path, Some("/path2"), None, None, None, None),
Some(VolumeWithMount(persistentVolume, mount))))
}

Expand Down

0 comments on commit 48bfd60

Please sign in to comment.