Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(schedules): criteria for triggering a build #893

Merged
merged 43 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
74626c9
Merge branch 'main' of github.com:go-vela/server
jbrockopp Feb 14, 2023
974c8e6
Merge branch 'main' of github.com:go-vela/server
jbrockopp Mar 3, 2023
2f2c425
Merge branch 'main' of github.com:go-vela/server
jbrockopp Mar 20, 2023
528291a
Merge branch 'main' of github.com:go-vela/server
jbrockopp Apr 8, 2023
b53c687
Merge branch 'main' of github.com:go-vela/server
jbrockopp Apr 8, 2023
4734dcb
Merge branch 'main' of github.com:go-vela/server
jbrockopp Apr 16, 2023
1fb52df
Merge branch 'main' of github.com:go-vela/server
jbrockopp Apr 21, 2023
e996aa6
Merge branch 'main' of github.com:go-vela/server
jbrockopp Apr 27, 2023
c299ee4
Merge branch 'main' of github.com:go-vela/server
jbrockopp May 11, 2023
c8da9e3
Merge branch 'main' of github.com:go-vela/server
jbrockopp May 16, 2023
1ee254f
Merge branch 'main' of github.com:go-vela/server
jbrockopp May 22, 2023
8dd6033
Merge branch 'main' of github.com:go-vela/server
jbrockopp May 22, 2023
0eb92b1
Merge branch 'main' of github.com:go-vela/server
jbrockopp May 26, 2023
d5dcb6d
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 1, 2023
be8dd9a
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 5, 2023
73893b5
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 7, 2023
355017b
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 8, 2023
3e17278
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 8, 2023
d3c1e06
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 12, 2023
d63abfe
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 17, 2023
a353552
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 21, 2023
f1a8ab6
Merge branch 'main' of github.com:go-vela/server
jbrockopp Jun 26, 2023
ce0385e
fix(schedules): ignore trigger for first time schedule
jbrockopp Jun 26, 2023
6c7dac6
fix(schedules): determine trigger off current UTC time
jbrockopp Jun 26, 2023
ef3010c
chore: save work
jbrockopp Jun 26, 2023
05f8ca3
cleanup: ignore inactive schedules
jbrockopp Jun 26, 2023
d002463
feat: add interval for schedules
jbrockopp Jun 26, 2023
589a3ce
Merge branch 'main' into fix/schedules
ecrupper Jun 26, 2023
cc3b18c
chore: address slack feedback
jbrockopp Jun 27, 2023
15d8699
Merge branch 'fix/schedules' of github.com:go-vela/server into fix/sc…
jbrockopp Jun 27, 2023
e0840d5
chore: fix typos
jbrockopp Jun 27, 2023
6d54f53
fix: processing timed schedules
jbrockopp Jun 27, 2023
afd763e
fix: processing schedules
jbrockopp Jul 3, 2023
510e7f0
Merge branch 'main' into fix/schedules
jbrockopp Jul 3, 2023
691aec2
fix: typo in comment
jbrockopp Jul 3, 2023
225b54f
Merge branch 'fix/schedules' of github.com:go-vela/server into fix/sc…
jbrockopp Jul 3, 2023
c7faed2
chore: address review feedback
jbrockopp Jul 6, 2023
9d16213
temp: add test docker compose
jbrockopp Jul 6, 2023
74bb4ef
Merge branch 'main' into fix/schedules
jbrockopp Jul 7, 2023
c3648d2
fix: finalize
jbrockopp Jul 7, 2023
0b2ef32
revert: add test docker compose
jbrockopp Jul 7, 2023
5a0cde0
Merge branch 'fix/schedules' of github.com:go-vela/server into fix/sc…
jbrockopp Jul 7, 2023
b6ee9d7
Merge branch 'main' into fix/schedules
wass3rw3rk Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/vela-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ func main() {
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_MINIMUM_FREQUENCY", "SCHEDULE_MINIMUM_FREQUENCY"},
Name: "schedule-minimum-frequency",
Usage: "minimum time between each schedule entry",
Usage: "minimum time allowed between each build triggered for a schedule",
Value: 1 * time.Hour,
},
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_INTERVAL", "SCHEDULE_INTERVAL"},
Name: "schedule-interval",
Usage: "interval at which schedules will be processed by the server to trigger builds",
Value: 5 * time.Minute,
},
&cli.StringSliceFlag{
EnvVars: []string{"VELA_SCHEDULE_ALLOWLIST"},
Name: "vela-schedule-allowlist",
Expand Down
105 changes: 61 additions & 44 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/adhocore/gronx"
"github.com/go-vela/server/api/build"
"github.com/go-vela/server/compiler"
Expand All @@ -20,13 +22,15 @@ import (
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/util/wait"
)

const baseErr = "unable to schedule build"
const (
scheduleErr = "unable to trigger build for schedule"

scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
Expand All @@ -37,59 +41,79 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

// iterate through the list of active schedules
for _, s := range schedules {
// sleep for 1s - 2s before processing the active schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 1.0.
time.Sleep(wait.Jitter(time.Second, 1.0))

// send API call to capture the schedule
//
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// create a variable to track if a build should be triggered based off the schedule
trigger := false
// ignore triggering a build if the schedule is no longer active
if !schedule.GetActive() {
logrus.Tracef("skipping to trigger build for inactive schedule %s", schedule.GetName())

// check if a build has already been triggered for the schedule
if schedule.GetScheduledAt() == 0 {
// trigger a build for the schedule since one has not already been scheduled
trigger = true
} else {
// parse the previous occurrence of the entry for the schedule
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
continue
}

continue
}
// capture the last time a build was triggered for the schedule in UTC
scheduled := time.Unix(schedule.GetScheduledAt(), 0).UTC()

// parse the next occurrence of the entry for the schedule
nextTime, err := gronx.NextTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// capture the previous occurrence of the entry rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:00
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
continue
}

// parse the UNIX timestamp from when the last build was triggered for the schedule
t := time.Unix(schedule.GetScheduledAt(), 0).UTC()
// capture the next occurrence of the entry after the last schedule rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:05
nextTime, err := gronx.NextTickAfter(schedule.GetEntry(), scheduled, true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

// check if the time since the last triggered build is greater than the entry duration for the schedule
if time.Since(t) > nextTime.Sub(prevTime) {
// trigger a build for the schedule since it has not previously ran
trigger = true
}
continue
}

if trigger && schedule.GetActive() {
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// check if we should wait to trigger a build for the schedule
//
// The current time must be after the next occurrence of the schedule.
if !time.Now().After(nextTime) {
logrus.Tracef("%s %s: current time not past next occurrence", scheduleWait, schedule.GetName())

continue
}
continue
}

// check if we should wait to trigger a build for the schedule
//
// The previous occurrence of the schedule must be after the starting time of processing schedules.
if !prevTime.After(start) {
logrus.Tracef("%s %s: previous occurence not after starting point", scheduleWait, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())
ecrupper marked this conversation as resolved.
Show resolved Hide resolved

continue
}
}

Expand All @@ -98,13 +122,6 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// sleep for 1s - 3s before processing the schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 3.0.
time.Sleep(wait.Jitter(time.Second, 3.0))

// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down
29 changes: 18 additions & 11 deletions cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,30 @@ func server(c *cli.Context) error {
g.Go(func() error {
logrus.Info("starting scheduler")
for {
// cut the configured minimum frequency duration for schedules in half
// track the starting time for when the server begins processing schedules
//
// We need to sleep for some amount of time before we attempt to process schedules
// setup in the database. Since the minimum frequency is configurable, we cut it in
// half and use that as the base duration to determine how long to sleep for.
base := c.Duration("schedule-minimum-frequency") / 2
logrus.Infof("sleeping for %v before scheduling builds", base)
// This will be used to control which schedules will have a build triggered based
// off the configured entry and last time a build was triggered for the schedule.
start := time.Now().UTC()

// sleep for a duration of time before processing schedules
// capture the interval of time to wait before processing schedules
//
// We need to sleep for some amount of time before we attempt to process schedules
// setup in the database. Since the schedule interval is configurable, we use that
// as the base duration to determine how long to sleep for.
interval := c.Duration("schedule-interval")
jbrockopp marked this conversation as resolved.
Show resolved Hide resolved

// This should prevent multiple servers from processing schedules at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use the configured minimum frequency duration
// along with a scale factor of 0.1.
time.Sleep(wait.Jitter(base, 0.1))
// "jitter". To create the jitter, we use the configured schedule interval duration
// along with a scale factor of 0.5.
jitter := wait.Jitter(interval, 0.5)

logrus.Infof("sleeping for %v before scheduling builds", jitter)
// sleep for a duration of time before processing schedules
time.Sleep(jitter)

err = processSchedules(compiler, database, metadata, queue, scm)
err = processSchedules(start, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down
Loading