Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5094 from mesosphere/ju/dependencies_1.1
Browse files Browse the repository at this point in the history
Initial stab at making deployment plans cheaper. Back port of https://phabricator.mesosphere.com/D476
  • Loading branch information
timcharper authored Feb 3, 2017
2 parents a47e598 + 788dde5 commit e53912f
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 84 deletions.
11 changes: 5 additions & 6 deletions src/main/scala/mesosphere/marathon/state/Group.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ case class Group(
result
}

def dependencyGraph: DirectedGraph[AppDefinition, DefaultEdge] = {
lazy val dependencyGraph: DirectedGraph[AppDefinition, DefaultEdge] = {
require(id.isRoot)
val graph = new DefaultDirectedGraph[AppDefinition, DefaultEdge](classOf[DefaultEdge])
for (app <- transitiveApps)
graph.addVertex(app)
Expand Down Expand Up @@ -226,7 +227,7 @@ object Group {
group.id is validPathWithBase(base)
group.apps is every(AppDefinition.validNestedAppDefinition(group.id.canonicalPath(base)))
group is noAppsAndGroupsWithSameName
(group.id.isRoot is false) or (group.dependencies is noCyclicDependencies(group))
group is conditional[Group](_.id.isRoot)(noCyclicDependencies)
group is validPorts
group.groups is every(valid(validNestedGroup(group.id.canonicalPath(base))))
}
Expand All @@ -245,10 +246,8 @@ object Group {
clashingIds.isEmpty
}

private def noCyclicDependencies(group: Group): Validator[Set[PathId]] =
isTrue("Dependency graph has cyclic dependencies.") { _ =>
group.hasNonCyclicDependencies
}
private def noCyclicDependencies: Validator[Group] =
isTrue("Dependency graph has cyclic dependencies.") { _.hasNonCyclicDependencies }

private def validPorts: Validator[Group] = {
new Validator[Group] {
Expand Down
12 changes: 11 additions & 1 deletion src/main/scala/mesosphere/marathon/state/PathId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,19 @@ case class PathId(path: List[String], absolute: Boolean = true) extends Ordered[
path.zip(definition.path).forall { case (left, right) => left == right }
}

override def toString: String = toString("/")
override val toString: String = toString("/")

private def toString(delimiter: String): String = path.mkString(if (absolute) delimiter else "", delimiter, "")

override def equals(obj: Any): Boolean = {
obj match {
case that: PathId => (that eq this) || (that.toString == toString)
case _ => false
}
}

override def hashCode(): Int = toString.hashCode()

override def compare(that: PathId): Int = {
import Ordering.Implicits._
val seqOrder = implicitly(Ordering[List[String]])
Expand Down
31 changes: 25 additions & 6 deletions src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ object DeploymentPlan {
* similar logic.
*/
private[upgrade] def appsGroupedByLongestPath(
affectedAppIds: Set[PathId],
group: Group): SortedMap[Int, Set[AppDefinition]] = {

import org.jgrapht.DirectedGraph
Expand All @@ -193,9 +194,11 @@ object DeploymentPlan {

}

val unsortedEquivalenceClasses = group.transitiveApps.groupBy { app =>
longestPathFromVertex(group.dependencyGraph, app).length
}
val unsortedEquivalenceClasses = group.transitiveApps
.filter(app => affectedAppIds.contains(app.id))
.groupBy { app =>
longestPathFromVertex(group.dependencyGraph, app).length
}

SortedMap(unsortedEquivalenceClasses.toSeq: _*)
}
Expand All @@ -204,12 +207,12 @@ object DeploymentPlan {
* Returns a sequence of deployment steps, the order of which is derived
* from the topology of the target group's dependency graph.
*/
def dependencyOrderedSteps(original: Group, target: Group,
def dependencyOrderedSteps(original: Group, target: Group, affectedIds: Set[PathId],
toKill: Map[PathId, Iterable[Task]]): Seq[DeploymentStep] = {
val originalApps: Map[PathId, AppDefinition] =
original.transitiveApps.map(app => app.id -> app).toMap

val appsByLongestPath: SortedMap[Int, Set[AppDefinition]] = appsGroupedByLongestPath(target)
val appsByLongestPath: SortedMap[Int, Set[AppDefinition]] = appsGroupedByLongestPath(affectedIds, target)

appsByLongestPath.valuesIterator.map { (equivalenceClass: Set[AppDefinition]) =>
val actions: Set[DeploymentAction] = equivalenceClass.flatMap { (newApp: AppDefinition) =>
Expand Down Expand Up @@ -281,6 +284,22 @@ object DeploymentPlan {
}.to[Seq]
)

// applications that are either new or the specs are different should be considered for the dependency graph
val addedOrChanged: Set[PathId] = targetApps.flatMap {
case (appId, spec) =>
if (!originalApps.contains(appId) ||
(originalApps.contains(appId) && originalApps(appId) != spec)) {
// the above could be optimized/refined further by checking the version info. The tests are actually
// really bad about structuring this correctly though, so for now, we just make sure that
// the specs are different (or brand new)
Some(appId)
}
else {
None
}
}(collection.breakOut)
val affectedApplications = addedOrChanged ++ (originalApps.keySet -- targetApps.keySet)

// 3. For each app in each dependency class,
//
// A. If this app is new, scale to the target number of instances.
Expand All @@ -293,7 +312,7 @@ object DeploymentPlan {
// the old app or the new app, whichever is less.
// ii. Restart the app, up to the new target number of instances.
//
steps ++= dependencyOrderedSteps(original, target, toKill)
steps ++= dependencyOrderedSteps(original, target, affectedApplications, toKill)

// Build the result.
val result = DeploymentPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ class MarathonSchedulerActorTest extends MarathonActorSupport
upgradeStrategy = UpgradeStrategy(0.5),
versionInfo = AppDefinition.VersionInfo.forNewConfig(Timestamp(0))
)
val origGroup = Group(PathId("/foo/bar"), Set(app))
val origGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val appNew = app.copy(
cmd = Some("cmd new"),
versionInfo = AppDefinition.VersionInfo.forNewConfig(Timestamp(1000))
)

val targetGroup = Group(PathId("/foo/bar"), Set(appNew))
val targetGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(appNew))))

val plan = DeploymentPlan("foo", origGroup, targetGroup, Nil, Timestamp.now())

Expand Down Expand Up @@ -328,7 +328,7 @@ class MarathonSchedulerActorTest extends MarathonActorSupport
upgradeStrategy = UpgradeStrategy(0.5),
versionInfo = AppDefinition.VersionInfo.forNewConfig(Timestamp(0))
)
val group = Group(PathId("/foo/bar"), Set(app))
val group = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val plan = DeploymentPlan(Group.empty, group)

Expand Down Expand Up @@ -365,7 +365,7 @@ class MarathonSchedulerActorTest extends MarathonActorSupport
upgradeStrategy = UpgradeStrategy(0.5),
versionInfo = AppDefinition.VersionInfo.forNewConfig(Timestamp(0))
)
val group = Group(PathId("/foo/bar"), Set(app))
val group = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val plan = DeploymentPlan(Group.empty, group)

Expand Down Expand Up @@ -409,7 +409,7 @@ class MarathonSchedulerActorTest extends MarathonActorSupport

test("Forced deployment") {
val app = AppDefinition(id = PathId("app1"), cmd = Some("cmd"), instances = 2, upgradeStrategy = UpgradeStrategy(0.5))
val group = Group(PathId("/foo/bar"), Set(app))
val group = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val plan = DeploymentPlan(Group.empty, group)

Expand Down Expand Up @@ -437,7 +437,7 @@ class MarathonSchedulerActorTest extends MarathonActorSupport

test("Cancellation timeout") {
val app = AppDefinition(id = PathId("app1"), cmd = Some("cmd"), instances = 2, upgradeStrategy = UpgradeStrategy(0.5))
val group = Group(PathId("/foo/bar"), Set(app))
val group = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val plan = DeploymentPlan(Group.empty, group)

Expand Down
17 changes: 9 additions & 8 deletions src/test/scala/mesosphere/marathon/state/GroupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,15 @@ class GroupTest extends FunSpec with GivenWhenThen with Matchers {

it("detects a cyclic dependency graph") {
Given("a group with cyclic dependencies")
val current: Group = Group("/test".toPath, groups = Set(
Group("/test/database".toPath, groups = Set(
Group("/test/database/mongo".toPath, Set(AppDefinition("/test/database/mongo/m1".toPath, dependencies = Set("/test/service".toPath))))
)),
Group("/test/service".toPath, groups = Set(
Group("/test/service/service1".toPath, Set(AppDefinition("/test/service/service1/srv1".toPath, dependencies = Set("/test/database".toPath))))
))
))
val current: Group = Group.empty.copy(groups = Set(
Group("/test".toPath, groups = Set(
Group("/test/database".toPath, groups = Set(
Group("/test/database/mongo".toPath, Set(AppDefinition("/test/database/mongo/m1".toPath, dependencies = Set("/test/service".toPath))))
)),
Group("/test/service".toPath, groups = Set(
Group("/test/service/service1".toPath, Set(AppDefinition("/test/service/service1/srv1".toPath, dependencies = Set("/test/database".toPath))))
))
))))

Then("the cycle is detected")
current.hasNonCyclicDependencies should equal(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class DeploymentActorTest
val app2 = AppDefinition(id = PathId("/app2"), cmd = Some("cmd"), instances = 1)
val app3 = AppDefinition(id = PathId("/app3"), cmd = Some("cmd"), instances = 1)
val app4 = AppDefinition(id = PathId("/app4"), cmd = Some("cmd"))
val origGroup = Group(PathId("/foo/bar"), Set(app1, app2, app4))
val origGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app1, app2, app4))))

val version2 = AppDefinition.VersionInfo.forNewConfig(Timestamp(1000))
val app1New = app1.copy(instances = 1, versionInfo = version2)
val app2New = app2.copy(instances = 2, cmd = Some("otherCmd"), versionInfo = version2)

val targetGroup = Group(PathId("/foo/bar"), Set(app1New, app2New, app3))
val targetGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app1New, app2New, app3))))

// setting started at to 0 to make sure this survives
val task1_1 = MarathonTestHelper.runningTask("task1_1", appVersion = app1.version, startedAt = 0)
Expand Down Expand Up @@ -118,12 +118,12 @@ class DeploymentActorTest
val managerProbe = TestProbe()
val receiverProbe = TestProbe()
val app = AppDefinition(id = PathId("/app1"), cmd = Some("cmd"), instances = 2)
val origGroup = Group(PathId("/foo/bar"), Set(app))
val origGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val version2 = AppDefinition.VersionInfo.forNewConfig(Timestamp(1000))
val appNew = app.copy(cmd = Some("cmd new"), versionInfo = version2)

val targetGroup = Group(PathId("/foo/bar"), Set(appNew))
val targetGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(appNew))))

