Skip to content

Commit

Permalink
feat: enable clusterStagedUpdateRun controller (#1004)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwtty authored Jan 4, 2025
1 parent 81a63df commit 9799886
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 24 deletions.
11 changes: 10 additions & 1 deletion apis/placement/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ const (
// ResourceOverrideKind is the kind of the ResourceOverride.
ResourceOverrideKind = "ResourceOverride"

// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshotKind.
// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshot.
ResourceOverrideSnapshotKind = "ResourceOverrideSnapshot"

// ClusterStagedUpdateRunKind is the kind of the ClusterStagedUpdateRun.
ClusterStagedUpdateRunKind = "ClusterStagedUpdateRun"

// ClusterStagedUpdateStrategyKind is the kind of the ClusterStagedUpdateStrategy.
ClusterStagedUpdateStrategyKind = "ClusterStagedUpdateStrategy"

// ClusterApprovalRequestKind is the kind of the ClusterApprovalRequest.
ClusterApprovalRequestKind = "ClusterApprovalRequest"

// ClusterStagedUpdateRunFinalizer is used by the ClusterStagedUpdateRun controller to make sure that the ClusterStagedUpdateRun
// object is not deleted until all its dependent resources are deleted.
ClusterStagedUpdateRunFinalizer = fleetPrefix + "stagedupdaterun-finalizer"
Expand Down
1 change: 1 addition & 0 deletions charts/hub-agent/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ spec:
- --enable-v1alpha1-apis={{ .Values.enableV1Alpha1APIs }}
- --enable-v1beta1-apis={{ .Values.enableV1Beta1APIs }}
- --enable-cluster-inventory-apis={{ .Values.enableClusterInventoryAPI }}
- --enable-staged-update-run-apis={{ .Values.enableStagedUpdateRunAPIs }}
- --max-concurrent-cluster-placement={{ .Values.MaxConcurrentClusterPlacement }}
- --concurrent-resource-change-syncs={{ .Values.ConcurrentResourceChangeSyncs }}
- --log_file_max_size={{ .Values.logFileMaxSize }}
Expand Down
1 change: 1 addition & 0 deletions charts/hub-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ affinity: {}
enableV1Alpha1APIs: false
enableV1Beta1APIs: true
enableClusterInventoryAPI: true
enableStagedUpdateRunAPIs: true

hubAPIQPS: 250
hubAPIBurst: 1000
Expand Down
4 changes: 4 additions & 0 deletions cmd/hubagent/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Options struct {
EnableClusterInventoryAPIs bool
// ForceDeleteWaitTime is the duration the hub agent waits before force deleting a member cluster.
ForceDeleteWaitTime metav1.Duration
// EnableStagedUpdateRunAPIs enables the agents to watch the clusterStagedUpdateRun CRs.
EnableStagedUpdateRunAPIs bool
}

// NewOptions builds an empty options.
Expand All @@ -99,6 +101,7 @@ func NewOptions() *Options {
MaxFleetSizeSupported: 100,
EnableV1Alpha1APIs: false,
EnableClusterInventoryAPIs: false,
EnableStagedUpdateRunAPIs: false,
}
}

Expand Down Expand Up @@ -140,6 +143,7 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
flags.BoolVar(&o.EnableV1Beta1APIs, "enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
flags.BoolVar(&o.EnableClusterInventoryAPIs, "enable-cluster-inventory-apis", false, "If set, the agents will watch for the ClusterInventory APIs.")
flags.DurationVar(&o.ForceDeleteWaitTime.Duration, "force-delete-wait-time", 15*time.Minute, "The duration the hub agent waits before force deleting a member cluster.")
flags.BoolVar(&o.EnableStagedUpdateRunAPIs, "enable-staged-update-run-apis", false, "If set, the agents will watch for the ClusterStagedUpdateRun APIs.")

o.RateLimiterOpts.AddFlags(flags)
}
32 changes: 29 additions & 3 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.goms.io/fleet/pkg/controllers/overrider"
"go.goms.io/fleet/pkg/controllers/resourcechange"
"go.goms.io/fleet/pkg/controllers/rollout"
"go.goms.io/fleet/pkg/controllers/updaterun"
"go.goms.io/fleet/pkg/controllers/workgenerator"
"go.goms.io/fleet/pkg/resourcewatcher"
"go.goms.io/fleet/pkg/scheduler"
Expand Down Expand Up @@ -85,13 +86,20 @@ var (
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ResourceOverrideSnapshotKind),
}

