Skip to content

Commit

Permalink
[v0.10] Changes job handling in gitops controller (#2932)
Browse files Browse the repository at this point in the history
* Changes job handling in gitops controller (#2903)

* Changes job handling in gitopts controller

Changes the job handling so jobs are deleted after they succeed.
Jobs are created when they are needed, this avoids having always a minimum of 1 job per
`gitrepo` resource.

It also deletes the Owns(job) statement from the reconciler setup as we're already setting
the controller reference when creating the job and the Owns statement makes extra reconciler
calls that are not needed.

Signed-off-by: Xavi Garcia <[email protected]>
  • Loading branch information
0xavi0 authored Oct 7, 2024
1 parent 82e111d commit e75a058
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 53 deletions.
5 changes: 4 additions & 1 deletion charts/fleet-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6439,7 +6439,6 @@ spec:
commit:
description: Commit is the Git commit hash from the last git job
run.
nullable: true
type: string
conditions:
description: 'Conditions is a list of Wrangler conditions that describe
Expand Down Expand Up @@ -6847,6 +6846,10 @@ spec:
spec.forceSyncGeneration is set
format: int64
type: integer
webhookCommit:
description: WebhookCommit is the latest Git commit hash received
from a webhook
type: string
type: object
type: object
served: true
Expand Down
2 changes: 2 additions & 0 deletions e2e/single-cluster/gitrepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup"
By("updating the gitrepo's status")
expectedStatus := fleet.GitRepoStatus{
Commit: commit,
WebhookCommit: commit,
ReadyClusters: 1,
DesiredReadyClusters: 1,
GitJobStatus: "Current",
Expand Down Expand Up @@ -425,6 +426,7 @@ func (matcher *gitRepoStatusMatcher) Match(actual interface{}) (success bool, er
}

return got.Commit == want.Commit &&
got.WebhookCommit == want.WebhookCommit &&
got.ReadyClusters == want.ReadyClusters &&
got.DesiredReadyClusters == want.DesiredReadyClusters &&
got.GitJobStatus == want.GitJobStatus &&
Expand Down
86 changes: 76 additions & 10 deletions integrationtests/gitjob/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,18 @@ var _ = Describe("GitJob controller", func() {
g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue())
}).Should(Succeed())

// it should log 2 events
// it should log 3 events
// first one is to log the new commit from the poller
// second one is to inform that the job was created
// third one is to inform that the job was deleted because it succeeded
Eventually(func(g Gomega) {
events, _ := k8sClientSet.CoreV1().Events(gitRepo.Namespace).List(context.TODO(),
metav1.ListOptions{
FieldSelector: "involvedObject.name=success",
TypeMeta: metav1.TypeMeta{Kind: "GitRepo"},
})
g.Expect(events).ToNot(BeNil())
g.Expect(len(events.Items)).To(Equal(2))
g.Expect(len(events.Items)).To(Equal(3))
g.Expect(events.Items[0].Reason).To(Equal("GotNewCommit"))
g.Expect(events.Items[0].Message).To(Equal("9ca3a0ad308ed8bffa6602572e2a1343af9c3d2e"))
g.Expect(events.Items[0].Type).To(Equal("Normal"))
Expand All @@ -227,7 +228,17 @@ var _ = Describe("GitJob controller", func() {
g.Expect(events.Items[1].Message).To(Equal("GitJob was created"))
g.Expect(events.Items[1].Type).To(Equal("Normal"))
g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller"))
g.Expect(events.Items[2].Reason).To(Equal("JobDeleted"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded"))
g.Expect(events.Items[2].Type).To(Equal("Normal"))
g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller"))
}).Should(Succeed())

// job should not be present
Consistently(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})
})

Expand Down Expand Up @@ -347,13 +358,6 @@ var _ = Describe("GitJob controller", func() {
g.Expect(checkCondition(&gitRepo, "Ready", corev1.ConditionTrue, "")).To(BeTrue())
g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue())
}).Should(Succeed())

By("verifying that the job is deleted if Spec.Generation changed")
Expect(simulateIncreaseGitRepoGeneration(gitRepo)).ToNot(HaveOccurred())
Eventually(func() bool {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job))
}).Should(BeTrue())
})
})
})
Expand Down Expand Up @@ -408,9 +412,51 @@ var _ = Describe("GitJob controller", func() {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Not(HaveOccurred()))
// simulate job was successful
Eventually(func() error {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())

// store the generation value to compare against later
generationValue = gitRepo.Spec.ForceSyncGeneration
Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred())
// simulate job was successful
Eventually(func() error {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())
})
BeforeEach(func() {
expectedCommit = commit
Expand Down Expand Up @@ -449,7 +495,7 @@ var _ = Describe("GitJob controller", func() {
g.Expect(events.Items[1].Type).To(Equal("Normal"))
g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller"))
g.Expect(events.Items[2].Reason).To(Equal("JobDeleted"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because of ForceUpdateGeneration"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded"))
g.Expect(events.Items[2].Type).To(Equal("Normal"))
g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller"))
}).Should(Succeed())
Expand Down Expand Up @@ -481,6 +527,26 @@ var _ = Describe("GitJob controller", func() {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Not(HaveOccurred()))
// simulate job was successful
Eventually(func() error {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())

// change a gitrepo field, this will change the Generation field. This simulates changing fleet apply parameters.
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand Down
119 changes: 86 additions & 33 deletions internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
),
),
).
Owns(&batchv1.Job{}).
Watches(
// Fan out from bundle to gitrepo
&v1alpha1.Bundle{},
Expand Down Expand Up @@ -190,12 +189,17 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
} else if repoPolled && oldCommit != gitrepo.Status.Commit {
r.Recorder.Event(gitrepo, fleetevent.Normal, "GotNewCommit", gitrepo.Status.Commit)
}

// check for webhook commit
if gitrepo.Status.WebhookCommit != "" && gitrepo.Status.WebhookCommit != gitrepo.Status.Commit {
gitrepo.Status.Commit = gitrepo.Status.WebhookCommit
}
// From this point onwards we have to take into account if the poller
// task was executed.
// If so, we need to return a Result with EnqueueAfter set.

res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled)
if err != nil {
if err != nil || res.Requeue {
return res, err
}

Expand Down Expand Up @@ -261,30 +265,26 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
}
}

