Skip to content

Commit

Permalink
Merge branch 'main' into scheduler-integration-tests-3
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelawyu authored Oct 7, 2023
2 parents c5252d4 + 48a0b8f commit c4292ca
Show file tree
Hide file tree
Showing 28 changed files with 2,906 additions and 945 deletions.
4 changes: 4 additions & 0 deletions apis/placement/v1beta1/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const (
// which an object is derived or last updated.
CRPGenerationAnnotation = fleetPrefix + "CRPGeneration"

// PreviousBindingStateAnnotation is the annotation that records the previous state of a binding.
// This is used to remember if an "unscheduled" binding was moved from a "bound" state or a "scheduled" state.
PreviousBindingStateAnnotation = fleetPrefix + "PreviousBindingState"

// EnvelopeConfigMapAnnotation is the annotation that indicates the configmap is an envelope configmap that contains resources
// we need to apply to the member cluster instead of the configMap itself.
EnvelopeConfigMapAnnotation = fleetPrefix + "EnvelopeConfigMap"
Expand Down
2 changes: 1 addition & 1 deletion apis/placement/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
Expand Down Expand Up @@ -71,7 +72,6 @@ require (
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
Expand Down
85 changes: 74 additions & 11 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
allBindings = append(allBindings, binding.DeepCopy())
}

// handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not completely deleted yet
wait, err := waitForResourcesToCleanUp(allBindings, &crp)
if err != nil {
return ctrl.Result{}, err
}
if wait {
// wait for the deletion to finish
klog.V(2).InfoS("Found multiple bindings pointing to the same cluster, wait for the deletion to finish", "clusterResourcePlacement", crpName)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

// find the latest resource resourceBinding
latestResourceSnapshotName, err := r.fetchLatestResourceSnapshot(ctx, crpName)
if err != nil {
Expand All @@ -101,18 +112,16 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
klog.V(2).InfoS("Found the latest resourceSnapshot for the clusterResourcePlacement", "clusterResourcePlacement", crpName, "latestResourceSnapshotName", latestResourceSnapshotName)

//TODO: handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not deleted yet

// fill out all the default values for CRP just in case the mutation webhook is not enabled.
crpCopy := crp.DeepCopy()
fleetv1beta1.SetDefaultsClusterResourcePlacement(crpCopy)
fleetv1beta1.SetDefaultsClusterResourcePlacement(&crp)
// validate the clusterResourcePlacement just in case the validation webhook is not enabled
if err = validator.ValidateClusterResourcePlacement(crpCopy); err != nil {
if err = validator.ValidateClusterResourcePlacement(&crp); err != nil {
klog.ErrorS(err, "Encountered an invalid clusterResourcePlacement", "clusterResourcePlacement", crpName)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}

// pick the bindings to be updated according to the rollout plan
toBeUpdatedBindings, needRoll := pickBindingsToRoll(allBindings, latestResourceSnapshotName, crpCopy)
toBeUpdatedBindings, needRoll := pickBindingsToRoll(allBindings, latestResourceSnapshotName, &crp)
if !needRoll {
klog.V(2).InfoS("No bindings are out of date, stop rolling", "clusterResourcePlacement", crpName)
return ctrl.Result{}, nil
Expand All @@ -124,7 +133,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// to avoid the case that the rollout process stalling because the time based binding readiness does not trigger any event.
// We wait for 1/5 of the UnavailablePeriodSeconds so we can catch the next ready one early.
// TODO: only wait the time we need to wait for the first applied but not ready binding to be ready
return ctrl.Result{RequeueAfter: time.Duration(*crpCopy.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second / 5},
return ctrl.Result{RequeueAfter: time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second / 5},
r.updateBindings(ctx, latestResourceSnapshotName, toBeUpdatedBindings)
}

Expand Down Expand Up @@ -160,6 +169,54 @@ func (r *Reconciler) fetchLatestResourceSnapshot(ctx context.Context, crpName st
return latestResourceSnapshotName, nil
}

// waitForResourcesToCleanUp checks if there are any cluster that has a binding that is both being deleted and another one that needs rollout.
// We currently just wait for those cluster to be cleanup so that we can have a clean slate to start compute the rollout plan.
// TODO (rzhang): group all bindings pointing to the same cluster together when we calculate the rollout plan so that we can avoid this.
func waitForResourcesToCleanUp(allBindings []*fleetv1beta1.ClusterResourceBinding, crp *fleetv1beta1.ClusterResourcePlacement) (bool, error) {
crpObj := klog.KObj(crp)
deletingBinding := make(map[string]bool)
bindingMap := make(map[string]*fleetv1beta1.ClusterResourceBinding)
// separate deleting bindings from the rest of the bindings
for _, binding := range allBindings {
if !binding.DeletionTimestamp.IsZero() {
deletingBinding[binding.Spec.TargetCluster] = true
klog.V(2).InfoS("Found a binding that is being deleted", "clusterResourcePlacement", crpObj, "binding", klog.KObj(binding))
} else {
if _, exist := bindingMap[binding.Spec.TargetCluster]; !exist {
bindingMap[binding.Spec.TargetCluster] = binding
} else {
return false, controller.NewUnexpectedBehaviorError(fmt.Errorf("the same cluster `%s` has bindings `%s` and `%s` pointing to it",
binding.Spec.TargetCluster, bindingMap[binding.Spec.TargetCluster].Name, binding.Name))
}
}
}
// check if there are any cluster that has a binding that is both being deleted and scheduled
for cluster, binding := range bindingMap {
// check if there is a deleting binding on the same cluster
if deletingBinding[cluster] {
klog.V(2).InfoS("Find a binding assigned to a cluster with another deleting binding", "clusterResourcePlacement", crpObj, "binding", binding)
if binding.Spec.State == fleetv1beta1.BindingStateBound {
// the rollout controller won't move a binding from scheduled state to bound if there is a deleting binding on the same cluster.
return false, controller.NewUnexpectedBehaviorError(fmt.Errorf(
"find a cluster `%s` that has a bound binding `%s` and a deleting binding point to it", binding.Spec.TargetCluster, binding.Name))
}
if binding.Spec.State == fleetv1beta1.BindingStateUnscheduled {
// this is a very rare case that the resource was in the middle of being removed from a member cluster after it is unselected.
// then the cluster get selected and unselected in two scheduling before the member agent is able to clean up all the resources.
if binding.GetAnnotations()[fleetv1beta1.PreviousBindingStateAnnotation] == string(fleetv1beta1.BindingStateBound) {
// its previous state can not be bound as rollout won't roll a binding with a deleting binding pointing to the same cluster.
return false, controller.NewUnexpectedBehaviorError(fmt.Errorf(
"find a cluster `%s` that has a unscheduled binding `%+s` with previous state is `bound` and a deleting binding point to it", binding.Spec.TargetCluster, binding.Name))
}
return true, nil
}
// there is a scheduled binding on the same cluster, we need to wait for the deletion to finish
return true, nil
}
}
return false, nil
}

// pickBindingsToRoll go through all bindings associated with a CRP and returns the bindings that are ready to be updated.
// There could be cases that no bindings are ready to be updated because of the maxSurge/maxUnavailable constraints even if there are out of sync bindings.
// Thus, it also returns a bool indicating whether there are out of sync bindings to be rolled to differentiate those two cases.
Expand Down Expand Up @@ -203,11 +260,11 @@ func pickBindingsToRoll(allBindings []*fleetv1beta1.ClusterResourceBinding, late
klog.V(8).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", klog.KObj(crp), "binding", klog.KObj(binding))
readyBindings = append(readyBindings, binding)
}
if binding.DeletionTimestamp == nil {
// it's not deleted yet, so it is a removal candidate
if binding.DeletionTimestamp.IsZero() {
// it's not been deleted yet, so it is a removal candidate
removeCandidates = append(removeCandidates, binding)
} else if bindingReady {
// it can be deleted at any time, so it can be unavailable at any time
// it is being deleted, it can be removed from the cluster at any time, so it can be unavailable at any time
canBeUnavailableBindings = append(canBeUnavailableBindings, binding)
}

Expand Down Expand Up @@ -242,8 +299,14 @@ func pickBindingsToRoll(allBindings []*fleetv1beta1.ClusterResourceBinding, late
case crp.Spec.Policy.PlacementType == fleetv1beta1.PickFixedPlacementType:
// we use the length of the given cluster names are targets
targetNumber = len(crp.Spec.Policy.ClusterNames)
default:
case crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType:
// we use the given number as the target
targetNumber = int(*crp.Spec.Policy.NumberOfClusters)
default:
// should never happen
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("unknown placement type")),
"Encountered an invalid placementType", "clusterResourcePlacement", klog.KObj(crp))
targetNumber = 0
}
klog.V(2).InfoS("Calculated the targetNumber", "clusterResourcePlacement", klog.KObj(crp),
"targetNumber", targetNumber, "readyBindingNumber", len(readyBindings), "canBeUnavailableBindingNumber", len(canBeUnavailableBindings),
Expand Down
112 changes: 110 additions & 2 deletions pkg/controllers/rollout/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
)

const (
timeout = time.Second * 5
interval = time.Millisecond * 250
timeout = time.Second * 5
interval = time.Millisecond * 250
consistentTimeout = time.Second * 60
consistentInterval = time.Second * 5
customBindingFinalizer = "custom-binding-finalizer"
)

var testCRPName string
Expand All @@ -46,10 +49,12 @@ var _ = Describe("Test the rollout Controller", func() {
for _, binding := range bindings {
Expect(k8sClient.Delete(ctx, binding)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{}))
}
bindings = nil
By("Deleting ClusterResourceSnapshots")
for _, resourceSnapshot := range resourceSnapshots {
Expect(k8sClient.Delete(ctx, resourceSnapshot)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{}))
}
resourceSnapshots = nil
By("Deleting ClusterResourcePlacement")
Expect(k8sClient.Delete(ctx, rolloutCRP)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{}))
})
Expand Down Expand Up @@ -346,13 +351,108 @@ var _ = Describe("Test the rollout Controller", func() {
}, timeout, interval).Should(BeTrue(), "rollout controller should roll all the bindings to use the latest resource snapshot")
})

