Skip to content

Commit

Permalink
Allow multiple health check shields per task id
Browse files Browse the repository at this point in the history
* Allow multiple health check shields per task id
  Shield is now identified by /v2/shield/<task_id>/<shield_name>

* Renames after live review

* Fixed tests

* Better error handling in the actor
  • Loading branch information
ezsilmar authored and Lqp1 committed Feb 4, 2021
1 parent ecf0a1c commit 05e80c6
Show file tree
Hide file tree
Showing 19 changed files with 492 additions and 235 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"taskId": "incubator_marathon-test-suite_ezhirov_health-check-shield-app.instance-4d0bc714-0fa5-11eb-9138-6ecaa32f70ed._app.1",
"shieldName": "RD-remdbg",
"until": "2020-10-16T12:17:10.856Z"
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
[
{
"taskId": "incubator_marathon-test-suite_ezhirov_health-check-shield-app.instance-4d0bc714-0fa5-11eb-9138-6ecaa32f70ed._app.1",
"shieldName": "RD-remdbg",
"until": "2020-10-16T12:17:10.856Z"
},
{
"taskId": "incubator_marathon-test-suite_ezhirov_health-check-shield-app.instance-63450465-0fa5-11eb-9138-6ecaa32f70ed._app.1",
"shieldName": "TA-memdump",
"until": "2020-10-16T12:17:48.184Z"
}
]
49 changes: 45 additions & 4 deletions docs/docs/rest-api/public/api/v2/shield.raml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ description: |
/:
description: |
List current health check shields
List all health check shields
get:
description: |
Lists all health check shields currently saved in the persistence storage.
Lists all health check shields.
Note that some shields could already be expired because the purge is done periodically.
responses:
200:
description: |
Json array containing the shield id and the timestamp when it will expire.
Json array containing the task id, shield name, and the timestamp when it will expire.
body:
application/json:
type: healthCheck.HealthCheckShield[]
Expand All @@ -21,14 +21,54 @@ description: |
Health check shield API is disabled.
/{taskId}:
description: |
List all health check shields of the task
get:
description: |
Lists all health check shields of the task.
Note that some shields could already be expired because the purge is done periodically.
responses:
200:
description: |
Json array containing the task id, shield name, and the timestamp when it will expire.
body:
application/json:
type: healthCheck.HealthCheckShield[]
example: !include examples/health-check-shield-get.json
403:
description: |
Health check shield API is disabled.
/{taskId}/{shieldName}:
description: |
Create, update, or delete a health check shield
uriParameters:
taskId:
type: strings.TaskId
description: |
Id of the task, as displayed in the Marathon UI
shieldName:
type: string
description: |
The name of the shield.
Allows to define multiple shields for a single task as well as to distinguish who put the shield
get:
description: |
If a shield with this name is defined for this task id, returns the info about this shield.
responses:
200:
description: |
Json object containing the task id, shield name, and the timestamp when it will expire.
body:
application/json:
type: healthCheck.HealthCheckShield
example: !include examples/health-check-shield-get-single.json
403:
description: |
Health check shield API is disabled.
404:
description: |
Health check with this name and task id does not exist
put:
description: |
Create or update the shield for the specified task.
Expand All @@ -37,6 +77,7 @@ description: |
Note that the shield operates on task level. After the corresponding Mesos container is gone the shield is useless.
Marathon could still kill the instance for another reason like deployment.
Mesos health checks are not supported as Mesos would kill the app without asking Marathon.
Multiple shields for the same application work in parallel. The app is protected until all of the shields are deleted or expired.
is: [ secured ]
queryParameters:
duration:
Expand Down
1 change: 1 addition & 0 deletions docs/docs/rest-api/public/api/v2/types/healthCheck.raml
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,5 @@ types:
type: object
properties:
taskId: string
shieldName: string
until: string
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import javax.ws.rs.core.Response.Status
import javax.ws.rs.core.Response
import javax.ws.rs.core.{Context, MediaType}
import mesosphere.marathon.api._
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.health.{HealthCheckShieldApi, HealthCheckShield}
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.plugin.auth.{Authenticator, Authorizer}
import mesosphere.marathon.raml.Raml
import mesosphere.marathon.raml.HealthConversion._
import mesosphere.marathon.api.RestResource.RestStreamingBody
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.state.Timestamp