clusterStagedUpdateRunGVKs = []schema.GroupVersionKind{
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateRunKind),
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateStrategyKind),
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterApprovalRequestKind),
}

clusterInventoryGVKs = []schema.GroupVersionKind{
clusterinventory.GroupVersion.WithKind("ClusterProfile"),
}
)

// SetupControllers set up the customized controllers we developed
func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error {
func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { //nolint:gocyclo
// TODO: Try to reduce the complexity of this last measured at 33 (failing at > 30) and remove the // nolint:gocyclo
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
klog.ErrorS(err, "unable to create the dynamic client")
Expand Down Expand Up @@ -195,7 +203,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

// Set up a new controller to do rollout resources according to CRP rollout strategy
// Set up a new controller to do rollout resources according to CRP rollout strategy
klog.Info("Setting up rollout controller")
if err := (&rollout.Reconciler{
Client: mgr.GetClient(),
Expand All @@ -215,6 +223,24 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

// Set up a controller to do staged update run, rolling out resources to clusters in a stage by stage manner.
if opts.EnableStagedUpdateRunAPIs {
for _, gvk := range clusterStagedUpdateRunGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterStagedUpdateRun controller")
if err = (&updaterun.Reconciler{
Client: mgr.GetClient(),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "unable to set up clusterStagedUpdateRun controller")
return err
}
}

// Set up the work generator
klog.Info("Setting up work generator")
if err := (&workgenerator.Reconciler{
Expand Down Expand Up @@ -327,7 +353,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
}
}

// Set up a new controller to reconcile any resources in the cluster
// Set up a new controller to reconcile any resources in the cluster
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Expand Down
4 changes: 2 additions & 2 deletions examples/stagedupdaterun/clusterStagedUpdateRun.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ metadata:
name: example-run
spec:
placementName: example-placement
resourceSnapshotIndex: "1"
resourceSnapshotIndex: example-placement-0-snapshot
stagedRolloutStrategyName: example-strategy
status:
policySnapshotIndexUsed: "1"
policySnapshotIndexUsed: example-placement-0
policyObservedClusterCount: 3
appliedStrategy:
type: Immediate
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) {
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func generateTestClusterSchedulingPolicySnapshot(idx int) *placementv1beta1.Clus
}
}

func generateTestClusterResourceBinding(policySnapshotName, targetCluster string) *placementv1beta1.ClusterResourceBinding {
func generateTestClusterResourceBinding(policySnapshotName, targetCluster string, state placementv1beta1.BindingState) *placementv1beta1.ClusterResourceBinding {
binding := &placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding-" + testResourceSnapshotName + "-" + targetCluster,
Expand All @@ -277,7 +277,7 @@ func generateTestClusterResourceBinding(policySnapshotName, targetCluster string
},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
State: state,
TargetCluster: targetCluster,
SchedulingPolicySnapshotName: policySnapshotName,
},
Expand Down
41 changes: 37 additions & 4 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func (r *Reconciler) executeUpdatingStage(
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if binding.Spec.State != placementv1beta1.BindingStateBound {
Expand All @@ -124,6 +127,14 @@ func (r *Reconciler) executeUpdatingStage(
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
return clusterUpdatingWaitTime, updateErr
Expand All @@ -139,8 +150,10 @@ func (r *Reconciler) executeUpdatingStage(
}

// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v", clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec))
if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
Expand Down Expand Up @@ -314,6 +327,26 @@ func (r *Reconciler) checkAfterStageTasksStatus(ctx context.Context, updatingSta
return true, nil
}

// updateBindingRolloutStarted updates the binding status to indicate the rollout has started.
func (r *Reconciler) updateBindingRolloutStarted(ctx context.Context, binding *placementv1beta1.ClusterResourceBinding, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
// first reset the condition to reflect the latest lastTransitionTime
binding.RemoveCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
cond := metav1.Condition{
Type: string(placementv1beta1.ResourceBindingRolloutStarted),
Status: metav1.ConditionTrue,
ObservedGeneration: binding.Generation,
Reason: condition.RolloutStartedReason,
Message: fmt.Sprintf("Detected the new changes on the resources and started the rollout process, resourceSnapshotName: %s, clusterStagedUpdateRun: %s", updateRun.Spec.ResourceSnapshotIndex, updateRun.Name),
}
binding.SetConditions(cond)
if err := r.Client.Status().Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update binding status", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
return controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated binding as rolloutStarted", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
return nil
}

// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStagedUpdateRun, binding *placementv1beta1.ClusterResourceBinding, cluster *placementv1alpha1.ClusterUpdatingStatus) bool {
if binding.Spec.ResourceSnapshotName != updateRun.Spec.ResourceSnapshotIndex {
Expand All @@ -335,8 +368,8 @@ func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStaged
return true
}

// checkClusterUpdateResult checks if the cluster has been updated successfully.
// It returns if the cluster has been updated successfully and the error if the cluster update failed.
// checkClusterUpdateResult checks if the resources have been updated successfully on a given cluster.
// It returns true if the resources have been updated successfully or any error if the update failed.
func checkClusterUpdateResult(
binding *placementv1beta1.ClusterResourceBinding,
clusterStatus *placementv1alpha1.ClusterUpdatingStatus,
Expand Down
11 changes: 8 additions & 3 deletions pkg/controllers/updaterun/execution_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ var _ = Describe("UpdateRun execution tests", func() {
}
// reserse the order of the clusters by index
targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region})
resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name)
resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name, placementv1beta1.BindingStateScheduled)
}

unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters)
for i := range unscheduledCluster {
unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"})
// update the policySnapshot name so that these clusters are considered to-be-deleted
resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name)
resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name, placementv1beta1.BindingStateUnscheduled)
}