if gitrepo.Status.Commit != "" {
if r.shouldCreateJob(gitrepo, oldCommit) {
r.updateGenerationValuesIfNeeded(gitrepo)
if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil {
r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error())
return result(repoPolled, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
}
logger.V(1).Info("Creating Git job resources")
if err := r.createJobRBAC(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create RBAC resources for git job: %w", err)
}
if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create targets config map for git job: %w", err)
}
if err := r.createCABundleSecret(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create cabundle secret for git job: %w", err)
}
if err := r.createJob(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("error creating git job: %w", err)
if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil {
return result(repoPolled, gitrepo), err
}
r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created")
}
} else if gitrepo.Status.Commit != "" {
if err = r.deleteJobIfNeeded(ctx, gitrepo, &job); err != nil {
} else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit {
err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job)
if err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("error deleting git job: %w", err)
}
// job was deleted and we need to recreate it
// Requeue so the reconciler creates the job again
if recreateGitJob {
return reconcile.Result{Requeue: true}, nil
}
}

gitrepo.Status.ObservedGeneration = gitrepo.Generation
Expand Down Expand Up @@ -330,6 +330,37 @@ func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logge
return nil
}

// shouldCreateJob checks if the conditions to create a new job are met.
// It checks for all the conditions so, in case more than one is met, it sets all the
// values related in one single reconciler loop
func (r *GitJobReconciler) shouldCreateJob(gitrepo *v1alpha1.GitRepo, oldCommit string) bool {
if gitrepo.Status.Commit != "" && gitrepo.Status.Commit != oldCommit {
return true
}

if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration {
return true
}

// k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed.
// Avoid deleting the job twice
if generationChanged(gitrepo) {
return true
}

return false
}

