Skip to content

Commit

Permalink
fix(schedules): criteria for triggering a build (#893)
Browse files Browse the repository at this point in the history
* fix(schedules): ignore trigger for first time schedule

* fix(schedules): determine trigger off current UTC time

* chore: save work

* cleanup: ignore inactive schedules

* feat: add interval for schedules

* chore: address slack feedback

* chore: fix typos

* fix: processing timed schedules

* fix: processing schedules

* fix: typo in comment

* chore: address review feedback

* temp: add test docker compose

* fix: finalize

* revert: add test docker compose

---------

Co-authored-by: Easton Crupper <[email protected]>
Co-authored-by: David May <[email protected]>
  • Loading branch information
3 people authored Jul 10, 2023
1 parent 71a484d commit 563f226
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 62 deletions.
8 changes: 7 additions & 1 deletion cmd/vela-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ func main() {
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_MINIMUM_FREQUENCY", "SCHEDULE_MINIMUM_FREQUENCY"},
Name: "schedule-minimum-frequency",
Usage: "minimum time between each schedule entry",
Usage: "minimum time allowed between each build triggered for a schedule",
Value: 1 * time.Hour,
},
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_INTERVAL", "SCHEDULE_INTERVAL"},
Name: "schedule-interval",
Usage: "interval at which schedules will be processed by the server to trigger builds",
Value: 5 * time.Minute,
},
&cli.StringSliceFlag{
EnvVars: []string{"VELA_SCHEDULE_ALLOWLIST"},
Name: "vela-schedule-allowlist",
Expand Down
123 changes: 73 additions & 50 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

const baseErr = "unable to schedule build"
const (
scheduleErr = "unable to trigger build for schedule"

func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
Expand All @@ -37,59 +41,93 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

// iterate through the list of active schedules
for _, s := range schedules {
// sleep for 1s - 2s before processing the active schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 1.0.
time.Sleep(wait.Jitter(time.Second, 1.0))

// send API call to capture the schedule
//
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// create a variable to track if a build should be triggered based off the schedule
trigger := false
// ignore triggering a build if the schedule is no longer active
if !schedule.GetActive() {
logrus.Tracef("skipping to trigger build for inactive schedule %s", schedule.GetName())

// check if a build has already been triggered for the schedule
if schedule.GetScheduledAt() == 0 {
// trigger a build for the schedule since one has not already been scheduled
trigger = true
} else {
// parse the previous occurrence of the entry for the schedule
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
continue
}

continue
}
// capture the last time a build was triggered for the schedule in UTC
scheduled := time.Unix(schedule.GetScheduledAt(), 0).UTC()

// parse the next occurrence of the entry for the schedule
nextTime, err := gronx.NextTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// capture the previous occurrence of the entry rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:00
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
continue
}

// parse the UNIX timestamp from when the last build was triggered for the schedule
t := time.Unix(schedule.GetScheduledAt(), 0).UTC()
// capture the next occurrence of the entry after the last schedule rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:05
nextTime, err := gronx.NextTickAfter(schedule.GetEntry(), scheduled, true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

// check if the time since the last triggered build is greater than the entry duration for the schedule
if time.Since(t) > nextTime.Sub(prevTime) {
// trigger a build for the schedule since it has not previously ran
trigger = true
}
continue
}

if trigger && schedule.GetActive() {
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// check if we should wait to trigger a build for the schedule
//
// The current time must be after the next occurrence of the schedule.
if !time.Now().After(nextTime) {
logrus.Tracef("%s %s: current time not past next occurrence", scheduleWait, schedule.GetName())

continue
}
continue
}

// check if we should wait to trigger a build for the schedule
//
// The previous occurrence of the schedule must be after the starting time of processing schedules.
if !prevTime.After(start) {
logrus.Tracef("%s %s: previous occurence not after starting point", scheduleWait, schedule.GetName())

continue
}

// update the scheduled_at field with the current timestamp
//
// This should help prevent multiple servers from processing a schedule at the same time
// by updating the schedule with a new timestamp to reflect the current state.
schedule.SetScheduledAt(time.Now().UTC().Unix())

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(schedule, false)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
}

Expand All @@ -98,13 +136,6 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// sleep for 1s - 3s before processing the schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 3.0.
time.Sleep(wait.Jitter(time.Second, 3.0))

// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down Expand Up @@ -337,8 +368,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat
return err
}

s.SetScheduledAt(time.Now().UTC().Unix())

// break the loop because everything was successful
break
} // end of retry loop
Expand All @@ -349,12 +378,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat
return fmt.Errorf("unable to update repo %s: %w", r.GetFullName(), err)
}

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(s, false)
if err != nil {
return fmt.Errorf("unable to update schedule %s/%s: %w", r.GetFullName(), s.GetName(), err)
}

// send API call to capture the triggered build
b, err = database.GetBuildForRepo(r, b.GetNumber())
if err != nil {
Expand Down
29 changes: 18 additions & 11 deletions cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,30 @@ func server(c *cli.Context) error {
g.Go(func() error {
logrus.Info("starting scheduler")
for {
// cut the configured minimum frequency duration for schedules in half
// track the starting time for when the server begins processing schedules
//
// We need to sleep for some amount of time before we attempt to process schedules
// setup in the database. Since the minimum frequency is configurable, we cut it in
// half and use that as the base duration to determine how long to sleep for.
base := c.Duration("schedule-minimum-frequency") / 2
logrus.Infof("sleeping for %v before scheduling builds", base)
// This will be used to control which schedules will have a build triggered based
// off the configured entry and last time a build was triggered for the schedule.
start := time.Now().UTC()

// sleep for a duration of time before processing schedules
// capture the interval of time to wait before processing schedules
//
// We need to sleep for some amount of time before we attempt to process schedules
// setup in the database. Since the schedule interval is configurable, we use that
// as the base duration to determine how long to sleep for.
interval := c.Duration("schedule-interval")

// This should prevent multiple servers from processing schedules at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use the configured minimum frequency duration
// along with a scale factor of 0.1.
time.Sleep(wait.Jitter(base, 0.1))
// "jitter". To create the jitter, we use the configured schedule interval duration
// along with a scale factor of 0.5.
jitter := wait.Jitter(interval, 0.5)

logrus.Infof("sleeping for %v before scheduling builds", jitter)
// sleep for a duration of time before processing schedules
time.Sleep(jitter)

err = processSchedules(compiler, database, metadata, queue, scm)
err = processSchedules(start, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down

0 comments on commit 563f226

Please sign in to comment.