Skip to content

Commit

Permalink
Fix the loop variable scheduler issue (#4468)
Browse files Browse the repository at this point in the history
* Fix the loop variable scheduler issue

Signed-off-by: pmahindrakar-oss <[email protected]>

* lint

Signed-off-by: pmahindrakar-oss <[email protected]>

* renamed variable

Signed-off-by: pmahindrakar-oss <[email protected]>

* Removed the debuggger file

Signed-off-by: pmahindrakar-oss <[email protected]>

* Added comments

Signed-off-by: pmahindrakar-oss <[email protected]>

---------

Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Nov 27, 2023
1 parent db61e22 commit 98dd505
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 4 deletions.
11 changes: 7 additions & 4 deletions flyteadmin/scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,26 @@ func (g *GoCronScheduler) GetTimedFuncWithSchedule() TimedFuncWithSchedule {
func (g *GoCronScheduler) BootStrapSchedulesFromSnapShot(ctx context.Context, schedules []models.SchedulableEntity,
snapshot snapshoter.Snapshot) {
for _, s := range schedules {
// Copy the object to save to a new pointer since the pointer is saved later
// Issue due to https://github.com/golang/go/discussions/56010
schedule := s
if *s.Active {
funcRef := g.GetTimedFuncWithSchedule()
nameOfSchedule := identifier.GetScheduleName(ctx, s)
// Initialize the lastExectime as the updatedAt time
// Assumption here that schedule was activated and that the 0th execution of the schedule
// which will be used as a reference
lastExecTime := &s.UpdatedAt
lastExecTime := &schedule.UpdatedAt

fromSnapshot := snapshot.GetLastExecutionTime(nameOfSchedule)
// Use the latest time if available in the snapshot
if fromSnapshot != nil && fromSnapshot.After(s.UpdatedAt) {
if fromSnapshot != nil && fromSnapshot.After(schedule.UpdatedAt) {
lastExecTime = fromSnapshot
}
err := g.ScheduleJob(ctx, s, funcRef, lastExecTime)
err := g.ScheduleJob(ctx, schedule, funcRef, lastExecTime)
if err != nil {
g.metrics.JobScheduledFailedCounter.Inc()
logger.Errorf(ctx, "unable to register the schedule %+v due to %v", s, err)
logger.Errorf(ctx, "unable to register the schedule %+v due to %v", schedule, err)
}
}
}
Expand Down
96 changes: 96 additions & 0 deletions flyteadmin/scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,99 @@ func TestCatchUpAllSchedule(t *testing.T) {
catchupSuccess := g.CatchupAll(ctx, toTime)
assert.True(t, catchupSuccess)
}

func TestGoCronScheduler_BootStrapSchedulesFromSnapShot(t *testing.T) {
g := setupWithSchedules(t, "testing", []models.SchedulableEntity{}, true)
True := true
False := false
scheduleActive1 := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(1000, time.October, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "schedule_active_1",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &True,
}
scheduleActive2 := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(2000, time.November, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "schedule_active_2",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &True,
}
scheduleInactive := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(3000, time.December, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron3",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &False,
}

schedule1SnapshotTime := time.Date(5000, time.December, 19, 10, 0, 0, 0, time.UTC)
schedule2SnapshotTime := time.Date(6000, time.December, 19, 10, 0, 0, 0, time.UTC)
tests := []struct {
name string
schedules []models.SchedulableEntity
snapshoter snapshoter.Snapshot
expectedCatchUpTimes map[string]*time.Time
}{
{
name: "two active",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2},
snapshoter: &snapshoter.SnapshotV1{},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt},
},
{
name: "two active one inactive",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive},
snapshoter: &snapshoter.SnapshotV1{},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt},
},
{
name: "two active one inactive with snapshot populated",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive},
snapshoter: &snapshoter.SnapshotV1{
LastTimes: map[string]*time.Time{
"11407394263542327059": &schedule1SnapshotTime,
"1420107156943834850": &schedule2SnapshotTime,
},
},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &schedule1SnapshotTime, "1420107156943834850": &schedule2SnapshotTime},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g.BootStrapSchedulesFromSnapShot(context.Background(), tt.schedules, tt.snapshoter)
g.jobStore.Range(func(key, value interface{}) bool {
jobID := key.(string)
job := value.(*GoCronJob)
if !*job.schedule.Active {
return true
}
assert.Equal(t, job.catchupFromTime, tt.expectedCatchUpTimes[jobID])
return true
})
for _, schedule := range tt.schedules {
g.DeScheduleJob(context.TODO(), schedule)
}
})
}
}

0 comments on commit 98dd505

Please sign in to comment.