Skip to content

Commit

Permalink
Implementation of zombie cluster detection (#587)
Browse files Browse the repository at this point in the history
* Implementation of zombie cluster detection

* Minor tweaks

* Don't zombify creating clusters, and fix error message

* Also check the project's billing

* Update libs dependency for bugfix

* PR feedback

* Add unit test to make sure Google errors don't cause zombification

* Update wb-libs to non-SNAP
  • Loading branch information
rtitle authored Sep 24, 2018
1 parent 5464cf3 commit 0052cee
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 5 deletions.
9 changes: 9 additions & 0 deletions leonardo-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,12 @@ autoFreeze {
#This is the polling period to poll for clusters that can be auto stopped/frozen
autoFreezeCheckScheduler = 1 minutes
}

# Configures the "zombie cluster" monitor, which periodically sweeps the database and checks for clusters
# and projects that no longer exist in Google.
zombieClusterMonitor {
# Whether the zombie cluster monitor is enabled
enableZombieClusterMonitor = true
# The period to check for zombie clusters and projects
pollPeriod = 30 minutes
}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Dependencies {

val workbenchUtilV = "0.3-0e9d080"
val workbenchModelV = "0.11-2bddd5b"
val workbenchGoogleV = "0.16-7c48765"
val workbenchGoogleV = "0.16-4fe117d"
val workbenchMetricsV = "0.3-c5b80d2"

val samV = "1.0-5cdffb4"
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ jupyterConfig {
# https://*.npmjs.org and 'unsafe-eval' needed for jupyterlab
contentSecurityPolicy = "frame-ancestors 'self' http://localhost:3000 https://bvdp-saturn-prod.appspot.com https://bvdp-saturn-dev.appspot.com https://localhost:443; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://apis.google.com ; style-src 'self' 'unsafe-inline'; connect-src 'self' wss://*.broadinstitute.org:* wss://notebooks.firecloud.org:* *.googleapis.com https://*.npmjs.org"
}

zombieClusterMonitor {
enableZombieClusterMonitor = true
pollPeriod = 30 minutes
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import org.broadinstitute.dsde.workbench.google.GoogleCredentialModes.{Pem, Token}
import org.broadinstitute.dsde.workbench.google.{GoogleStorageDAO, HttpGoogleIamDAO, HttpGoogleStorageDAO}
import org.broadinstitute.dsde.workbench.google.{GoogleStorageDAO, HttpGoogleIamDAO, HttpGoogleStorageDAO, HttpGoogleProjectDAO}
import org.broadinstitute.dsde.workbench.leonardo.api.{LeoRoutes, StandardUserInfoDirectives}
import org.broadinstitute.dsde.workbench.leonardo.auth.{LeoAuthProviderHelper, ServiceAccountProviderHelper}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SamConfig, SwaggerConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SamConfig, SwaggerConfig, ZombieClusterConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.{HttpJupyterDAO, HttpSamDAO}
import org.broadinstitute.dsde.workbench.leonardo.dao.google.{HttpGoogleComputeDAO, HttpGoogleDataprocDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.DbReference
import org.broadinstitute.dsde.workbench.leonardo.dns.ClusterDnsCache
import org.broadinstitute.dsde.workbench.leonardo.model.google.{ClusterStatus, NetworkTag, VPCNetworkName, VPCSubnetName}
import org.broadinstitute.dsde.workbench.leonardo.monitor.{ClusterDateAccessedActor, ClusterMonitorSupervisor}
import org.broadinstitute.dsde.workbench.leonardo.monitor.{ClusterDateAccessedActor, ClusterMonitorSupervisor, ZombieClusterMonitor}
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorSupervisor._
import org.broadinstitute.dsde.workbench.leonardo.service.{LeonardoService, ProxyService, StatusService}
import org.broadinstitute.dsde.workbench.leonardo.util.BucketHelper
Expand Down Expand Up @@ -52,6 +52,7 @@ object Boot extends App with LazyLogging {
val samConfig = config.as[SamConfig]("sam")
val autoFreezeConfig = config.as[AutoFreezeConfig]("autoFreeze")
val contentSecurityPolicy = config.as[Option[String]]("jupyterConfig.contentSecurityPolicy").getOrElse("default-src: 'self'")
val zombieClusterMonitorConfig = config.as[ZombieClusterConfig]("zombieClusterMonitor")

// we need an ActorSystem to host our application in
implicit val system = ActorSystem("leonardo")
Expand Down Expand Up @@ -80,13 +81,15 @@ object Boot extends App with LazyLogging {
val googleComputeDAO = new HttpGoogleComputeDAO(dataprocConfig.applicationName, Pem(leoServiceAccountEmail, leoServiceAccountPemFile), "google")
val googleIamDAO = new HttpGoogleIamDAO(dataprocConfig.applicationName, Pem(leoServiceAccountEmail, leoServiceAccountPemFile), "google")
val googleStorageDAO = new HttpGoogleStorageDAO(dataprocConfig.applicationName, Pem(leoServiceAccountEmail, leoServiceAccountPemFile), "google")
val googleProjectDAO = new HttpGoogleProjectDAO(dataprocConfig.applicationName, Pem(leoServiceAccountEmail, leoServiceAccountPemFile), "google")
val samDAO = new HttpSamDAO(samConfig.server)
val clusterDnsCache = system.actorOf(ClusterDnsCache.props(proxyConfig, dbRef))
val jupyterDAO = new HttpJupyterDAO(clusterDnsCache)
val bucketHelper = new BucketHelper(dataprocConfig, gdDAO, googleComputeDAO, googleStorageDAO, serviceAccountProvider)
val clusterMonitorSupervisor = system.actorOf(ClusterMonitorSupervisor.props(monitorConfig, dataprocConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, clusterDnsCache, authProvider, autoFreezeConfig, jupyterDAO))
val leonardoService = new LeonardoService(dataprocConfig, clusterFilesConfig, clusterResourcesConfig, clusterDefaultsConfig, proxyConfig, swaggerConfig, autoFreezeConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, petGoogleStorageDAO, dbRef, clusterMonitorSupervisor, authProvider, serviceAccountProvider, whitelist, bucketHelper, contentSecurityPolicy)
val clusterDateAccessedActor = system.actorOf(ClusterDateAccessedActor.props(autoFreezeConfig, dbRef))
val zombieClusterMonitor = system.actorOf(ZombieClusterMonitor.props(zombieClusterMonitorConfig, gdDAO, googleProjectDAO, dbRef))
val proxyService = new ProxyService(proxyConfig, gdDAO, dbRef, clusterDnsCache, authProvider, clusterDateAccessedActor)
val statusService = new StatusService(gdDAO, samDAO, dbRef, dataprocConfig)
val leoRoutes = new LeoRoutes(leonardoService, proxyService, statusService, swaggerConfig) with StandardUserInfoDirectives
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.broadinstitute.dsde.workbench.leonardo.config

import scala.concurrent.duration.FiniteDuration

case class ZombieClusterConfig(enableZombieClusterDetection: Boolean,
zombieCheckPeriod: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ package object config {
toScalaDuration(config.getDuration("autoFreezeAfter")),
toScalaDuration(config.getDuration("autoFreezeCheckScheduler"))
)
}

implicit val zombieClusterConfig: ValueReader[ZombieClusterConfig] = ValueReader.relative { config =>
ZombieClusterConfig(
config.getBoolean("enableZombieClusterMonitor"),
toScalaDuration(config.getDuration("pollPeriod"))
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.broadinstitute.dsde.workbench.leonardo.monitor

import java.time.Instant

import akka.actor.{Actor, Props, Timers}
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.workbench.google.GoogleProjectDAO
import org.broadinstitute.dsde.workbench.leonardo.config.ZombieClusterConfig
import org.broadinstitute.dsde.workbench.leonardo.dao.google.GoogleDataprocDAO
import org.broadinstitute.dsde.workbench.leonardo.db.DbReference
import org.broadinstitute.dsde.workbench.leonardo.model.google.ClusterStatus
import org.broadinstitute.dsde.workbench.leonardo.model.{Cluster, ClusterError}
import org.broadinstitute.dsde.workbench.leonardo.monitor.ZombieClusterMonitor._
import org.broadinstitute.dsde.workbench.model.google.GoogleProject

import scala.concurrent.Future

object ZombieClusterMonitor {

def props(config: ZombieClusterConfig, gdDAO: GoogleDataprocDAO, googleProjectDAO: GoogleProjectDAO, dbRef: DbReference): Props = {
Props(new ZombieClusterMonitor(config, gdDAO, googleProjectDAO, dbRef))
}

sealed trait ZombieClusterMonitorMessage
case object DetectZombieClusters extends ZombieClusterMonitorMessage
case object TimerKey extends ZombieClusterMonitorMessage
}

/**
* This monitor periodically sweeps the Leo database and checks for clusters which no longer exist in Google.
*/
class ZombieClusterMonitor(config: ZombieClusterConfig, gdDAO: GoogleDataprocDAO, googleProjectDAO: GoogleProjectDAO, dbRef: DbReference) extends Actor with Timers with LazyLogging {
import context._

override def preStart(): Unit = {
super.preStart()
timers.startPeriodicTimer(TimerKey, DetectZombieClusters, config.zombieCheckPeriod)
}

override def receive: Receive = {
case DetectZombieClusters =>
// Get active clusters from the Leo DB, grouped by project
val zombieClusters = getActiveClustersFromDatabase.flatMap { clusterMap =>
clusterMap.toList.flatTraverse { case (project, clusters) =>
// Check if the project is active
isProjectActiveInGoogle(project).flatMap {
case true =>
// If the project is active, check each individual cluster
logger.debug(s"Project ${project.value} containing ${clusters.size} clusters is active in Google")
clusters.toList.traverseFilter { cluster =>
isClusterActiveInGoogle(cluster).map {
case true =>
logger.debug(s"Cluster ${cluster.projectNameString} is active in Google")
None
case false =>
logger.debug(s"Cluster ${cluster.projectNameString} is a zombie!")
Some(cluster)
}
}
case false =>
// If the project is inactive, all clusters in the project are zombies
logger.debug(s"Project ${project.value} containing ${clusters.size} clusters is inactive in Google")
Future.successful(clusters.toList)
}
}
}

// Error out each detected zombie cluster
zombieClusters.flatMap { cs =>
logger.info(s"Detected ${cs.size} zombie clusters across ${cs.map(_.googleProject).toSet.size} projects.")
cs.traverse { cluster =>
handleZombieCluster(cluster)
}
}

}

private def getActiveClustersFromDatabase: Future[Map[GoogleProject, Seq[Cluster]]] = {
dbRef.inTransaction {
_.clusterQuery.listActive
} map { clusters =>
clusters.groupBy(_.googleProject)
}
}

private def isProjectActiveInGoogle(googleProject: GoogleProject): Future[Boolean] = {
// Check the project and its billing info
(googleProjectDAO.isProjectActive(googleProject.value) |@| googleProjectDAO.isBillingActive(googleProject.value))
.map(_ && _)
.recover { case e =>
logger.warn(s"Unable to check status of project ${googleProject.value} for zombie cluster detection", e)
true
}
}

private def isClusterActiveInGoogle(cluster: Cluster): Future[Boolean] = {
// Clusters in Creating status may not yet exist in Google. Therefore treat all Creating clusters as active.
if (cluster.status == ClusterStatus.Creating) {
Future.successful(true)
} else {
// Check if status returned by GoogleDataprocDAO is an "active" status.
gdDAO.getClusterStatus(cluster.googleProject, cluster.clusterName) map { clusterStatus =>
ClusterStatus.activeStatuses contains clusterStatus
} recover { case e =>
logger.warn(s"Unable to check status of cluster ${cluster.projectNameString} for zombie cluster detection", e)
true
}
}
}

private def handleZombieCluster(cluster: Cluster): Future[Unit] = {
logger.info(s"Erroring zombie cluster: ${cluster.projectNameString}")
dbRef.inTransaction { dataAccess =>
for {
_ <- dataAccess.clusterQuery.updateClusterStatus(cluster.id, ClusterStatus.Error)
error = ClusterError("An underlying resource was removed in Google. Please delete and recreate your cluster.", -1, Instant.now)
_ <- dataAccess.clusterErrorQuery.save(cluster.id, error)
} yield ()
}.void
}
}
5 changes: 5 additions & 0 deletions src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ jupyterConfig {
# https://*.npmjs.org and 'unsafe-eval' needed for jupyterlab
contentSecurityPolicy = "frame-ancestors 'self' http://localhost:3000 https://bvdp-saturn-prod.appspot.com https://bvdp-saturn-dev.appspot.com https://localhost:443; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://apis.google.com ; style-src 'self' 'unsafe-inline'; connect-src 'self' wss://*.broadinstitute.org:* *.googleapis.com https://*.npmjs.org"
}

zombieClusterMonitor {
enableZombieClusterMonitor = true
pollPeriod = 1 second
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import net.ceedubs.ficus.Ficus._
import org.broadinstitute.dsde.workbench.google.mock.MockGoogleDataprocDAO
import org.broadinstitute.dsde.workbench.leonardo.auth.WhitelistAuthProvider
import org.broadinstitute.dsde.workbench.leonardo.auth.sam.MockPetClusterServiceAccountProvider
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SwaggerConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SwaggerConfig, ZombieClusterConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.google.MockGoogleComputeDAO
import org.broadinstitute.dsde.workbench.leonardo.dao.{MockJupyterDAO, MockSamDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.TestComponent
Expand Down Expand Up @@ -52,6 +52,7 @@ trait CommonTestData{ this: ScalaFutures =>
val proxyConfig = config.as[ProxyConfig]("proxy")
val swaggerConfig = config.as[SwaggerConfig]("swagger")
val autoFreezeConfig = config.as[AutoFreezeConfig]("autoFreeze")
val zombieClusterConfig = config.as[ZombieClusterConfig]("zombieClusterMonitor")
val clusterUrlBase = dataprocConfig.clusterUrlBase
val serviceAccountsConfig = config.getConfig("serviceAccounts.config")
val monitorConfig = config.as[MonitorConfig]("monitor")
Expand Down
Loading

0 comments on commit 0052cee

Please sign in to comment.