Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes job handling in gitops controller #2903

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion charts/fleet-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6475,7 +6475,6 @@ spec:
commit:
description: Commit is the Git commit hash from the last git job
run.
nullable: true
type: string
conditions:
description: 'Conditions is a list of Wrangler conditions that describe
Expand Down Expand Up @@ -6890,6 +6889,10 @@ spec:
spec.forceSyncGeneration is set
format: int64
type: integer
webhookCommit:
description: WebhookCommit is the latest Git commit hash received
from a webhook
type: string
type: object
type: object
served: true
Expand Down
2 changes: 2 additions & 0 deletions e2e/single-cluster/gitrepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup"
By("updating the gitrepo's status")
expectedStatus := fleet.GitRepoStatus{
Commit: commit,
WebhookCommit: commit,
ReadyClusters: 1,
DesiredReadyClusters: 1,
GitJobStatus: "Current",
Expand Down Expand Up @@ -425,6 +426,7 @@ func (matcher *gitRepoStatusMatcher) Match(actual interface{}) (success bool, er
}

return got.Commit == want.Commit &&
got.WebhookCommit == want.WebhookCommit &&
got.ReadyClusters == want.ReadyClusters &&
got.DesiredReadyClusters == want.DesiredReadyClusters &&
got.GitJobStatus == want.GitJobStatus &&
Expand Down
86 changes: 76 additions & 10 deletions integrationtests/gitjob/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,18 @@ var _ = Describe("GitJob controller", func() {
g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue())
}).Should(Succeed())

