Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Adding a POST info route
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jun 27, 2017
1 parent be19367 commit a5237d1
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 21 deletions.
58 changes: 57 additions & 1 deletion webservice/src/main/scala/dagr/webservice/ApiDataModels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,76 @@

package dagr.webservice

import java.time.Instant

import dagr.core.DagrDef.TaskId
import dagr.core.exec.ResourceSet
import dagr.core.tasksystem.Task.TaskInfoLike
import dagr.tasks.DagrDef.FilePath

import scala.util.matching.Regex

/** Stores the data to be returned by an end-point. Make sure that there exists a protocol and any custom JSON
* handling specified in [[DagrApiJsonSupport]].
*/
sealed abstract class DagrResponse

case class DagrVersionResponse(id: String) extends DagrResponse

case class DagrStatusResponse(infos: Iterable[TaskInfoLike]) extends DagrResponse
case class DagrTaskInfosResponse(infos: Iterable[TaskInfoLike]) extends DagrResponse

case class DagrTaskScriptResponse(script: Option[FilePath]) extends DagrResponse

case class DagrTaskLogResponse(log: Option[FilePath]) extends DagrResponse

case class DagrTaskInfoResponse(info: TaskInfoLike) extends DagrResponse

case class TaskInfoQuery
(
name: Seq[String] = Seq.empty, // any name pattern
status: Seq[String] = Seq.empty, // name
id: Seq[TaskId] = Seq.empty, // any task ids
attempts: Seq[Int] = Seq.empty, // any attempt #
minResources: Option[ResourceSet] = None,
maxResources: Option[ResourceSet] = None,
since: Option[Instant] = None, // since using statusTime
until: Option[Instant] = None, // until using statusTime,
dependsOn: Seq[TaskId] = Seq.empty,
dependents: Seq[TaskId] = Seq.empty
) {
// TODO: filter out regexes that match the same stuff?
private val nameRegexes: Seq[Regex] = this.name.map(_.r)

def get(infos: Iterable[TaskInfoLike]): Iterable[TaskInfoLike] = infos
.filter { info => this.nameRegexes.isEmpty || this.nameRegexes.exists(_.findFirstIn(info.task.name).isDefined) }
.filter { info => this.status.isEmpty || this.status.contains(info.status.name)}
.filter { info => this.id.isEmpty || info.id.forall(this.id.contains(_)) }
.filter { info => this.attempts.isEmpty || this.attempts.contains(info.attempts) }
.filter { info => leq(this.minResources, info.resources) }
.filter { info => geq(this.maxResources, info.resources) }
.filter { info => this.since.forall(s => s.compareTo(info.statusTime) <= 0) }
.filter { info => this.until.forall(u => u.compareTo(info.statusTime) <= 0) }
.filter { info => this.dependsOn.isEmpty || intersects(this.dependsOn, info.task.tasksDependedOn.flatMap(_.taskInfo.id).toSeq) }
.filter { info => this.dependents.isEmpty || intersects(this.dependents, info.task.tasksDependingOnThisTask.flatMap(_.taskInfo.id).toSeq) }

private def intersects(left: Seq[TaskId], right: Seq[TaskId]): Boolean = {
left.exists { l => right.contains(l) }
}


/** Returns true left is not defined, or if left and right are defined and left is <= right in both cores and memory,
* false otherwise. */
private def leq(left: Option[ResourceSet], right: Option[ResourceSet]): Boolean = (left, right) match {
case (None, _) => true
case (Some(l), Some(r)) => l.cores <= r.cores && l.memory <= r.memory
case (Some(_), None) => false
}

/** Returns true left is not defined, or if left and right are defined and left is >= right in both cores and memory,
* false otherwise. */
private def geq(left: Option[ResourceSet], right: Option[ResourceSet]): Boolean = (left, right) match {
case (None, _) => true
case (Some(l), Some(r)) => l.cores >= r.cores && l.memory >= r.memory
case (Some(_), None) => false
}
}
30 changes: 20 additions & 10 deletions webservice/src/main/scala/dagr/webservice/DagrApiHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import akka.actor.{Actor, ActorSystem, Props}
import akka.util.Timeout
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.DagrDef._
import dagr.core.tasksystem.Task.TaskInfoLike
import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus}
import dagr.webservice.PerRequest.RequestComplete
import spray.http.StatusCodes
import spray.httpx.SprayJsonSupport._
Expand All @@ -43,10 +43,11 @@ object DagrApiHandler {

sealed trait DagrRequest
final case class DagrVersionRequest() extends DagrRequest
final case class DagrStatusRequest() extends DagrRequest
final case class DagrTaskInfoRequest(id: TaskId) extends DagrRequest
final case class DagrTaskInfosRequest(status: Option[TaskStatus] = None) extends DagrRequest
final case class DagrTaskInfoQueryRequest(query: TaskInfoQuery) extends DagrRequest
final case class DagrTaskScriptRequest(id: TaskId) extends DagrRequest
final case class DagrTaskLogRequest(id: TaskId) extends DagrRequest
final case class DagrTaskInfoRequest(id: TaskId) extends DagrRequest
}

