Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#306 Report a job as New when it never ran, but a rerun is requested. #308

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ScheduleStrategySourcing extends ScheduleStrategy {
(trackedDays ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
getRerun(outputTable, runDate, schedule, infoDateExpression)
getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper)
case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) =>
log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate")
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ScheduleStrategyTransformation extends ScheduleStrategy {
(retrospective ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
getRerun(outputTable, runDate, schedule, infoDateExpression)
getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper)
case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) =>
log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate")
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ object ScheduleStrategyUtils {
private[core] def getRerun(outputTable: String,
runDate: LocalDate,
schedule: Schedule,
infoDateExpression: String
infoDateExpression: String,
bookkeeper: Bookkeeper
): List[TaskPreDef] = {
if (schedule.isEnabled(runDate)) {
val infoDate = evaluateRunDate(runDate, infoDateExpression)

log.info(s"Rerunning '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")

List(pipeline.TaskPreDef(infoDate, TaskRunReason.Rerun))
bookkeeper.getLatestDataChunk(outputTable, infoDate, infoDate) match {
case Some(_) =>
log.info(s"Rerunning '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")
List(pipeline.TaskPreDef(infoDate, TaskRunReason.Rerun))
case None =>
log.info(s"Running '$outputTable' for date $runDate. Info date = '$infoDateExpression' = $infoDate.")
List(pipeline.TaskPreDef(infoDate, TaskRunReason.New))
}
} else {
log.info(s"The job for '$outputTable' is out of schedule $schedule for $runDate. Skipping...")
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(5))
Expand All @@ -140,6 +141,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
"earlier than the minimum date" in {
val bk = mock(classOf[Bookkeeper])

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(365), runDate.minusDays(365))).thenReturn(None)
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(365))
Expand Down Expand Up @@ -299,6 +301,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(9)))

"normal rerun" in {
Expand Down Expand Up @@ -511,6 +514,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Rerun(runDate.minusDays(5))
Expand All @@ -526,6 +530,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])

when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
when(bk.getLatestDataChunk(outputTable, runDate.minusDays(365), runDate.minusDays(365))).thenReturn(None)

val params = ScheduleParams.Rerun(runDate.minusDays(365))

Expand Down Expand Up @@ -680,6 +685,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
val infoDateExpression = "@runDate - 2"

when(bk.getLatestDataChunk(outputTable, runDate.minusDays(7), runDate.minusDays(7))).thenReturn(Some(null))
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(9)))

"normal rerun" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.expr.exceptions.SyntaxErrorException
import za.co.absa.pramen.core.metastore.model.MetastoreDependency
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.{TaskPreDef, TaskRunReason}
import za.co.absa.pramen.core.model.DataChunk
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.TaskRunReason
import za.co.absa.pramen.core.runner.splitter.RunMode
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._
import za.co.absa.pramen.core.schedule.Schedule
Expand All @@ -35,22 +35,30 @@ class ScheduleStrategyUtilsSuite extends AnyWordSpec {
val date = LocalDate.of(2022, 2, 18)

"getRerun" should {
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestDataChunk("table", date.minusDays(1), date.minusDays(1))).thenReturn(Some(null))
when(bk.getLatestDataChunk("table", date, date)).thenReturn(None)

"return information date of the rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.EveryDay(), "@runDate - 1") == expected :: Nil)
assert(getRerun("table", date, Schedule.EveryDay(), "@runDate - 1", bk) == expected :: Nil)
}

"return information date of the rerun with New status if the job hasn't ran yet" in {
val expected = pipeline.TaskPreDef(date, TaskRunReason.New)

assert(getRerun("table", date, Schedule.EveryDay(), "@runDate", bk) == expected :: Nil)
}

"return information date of a non-daily run rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.Monthly(18 :: Nil), "@runDate - 1") == expected :: Nil)
assert(getRerun("table", date, Schedule.Monthly(18 :: Nil), "@runDate - 1", bk) == expected :: Nil)
}

"return nothing is out of schedule rerun" in {
val expected = pipeline.TaskPreDef(date.minusDays(1), TaskRunReason.Rerun)

assert(getRerun("table", date, Schedule.Monthly(1 :: Nil), "@runDate - 1").isEmpty)
assert(getRerun("table", date, Schedule.Monthly(1 :: Nil), "@runDate - 1", bk).isEmpty)
}
}

Expand Down