// it should log 2 events
// it should log 3 events
// first one is to log the new commit from the poller
// second one is to inform that the job was created
// third one is to inform that the job was deleted because it succeeded
Eventually(func(g Gomega) {
events, _ := k8sClientSet.CoreV1().Events(gitRepo.Namespace).List(context.TODO(),
metav1.ListOptions{
FieldSelector: "involvedObject.name=success",
TypeMeta: metav1.TypeMeta{Kind: "GitRepo"},
})
g.Expect(events).ToNot(BeNil())
g.Expect(len(events.Items)).To(Equal(2))
g.Expect(len(events.Items)).To(Equal(3))
weyfonk marked this conversation as resolved.
Show resolved Hide resolved
g.Expect(events.Items[0].Reason).To(Equal("GotNewCommit"))
g.Expect(events.Items[0].Message).To(Equal("9ca3a0ad308ed8bffa6602572e2a1343af9c3d2e"))
g.Expect(events.Items[0].Type).To(Equal("Normal"))
Expand All @@ -192,7 +193,17 @@ var _ = Describe("GitJob controller", func() {
g.Expect(events.Items[1].Message).To(Equal("GitJob was created"))
g.Expect(events.Items[1].Type).To(Equal("Normal"))
g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller"))
g.Expect(events.Items[2].Reason).To(Equal("JobDeleted"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded"))
g.Expect(events.Items[2].Type).To(Equal("Normal"))
g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller"))
}).Should(Succeed())

// job should not be present
Consistently(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})
})

Expand Down Expand Up @@ -312,13 +323,6 @@ var _ = Describe("GitJob controller", func() {
g.Expect(checkCondition(&gitRepo, "Ready", corev1.ConditionTrue, "")).To(BeTrue())
g.Expect(checkCondition(&gitRepo, "Accepted", corev1.ConditionTrue, "")).To(BeTrue())
}).Should(Succeed())

By("verifying that the job is deleted if Spec.Generation changed")
Expect(simulateIncreaseGitRepoGeneration(gitRepo)).ToNot(HaveOccurred())
Eventually(func() bool {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job))
}).Should(BeTrue())
})
})
})
Expand Down Expand Up @@ -373,9 +377,51 @@ var _ = Describe("GitJob controller", func() {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Not(HaveOccurred()))
// simulate job was successful
Eventually(func() error {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())

// store the generation value to compare against later
generationValue = gitRepo.Spec.ForceSyncGeneration
Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred())
// simulate job was successful
Eventually(func() error {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())
})
BeforeEach(func() {
expectedCommit = commit
Expand Down Expand Up @@ -414,7 +460,7 @@ var _ = Describe("GitJob controller", func() {
g.Expect(events.Items[1].Type).To(Equal("Normal"))
g.Expect(events.Items[1].Source.Component).To(Equal("gitjob-controller"))
g.Expect(events.Items[2].Reason).To(Equal("JobDeleted"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because of ForceUpdateGeneration"))
g.Expect(events.Items[2].Message).To(Equal("job deletion triggered because job succeeded"))
g.Expect(events.Items[2].Type).To(Equal("Normal"))
g.Expect(events.Items[2].Source.Component).To(Equal("gitjob-controller"))
}).Should(Succeed())
Expand Down Expand Up @@ -446,6 +492,26 @@ var _ = Describe("GitJob controller", func() {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Not(HaveOccurred()))
// simulate job was successful
Eventually(func() error {
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
// We could be checking this when the job is still not created
Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred())
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{
{
Type: "Complete",
Status: "True",
},
}
return k8sClient.Status().Update(ctx, &job)
}).Should(Not(HaveOccurred()))
// wait until the job has finished
Eventually(func() bool {
jobName = names.SafeConcatName(gitRepoName, names.Hex(repo+commit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())

// change a gitrepo field, this will change the Generation field. This simulates changing fleet apply parameters.
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand Down
119 changes: 86 additions & 33 deletions internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
),
),
).
Owns(&batchv1.Job{}).
Watches(
// Fan out from bundle to gitrepo
&v1alpha1.Bundle{},
Expand Down Expand Up @@ -190,12 +189,17 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
} else if repoPolled && oldCommit != gitrepo.Status.Commit {
r.Recorder.Event(gitrepo, fleetevent.Normal, "GotNewCommit", gitrepo.Status.Commit)
}

// check for webhook commit
if gitrepo.Status.WebhookCommit != "" && gitrepo.Status.WebhookCommit != gitrepo.Status.Commit {
gitrepo.Status.Commit = gitrepo.Status.WebhookCommit
}
// From this point onwards we have to take into account if the poller
// task was executed.
// If so, we need to return a Result with EnqueueAfter set.

res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled)
if err != nil {
if err != nil || res.Requeue {
weyfonk marked this conversation as resolved.
Show resolved Hide resolved
return res, err
}

Expand Down Expand Up @@ -261,30 +265,26 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
}
}

if gitrepo.Status.Commit != "" {
if r.shouldCreateJob(gitrepo, oldCommit) {
r.updateGenerationValuesIfNeeded(gitrepo)
if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil {
r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error())
return result(repoPolled, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
}
logger.V(1).Info("Creating Git job resources")
if err := r.createJobRBAC(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create RBAC resources for git job: %w", err)
}
if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create targets config map for git job: %w", err)
}
if err := r.createCABundleSecret(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("failed to create cabundle secret for git job: %w", err)
}
if err := r.createJob(ctx, gitrepo); err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("error creating git job: %w", err)
if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil {
return result(repoPolled, gitrepo), err
}
r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created")
}
} else if gitrepo.Status.Commit != "" {
if err = r.deleteJobIfNeeded(ctx, gitrepo, &job); err != nil {
} else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit {
err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job)
if err != nil {
return result(repoPolled, gitrepo), fmt.Errorf("error deleting git job: %w", err)
}
// job was deleted and we need to recreate it
// Requeue so the reconciler creates the job again
if recreateGitJob {
return reconcile.Result{Requeue: true}, nil
}
}

gitrepo.Status.ObservedGeneration = gitrepo.Generation
Expand Down Expand Up @@ -330,6 +330,37 @@ func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logge
return nil
}

// shouldCreateJob checks if the conditions to create a new job are met.
// It checks for all the conditions so, in case more than one is met, it sets all the
// values related in one single reconciler loop
func (r *GitJobReconciler) shouldCreateJob(gitrepo *v1alpha1.GitRepo, oldCommit string) bool {
if gitrepo.Status.Commit != "" && gitrepo.Status.Commit != oldCommit {
return true
}

if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration {
return true
}

// k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed.
// Avoid deleting the job twice
if generationChanged(gitrepo) {
return true
}

return false
}

func (r *GitJobReconciler) updateGenerationValuesIfNeeded(gitrepo *v1alpha1.GitRepo) {
if gitrepo.Spec.ForceSyncGeneration != gitrepo.Status.UpdateGeneration {
gitrepo.Status.UpdateGeneration = gitrepo.Spec.ForceSyncGeneration
}

if generationChanged(gitrepo) {
gitrepo.Status.ObservedGeneration = gitrepo.Generation
}
}