val task1_1 = MarathonTestHelper.runningTask("task1_1", appVersion = app.version, startedAt = 0)
val task1_2 = MarathonTestHelper.runningTask("task1_2", appVersion = app.version, startedAt = 1000)
Expand Down Expand Up @@ -171,11 +171,11 @@ class DeploymentActorTest
val receiverProbe = TestProbe()

val app = AppDefinition(id = PathId("/app1"), cmd = Some("cmd"), instances = 0)
val origGroup = Group(PathId("/foo/bar"), Set(app))
val origGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app))))

val version2 = AppDefinition.VersionInfo.forNewConfig(Timestamp(1000))
val appNew = app.copy(cmd = Some("cmd new"), versionInfo = version2)
val targetGroup = Group(PathId("/foo/bar"), Set(appNew))
val targetGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(appNew))))

val plan = DeploymentPlan("foo", origGroup, targetGroup, List(DeploymentStep(List(RestartApplication(appNew)))), Timestamp.now())

Expand All @@ -196,12 +196,12 @@ class DeploymentActorTest
val managerProbe = TestProbe()
val receiverProbe = TestProbe()
val app1 = AppDefinition(id = PathId("/app1"), cmd = Some("cmd"), instances = 3)
val origGroup = Group(PathId("/foo/bar"), Set(app1))
val origGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app1))))

val version2 = AppDefinition.VersionInfo.forNewConfig(Timestamp(1000))
val app1New = app1.copy(instances = 2, versionInfo = version2)

val targetGroup = Group(PathId("/foo/bar"), Set(app1New))
val targetGroup = Group(PathId("/"), groups = Set(Group(PathId("/foo/bar"), Set(app1New))))

val task1_1 = MarathonTestHelper.runningTask("task1_1", appVersion = app1.version, startedAt = 0)
val task1_2 = MarathonTestHelper.runningTask("task1_2", appVersion = app1.version, startedAt = 500)
Expand Down
Loading

0 comments on commit e53912f

Please sign in to comment.