Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Replay
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jun 9, 2017
1 parent e4e7933 commit 5fbb3b2
Show file tree
Hide file tree
Showing 10 changed files with 580 additions and 31 deletions.
25 changes: 21 additions & 4 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.net.InetAddress
import java.nio.file.{Files, Path}
import java.text.DecimalFormat

import com.fulcrumgenomics.commons.CommonsDef.unreachable
import com.fulcrumgenomics.commons.CommonsDef.{FilePath, unreachable}
import com.fulcrumgenomics.commons.io.{Io, PathUtil}
import com.fulcrumgenomics.commons.util.{LazyLogging, LogLevel, Logger}
import com.fulcrumgenomics.sopt.cmdline.{CommandLineProgramParserStrings, ValidationException}
Expand All @@ -41,7 +41,8 @@ import dagr.core.execsystem._
import dagr.core.execsystem2.GraphExecutor
import dagr.core.execsystem2.{TopLikeStatusReporter => TopLikeStatusReporter2}
import dagr.core.execsystem2.local.LocalTaskExecutor
import dagr.core.reporting.{FinalStatusReporter, PeriodicRefreshingReporter, Terminal}
import dagr.core.execsystem2.replay.TaskCache
import dagr.core.reporting.{ExecutionLogger, FinalStatusReporter, PeriodicRefreshingReporter, Terminal}
import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -112,7 +113,9 @@ class DagrCoreArgs(
@arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.")
var interactive: Boolean = false,
@arg(doc = "Use the experimental execution system.")
val experimentalExecution: Boolean = false
val experimentalExecution: Boolean = false,
@arg(doc = "Attempt to replay using the provided replay log")
val replayLog: Option[FilePath] = None
) extends LazyLogging {

// These are not optional, but are only populated during configure()
Expand Down Expand Up @@ -184,7 +187,12 @@ class DagrCoreArgs(
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))
// FIXME: should this path be exposed on the command line?
val executionLog = {
if (report == Io.StdOut.toAbsolutePath) None // FIXME
else Some(report.getParent.resolve("execution_log.txt"))
}

if (interactive && !Terminal.supportsAnsi) {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
Expand All @@ -210,6 +218,15 @@ class DagrCoreArgs(
)
graphExecutor.withLogger(reporter)
}
executionLog.foreach { log =>
val executionLogger = new ExecutionLogger(log)
graphExecutor.withLogger(executionLogger)
graphExecutor.withTaskRegister(executionLogger)
}
this.replayLog.foreach { log =>
val taskCache = TaskCache(log)
graphExecutor.withTaskCache(taskCache)
}
(graphExecutor, graphExecutor execute pipeline)
}
else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/TaskStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ sealed trait TaskStatus extends RootTaskStatus {
override def toString: String = description
}