It("Should wait for deleting binding delete before we rollout", func() {
// create CRP
var targetCluster int32 = 5
rolloutCRP = clusterResourcePlacementForTest(testCRPName, createPlacementPolicyForTest(fleetv1beta1.PickNPlacementType, targetCluster))
Expect(k8sClient.Create(ctx, rolloutCRP)).Should(Succeed())
// create master resource snapshot that is latest
latestSnapshot := generateResourceSnapshot(rolloutCRP.Name, 1, true)
Expect(k8sClient.Create(ctx, latestSnapshot)).Should(Succeed())
By(fmt.Sprintf("resource snapshot %s created", latestSnapshot.Name))
// generate scheduled bindings for master snapshot on target clusters
clusters := make([]string, targetCluster)
for i := 0; i < int(targetCluster); i++ {
clusters[i] = "cluster-" + utils.RandStr()
binding := generateClusterResourceBinding(fleetv1beta1.BindingStateScheduled, latestSnapshot.Name, clusters[i])
bindings = append(bindings, binding)
}
// create two unscheduled bindings and delete them
firstDeleteBinding := generateClusterResourceBinding(fleetv1beta1.BindingStateUnscheduled, latestSnapshot.Name, clusters[0])
firstDeleteBinding.Name = "delete-" + firstDeleteBinding.Name
firstDeleteBinding.SetFinalizers([]string{customBindingFinalizer})
Expect(k8sClient.Create(ctx, firstDeleteBinding)).Should(Succeed())
Expect(k8sClient.Delete(ctx, firstDeleteBinding)).Should(Succeed())
secondDeleteBinding := generateClusterResourceBinding(fleetv1beta1.BindingStateUnscheduled, latestSnapshot.Name, clusters[2])
secondDeleteBinding.Name = "delete-" + secondDeleteBinding.Name
secondDeleteBinding.SetFinalizers([]string{customBindingFinalizer})
Expect(k8sClient.Create(ctx, secondDeleteBinding)).Should(Succeed())
Expect(k8sClient.Delete(ctx, secondDeleteBinding)).Should(Succeed())
By("Created 2 deleting bindings")
// create the normal binding after the deleting one
for _, binding := range bindings {
Expect(k8sClient.Create(ctx, binding)).Should(Succeed())
By(fmt.Sprintf("resource binding %s created", binding.Name))
}
// check that no bindings are rolled out
Consistently(func() bool {
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
return false
}
if binding.Spec.State == fleetv1beta1.BindingStateBound {
return false
}
}
return true
}, consistentTimeout, consistentInterval).Should(BeTrue(), "rollout controller should not roll the bindings")
By("Verified that the rollout is blocked")
// now we remove the finalizer of the first deleting binding
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: firstDeleteBinding.GetName()}, firstDeleteBinding)).Should(Succeed())
firstDeleteBinding.SetFinalizers([]string{})
Expect(k8sClient.Update(ctx, firstDeleteBinding)).Should(Succeed())
Eventually(func() bool {
return apierrors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: firstDeleteBinding.GetName()}, firstDeleteBinding))
}, timeout, interval).Should(BeTrue(), "the first deleting binding should now be deleted")
By("Verified that the first deleting binding is deleted")
// check that no bindings are rolled out
Consistently(func() bool {
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
return false
}
if binding.Spec.State == fleetv1beta1.BindingStateBound {
return false
}
}
return true
}, consistentTimeout, consistentInterval).Should(BeTrue(), "rollout controller should not roll the bindings")
By("Verified that the rollout is still blocked")
// now we remove the finalizer of the second deleting binding
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: secondDeleteBinding.GetName()}, secondDeleteBinding)).Should(Succeed())
secondDeleteBinding.SetFinalizers([]string{})
Expect(k8sClient.Update(ctx, secondDeleteBinding)).Should(Succeed())
Eventually(func() bool {
return apierrors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: secondDeleteBinding.GetName()}, secondDeleteBinding))
}, timeout, interval).Should(BeTrue(), "the second deleting binding should now be deleted")
By("Verified that the second deleting binding is deleted")
// check that the bindings are rolledout
Eventually(func() bool {
for _, binding := range bindings {
err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)
if err != nil {
return false
}
if binding.Spec.State != fleetv1beta1.BindingStateBound {
return false
}
}
return true
}, consistentTimeout, consistentInterval).Should(BeTrue(), "rollout controller should roll all the bindings to Bound state")
By("Verified that the rollout is finally unblocked")
})

// TODO: should update scheduled bindings to the latest snapshot when it is updated to bound state.

// TODO: should count the deleting bindings as can be Unavailable.

})

func markBindingApplied(binding *fleetv1beta1.ClusterResourceBinding) {
// get the binding again to avoid conflict
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, binding)).Should(Succeed())
binding.SetConditions(metav1.Condition{
Status: metav1.ConditionTrue,
Type: string(fleetv1beta1.ResourceBindingApplied),
Expand Down Expand Up @@ -407,3 +507,11 @@ func generateResourceSnapshot(testCRPName string, resourceIndex int, isLatest bo
}
return clusterResourceSnapshot
}

func generateDeletingClusterResourceBinding(targetCluster string) *fleetv1beta1.ClusterResourceBinding {
binding := generateClusterResourceBinding(fleetv1beta1.BindingStateUnscheduled, "anything", targetCluster)
binding.DeletionTimestamp = &metav1.Time{
Time: now,
}
return binding
}
Loading

0 comments on commit c4292ca

Please sign in to comment.