diff --git a/cmd/vela-server/main.go b/cmd/vela-server/main.go index 19463fe0b..788f6c26b 100644 --- a/cmd/vela-server/main.go +++ b/cmd/vela-server/main.go @@ -213,9 +213,15 @@ func main() { &cli.DurationFlag{ EnvVars: []string{"VELA_SCHEDULE_MINIMUM_FREQUENCY", "SCHEDULE_MINIMUM_FREQUENCY"}, Name: "schedule-minimum-frequency", - Usage: "minimum time between each schedule entry", + Usage: "minimum time allowed between each build triggered for a schedule", Value: 1 * time.Hour, }, + &cli.DurationFlag{ + EnvVars: []string{"VELA_SCHEDULE_INTERVAL", "SCHEDULE_INTERVAL"}, + Name: "schedule-interval", + Usage: "interval at which schedules will be processed by the server to trigger builds", + Value: 5 * time.Minute, + }, &cli.StringSliceFlag{ EnvVars: []string{"VELA_SCHEDULE_ALLOWLIST"}, Name: "vela-schedule-allowlist", diff --git a/cmd/vela-server/schedule.go b/cmd/vela-server/schedule.go index a9fd38234..08e7867cc 100644 --- a/cmd/vela-server/schedule.go +++ b/cmd/vela-server/schedule.go @@ -24,9 +24,13 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -const baseErr = "unable to schedule build" +const ( + scheduleErr = "unable to trigger build for schedule" -func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error { + scheduleWait = "waiting to trigger build for schedule" +) + +func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error { logrus.Infof("processing active schedules to create builds") // send API call to capture the list of active schedules @@ -37,6 +41,13 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met // iterate through the list of active schedules for _, s := range schedules { + // sleep for 1s - 2s before processing the active schedule + // + // This should prevent multiple servers from processing a schedule at the same time by + // leveraging a base duration along with a standard deviation of randomness a.k.a. + // "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 1.0. + time.Sleep(wait.Jitter(time.Second, 1.0)) + // send API call to capture the schedule // // This is needed to ensure we are not dealing with a stale schedule since we fetch @@ -44,52 +55,79 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met // amount of time to get to the end of the list. schedule, err := database.GetSchedule(s.GetID()) if err != nil { - logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName()) continue } - // create a variable to track if a build should be triggered based off the schedule - trigger := false + // ignore triggering a build if the schedule is no longer active + if !schedule.GetActive() { + logrus.Tracef("skipping to trigger build for inactive schedule %s", schedule.GetName()) - // check if a build has already been triggered for the schedule - if schedule.GetScheduledAt() == 0 { - // trigger a build for the schedule since one has not already been scheduled - trigger = true - } else { - // parse the previous occurrence of the entry for the schedule - prevTime, err := gronx.PrevTick(schedule.GetEntry(), true) - if err != nil { - logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + continue + } - continue - } + // capture the last time a build was triggered for the schedule in UTC + scheduled := time.Unix(schedule.GetScheduledAt(), 0).UTC() - // parse the next occurrence of the entry for the schedule - nextTime, err := gronx.NextTick(schedule.GetEntry(), true) - if err != nil { - logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + // capture the previous occurrence of the entry rounded to the nearest whole interval + // + // i.e. if it's 4:02 on five minute intervals, this will be 4:00 + prevTime, err := gronx.PrevTick(schedule.GetEntry(), true) + if err != nil { + logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName()) - continue - } + continue + } - // parse the UNIX timestamp from when the last build was triggered for the schedule - t := time.Unix(schedule.GetScheduledAt(), 0).UTC() + // capture the next occurrence of the entry after the last schedule rounded to the nearest whole interval + // + // i.e. if it's 4:02 on five minute intervals, this will be 4:05 + nextTime, err := gronx.NextTickAfter(schedule.GetEntry(), scheduled, true) + if err != nil { + logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName()) - // check if the time since the last triggered build is greater than the entry duration for the schedule - if time.Since(t) > nextTime.Sub(prevTime) { - // trigger a build for the schedule since it has not previously ran - trigger = true - } + continue } - if trigger && schedule.GetActive() { - err = processSchedule(schedule, compiler, database, metadata, queue, scm) - if err != nil { - logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + // check if we should wait to trigger a build for the schedule + // + // The current time must be after the next occurrence of the schedule. + if !time.Now().After(nextTime) { + logrus.Tracef("%s %s: current time not past next occurrence", scheduleWait, schedule.GetName()) - continue - } + continue + } + + // check if we should wait to trigger a build for the schedule + // + // The previous occurrence of the schedule must be after the starting time of processing schedules. + if !prevTime.After(start) { + logrus.Tracef("%s %s: previous occurence not after starting point", scheduleWait, schedule.GetName()) + + continue + } + + // update the scheduled_at field with the current timestamp + // + // This should help prevent multiple servers from processing a schedule at the same time + // by updating the schedule with a new timestamp to reflect the current state. + schedule.SetScheduledAt(time.Now().UTC().Unix()) + + // send API call to update schedule for ensuring scheduled_at field is set + err = database.UpdateSchedule(schedule, false) + if err != nil { + logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName()) + + continue + } + + // process the schedule and trigger a new build + err = processSchedule(schedule, compiler, database, metadata, queue, scm) + if err != nil { + logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName()) + + continue } } @@ -98,13 +136,6 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met //nolint:funlen // ignore function length and number of statements func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error { - // sleep for 1s - 3s before processing the schedule - // - // This should prevent multiple servers from processing a schedule at the same time by - // leveraging a base duration along with a standard deviation of randomness a.k.a. - // "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 3.0. - time.Sleep(wait.Jitter(time.Second, 3.0)) - // send API call to capture the repo for the schedule r, err := database.GetRepo(s.GetRepoID()) if err != nil { @@ -337,8 +368,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat return err } - s.SetScheduledAt(time.Now().UTC().Unix()) - // break the loop because everything was successful break } // end of retry loop @@ -349,12 +378,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat return fmt.Errorf("unable to update repo %s: %w", r.GetFullName(), err) } - // send API call to update schedule for ensuring scheduled_at field is set - err = database.UpdateSchedule(s, false) - if err != nil { - return fmt.Errorf("unable to update schedule %s/%s: %w", r.GetFullName(), s.GetName(), err) - } - // send API call to capture the triggered build b, err = database.GetBuildForRepo(r, b.GetNumber()) if err != nil { diff --git a/cmd/vela-server/server.go b/cmd/vela-server/server.go index 0d95e88e5..689ba03c2 100644 --- a/cmd/vela-server/server.go +++ b/cmd/vela-server/server.go @@ -176,23 +176,30 @@ func server(c *cli.Context) error { g.Go(func() error { logrus.Info("starting scheduler") for { - // cut the configured minimum frequency duration for schedules in half + // track the starting time for when the server begins processing schedules // - // We need to sleep for some amount of time before we attempt to process schedules - // setup in the database. Since the minimum frequency is configurable, we cut it in - // half and use that as the base duration to determine how long to sleep for. - base := c.Duration("schedule-minimum-frequency") / 2 - logrus.Infof("sleeping for %v before scheduling builds", base) + // This will be used to control which schedules will have a build triggered based + // off the configured entry and last time a build was triggered for the schedule. + start := time.Now().UTC() - // sleep for a duration of time before processing schedules + // capture the interval of time to wait before processing schedules // + // We need to sleep for some amount of time before we attempt to process schedules + // setup in the database. Since the schedule interval is configurable, we use that + // as the base duration to determine how long to sleep for. + interval := c.Duration("schedule-interval") + // This should prevent multiple servers from processing schedules at the same time by // leveraging a base duration along with a standard deviation of randomness a.k.a. - // "jitter". To create the jitter, we use the configured minimum frequency duration - // along with a scale factor of 0.1. - time.Sleep(wait.Jitter(base, 0.1)) + // "jitter". To create the jitter, we use the configured schedule interval duration + // along with a scale factor of 0.5. + jitter := wait.Jitter(interval, 0.5) + + logrus.Infof("sleeping for %v before scheduling builds", jitter) + // sleep for a duration of time before processing schedules + time.Sleep(jitter) - err = processSchedules(compiler, database, metadata, queue, scm) + err = processSchedules(start, compiler, database, metadata, queue, scm) if err != nil { logrus.WithError(err).Warn("unable to process schedules") } else {