func (r *GitJobReconciler) updateGenerationValuesIfNeeded(gitrepo *v1alpha1.GitRepo) {
if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration {
gitrepo.Status.UpdateGeneration = gitrepo.Spec.ForceSyncGeneration
}

if generationChanged(gitrepo) {
gitrepo.Status.ObservedGeneration = gitrepo.Generation
}
}

func (r *GitJobReconciler) addGitRepoFinalizer(ctx context.Context, nsName types.NamespacedName) error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
gitrepo := &v1alpha1.GitRepo{}
Expand Down Expand Up @@ -446,38 +477,60 @@ func (r *GitJobReconciler) createJob(ctx context.Context, gitRepo *v1alpha1.GitR
return r.Create(ctx, job)
}

func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) error {
func (r *GitJobReconciler) createJobAndResources(ctx context.Context, gitrepo *v1alpha1.GitRepo, logger logr.Logger) error {
logger.V(1).Info("Creating Git job resources")
if err := r.createJobRBAC(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create RBAC resources for git job: %w", err)
}
if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create targets config map for git job: %w", err)
}
if err := r.createCABundleSecret(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create cabundle secret for git job: %w", err)
}
if err := r.createJob(ctx, gitrepo); err != nil {
return fmt.Errorf("error creating git job: %w", err)
}
r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created")
return nil
}

func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) (error, bool) {
logger := log.FromContext(ctx)
jobDeleted := false
jobDeletedMessage := ""
// if force delete is set, delete the job to make sure a new job is created

// the following cases imply that the job is still running but we need to stop it and
// create a new one
if gitRepo.Spec.ForceSyncGeneration != gitRepo.Status.UpdateGeneration {
gitRepo.Status.UpdateGeneration = gitRepo.Spec.ForceSyncGeneration
jobDeletedMessage = "job deletion triggered because of ForceUpdateGeneration"
jobDeletedMessage := "job deletion triggered because of ForceUpdateGeneration"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err
return err, true
}
jobDeleted = true
return nil, true
}

// k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed.
// Avoid deleting the job twice
if !jobDeleted && generationChanged(gitRepo) {
jobDeletedMessage = "job deletion triggered because of generation change"
gitRepo.Status.ObservedGeneration = gitRepo.Generation
if generationChanged(gitRepo) {
jobDeletedMessage := "job deletion triggered because of generation change"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err
return err, true
}
jobDeleted = true
return nil, true
}

if jobDeleted {
// check if the job finished and was successful
if job.Status.Succeeded == 1 {
jobDeletedMessage := "job deletion triggered because job succeeded"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err, false
}
r.Recorder.Event(gitRepo, fleetevent.Normal, "JobDeleted", jobDeletedMessage)
}

return nil
return nil, false
}

func generationChanged(r *v1alpha1.GitRepo) bool {
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ type GitRepoStatus struct {
// Update generation is the force update generation if spec.forceSyncGeneration is set
UpdateGeneration int64 `json:"updateGeneration,omitempty"`
// Commit is the Git commit hash from the last git job run.
// +nullable
// +optional
Commit string `json:"commit,omitempty"`
// WebhookCommit is the latest Git commit hash received from a webhook
// +optional
WebhookCommit string `json:"webhookCommit,omitempty"`
// ReadyClusters is the lowest number of clusters that are ready over
// all the bundles of this GitRepo.
// +optional
Expand Down
4 changes: 2 additions & 2 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
}

if gitrepo.Status.Commit != revision && revision != "" {
if gitrepo.Status.WebhookCommit != revision && revision != "" {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var gitRepoFromCluster v1alpha1.GitRepo
err := w.client.Get(
Expand All @@ -236,7 +236,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
if err != nil {
return err
}
gitRepoFromCluster.Status.Commit = revision
gitRepoFromCluster.Status.WebhookCommit = revision
// if PollingInterval is not set and webhook is configured, set it to 1 hour
if gitrepo.Spec.PollingInterval == nil {
gitRepoFromCluster.Spec.PollingInterval = &metav1.Duration{
Expand Down
Loading

0 comments on commit e75a058

Please sign in to comment.