func (r *GitJobReconciler) addGitRepoFinalizer(ctx context.Context, nsName types.NamespacedName) error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
gitrepo := &v1alpha1.GitRepo{}
Expand Down Expand Up @@ -442,38 +473,60 @@ func (r *GitJobReconciler) createJob(ctx context.Context, gitRepo *v1alpha1.GitR
return r.Create(ctx, job)
}

func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) error {
func (r *GitJobReconciler) createJobAndResources(ctx context.Context, gitrepo *v1alpha1.GitRepo, logger logr.Logger) error {
logger.V(1).Info("Creating Git job resources")
if err := r.createJobRBAC(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create RBAC resources for git job: %w", err)
}
if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create targets config map for git job: %w", err)
}
if err := r.createCABundleSecret(ctx, gitrepo); err != nil {
return fmt.Errorf("failed to create cabundle secret for git job: %w", err)
}
if err := r.createJob(ctx, gitrepo); err != nil {
return fmt.Errorf("error creating git job: %w", err)
}
r.Recorder.Event(gitrepo, fleetevent.Normal, "Created", "GitJob was created")
return nil
}

func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitRepo *v1alpha1.GitRepo, job *batchv1.Job) (error, bool) {
logger := log.FromContext(ctx)
jobDeleted := false
jobDeletedMessage := ""
// if force delete is set, delete the job to make sure a new job is created

// the following cases imply that the job is still running but we need to stop it and
// create a new one
if gitRepo.Spec.ForceSyncGeneration != gitRepo.Status.UpdateGeneration {
gitRepo.Status.UpdateGeneration = gitRepo.Spec.ForceSyncGeneration
jobDeletedMessage = "job deletion triggered because of ForceUpdateGeneration"
jobDeletedMessage := "job deletion triggered because of ForceUpdateGeneration"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err
return err, true
}
jobDeleted = true
return nil, true
}

// k8s Jobs are immutable. Recreate the job if the GitRepo Spec has changed.
// Avoid deleting the job twice
if !jobDeleted && generationChanged(gitRepo) {
jobDeletedMessage = "job deletion triggered because of generation change"
gitRepo.Status.ObservedGeneration = gitRepo.Generation
if generationChanged(gitRepo) {
jobDeletedMessage := "job deletion triggered because of generation change"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err
return err, true
}
jobDeleted = true
return nil, true
}

if jobDeleted {
// check if the job finished and was successful
if job.Status.Succeeded == 1 {
jobDeletedMessage := "job deletion triggered because job succeeded"
logger.Info(jobDeletedMessage)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err, false
}
r.Recorder.Event(gitRepo, fleetevent.Normal, "JobDeleted", jobDeletedMessage)
}

return nil
return nil, false
}

func generationChanged(r *v1alpha1.GitRepo) bool {
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ type GitRepoStatus struct {
// Update generation is the force update generation if spec.forceSyncGeneration is set
UpdateGeneration int64 `json:"updateGeneration,omitempty"`
// Commit is the Git commit hash from the last git job run.
// +nullable
// +optional
Commit string `json:"commit,omitempty"`
// WebhookCommit is the latest Git commit hash received from a webhook
// +optional
WebhookCommit string `json:"webhookCommit,omitempty"`
// ReadyClusters is the lowest number of clusters that are ready over
// all the bundles of this GitRepo.
// +optional
Expand Down
4 changes: 2 additions & 2 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
}

if gitrepo.Status.Commit != revision && revision != "" {
if gitrepo.Status.WebhookCommit != revision && revision != "" {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var gitRepoFromCluster v1alpha1.GitRepo
err := w.client.Get(
Expand All @@ -235,7 +235,7 @@ func (w *Webhook) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
if err != nil {
return err
}
gitRepoFromCluster.Status.Commit = revision
gitRepoFromCluster.Status.WebhookCommit = revision
// if PollingInterval is not set and webhook is configured, set it to 1 hour
if gitrepo.Spec.PollingInterval == nil {
gitRepoFromCluster.Spec.PollingInterval = &metav1.Duration{
Expand Down
Loading
Loading