import scala.async.Async._
import scala.concurrent.ExecutionContext
Expand All @@ -27,29 +28,73 @@ import scala.concurrent.duration._
@Produces(Array(MediaType.APPLICATION_JSON))
class HealthCheckShieldResource @Inject() (
val config: MarathonConf,
val healthCheckManager: HealthCheckManager,
val healthCheckShieldApi: HealthCheckShieldApi,
val groupManager: GroupManager,
val authenticator: Authenticator,
val authorizer: Authorizer)(implicit val executionContext: ExecutionContext) extends AuthResource with StrictLogging {

@GET
def index(
def getShields(
@Context req: HttpServletRequest, @Suspended asyncResponse: AsyncResponse): Unit = sendResponse(asyncResponse) {
async {
if (config.healthCheckShieldFeatureEnabled) {
implicit val identity = await(authenticatedAsync(req))
val shields = await (healthCheckManager.listShields())
val shields = await (healthCheckShieldApi.getShields())
ok(Raml.toRaml(shields))
} else {
featureDisabledError()
}
}
}

@PUT
@GET
@Path("{taskId}")
def getShields(
@PathParam("taskId") taskId: String,
@Context req: HttpServletRequest, @Suspended asyncResponse: AsyncResponse): Unit = sendResponse(asyncResponse) {
async {
if (config.healthCheckShieldFeatureEnabled) {
implicit val identity = await(authenticatedAsync(req))
val parsedTaskId = Task.Id.parse(taskId)
val shields = await (healthCheckShieldApi.getShields(parsedTaskId))
ok(Raml.toRaml(shields))
} else {
featureDisabledError()
}
}
}

@GET
@Path("{taskId}/{shieldName}")
def getShield(
@PathParam("taskId") taskId: String,
@PathParam("shieldName") shieldName: String,
@Context req: HttpServletRequest, @Suspended asyncResponse: AsyncResponse): Unit = sendResponse(asyncResponse) {
async {
if (config.healthCheckShieldFeatureEnabled) {
implicit val identity = await(authenticatedAsync(req))
val parsedTaskId = Task.Id.parse(taskId)
val shieldId = HealthCheckShield.Id(parsedTaskId, shieldName)
val maybeShield = await (healthCheckShieldApi.getShield(shieldId))
maybeShield match {
case Some(shield) => {
ok(Raml.toRaml(shield))
}
case None => {
Response.status(Status.NOT_FOUND).build()
}
}
} else {
featureDisabledError()
}
}
}

@PUT
@Path("{taskId}/{shieldName}")
def putHealthShield(
@PathParam("taskId") taskId: String,
@PathParam("shieldName") shieldName: String,
@QueryParam("duration")@DefaultValue("30 minutes") duration: String,
@Context req: HttpServletRequest, @Suspended asyncResponse: AsyncResponse): Unit = sendResponse(asyncResponse) {
async {
Expand All @@ -66,7 +111,8 @@ class HealthCheckShieldResource @Inject() (
val parsedDuration = Duration(duration)
if (parsedDuration.isFinite() && parsedDuration < config.healthCheckShieldMaxDuration) {
val ttl = parsedDuration.asInstanceOf[FiniteDuration]
await (healthCheckManager.enableShield(parsedTaskId, ttl))
val shield = HealthCheckShield(HealthCheckShield.Id(parsedTaskId, shieldName), Timestamp.now() + ttl)
await (healthCheckShieldApi.addOrUpdate(shield))
ok()
} else {
Response.status(Status.BAD_REQUEST).entity(new RestStreamingBody(raml.Error(s"The duration should be finite and less than ${config.healthCheckShieldMaxDuration.toMinutes} minutes"))).build()
Expand All @@ -81,9 +127,10 @@ class HealthCheckShieldResource @Inject() (
}

@DELETE
@Path("{taskId}")
@Path("{taskId}/{shieldName}")
def removeHealthShield(
@PathParam("taskId") taskId: String,
@PathParam("shieldName") shieldName: String,
@Context req: HttpServletRequest, @Suspended asyncResponse: AsyncResponse): Unit = sendResponse(asyncResponse) {
async {
if (config.healthCheckShieldFeatureEnabled) {
Expand All @@ -96,8 +143,8 @@ class HealthCheckShieldResource @Inject() (
if (maybeRunSpec.isDefined && config.healthCheckShieldAuthorizationEnabled) {
checkAuthorization(UpdateRunSpec, maybeRunSpec.get)
}

await (healthCheckManager.disableShield(parsedTaskId))
val shieldId = HealthCheckShield.Id(parsedTaskId, shieldName)
await (healthCheckShieldApi.remove(shieldId))
ok()
} else {
featureDisabledError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import mesosphere.marathon.core.appinfo.{AppInfoModule, AppInfoService, GroupInf
import mesosphere.marathon.core.deployment.DeploymentManager
import mesosphere.marathon.core.election.ElectionService
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.health.{HealthCheckManager, HealthCheckShieldApi}
import mesosphere.marathon.core.heartbeat.MesosHeartbeatMonitor
import mesosphere.marathon.core.instance.update.InstanceChangeHandler
import mesosphere.marathon.core.launcher.OfferProcessor
Expand Down Expand Up @@ -245,6 +245,9 @@ class CoreGuiceModule(cliConf: MarathonConf) extends AbstractModule {
@Provides @Singleton
def healthCheckManager(coreModule: CoreModule): HealthCheckManager = coreModule.healthModule.healthCheckManager

@Provides @Singleton
def healthCheckShieldApi(coreModule: CoreModule): HealthCheckShieldApi = coreModule.healthModule.healthCheckShieldApi

@Provides
@Singleton
def deploymentManager(coreModule: CoreModule): DeploymentManager = coreModule.deploymentModule.deploymentManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,4 @@ trait HealthCheckManager {
* Returns the health status of all instances of the supplied app.
*/
def statuses(appId: AbsolutePathId): Future[Map[Instance.Id, Seq[Health]]]

def enableShield(taskId: Task.Id, duration: FiniteDuration): Future[Done]

def disableShield(taskId: Task.Id): Future[Done]

def listShields(): Future[Seq[HealthCheckShield]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,33 @@ package core.health
import mesosphere.marathon.state.Timestamp
import mesosphere.marathon.core.task.Task

case class HealthCheckShield(taskId: Task.Id, until: Timestamp) {
case class HealthCheckShield(id: HealthCheckShield.Id, until: Timestamp) {
}

object HealthCheckShield {
case class Id(taskId: Task.Id, shieldName: String) {
lazy val idString = s"${taskId.idString}.${shieldName}"

override def toString: String = s"shield [${idString}]"
}

object Id {
private val IdRegex = """^(.+)[\.]([^\.]+)$""".r

def parse(idString: String): HealthCheckShield.Id = {
idString match {
case IdRegex(taskIdString, shieldName) => {
try {
val taskId = Task.Id.parse(taskIdString)
Id(taskId, shieldName)
} catch {
case e: MatchError => {
throw new MatchError(s"Error while parsing shieldId '$idString': '$e'")
}
}
}
case _ => throw new MatchError(s"shieldId '$idString' unrecognized format")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import mesosphere.marathon.core.health.impl.MarathonHealthCheckManager
import mesosphere.marathon.core.task.termination.KillService
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.storage.repository.HealthCheckShieldRepository
import mesosphere.marathon.core.health.impl.{HealthCheckShieldManager, HealthCheckShieldActor}
import mesosphere.marathon.core.health.impl.{HealthCheckShieldApiImpl, HealthCheckShieldActor}
import mesosphere.marathon.core.leadership.LeadershipModule

import scala.concurrent.ExecutionContext
Expand All @@ -26,17 +26,18 @@ class HealthModule(
conf: MarathonConf,
healthCheckShieldRepository: HealthCheckShieldRepository,
leadershipModule: LeadershipModule)(implicit mat: ActorMaterializer, ec: ExecutionContext) {
private val healthCheckShieldManager = new HealthCheckShieldManager(healthCheckShieldRepository, conf)
private val healthCheckShieldApiImpl = new HealthCheckShieldApiImpl(healthCheckShieldRepository, conf)
private val healthCheckShieldActor = leadershipModule.startWhenLeader(
HealthCheckShieldActor.props(healthCheckShieldManager, conf),
HealthCheckShieldActor.props(healthCheckShieldApiImpl, conf),
"HealthCheckShieldActor")

lazy val healthCheckShieldApi: HealthCheckShieldApi = healthCheckShieldApiImpl
lazy val healthCheckManager = new MarathonHealthCheckManager(
actorSystem,
killService,
eventBus,
instanceTracker,
groupManager,
conf,
healthCheckShieldManager)
healthCheckShieldApi)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mesosphere.marathon
package core.health

import mesosphere.marathon.core.task.Task
import mesosphere.marathon.state.Timestamp

import scala.concurrent.duration._

import scala.concurrent.Future
import akka.{Done}

trait HealthCheckShieldApi {
// Thread-safe and scalable read path used during health check execution
def isShielded(taskId: Task.Id): Boolean

// User API surface
def getShields(): Future[Seq[HealthCheckShield]]
def getShields(taskId: Task.Id): Future[Seq[HealthCheckShield]]
def getShield(shieldId: HealthCheckShield.Id): Future[Option[HealthCheckShield]]

def addOrUpdate(shield: HealthCheckShield): Future[Done]
def remove(shieldId: HealthCheckShield.Id): Future[Done]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[health] class HealthCheckActor(
instanceTracker: InstanceTracker,
eventBus: EventStream,
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed],
healthCheckShieldManager: HealthCheckShieldManager)
healthCheckShieldApi: HealthCheckShieldApi)
extends Actor with StrictLogging {

implicit val mat = ActorMaterializer()
Expand Down Expand Up @@ -118,7 +118,7 @@ private[health] class HealthCheckActor(
return
}

if (healthCheckShieldManager.isShielded(instance.appTask.taskId)) {
if (healthCheckShieldApi.isShielded(instance.appTask.taskId)) {
logger.info(s"[health-check-shield] app ${app.id} version ${app.version}. Won't kill $instanceId because the shield is enabled")
return
}
Expand Down Expand Up @@ -271,7 +271,7 @@ object HealthCheckActor {
instanceTracker: InstanceTracker,
eventBus: EventStream,
healthCheckHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed],
healthCheckShieldManager: HealthCheckShieldManager): Props = {
healthCheckShieldApi: HealthCheckShieldApi): Props = {

Props(new HealthCheckActor(
app,
Expand All @@ -281,7 +281,7 @@ object HealthCheckActor {
instanceTracker,
eventBus,
healthCheckHub,
healthCheckShieldManager))
healthCheckShieldApi))
}

// self-sent every healthCheck.intervalSeconds
Expand Down
Loading

0 comments on commit 05e80c6

Please sign in to comment.