diff --git a/app/controllers.go b/app/controllers.go index 5bf8c60..8729242 100644 --- a/app/controllers.go +++ b/app/controllers.go @@ -5,6 +5,7 @@ import ( controllerscontext "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/context" "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/crdinstallation" + "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/mciservicelocations" "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/multiclusteringress" "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/multiclusterservice" "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/serviceexportpropagation" @@ -79,3 +80,14 @@ func startServiceExportPropagationController(ctx controllerscontext.Context) (en } return true, nil } + +func startMCIServiceLocationsController(ctx controllerscontext.Context) (enabled bool, err error) { + c := &mciservicelocations.Controller{ + Client: ctx.Mgr.GetClient(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + } + if err = c.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + return true, nil +} diff --git a/app/manager.go b/app/manager.go index 5fba9b1..8248e84 100644 --- a/app/manager.go +++ b/app/manager.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" multiclusterprovider "github.com/karmada-io/multicluster-cloud-provider" "github.com/karmada-io/multicluster-cloud-provider/options" @@ -39,6 +40,7 @@ func init() { controllers["multiclusterservice"] = startMCSController controllers["crd-installation"] = startCRDInstallationController controllers["serviceexport-propagation"] = startServiceExportPropagationController + controllers["mci-service-locations"] = startMCIServiceLocationsController } // InitProviderFunc is used to initialize multicluster provider @@ -105,7 +107,7 @@ func Run(ctx context.Context, opts *options.MultiClusterControllerManagerOptions controllerManager, err := controllerruntime.NewManager(config, controllerruntime.Options{ Logger: klog.Background(), Scheme: gclient.NewSchema(), - SyncPeriod: &opts.ResyncPeriod.Duration, + Cache: cache.Options{SyncPeriod: &opts.ResyncPeriod.Duration}, LeaderElection: opts.LeaderElection.LeaderElect, LeaderElectionID: opts.LeaderElection.ResourceName, LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace, @@ -115,7 +117,7 @@ func Run(ctx context.Context, opts *options.MultiClusterControllerManagerOptions LeaderElectionResourceLock: opts.LeaderElection.ResourceLock, HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)), LivenessEndpointName: "/healthz", - MetricsBindAddress: opts.MetricsBindAddress, + Metrics: metricsserver.Options{BindAddress: opts.MetricsBindAddress}, MapperProvider: restmapper.MapperProvider, BaseContext: func() context.Context { return ctx diff --git a/pkg/controllers/mciservicelocations/mci_service_locations.go b/pkg/controllers/mciservicelocations/mci_service_locations.go new file mode 100644 index 0000000..8519414 --- /dev/null +++ b/pkg/controllers/mciservicelocations/mci_service_locations.go @@ -0,0 +1,217 @@ +package mciservicelocations + +import ( + "context" + "reflect" + "sort" + + networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util/names" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "k8s.io/kubernetes/staging/src/k8s.io/client-go/util/retry" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/indexes" +) + +// ControllerName is the controller name that will be used when reporting events. +const ControllerName = "mci-service-locations-controller" + +// Controller is used to maintain information about the clusters in which +// the Service backend of the MultiClusterIngress resource resides. +type Controller struct { + client.Client + RateLimiterOptions ratelimiterflag.Options +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling MultiClusterIngress %s", req.NamespacedName.String()) + + mci := &networkingv1alpha1.MultiClusterIngress{} + if err := c.Client.Get(ctx, req.NamespacedName, mci); err != nil { + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + klog.InfoS("failed to get multiClusterIngress object", "NamespacedName", req.NamespacedName.String()) + return controllerruntime.Result{}, err + } + + svcLocations, err := c.calculateServiceLocations(ctx, mci) + if err != nil { + return controllerruntime.Result{}, err + } + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + if reflect.DeepEqual(svcLocations, mci.Status.ServiceLocations) { + return nil + } + mci.Status.ServiceLocations = svcLocations + updateErr := c.Client.Status().Update(ctx, mci) + if updateErr == nil { + return nil + } + + updatedMCI := &networkingv1alpha1.MultiClusterIngress{} + err = c.Client.Get(ctx, req.NamespacedName, updatedMCI) + if err == nil { + mci = updatedMCI + } else { + klog.Errorf("Failed to get updated multiClusterIngress(%s): %v", req.NamespacedName.String(), err) + } + return updateErr + }) + if err != nil { + klog.Errorf("Failed to sync multiClusterIngress(%s) service locations: %v", req.NamespacedName.String(), err) + return controllerruntime.Result{}, err + } + klog.V(4).Infof("Success to sync multiClusterIngress(%s) service locations", req.NamespacedName.String()) + return controllerruntime.Result{}, nil +} + +func (c *Controller) calculateServiceLocations(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) ([]networkingv1alpha1.ServiceLocation, error) { + backendSvcNames := indexes.BuildServiceRefIndexes(mci) + sort.Strings(backendSvcNames) + + var svcLocations []networkingv1alpha1.ServiceLocation + for _, svcName := range backendSvcNames { + svcBinding := &workv1alpha2.ResourceBinding{} + svcRBNamespacedName := types.NamespacedName{ + Namespace: mci.Namespace, + Name: names.GenerateBindingName("Service", svcName), + } + err := c.Client.Get(ctx, svcRBNamespacedName, svcBinding) + if err != nil { + if apierrors.IsNotFound(err) { + continue + } + klog.ErrorS(err, "failed to get service's related resourceBinding", + "ResourceBinding", svcRBNamespacedName.String()) + return nil, err + } + + svcLocations = append(svcLocations, networkingv1alpha1.ServiceLocation{ + Name: svcName, + Clusters: obtainBindingClusters(svcBinding), + }) + } + return svcLocations, nil +} + +func obtainBindingClusters(rb *workv1alpha2.ResourceBinding) []string { + clusters := sets.NewString() + for _, cluster := range rb.Spec.Clusters { + clusters.Insert(cluster.Name) + } + for _, requiredBy := range rb.Spec.RequiredBy { + for _, cluster := range requiredBy.Clusters { + clusters.Insert(cluster.Name) + } + } + return clusters.List() +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { + mciPredicateFuncs := predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { return true }, + DeleteFunc: func(event event.DeleteEvent) bool { return false }, + UpdateFunc: func(event event.UpdateEvent) bool { + oldMCI := event.ObjectOld.(*networkingv1alpha1.MultiClusterIngress) + newMCI := event.ObjectNew.(*networkingv1alpha1.MultiClusterIngress) + return !reflect.DeepEqual(*oldMCI.Spec.DefaultBackend, *newMCI.Spec.DefaultBackend) || + !reflect.DeepEqual(oldMCI.Spec.Rules, newMCI.Spec.Rules) + }, + } + + serviceMapFunc := handler.MapFunc( + func(ctx context.Context, object client.Object) []reconcile.Request { + var requests []reconcile.Request + + mciList := &networkingv1alpha1.MultiClusterIngressList{} + if err := c.Client.List(context.Background(), mciList, + client.InNamespace(object.GetNamespace()), + client.MatchingFields{indexes.IndexKeyServiceRefName: object.GetName()}); err != nil { + klog.Errorf("failed to fetch multiclusteringresses") + return nil + } + + for index := range mciList.Items { + mci := &mciList.Items[index] + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}}) + } + return requests + }) + + servicePredicateFuncs := predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { return true }, + UpdateFunc: func(event event.UpdateEvent) bool { return false }, + DeleteFunc: func(event event.DeleteEvent) bool { return true }, + } + + rbMapFunc := handler.MapFunc( + func(ctx context.Context, object client.Object) []reconcile.Request { + var requests []reconcile.Request + + rb := object.(*workv1alpha2.ResourceBinding) + if rb.Spec.Resource.APIVersion != "v1" || rb.Spec.Resource.Kind != "Service" { + return nil + } + + mciList := &networkingv1alpha1.MultiClusterIngressList{} + if err := c.Client.List(context.Background(), mciList, + client.InNamespace(rb.GetNamespace()), + client.MatchingFields{indexes.IndexKeyServiceRefName: rb.Spec.Resource.Name}); err != nil { + klog.Errorf("failed to fetch multiclusteringresses") + return nil + } + + for index := range mciList.Items { + mci := &mciList.Items[index] + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}}) + } + return requests + }) + + rbPredicateFuncs := predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + rb := event.Object.(*workv1alpha2.ResourceBinding) + return rb.Spec.Resource.APIVersion == "v1" && rb.Spec.Resource.Kind == "Service" + }, + UpdateFunc: func(event event.UpdateEvent) bool { + oldRB := event.ObjectOld.(*workv1alpha2.ResourceBinding) + newRB := event.ObjectNew.(*workv1alpha2.ResourceBinding) + if newRB.Spec.Resource.APIVersion != "v1" || newRB.Spec.Resource.Kind != "Service" { + return false + } + return !reflect.DeepEqual(oldRB.Spec.Clusters, newRB.Spec.Clusters) || + !reflect.DeepEqual(oldRB.Spec.RequiredBy, newRB.Spec.RequiredBy) + }, + DeleteFunc: func(event event.DeleteEvent) bool { return false }, + } + + return controllerruntime.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.MultiClusterIngress{}). + WithEventFilter(mciPredicateFuncs). + Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(serviceMapFunc), builder.WithPredicates(servicePredicateFuncs)). + Watches(&workv1alpha2.ResourceBinding{}, handler.EnqueueRequestsFromMapFunc(rbMapFunc), builder.WithPredicates(rbPredicateFuncs)). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). + Complete(c) +}