/** Receives a request, performs the appropriate logic, and sends back a response */
Expand All @@ -58,16 +59,18 @@ class DagrApiHandler(val taskInfoTracker: TaskInfoTracker) extends Actor with Da
implicit val system: ActorSystem = context.system

override def receive: PartialFunction[Any, Unit] = {
case DagrVersionRequest() =>
case DagrVersionRequest() =>
context.parent ! RequestComplete(StatusCodes.OK, DagrVersionResponse(DagrApiService.version))
case DagrStatusRequest() =>
context.parent ! RequestComplete(StatusCodes.OK, DagrStatusResponse(taskInfoTracker.infos))
case DagrTaskScriptRequest(id) =>
case DagrTaskInfoRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfoResponse(info)))
case DagrTaskInfosRequest(status) =>
context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfosResponse(getInfos(status)))
case DagrTaskInfoQueryRequest(query) =>
context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfosResponse(query.get(taskInfoTracker.infos)))
case DagrTaskScriptRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskScriptResponse(info.script)))
case DagrTaskLogRequest(id) =>
case DagrTaskLogRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskLogResponse(info.log)))
case DagrTaskInfoRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfoResponse(info)))
}

/** Handles the case that the specific task identifier does not exist and sends back a bad request message, otherwise,
Expand All @@ -79,5 +82,12 @@ class DagrApiHandler(val taskInfoTracker: TaskInfoTracker) extends Actor with Da
case None => context.parent ! RequestComplete(StatusCodes.NotFound, s"Task with id '$id' not found")
}
}

private def getInfos(status: Option[TaskStatus] = None): Iterable[TaskInfoLike] = {
status match {
case None => taskInfoTracker.infos
case Some(st) => taskInfoTracker.infos.filter { info => info.status == st }
}
}
}

49 changes: 43 additions & 6 deletions webservice/src/main/scala/dagr/webservice/DagrApiJsonSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ import com.sun.xml.internal.ws.encoding.soap.DeserializationException
import dagr.core.DagrDef._
import dagr.core.exec.{Cores, Memory, ResourceSet}
import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus, TimePoint}
import spray.httpx.SprayJsonSupport
import spray.json._

import scala.language.implicitConversions

/** Methods for formatting custom types in JSON. */
trait DagrApiJsonSupport extends DefaultJsonProtocol {
implicit val dagrVersionResponseProtocol: RootJsonFormat[DagrVersionResponse] = jsonFormat1(DagrVersionResponse)
implicit val dagrStatusResponseProtocol: RootJsonFormat[DagrStatusResponse] = jsonFormat1(DagrStatusResponse)
implicit val dagrTaskScriptResponseProtocol: RootJsonFormat[DagrTaskScriptResponse] = jsonFormat1(DagrTaskScriptResponse)
implicit val dagrTaskLogResponseProtocol: RootJsonFormat[DagrTaskLogResponse] = jsonFormat1(DagrTaskLogResponse)
implicit val dagrTaskInfoResponseProtocol: RootJsonFormat[DagrTaskInfoResponse] = jsonFormat1(DagrTaskInfoResponse)
trait DagrApiJsonSupport extends DefaultJsonProtocol with SprayJsonSupport {
implicit val dagrVersionResponseProtocol: RootJsonFormat[DagrVersionResponse] = jsonFormat1(DagrVersionResponse)
implicit val dagrTaskInfoResponseProtocol: RootJsonFormat[DagrTaskInfoResponse] = jsonFormat1(DagrTaskInfoResponse)
implicit val dagrInfosResponseProtocol: RootJsonFormat[DagrTaskInfosResponse] = jsonFormat1(DagrTaskInfosResponse)
implicit val dagrTaskScriptResponseProtocol: RootJsonFormat[DagrTaskScriptResponse] = jsonFormat1(DagrTaskScriptResponse)
implicit val dagrTaskLogResponseProtocol: RootJsonFormat[DagrTaskLogResponse] = jsonFormat1(DagrTaskLogResponse)

def taskInfoTracker: TaskInfoTracker

Expand Down Expand Up @@ -150,6 +151,42 @@ trait DagrApiJsonSupport extends DefaultJsonProtocol {
}
}

implicit object TaskInfoQueryFormat extends RootJsonFormat[TaskInfoQuery] {
override def write(query: TaskInfoQuery): JsObject = {
var map = Map.empty[String, JsValue]

map += ("name" -> query.name.toList.toJson)
map += ("status" -> query.status.toList.toJson)
map += ("id" -> query.id.toList.toJson)
map += ("attempts" -> query.attempts.toList.toJson)
query.minResources.foreach { r => map += ("min_resources" -> r.toJson) }
query.maxResources.foreach { r => map += ("max_resources" -> r.toJson) }
query.since.foreach { s => map += "since" -> s.toJson }
query.until.foreach { u => map += "until" -> u.toJson}
map += ("depends_on" -> query.dependsOn.toList.toJson)
map += ("dependents" -> query.dependents.toList.toJson)

JsObject(map)
}

override def read(json: JsValue): TaskInfoQuery = {
val jsObject = json.asJsObject

TaskInfoQuery(
name = if (jsObject.getFields("name").isEmpty) Seq.empty else jsObject.fields("name").convertTo[List[String]],
status = if (jsObject.getFields("status").isEmpty) Seq.empty else jsObject.fields("status").convertTo[List[String]],
id = if (jsObject.getFields("id").isEmpty) Seq.empty else jsObject.fields("id").convertTo[List[Int]].map(BigInt(_)),
attempts = if (jsObject.getFields("attempts").isEmpty) Seq.empty else jsObject.fields("attempts").convertTo[List[Int]],
minResources = if (jsObject.getFields("min_resources").isEmpty) None else jsObject.fields("min_resources").convertTo[Option[ResourceSet]],
maxResources = if (jsObject.getFields("max_resources").isEmpty) None else jsObject.fields("max_resources").convertTo[Option[ResourceSet]],
since = if (jsObject.getFields("since").isEmpty) None else jsObject.fields("since").convertTo[Option[Instant]],
until = if (jsObject.getFields("until").isEmpty) None else jsObject.fields("until").convertTo[Option[Instant]],
dependsOn = if (jsObject.getFields("depends_on").isEmpty) Seq.empty else jsObject.fields("depends_on").convertTo[List[Int]].map(BigInt(_)),
dependents = if (jsObject.getFields("dependents").isEmpty) Seq.empty else jsObject.fields("dependents").convertTo[List[Int]].map(BigInt(_))
)
}
}

/*
private def pathOrNone(json: JsObject, key: String): Option[FilePath] = {
if (json.getFields(key).nonEmpty) json.fields(key).convertTo[Option[FilePath]] else None
Expand Down
40 changes: 36 additions & 4 deletions webservice/src/main/scala/dagr/webservice/DagrApiService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ package dagr.webservice
import akka.actor.{Actor, ActorContext, Props}
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.DagrDef.TaskId
import spray.http.StatusCodes
import spray.http._
import spray.http.StatusCodes._
import spray.routing._
import spray.util.LoggingContext

import spray.json._
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -67,13 +67,35 @@ class DagrApiServiceActor(taskInfoTracker: TaskInfoTracker) extends HttpServiceA
}
}

/** A simple trait to set CORS headers. Mix this in and wrap your routes with [[CORSDirectives.respondWithCORSHeaders()]] */
trait CORSDirectives {
this: HttpService =>

def respondWithCORSHeaders(origin: AllowedOrigins): Directive0 = respondWithHeaders(HttpHeaders.`Access-Control-Allow-Origin`(origin))

def corsFilter(origin: String)(route: Route): Route = {
if (origin == "*") {
respondWithCORSHeaders(AllOrigins)(route)
}
else {
optionalHeaderValueByName("Origin") {
case None => route
case Some(clientOrigin) if origin == clientOrigin =>
respondWithCORSHeaders(SomeOrigins(Seq(HttpOrigin(origin))))(route)
case Some(_) =>
complete(Forbidden, Nil, "Invalid origin") // Maybe, a Rejection will fit better
}
}
}
}

object DagrApiService {
val version: String = "v1" // TODO: match versions
val root: String = "service"
}

/** Defines the possible routes for the Dagr service */
abstract class DagrApiService(taskInfoTracker: TaskInfoTracker) extends HttpService with PerRequestCreator {
abstract class DagrApiService(val taskInfoTracker: TaskInfoTracker) extends HttpService with PerRequestCreator with DagrApiJsonSupport {
import DagrApiService._

def routes: Route = versionRoute ~ taskScriptRoute ~ taskLogRoute ~ infoRoute
Expand Down Expand Up @@ -116,7 +138,14 @@ abstract class DagrApiService(taskInfoTracker: TaskInfoTracker) extends HttpServ
pathPrefix(root / version / "info") {
pathEnd {
get {
requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrStatusRequest())
requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrTaskInfosRequest())
}
} ~
pathEnd {
post {
entity(as[TaskInfoQuery]) { query =>
requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrTaskInfoQueryRequest(query))
}
}
} ~
pathPrefix(IntNumber) { (id) =>
Expand All @@ -128,6 +157,9 @@ abstract class DagrApiService(taskInfoTracker: TaskInfoTracker) extends HttpServ
}
}
}

}
}

def rawJson = extract { _.request.entity.asString}
}

0 comments on commit a5237d1

Please sign in to comment.