Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add, remove finalizer for CRB in Scheduler and scheduler watcher for CRB #924

Merged
merged 22 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// SchedulerCRBCleanupFinalizer is a finalizer added to ClusterResourceBindings to ensure we can look up the
// corresponding CRP for deleting ClusterResourceBindings in a scheduling cycle.
SchedulerCRBCleanupFinalizer = fleetPrefix + "scheduler-crb-cleanup"
)

// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb
// +kubebuilder:subresource:status
Expand Down
10 changes: 10 additions & 0 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.goms.io/fleet/pkg/scheduler/framework"
"go.goms.io/fleet/pkg/scheduler/profile"
"go.goms.io/fleet/pkg/scheduler/queue"
schedulercrbwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourcebinding"
schedulercrpwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourceplacement"
schedulercspswatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterschedulingpolicysnapshot"
"go.goms.io/fleet/pkg/scheduler/watchers/membercluster"
Expand Down Expand Up @@ -278,6 +279,15 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

klog.Info("Setting up the clusterResourceBinding watcher for scheduler")
if err := (&schedulercrbwatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourceBinding watcher for scheduler")
return err
}

klog.Info("Setting up the memberCluster watcher for scheduler")
if err := (&membercluster.Reconciler{
Client: mgr.GetClient(),
Expand Down
45 changes: 42 additions & 3 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
Expand Down Expand Up @@ -287,12 +288,18 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// * dangling bindings, i.e., bindings that are associated with a cluster that is no longer
// in a normally operating state (the cluster has left the fleet, or is in the state of leaving),
// yet has not been marked as unscheduled by the scheduler; and
//
// * deleting bindings, i.e., bindings that have a deletionTimeStamp on them.
// Any deleted binding is also ignored.
// Note that bindings marked as unscheduled are ignored by the scheduler, as they
// are irrelevant to the scheduling cycle. However, we will reconcile them with the latest scheduling
// result so that we won't have a ever increasing chain of flip flop bindings.
bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters)
bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters)

// Remove finalizers on all deleting bindings.
if err := f.removeFinalizer(ctx, deleting); err != nil {
klog.ErrorS(err, "Failed to remove finalizers from deleting bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Mark all dangling bindings as unscheduled.
if err := f.markAsUnscheduledFor(ctx, dangling); err != nil {
Expand Down Expand Up @@ -373,7 +380,39 @@ func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*placem
// We will just retry for conflict errors since the scheduler holds the truth here.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
return f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding)
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
getErr := f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding)
if getErr != nil {
return getErr
}
}
return err
})
})
}
return errs.Wait()
}

// removeFinalizer removes all finalizers from ClusterResourceBinding.
func (f *framework) removeFinalizer(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range bindings {
deletingBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err)
},
func() error {
controllerutil.RemoveFinalizer(deletingBinding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := f.client.Update(cctx, deletingBinding, &client.UpdateOptions{})
// We will retry on conflicts.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
getErr := f.client.Get(cctx, client.ObjectKeyFromObject(deletingBinding), deletingBinding)
if getErr != nil {
return getErr
}
}
return err
})
Expand Down
17 changes: 16 additions & 1 deletion pkg/scheduler/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ func TestClassifyBindings(t *testing.T) {
wantObsolete := []*placementv1beta1.ClusterResourceBinding{&obsoleteBinding}
wantUnscheduled := []*placementv1beta1.ClusterResourceBinding{&unscheduledBinding}
wantDangling := []*placementv1beta1.ClusterResourceBinding{&associatedWithLeavingClusterBinding, &assocaitedWithDisappearedClusterBinding}
wantDeleting := []*placementv1beta1.ClusterResourceBinding{&deletingBinding}

bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters)
bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters)
if diff := cmp.Diff(bound, wantBound); diff != "" {
t.Errorf("classifyBindings() bound diff (-got, +want): %s", diff)
}
Expand All @@ -373,6 +374,10 @@ func TestClassifyBindings(t *testing.T) {
if diff := cmp.Diff(dangling, wantDangling); diff != "" {
t.Errorf("classifyBindings() dangling diff (-got, +want) = %s", diff)
}

if diff := cmp.Diff(deleting, wantDeleting); diff != "" {
t.Errorf("classifyBIndings() deleting diff (-got, +want) = %s", diff)
}
}

