Skip to content

Commit

Permalink
Merge pull request #2 from itsouvalas/slowdown
Browse files Browse the repository at this point in the history
Address performance issues
  • Loading branch information
wayneeseguin authored Jun 14, 2024
2 parents 621641e + e28e3db commit 064a24f
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 56 deletions.
40 changes: 37 additions & 3 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,38 @@ func main() {
}
}

// Schedule cleanup tasks
retentionPeriod := 5 * 30 * 24 * time.Hour // 30 days
ticker := time.NewTicker(24 * time.Hour) // run cleanup every day
quit := make(chan struct{})
signalChan := make(chan os.Signal, 1) // New channel for OS signals

go func() {
for {
select {
case <-ticker.C:
log.Info(tag, "Starting cleanup of old executions")

deletedExecutions, err := executions.CleanupOldExecutions(retentionPeriod)
if err != nil {
log.Error(tag, fmt.Sprintf("Error cleaning up old executions: %s", err.Error()))
} else {
if len(deletedExecutions) == 0 {
log.Info(tag, "No executions were deleted.")
} else {
for _, execution := range deletedExecutions {
log.Info(tag, fmt.Sprintf("Deleted execution: %s, Last Updated: %s", execution["guid"], execution["execution_end_time"]))
}
}
}

case <-quit:
ticker.Stop()
return
}
}
}()

server := http.Server(fmt.Sprintf("0.0.0.0:%d", port), services)

