Skip to content

Commit

Permalink
#306 Report a job as New when it never ran, but a rerun is requested.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Dec 1, 2023
1 parent 6cffc45 commit 13eae55
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ScheduleStrategySourcing extends ScheduleStrategy {
(trackedDays ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
getRerun(outputTable, runDate, schedule, infoDateExpression)
getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper)
case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) =>
log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate")
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ScheduleStrategyTransformation extends ScheduleStrategy {
(retrospective ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
getRerun(outputTable, runDate, schedule, infoDateExpression)
getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper)
case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) =>
log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate")
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ object ScheduleStrategyUtils {
private[core] def getRerun(outputTable: String,
runDate: LocalDate,
schedule: Schedule,
infoDateExpression: String
infoDateExpression: String,
bookkeeper: Bookkeeper
): List[TaskPreDef] = {
if (schedule.isEnabled(runDate)) {
val infoDate = evaluateRunDate(runDate, infoDateExpression)

log.info(s"Rerunning '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")

List(pipeline.TaskPreDef(infoDate, TaskRunReason.Rerun))
bookkeeper.getLatestDataChunk(outputTable, infoDate, infoDate) match {
case Some(_) =>
log.info(s"Rerunning '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")
List(pipeline.TaskPreDef(infoDate, TaskRunReason.Rerun))
case None =>
log.info(s"Running '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")
List(pipeline.TaskPreDef(infoDate, TaskRunReason.New))
}
} else {
log.info(s"The job for '$outputTable' is out of schedule $schedule for $runDate. Skipping...")
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(5))
Expand All @@ -140,6 +141,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
"earlier than the minimum date" in {
val bk = mock(classOf[Bookkeeper])

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(365), runDate.minusDays(365))).thenReturn(None)
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(365))
Expand Down Expand Up @@ -299,6 +301,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(9)))

"normal rerun" in {
Expand Down Expand Up @@ -511,6 +514,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(5))
Expand All @@ -526,6 +530,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])

when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
when(bk.getLatestDataChunk(outputTable, runDate.minusDays(365), runDate.minusDays(365))).thenReturn(None)

val params = ScheduleParams.Rerun(runDate.minusDays(365))

Expand Down Expand Up @@ -680,6 +685,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(9)))

"normal rerun" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.expr.exceptions.SyntaxErrorException
import za.co.absa.pramen.core.metastore.model.MetastoreDependency
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.{TaskPreDef, TaskRunReason}
import za.co.absa.pramen.core.model.DataChunk
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.TaskRunReason
import za.co.absa.pramen.core.runner.splitter.RunMode
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._
import za.co.absa.pramen.core.schedule.Schedule
Expand All @@ -35,22 +35,30 @@ class ScheduleStrategyUtilsSuite extends AnyWordSpec {
val date = LocalDate.of(2022, 2, 18)

"getRerun" should {
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestDataChunk("table", date.minusDays(1), date.minusDays(1))).thenReturn(Some(null))
when(bk.getLatestDataChunk("table", date, date)).thenReturn(None)

"return information date of the rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.EveryDay(), "@runDate - 1") == expected :: Nil)
assert(getRerun("table", date, Schedule.EveryDay(), "@runDate - 1", bk) == expected :: Nil)
}

"return information date of the rerun with New status if the job hasn't ran yet" in {
val expected = pipeline.TaskPreDef(date, TaskRunReason.New)

assert(getRerun("table", date, Schedule.EveryDay(), "@runDate", bk) == expected :: Nil)
}

"return information date of a non-daily run rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.Monthly(18 :: Nil), "@runDate - 1") == expected :: Nil)
assert(getRerun("table", date, Schedule.Monthly(18 :: Nil), "@runDate - 1", bk) == expected :: Nil)
}

"return nothing is out of schedule rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.Monthly(1 :: Nil), "@runDate - 1").isEmpty)
assert(getRerun("table", date, Schedule.Monthly(1 :: Nil), "@runDate - 1", bk).isEmpty)
}
}

Expand Down

0 comments on commit 13eae55

Please sign in to comment.