var err error
Expand Down Expand Up @@ -215,7 +215,7 @@ var _ = Describe("UpdateRun execution tests", func() {
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
})

It("Should mark the 3rd cluster in the 1st stage as succeeded", func() {
It("Should mark the 3rd cluster in the 1st stage as succeeded after marking the binding available", func() {
By("Validating the 3rd clusterResourceBinding is updated to Bound")
binding := resourceBindings[numTargetClusters-5] // cluster-5
validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0)
Expand Down Expand Up @@ -494,6 +494,11 @@ func validateBindingState(ctx context.Context, binding *placementv1beta1.Cluster
if diff := cmp.Diff(binding.Spec.ApplyStrategy, updateRun.Status.ApplyStrategy); diff != "" {
return fmt.Errorf("binding %s has different applyStrategy (-want +got):\n%s", binding.Name, diff)
}

rolloutStartedCond := binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
if !condition.IsConditionStatusTrue(rolloutStartedCond, binding.Generation) {
return fmt.Errorf("binding %s does not have RolloutStarted condition", binding.Name)
}
return nil
}, timeout, interval).Should(Succeed(), "failed to validate the binding state")
}
6 changes: 6 additions & 0 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func (r *Reconciler) collectScheduledClusters(
klog.V(2).InfoS("Found a scheduled binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
selectedBindings = append(selectedBindings, &bindingList.Items[i])
} else {
if binding.Spec.State != placementv1beta1.BindingStateUnscheduled {
stateErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("binding `%s` with old policy snapshot %s has state %s, not unscheduled", binding.Name, binding.Spec.SchedulingPolicySnapshotName, binding.Spec.State))
klog.ErrorS(stateErr, "Failed to collect clusterResourceBindings", "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
return nil, nil, fmt.Errorf("%w: %s", errInitializedFailed, stateErr.Error())
}
klog.V(2).InfoS("Found a to-be-deleted binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
toBeDeletedBindings = append(toBeDeletedBindings, &bindingList.Items[i])
}
Expand Down
Loading

0 comments on commit 9799886

Please sign in to comment.