diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 08def63..276dd59 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -168,6 +168,38 @@ func main() { } } + // Schedule cleanup tasks + retentionPeriod := 5 * 30 * 24 * time.Hour // 30 days + ticker := time.NewTicker(24 * time.Hour) // run cleanup every day + quit := make(chan struct{}) + signalChan := make(chan os.Signal, 1) // New channel for OS signals + + go func() { + for { + select { + case <-ticker.C: + log.Info(tag, "Starting cleanup of old executions") + + deletedExecutions, err := executions.CleanupOldExecutions(retentionPeriod) + if err != nil { + log.Error(tag, fmt.Sprintf("Error cleaning up old executions: %s", err.Error())) + } else { + if len(deletedExecutions) == 0 { + log.Info(tag, "No executions were deleted.") + } else { + for _, execution := range deletedExecutions { + log.Info(tag, fmt.Sprintf("Deleted execution: %s, Last Updated: %s", execution["guid"], execution["execution_end_time"])) + } + } + } + + case <-quit: + ticker.Stop() + return + } + } + }() + server := http.Server(fmt.Sprintf("0.0.0.0:%d", port), services) go func() { @@ -178,10 +210,12 @@ func main() { log.Info(tag, fmt.Sprintf("listening for connections on %s", server.Addr)) - quit := make(chan os.Signal) - signal.Notify(quit, os.Interrupt) + signal.Notify(signalChan, os.Interrupt) // Use signalChan for signal notification + + <-signalChan // Wait for signal - <-quit + // Stop the cleanup ticker + close(quit) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/core/jobs.go b/core/jobs.go index 1a83e25..ed9858a 100644 --- a/core/jobs.go +++ b/core/jobs.go @@ -37,7 +37,7 @@ type JobService interface { Delete(*Job) error Named(string) (*Job, error) Persist(*Job) (*Job, error) - InSpace(string) []*Job + InSpace(string) ([]*Job, error) Success(*Job) (*Job, error) Fail(*Job) (*Job, error) } diff --git a/cron/cron_service.go b/cron/cron_service.go index 9bfbbb5..3ef7dd6 100644 --- a/cron/cron_service.go +++ b/cron/cron_service.go @@ -16,9 +16,10 @@ type CronService struct { func NewCronService(log core.LogService) *CronService { return &CronService{ - cron.New(), - log, - make(map[string]cron.EntryID)} + Cron: cron.New(), + log: log, + mapping: make(map[string]cron.EntryID), + } } func (service *CronService) Add(runnable core.Runnable) error { @@ -60,6 +61,7 @@ func (service *CronService) Add(runnable core.Runnable) error { } service.mapping[schedule.GUID] = id + service.logMappingSize("Added job to cron service") return nil } @@ -71,6 +73,8 @@ func (service *CronService) Delete(runnable core.Runnable) error { } service.Remove(id) + delete(service.mapping, runnable.Schedule().GUID) + service.logMappingSize("Deleted job from cron service") return nil } @@ -84,3 +88,12 @@ func (service *CronService) Validate(expression string) error { return err } + +func (service *CronService) MappingSize() int { + return len(service.mapping) +} + +func (service *CronService) logMappingSize(action string) { + size := service.MappingSize() + service.log.Info("cron-service", fmt.Sprintf("%s: current mapping size is %d", action, size)) +} diff --git a/http/routes/all_jobs.go b/http/routes/all_jobs.go index de03d02..11ba8da 100644 --- a/http/routes/all_jobs.go +++ b/http/routes/all_jobs.go @@ -1,6 +1,7 @@ package routes import ( + "fmt" "net/http" "github.com/labstack/echo/v4" @@ -42,7 +43,11 @@ func AllJobs(e *echo.Echo, services *core.Services) { spaceGUID := c.QueryParam("space_guid") - jobs := services.Jobs.InSpace(spaceGUID) + jobs, err := services.Jobs.InSpace(spaceGUID) + if err != nil { + services.Logger.Error(tag, fmt.Sprintf("error retrieving jobs: %v", err)) + return c.JSON(http.StatusInternalServerError, "error retrieving jobs") + } output := &jobCollection{ Resources: jobs, diff --git a/postgres/execution_service.go b/postgres/execution_service.go index db9c27c..10c4cd0 100644 --- a/postgres/execution_service.go +++ b/postgres/execution_service.go @@ -207,3 +207,42 @@ func (service *ExecutionService) finish(execution *core.Execution, state string) return service.update(execution) } + +func (service *ExecutionService) CleanupOldExecutions(retentionPeriod time.Duration) ([]map[string]string, error) { + cutoffTime := time.Now().Add(-retentionPeriod).UTC() + var deletedExecutions []map[string]string + + err := WithTransaction(service.db, func(tx Transaction) error { + rows, err := tx.Query( + "SELECT guid, execution_end_time FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')", + cutoffTime, + ) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var guid string + var execution_end_time time.Time + if err := rows.Scan(&guid, &execution_end_time); err != nil { + return err + } + deletedExecutions = append(deletedExecutions, map[string]string{ + "guid": guid, + "execution_end_time": execution_end_time.Format(time.RFC3339), + }) + } + + _, err = tx.Exec( + "DELETE FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')", + cutoffTime, + ) + return err + }) + if err != nil { + return nil, err + } + + return deletedExecutions, nil +} diff --git a/postgres/job_service.go b/postgres/job_service.go index e0c0c3c..4cd98bc 100644 --- a/postgres/job_service.go +++ b/postgres/job_service.go @@ -12,15 +12,24 @@ type JobService struct { db *sql.DB } +const ( + JobStatePending = "PENDING" + JobStateSucceeded = "SUCCEEDED" + JobStateFailed = "FAILED" +) + func NewJobService(db *sql.DB) *JobService { return &JobService{db} } func (service *JobService) Get(guid string) (*core.Job, error) { - candidates := service.getCollection( - "select * from jobs where guid = $1", + candidates, err := service.getCollection( + "SELECT * FROM jobs WHERE guid = $1", guid, ) + if err != nil { + return nil, err + } if err := expectingOne(len(candidates)); err != nil { return nil, err @@ -32,7 +41,7 @@ func (service *JobService) Get(guid string) (*core.Job, error) { func (service *JobService) Delete(job *core.Job) error { // Let's not try to delete something that isn't in the db if _, err := service.Get(job.GUID); err != nil { - return nil + return fmt.Errorf("job with GUID %s not found: %v", job.GUID, err) } err := WithTransaction(service.db, func(tx Transaction) error { @@ -48,10 +57,13 @@ func (service *JobService) Delete(job *core.Job) error { } func (service *JobService) Named(name string) (*core.Job, error) { - candidates := service.getCollection( - "select * from jobs where name = $1", + candidates, err := service.getCollection( + "SELECT * FROM jobs WHERE name = $1", name, ) + if err != nil { + return nil, err + } if err := expectingOne(len(candidates)); err != nil { return nil, err @@ -65,13 +77,13 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) { guid, err := core.GenGUID() if err != nil { - return nil, fmt.Errorf("coult not generate a job id") + return nil, fmt.Errorf("could not generate a job id: %v", err) } candidate.GUID = guid candidate.CreatedAt = now candidate.UpdatedAt = now - candidate.State = "PENDING" + candidate.State = JobStatePending if candidate.DiskInMb == 0 { candidate.DiskInMb = 1024 @@ -83,7 +95,7 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) { err = WithTransaction(service.db, func(tx Transaction) error { _, aErr := tx.Exec( - "INSERT INTO jobs VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + "INSERT INTO jobs (guid, name, command, disk_in_mb, memory_in_mb, state, app_guid, space_guid, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", candidate.GUID, candidate.Name, candidate.Command, @@ -107,14 +119,12 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) { } func (service *JobService) Success(candidate *core.Job) (*core.Job, error) { - candidate.State = "SUCCEEDED" - + candidate.State = JobStateSucceeded return service.update(candidate) } func (service *JobService) Fail(candidate *core.Job) (*core.Job, error) { - candidate.State = "FAILED" - + candidate.State = JobStateFailed return service.update(candidate) } @@ -125,7 +135,7 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) { err := WithTransaction(service.db, func(tx Transaction) error { _, aErr := tx.Exec( - "update jobs set updated_at = $3, state = $2 where guid = $1", + "UPDATE jobs SET updated_at = $3, state = $2 WHERE guid = $1", candidate.GUID, candidate.State, candidate.UpdatedAt, @@ -141,53 +151,42 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) { return candidate, nil } -func (service *JobService) InSpace(guid string) []*core.Job { - return service.getCollection( - "select * from jobs where space_guid = $1 ORDER BY name ASC", +func (service *JobService) InSpace(guid string) ([]*core.Job, error) { + candidates, err := service.getCollection( + "SELECT * FROM jobs WHERE space_guid = $1 ORDER BY name ASC", guid, ) + if err != nil { + return nil, err + } + return candidates, nil } -func (service *JobService) getCollection(query string, args ...interface{}) []*core.Job { - collection := make([]*core.Job, 0) +func (service *JobService) scanJob(rows *sql.Rows) (*core.Job, error) { + var job core.Job + err := rows.Scan(&job.GUID, &job.Name, &job.Command, &job.DiskInMb, &job.MemoryInMb, &job.State, &job.AppGUID, &job.SpaceGUID, &job.CreatedAt, &job.UpdatedAt) + if err != nil { + return nil, err + } + return &job, nil +} + +func (service *JobService) getCollection(query string, args ...interface{}) ([]*core.Job, error) { + var collection []*core.Job rows, err := service.db.Query(query, args...) if err != nil { - return collection + return nil, err } + defer rows.Close() for rows.Next() { - var guid string - var name string - var command string - var diskInMb int - var memoryInMb int - var state string - var spaceGUID string - var appGUID string - var createdAt time.Time - var updatedAt time.Time - - err := rows.Scan(&guid, &name, &command, &diskInMb, &memoryInMb, &state, &appGUID, &spaceGUID, &createdAt, &updatedAt) + job, err := service.scanJob(rows) if err != nil { - continue + return nil, err } - - candidate := &core.Job{ - GUID: guid, - Name: name, - Command: command, - DiskInMb: diskInMb, - MemoryInMb: memoryInMb, - State: state, - SpaceGUID: spaceGUID, - AppGUID: appGUID, - CreatedAt: createdAt, - UpdatedAt: updatedAt, - } - - collection = append(collection, candidate) + collection = append(collection, job) } - return collection + return collection, rows.Err() }