Skip to content

Commit

Permalink
Merge pull request #2249 from constantine2nd/develop
Browse files Browse the repository at this point in the history
Scheduler
  • Loading branch information
simonredfern committed Aug 23, 2023
2 parents b9d6194 + 02668d5 commit 5247b4e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
3 changes: 2 additions & 1 deletion obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ import code.productfee.ProductFee
import code.products.MappedProduct
import code.ratelimiting.RateLimiting
import code.remotedata.RemotedataActors
import code.scheduler.{DatabaseDriverScheduler, MetricsArchiveScheduler}
import code.scheduler.{DatabaseDriverScheduler, JobScheduler, MetricsArchiveScheduler}
import code.scope.{MappedScope, MappedUserScope}
import code.snippet.{OAuthAuthorisation, OAuthWorkedThanks}
import code.socialmedia.MappedSocialMedia
Expand Down Expand Up @@ -1044,6 +1044,7 @@ object ToSchemify {
// The following tables are accessed directly via Mapper / JDBC
val models: List[MetaMapper[_]] = List(
AuthUser,
JobScheduler,
MappedETag,
AtmAttribute,
Admin,
Expand Down
9 changes: 9 additions & 0 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3233,6 +3233,15 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
throw JsonResponseException(jsonResponse)
case _ => it
}
} map { result =>
result._1 match {
case Failure(msg, t, c) =>
(
fullBoxOrException(result._1 ~> APIFailureNewStyle(msg, 401, Some(cc.toLight))),
result._2
)
case _ => result
}
}
}

Expand Down
28 changes: 28 additions & 0 deletions obp-api/src/main/scala/code/scheduler/JobScheduler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package code.scheduler

import code.util.MappedUUID
import net.liftweb.mapper._

class JobScheduler extends JobSchedulerTrait with LongKeyedMapper[JobScheduler] with IdPK with CreatedUpdated {

def getSingleton = JobScheduler

object JobId extends MappedUUID(this)
object Name extends MappedString(this, 100)
object ApiInstanceId extends MappedString(this, 100)

override def primaryKey: Long = id.get
override def jobId: String = JobId.get
override def name: String = Name.get
override def apiInstanceId: String = ApiInstanceId.get

}

object JobScheduler extends JobScheduler with LongKeyedMetaMapper[JobScheduler] {
override def dbIndexes: List[BaseIndex[JobScheduler]] = UniqueIndex(JobId) :: super.dbIndexes
}





8 changes: 8 additions & 0 deletions obp-api/src/main/scala/code/scheduler/JobSchedulerTrait.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package code.scheduler

trait JobSchedulerTrait {
def primaryKey: Long
def jobId: String
def name: String
def apiInstanceId: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import java.util.concurrent.TimeUnit
import java.util.{Calendar, Date}

import code.actorsystem.ObpLookupSystem
import code.api.util.APIUtil.generateUUID
import code.api.util.{APIUtil, OBPLimit, OBPToDate}
import code.metrics.{APIMetric, APIMetrics, MappedMetric, MetricArchive}
import code.util.Helper.MdcLoggable
import net.liftweb.common.Full
import net.liftweb.mapper.{By, By_<=}
import net.liftweb.mapper.{By, By_<=, By_>=}

import scala.concurrent.duration._

Expand All @@ -19,18 +20,46 @@ object MetricsArchiveScheduler extends MdcLoggable {
implicit lazy val executor = actorSystem.dispatcher
private lazy val scheduler = actorSystem.scheduler
private val oneDayInMillis: Long = 86400000
private val jobName = "MetricsArchiveScheduler"
private val apiInstanceId = APIUtil.getPropsValue("api_instance_id", "NOT_SET")

def start(intervalInSeconds: Long): Unit = {
logger.info("Hello from MetricsArchiveScheduler.start")

logger.info(s"--------- Clean up Jobs ---------")
logger.info(s"Delete all Jobs created by api_instance_id=$apiInstanceId")
JobScheduler.findAll(By(JobScheduler.Name, apiInstanceId)).map { i =>
println(s"Job name: ${i.name}, Date: ${i.createdAt}")
i
}.map(_.delete_!)
logger.info(s"Delete all Jobs older than 5 days")
val fiveDaysAgo: Date = new Date(new Date().getTime - (oneDayInMillis * 5))
JobScheduler.findAll(By_<=(JobScheduler.createdAt, fiveDaysAgo)).map { i =>
println(s"Job name: ${i.name}, Date: ${i.createdAt}, api_instance_id: ${apiInstanceId}")
i
}.map(_.delete_!)

scheduler.schedule(
initialDelay = Duration(intervalInSeconds, TimeUnit.SECONDS),
interval = Duration(intervalInSeconds, TimeUnit.SECONDS),
runnable = new Runnable {
def run(): Unit = {
logger.info("Starting MetricsArchiveScheduler.start.run")
conditionalDeleteMetricsRow()
deleteOutdatedRowsFromMetricsArchive()
logger.info("End of MetricsArchiveScheduler.start.run")
JobScheduler.find(By(JobScheduler.Name, jobName)) match {
case Full(job) => // There is an ongoing/hanging job
logger.info(s"Cannot start MetricsArchiveScheduler.start.run due to ongoing job. Job ID: ${job.JobId}")
case _ => // Start a new job
val uniqueId = generateUUID()
val job = JobScheduler.create
.JobId(uniqueId)
.Name(jobName)
.ApiInstanceId(apiInstanceId)
.saveMe()
logger.info(s"Starting Job ID: $uniqueId")
conditionalDeleteMetricsRow()
deleteOutdatedRowsFromMetricsArchive()
JobScheduler.delete_!(job) // Allow future jobs
logger.info(s"End of Job ID: $uniqueId")
}
}
}
)
Expand Down

0 comments on commit 5247b4e

Please sign in to comment.