go func() {
Expand All @@ -178,10 +210,12 @@ func main() {

log.Info(tag, fmt.Sprintf("listening for connections on %s", server.Addr))

quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
signal.Notify(signalChan, os.Interrupt) // Use signalChan for signal notification

<-signalChan // Wait for signal

<-quit
// Stop the cleanup ticker
close(quit)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion core/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type JobService interface {
Delete(*Job) error
Named(string) (*Job, error)
Persist(*Job) (*Job, error)
InSpace(string) []*Job
InSpace(string) ([]*Job, error)
Success(*Job) (*Job, error)
Fail(*Job) (*Job, error)
}
19 changes: 16 additions & 3 deletions cron/cron_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ type CronService struct {

func NewCronService(log core.LogService) *CronService {
return &CronService{
cron.New(),
log,
make(map[string]cron.EntryID)}
Cron: cron.New(),
log: log,
mapping: make(map[string]cron.EntryID),
}
}

func (service *CronService) Add(runnable core.Runnable) error {
Expand Down Expand Up @@ -60,6 +61,7 @@ func (service *CronService) Add(runnable core.Runnable) error {
}

service.mapping[schedule.GUID] = id
service.logMappingSize("Added job to cron service")

return nil
}
Expand All @@ -71,6 +73,8 @@ func (service *CronService) Delete(runnable core.Runnable) error {
}

service.Remove(id)
delete(service.mapping, runnable.Schedule().GUID)
service.logMappingSize("Deleted job from cron service")

return nil
}
Expand All @@ -84,3 +88,12 @@ func (service *CronService) Validate(expression string) error {

return err
}

func (service *CronService) MappingSize() int {
return len(service.mapping)
}

func (service *CronService) logMappingSize(action string) {
size := service.MappingSize()
service.log.Info("cron-service", fmt.Sprintf("%s: current mapping size is %d", action, size))
}
7 changes: 6 additions & 1 deletion http/routes/all_jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package routes

import (
"fmt"
"net/http"

"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -42,7 +43,11 @@ func AllJobs(e *echo.Echo, services *core.Services) {

spaceGUID := c.QueryParam("space_guid")

jobs := services.Jobs.InSpace(spaceGUID)
jobs, err := services.Jobs.InSpace(spaceGUID)
if err != nil {
services.Logger.Error(tag, fmt.Sprintf("error retrieving jobs: %v", err))
return c.JSON(http.StatusInternalServerError, "error retrieving jobs")
}

output := &jobCollection{
Resources: jobs,
Expand Down
39 changes: 39 additions & 0 deletions postgres/execution_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,42 @@ func (service *ExecutionService) finish(execution *core.Execution, state string)

return service.update(execution)
}

func (service *ExecutionService) CleanupOldExecutions(retentionPeriod time.Duration) ([]map[string]string, error) {
cutoffTime := time.Now().Add(-retentionPeriod).UTC()
var deletedExecutions []map[string]string

err := WithTransaction(service.db, func(tx Transaction) error {
rows, err := tx.Query(
"SELECT guid, execution_end_time FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')",
cutoffTime,
)
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var guid string
var execution_end_time time.Time
if err := rows.Scan(&guid, &execution_end_time); err != nil {
return err
}
deletedExecutions = append(deletedExecutions, map[string]string{
"guid": guid,
"execution_end_time": execution_end_time.Format(time.RFC3339),
})
}

_, err = tx.Exec(
"DELETE FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')",
cutoffTime,
)
return err
})
if err != nil {
return nil, err
}

return deletedExecutions, nil
}
95 changes: 47 additions & 48 deletions postgres/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,24 @@ type JobService struct {
db *sql.DB
}

const (
JobStatePending = "PENDING"
JobStateSucceeded = "SUCCEEDED"
JobStateFailed = "FAILED"
)

func NewJobService(db *sql.DB) *JobService {
return &JobService{db}
}

func (service *JobService) Get(guid string) (*core.Job, error) {
candidates := service.getCollection(
"select * from jobs where guid = $1",
candidates, err := service.getCollection(
"SELECT * FROM jobs WHERE guid = $1",
guid,
)
if err != nil {
return nil, err
}

if err := expectingOne(len(candidates)); err != nil {
return nil, err
Expand All @@ -32,7 +41,7 @@ func (service *JobService) Get(guid string) (*core.Job, error) {
func (service *JobService) Delete(job *core.Job) error {
// Let's not try to delete something that isn't in the db
if _, err := service.Get(job.GUID); err != nil {
return nil
return fmt.Errorf("job with GUID %s not found: %v", job.GUID, err)
}

err := WithTransaction(service.db, func(tx Transaction) error {
Expand All @@ -48,10 +57,13 @@ func (service *JobService) Delete(job *core.Job) error {
}

func (service *JobService) Named(name string) (*core.Job, error) {
candidates := service.getCollection(
"select * from jobs where name = $1",
candidates, err := service.getCollection(
"SELECT * FROM jobs WHERE name = $1",
name,
)
if err != nil {
return nil, err
}

if err := expectingOne(len(candidates)); err != nil {
return nil, err
Expand All @@ -65,13 +77,13 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {

guid, err := core.GenGUID()
if err != nil {
return nil, fmt.Errorf("coult not generate a job id")
return nil, fmt.Errorf("could not generate a job id: %v", err)
}

candidate.GUID = guid
candidate.CreatedAt = now
candidate.UpdatedAt = now
candidate.State = "PENDING"
candidate.State = JobStatePending

if candidate.DiskInMb == 0 {
candidate.DiskInMb = 1024
Expand All @@ -83,7 +95,7 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {

err = WithTransaction(service.db, func(tx Transaction) error {
_, aErr := tx.Exec(
"INSERT INTO jobs VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
"INSERT INTO jobs (guid, name, command, disk_in_mb, memory_in_mb, state, app_guid, space_guid, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
candidate.GUID,
candidate.Name,
candidate.Command,
Expand All @@ -107,14 +119,12 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {
}

func (service *JobService) Success(candidate *core.Job) (*core.Job, error) {
candidate.State = "SUCCEEDED"

candidate.State = JobStateSucceeded
return service.update(candidate)
}

func (service *JobService) Fail(candidate *core.Job) (*core.Job, error) {
candidate.State = "FAILED"

candidate.State = JobStateFailed
return service.update(candidate)
}

Expand All @@ -125,7 +135,7 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) {

err := WithTransaction(service.db, func(tx Transaction) error {
_, aErr := tx.Exec(
"update jobs set updated_at = $3, state = $2 where guid = $1",
"UPDATE jobs SET updated_at = $3, state = $2 WHERE guid = $1",
candidate.GUID,
candidate.State,
candidate.UpdatedAt,
Expand All @@ -141,53 +151,42 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) {
return candidate, nil
}

func (service *JobService) InSpace(guid string) []*core.Job {
return service.getCollection(
"select * from jobs where space_guid = $1 ORDER BY name ASC",
func (service *JobService) InSpace(guid string) ([]*core.Job, error) {
candidates, err := service.getCollection(
"SELECT * FROM jobs WHERE space_guid = $1 ORDER BY name ASC",
guid,
)
if err != nil {
return nil, err
}
return candidates, nil
}

func (service *JobService) getCollection(query string, args ...interface{}) []*core.Job {
collection := make([]*core.Job, 0)
func (service *JobService) scanJob(rows *sql.Rows) (*core.Job, error) {
var job core.Job
err := rows.Scan(&job.GUID, &job.Name, &job.Command, &job.DiskInMb, &job.MemoryInMb, &job.State, &job.AppGUID, &job.SpaceGUID, &job.CreatedAt, &job.UpdatedAt)
if err != nil {
return nil, err
}
return &job, nil
}

func (service *JobService) getCollection(query string, args ...interface{}) ([]*core.Job, error) {
var collection []*core.Job

rows, err := service.db.Query(query, args...)
if err != nil {
return collection
return nil, err
}
defer rows.Close()

for rows.Next() {
var guid string
var name string
var command string
var diskInMb int
var memoryInMb int
var state string
var spaceGUID string
var appGUID string
var createdAt time.Time
var updatedAt time.Time

err := rows.Scan(&guid, &name, &command, &diskInMb, &memoryInMb, &state, &appGUID, &spaceGUID, &createdAt, &updatedAt)
job, err := service.scanJob(rows)
if err != nil {
continue
return nil, err
}

candidate := &core.Job{
GUID: guid,
Name: name,
Command: command,
DiskInMb: diskInMb,
MemoryInMb: memoryInMb,
State: state,
SpaceGUID: spaceGUID,
AppGUID: appGUID,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}

collection = append(collection, candidate)
collection = append(collection, job)
}

return collection
return collection, rows.Err()
}

0 comments on commit 064a24f

Please sign in to comment.