// TestMarkAsUnscheduledFor tests the markAsUnscheduledFor method.
Expand Down Expand Up @@ -1274,6 +1279,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand All @@ -1296,6 +1302,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand All @@ -1318,6 +1325,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down Expand Up @@ -1510,6 +1518,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down Expand Up @@ -1619,6 +1628,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand All @@ -1641,6 +1651,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand All @@ -1663,6 +1674,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down Expand Up @@ -1706,6 +1718,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand All @@ -1728,6 +1741,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down Expand Up @@ -1813,6 +1827,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) {
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down
19 changes: 10 additions & 9 deletions pkg/scheduler/framework/frameworkutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
// - dangling bindings, i.e., bindings that are associated with a cluster that is no longer in
// a normally operating state (the cluster has left the fleet, or is in the state of leaving),
// yet has not been marked as unscheduled by the scheduler; and
// - unscheduled bindings, i.e., bindings that are marked to be removed by the scheduler.
// - unscheduled bindings, i.e., bindings that are marked to be removed by the scheduler; and
// - obsolete bindings, i.e., bindings that are no longer associated with the latest scheduling
// policy.
func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, bindings []placementv1beta1.ClusterResourceBinding, clusters []clusterv1beta1.MemberCluster) (bound, scheduled, obsolete, unscheduled, dangling []*placementv1beta1.ClusterResourceBinding) {
// policy; and
// - deleting bindings, i.e., bindings that have a deletionTimeStamp on them.
func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, bindings []placementv1beta1.ClusterResourceBinding, clusters []clusterv1beta1.MemberCluster) (bound, scheduled, obsolete, unscheduled, dangling, deleting []*placementv1beta1.ClusterResourceBinding) {
// Pre-allocate arrays.
bound = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))
scheduled = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))
obsolete = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))
unscheduled = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))
dangling = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))
deleting = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings))

// Build a map for clusters for quick loopup.
clusterMap := make(map[string]clusterv1beta1.MemberCluster)
Expand All @@ -52,11 +54,8 @@ func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot,

switch {
case !binding.DeletionTimestamp.IsZero():
// Ignore any binding that has been deleted.
//
// Note that the scheduler will not add any cleanup scheduler to a binding, as
// in normal operations bound and scheduled bindings will not be deleted, and
// unscheduled bindings are disregarded by the scheduler.
// we need remove scheduler reconcile finalizer from deleting ClusterResourceBindings.
deleting = append(deleting, &binding)
case binding.Spec.State == placementv1beta1.BindingStateUnscheduled:
// we need to remember those bindings so that we will not create another one.
unscheduled = append(unscheduled, &binding)
Expand All @@ -83,7 +82,7 @@ func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot,
}
}

return bound, scheduled, obsolete, unscheduled, dangling
return bound, scheduled, obsolete, unscheduled, dangling, deleting
}

// bindingWithPatch is a helper struct that includes a binding that needs to be patched and the
Expand Down Expand Up @@ -186,6 +185,7 @@ func crossReferencePickedClustersAndDeDupBindings(
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down Expand Up @@ -684,6 +684,7 @@ func crossReferenceValidTargetsWithBindings(
Labels: map[string]string{
placementv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
Expand Down
20 changes: 13 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,25 +282,31 @@ func (s *Scheduler) cleanUpAllBindingsFor(ctx context.Context, crp *fleetv1beta1
return controller.NewAPIServerError(false, err)
}

// Remove the scheduler cleanup finalizer from all the bindings, and delete them.
//
// Remove scheduler reconcile finalizer from deleting bindings.
for idx := range bindingList.Items {
binding := &bindingList.Items[idx]
controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCRBCleanupFinalizer)
if err := s.client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to remove scheduler reconcile finalizer from cluster resource binding", "clusterResourceBinding", klog.KObj(binding))
return controller.NewUpdateIgnoreConflictError(err)
}
}
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved

// Note that once a CRP has been marked for deletion, it will no longer enter the scheduling cycle,
// so any cleanup finalizer has to be removed here.
//
// Also note that for deleted CRPs, derived bindings are deleted right away by the scheduler;
// the scheduler no longer marks them as deleting and waits for another controller to actually
// run the deletion.
for idx := range bindingList.Items {
binding := bindingList.Items[idx]
binding := &bindingList.Items[idx]
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
// Delete the binding if it has not been marked for deletion yet.
if binding.DeletionTimestamp == nil {
if err := s.client.Delete(ctx, &binding); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete binding", "clusterResourceBinding", klog.KObj(&binding))
if err := s.client.Delete(ctx, binding); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete binding", "clusterResourceBinding", klog.KObj(binding))
return controller.NewAPIServerError(false, err)
}
}

// Note that the scheduler will not add any cleanup finalizer to a binding.
}

// All bindings have been deleted; remove the scheduler cleanup finalizer from the CRP.
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestCleanUpAllBindingsFor(t *testing.T) {
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{fleetv1beta1.SchedulerCRBCleanupFinalizer},
},
},
{
Expand All @@ -78,6 +79,7 @@ func TestCleanUpAllBindingsFor(t *testing.T) {
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crpName,
},
Finalizers: []string{fleetv1beta1.SchedulerCRBCleanupFinalizer},
},
},
}
Expand Down
Loading
Loading