Skip to content

Commit

Permalink
feat: replace instruction annotation with work suspendDispatching field
Browse files Browse the repository at this point in the history
fix: get false when suspendDispatching is nil

fix: ignore work events when suspendDispatching is true

feat: rename workIndex and mv GetWorkSuspendDispatching to helper

feat: modify the style of Deprecated
Signed-off-by: vie-serendipity <[email protected]>
  • Loading branch information
vie-serendipity committed Feb 7, 2025
1 parent 446dbe9 commit 85f3dca
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 17 deletions.
3 changes: 3 additions & 0 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
}
if err := mcs.IndexField(ctx.Mgr); err != nil {
return false, err
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
if err = mcs.IndexField(ctx.Mgr); err != nil {
return false, err
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down
47 changes: 47 additions & 0 deletions pkg/controllers/mcs/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mcs

import (
"context"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util/helper"
)

const (
workSuspendDispatchdingIndex = "workSpec.suspendDispatching"
)

// IndexField registers Indexer functions to controller manager.
func IndexField(mgr controllerruntime.Manager) error {
workIndexerFunc := func(obj client.Object) []string {
work, ok := obj.(*workv1alpha1.Work)
if !ok {
return nil
}
return helper.GetWorkSuspendDispatching(&work.Spec)
}

return utilerrors.NewAggregate([]error{
mgr.GetFieldIndexer().IndexField(context.TODO(), &workv1alpha1.Work{}, workSuspendDispatchdingIndex, workIndexerFunc),
})
}
20 changes: 11 additions & 9 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -151,7 +152,9 @@ func (c *ServiceExportController) RunWorkQueue() {
func (c *ServiceExportController) enqueueReportedEpsServiceExport() {
workList := &workv1alpha1.WorkList{}
err := wait.PollUntilContextCancel(context.TODO(), 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
err = c.List(ctx, workList, client.MatchingLabels{util.PropagationInstruction: util.PropagationInstructionSuppressed})
err = c.List(ctx, workList, client.MatchingFields{
workSuspendDispatchdingIndex: "true",
})
if err != nil {
klog.Errorf("Failed to list collected EndpointSlices Work from member clusters: %v", err)
return false, nil
Expand Down Expand Up @@ -428,10 +431,10 @@ func (c *ServiceExportController) removeOrphanWork(ctx context.Context, endpoint
if err := c.List(ctx, collectedEpsWorkList, &client.ListOptions{
Namespace: names.GenerateExecutionSpaceName(serviceExportKey.Cluster),
LabelSelector: labels.SelectorFromSet(labels.Set{
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ServiceNamespaceLabel: serviceExportKey.Namespace,
util.ServiceNameLabel: serviceExportKey.Name,
util.ServiceNamespaceLabel: serviceExportKey.Namespace,
util.ServiceNameLabel: serviceExportKey.Name,
}),
FieldSelector: fields.OneTermEqualSelector(workSuspendDispatchdingIndex, "true"),
}); err != nil {
klog.Errorf("Failed to list endpointslice work with serviceExport(%s/%s) under namespace %s: %v",
serviceExportKey.Namespace, serviceExportKey.Name, names.GenerateExecutionSpaceName(serviceExportKey.Cluster), err)
Expand Down Expand Up @@ -495,7 +498,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
// indicate the Work should be not propagated since it's collected resource.
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
return err
}

Expand All @@ -518,10 +522,8 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w
Namespace: ns,
Finalizers: []string{util.EndpointSliceControllerFinalizer},
Labels: map[string]string{
util.ServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
util.EndpointSliceWorkManagedByLabel: util.ServiceExportKind,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
// indicate the Work should be not propagated since it's collected resource.
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand All @@ -408,9 +409,7 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w
ls := map[string]string{
util.MultiClusterServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.MultiClusterServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
}
if existWork.Labels == nil || (err != nil && apierrors.IsNotFound(err)) {
workMeta := metav1.ObjectMeta{Name: workName, Namespace: ns, Labels: ls}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) {
Labels: map[string]string{
util.MultiClusterServiceNamespaceLabel: "default",
util.MultiClusterServiceNameLabel: "test-service",
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
},
},
Expand All @@ -159,7 +158,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) {
Labels: map[string]string{
util.MultiClusterServiceNamespaceLabel: "default",
util.MultiClusterServiceNameLabel: "test-service",
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: "ExistingController.MultiClusterService",
},
Finalizers: []string{util.MCSEndpointSliceDispatchControllerFinalizer},
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
Labels: map[string]string{
// We add this id in mutating webhook, let's just use it
networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel),
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
util.MultiClusterServiceNameLabel: mcs.Name,
},
Expand All @@ -310,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, ctrlutil.WithSuspendDispatching(true)); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
//
// Note: This instruction is intended to set on Work objects to indicate the Work should be ignored by
// execution controller. The instruction maybe deprecated once we extend the Work API and no other scenario want this.
// Deprecated
PropagationInstruction = "propagation.karmada.io/instruction"

// FederatedResourceQuotaNamespaceLabel is added to Work to specify associated FederatedResourceQuota's namespace.
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/helper/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi
return false
}

if IsWorkSuspendDispatching(obj) {
klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType)
return false
}

clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
Expand Down Expand Up @@ -176,6 +181,11 @@ func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predic
return false
}

if IsWorkSuspendDispatching(obj) {
klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType)
return false
}

clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/helper/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package helper
import (
"context"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -111,3 +112,8 @@ func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.Gro
func IsWorkSuspendDispatching(work *workv1alpha1.Work) bool {
return ptr.Deref(work.Spec.SuspendDispatching, false)
}

// GetWorkSuspendDispatching will get suspendDispatching field from work spec
func GetWorkSuspendDispatching(spec *workv1alpha1.WorkSpec) []string {
return []string{strconv.FormatBool(ptr.Deref(spec.SuspendDispatching, false))}
}

0 comments on commit 85f3dca

Please sign in to comment.