object TaskStatus extends {
object TaskStatus {
/** Checks if a task with a given status is done.
*
* @param taskStatus the status of the task
Expand Down
43 changes: 39 additions & 4 deletions core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
package dagr.core.execsystem2

import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.execsystem2.replay.TaskCache
import dagr.core.reporting.FinalStatusReporter
import dagr.core.tasksystem.{Retry, Task}
import dagr.core.reporting.ReportingDef.{TaskLogger, TaskRegister}
import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo}
import dagr.core.tasksystem.{Retry, Task}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}

/** Coordinates between the dependency graph ([[DependencyGraph]]) and task executor ([[TaskExecutor]]) given a (root)
* task to execute.
Expand All @@ -51,6 +53,14 @@ trait GraphExecutor[T<:Task] extends FinalStatusReporter {
/** Adds the [[TaskLogger]] to the list of loggers to be notified when a task's status is updated. */
def withLogger(logger: TaskLogger): this.type

/** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */
def withTaskRegister(register: TaskRegister): this.type

/** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */
def withTaskCache(taskCache: TaskCache): this.type = {
withTaskRegister(taskCache)
}

/** Returns the executor that execute tasks. */
protected def taskExecutor: TaskExecutor[T]

Expand Down Expand Up @@ -88,9 +98,25 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
/** A list of [[TaskLogger]]s that will be notified when a task's status is updated. */
private val loggers: ListBuffer[TaskLogger] = ListBuffer[TaskLogger](new TaskStatusLogger)

/** A list of [[TaskRegister]]s that will be notified when a list of tasks is returned by [[Task.getTasks]]. */
private val registers: ListBuffer[TaskRegister] = ListBuffer[TaskRegister]()

/** A list of [[TaskCache]] to use to determine if a task should be manually succeeded. */
private val taskCaches: ListBuffer[TaskCache] = ListBuffer[TaskCache]()

/** Adds the [[TaskLogger]] to the list of loggers to be notified when a task's status is updated. */
def withLogger(logger: TaskLogger): this.type = { this.loggers.append(logger); this }

/** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */
def withTaskRegister(register: TaskRegister): this.type = { this.registers.append(register); this }

/** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */
override def withTaskCache(taskCache: TaskCache): this.type = {
super.withTaskCache(taskCache)
this.taskCaches.append(taskCache)
this
}

/** The tasks currently known by the executor. */
def tasks: Traversable[Task] = this._tasks

Expand All @@ -105,6 +131,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
val rootFuture: Future[Task] = failFutureWithTaskStatus(rootTask) {
Future {
lockIt {
this.registers.foreach(_.register(rootTask, rootTask))
registerTask(rootTask) match {
case None => throw new IllegalArgumentException(s"Task '${rootTask.name}' already attempted.")
case Some(false) => throw new IllegalArgumentException(s"Task '${rootTask.name}' depends on ${rootTask.tasksDependedOn.size} tasks.")
Expand Down Expand Up @@ -168,9 +195,17 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
buildTask(parent) flatMap {
case x :: Nil if x == parent => // one task and it returned itself, so execute it
requireNoDependencies(parent)
executeWithTaskExecutor(parent)
registers.foreach(_.register(parent, parent))
if (this.taskCaches.isEmpty || this.taskCaches.exists(_.execute(parent))) {
executeWithTaskExecutor(parent)
}
else {
updateMetadata(parent, ManuallySucceeded)
Future.successful(parent)
}
case childTasks => // a different task, or more than one task, so build those tasks and execute them
requireNoDependencies(parent)
registers.foreach(_.register(parent, childTasks:_*))
executeMultiTask(parent, childTasks)
}
}
Expand Down Expand Up @@ -360,7 +395,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
}

/** Provides synchronization and signals that this may block. */
private def lockIt[T](body: =>T): T = blocking { this.lock.synchronized(body) }
private def lockIt[A](body: =>A): A = blocking { this.lock.synchronized(body) }

/** Set the status to the failed and add the throwable to the failures map for this task */
private def fail(task: Task, thr: Throwable, status: TaskStatus): Unit = lockIt {
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/scala/dagr/core/execsystem2/TaskStatusLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.execsystem2

import com.fulcrumgenomics.commons.util.{LazyLogging, Logger}
import dagr.core.reporting.ReportingDef.TaskLogger
import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo}

/** A simple logger that delegates to [[dagr.core.tasksystem.Task.TaskInfo#logTaskMessage]]. */
class TaskStatusLogger extends TaskLogger {
// NB: rename this class to set the name on the command line
private class Dagr extends LazyLogging {
override lazy val logger: Logger = new Logger(getClass)
}
private val logger = new Dagr().logger
def record(info: RootTaskInfo): Unit = info.logTaskMessage(this.logger)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,15 @@ package dagr.core.execsystem2

import java.io.ByteArrayOutputStream

import com.fulcrumgenomics.commons.util.{LazyLogging, Logger}
import dagr.core.execsystem.SystemResources
import dagr.core.execsystem2.ExecDef.concurrentSet
import dagr.core.reporting.ReportingDef.TaskLogger
import dagr.core.reporting.{TopLikeStatusReporter => BaseTopLikeStatusReporter}
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo}
import dagr.core.execsystem2.ExecDef.concurrentSet

import scala.collection.mutable

/** Base trait for all classes interested in when the task status changes for any task. */
trait TaskLogger {
/** The method that will be called with updated task information. */
def record(info: RootTaskInfo): Unit
}

/** A simple logger that delegates to [[dagr.core.tasksystem.Task.TaskInfo#logTaskMessage]]. */
class TaskStatusLogger extends TaskLogger {
// NB: rename this class to set the name on the command line
private class Dagr extends LazyLogging {
override lazy val logger: Logger = new Logger(getClass)
}
private val logger = new Dagr().logger
def record(info: RootTaskInfo): Unit = info.logTaskMessage(this.logger)
}

/** A simple top-like status reporter for [[dagr.core.execsystem2.GraphExecutor]].
* @param systemResources the system resources used while executing
* @param loggerOut the stream to which log messages are written, or none if no stream is available.
Expand Down
Loading

0 comments on commit 5fbb3b2

Please sign in to comment.