From 3eabf14b036fec4bff0ffbd4a505edbf0b7e983f Mon Sep 17 00:00:00 2001 From: Raman Tehlan Date: Mon, 10 Jun 2024 08:21:05 +0530 Subject: [PATCH] feat: move informer to dynamic informer --- .../controller/elastiservice_controller.go | 6 +- operator/internal/controller/opsCRD.go | 8 +- operator/internal/controller/opsInformer.go | 63 +++-- operator/internal/informer/informer.go | 222 ++++++++++++------ pkg/utils/utils.go | 4 +- playground/config/watch-crd.yaml | 7 +- 6 files changed, 202 insertions(+), 108 deletions(-) diff --git a/operator/internal/controller/elastiservice_controller.go b/operator/internal/controller/elastiservice_controller.go index d5982511..8c3c666d 100644 --- a/operator/internal/controller/elastiservice_controller.go +++ b/operator/internal/controller/elastiservice_controller.go @@ -112,11 +112,9 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques // We reset mutex if crd is deleted, so it can be used again if the same CRD is reapplied r.getMutexForInformerStart(req.NamespacedName.String()).Do(func() { // Watch for changes in target deployment - go r.Informer.AddDeploymentWatch(es.Name, es.Spec.DeploymentName, - req.Namespace, r.getTargetDeploymentChangeHandler(ctx, es, req)) + go r.Informer.AddDeploymentWatch(req, es.Spec.DeploymentName, req.Namespace, r.getTargetDeploymentChangeHandler(ctx, es, req)) // Watch for changes in activator deployment - go r.Informer.AddDeploymentWatch(es.Name, resolverDeploymentName, - resolverNamespace, r.getResolverChangeHandler(ctx, es, req)) + go r.Informer.AddDeploymentWatch(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)) }) deploymentNamespacedName := types.NamespacedName{ diff --git a/operator/internal/controller/opsCRD.go b/operator/internal/controller/opsCRD.go index 8234a0a8..110d1d1a 100644 --- a/operator/internal/controller/opsCRD.go +++ b/operator/internal/controller/opsCRD.go @@ -54,10 +54,10 @@ func (r *ElastiServiceReconciler) finalizeCRD(ctx context.Context, es *v1alpha1. r.Logger.Info("ElastiService is being deleted", zap.String("name", es.Name), zap.Any("deletionTimestamp", es.ObjectMeta.DeletionTimestamp)) // Reset the informer start mutex, so if the ElastiService is recreated, we will need to reset the informer r.resetMutexForInformer(req.NamespacedName.String()) - // Stop target service informer - go r.Informer.StopInformer(es.Name, es.Spec.DeploymentName, es.Namespace) - // Stop resolver informer - go r.Informer.StopInformer(es.Name, resolverDeploymentName, resolverNamespace) + // Stop all active informers + // NOTE: If the informerManager is shared across multiple controllers, this will stop all informers + // In that case, we must call the + go r.Informer.StopForCRD(req.Name) // Remove CRD details from service directory crdDirectory.CRDDirectory.RemoveCRD(es.Spec.Service) diff --git a/operator/internal/controller/opsInformer.go b/operator/internal/controller/opsInformer.go index 2db7da3a..e15e4626 100644 --- a/operator/internal/controller/opsInformer.go +++ b/operator/internal/controller/opsInformer.go @@ -7,6 +7,8 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" @@ -23,26 +25,10 @@ func (r *ElastiServiceReconciler) resetMutexForInformer(key string) { r.WatcherStartLock.Delete(key) } -func (r *ElastiServiceReconciler) getTargetDeploymentChangeHandler(_ context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs { +func (r *ElastiServiceReconciler) getTargetDeploymentChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, new interface{}) { - newDeployment := new.(*appsv1.Deployment) - condition := newDeployment.Status.Conditions - if newDeployment.Status.Replicas == 0 { - r.Logger.Debug("Deployment has 0 replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String())) - _, err := r.runReconcile(context.Background(), req, ProxyMode) - if err != nil { - r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err)) - return - } - } else if newDeployment.Status.Replicas > 0 && condition[1].Status == "True" { - r.Logger.Debug("Deployment has replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String())) - _, err := r.runReconcile(context.Background(), req, ServeMode) - if err != nil { - r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err)) - return - } - } + r.handleTargetChanges(ctx, new, es, req) }, } } @@ -71,7 +57,11 @@ func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context, } func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj interface{}, serviceName, namespace string) { - resolverDeployment := obj.(*appsv1.Deployment) + resolverDeployment, err := r.unstructuredToDeployment(obj) + if err != nil { + r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err)) + return + } if resolverDeployment.Name == resolverDeploymentName { targetNamespacedName := types.NamespacedName{ Name: serviceName, @@ -88,3 +78,38 @@ func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj } } } + +func (r *ElastiServiceReconciler) handleTargetChanges(ctx context.Context, obj interface{}, es *v1alpha1.ElastiService, req ctrl.Request) { + newDeployment, err := r.unstructuredToDeployment(obj) + if err != nil { + r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err)) + return + } + condition := newDeployment.Status.Conditions + if newDeployment.Status.Replicas == 0 { + r.Logger.Debug("Deployment has 0 replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String())) + _, err := r.runReconcile(ctx, req, ProxyMode) + if err != nil { + r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err)) + return + } + } else if newDeployment.Status.Replicas > 0 && condition[1].Status == "True" { + r.Logger.Debug("Deployment has replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String())) + _, err := r.runReconcile(ctx, req, ServeMode) + if err != nil { + r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err)) + return + } + } +} + +func (r *ElastiServiceReconciler) unstructuredToDeployment(obj interface{}) (*appsv1.Deployment, error) { + unstructuredObj := obj.(*unstructured.Unstructured) + newDeployment := &appsv1.Deployment{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), newDeployment) + if err != nil { + r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err)) + return nil, err + } + return newDeployment, nil +} diff --git a/operator/internal/informer/informer.go b/operator/internal/informer/informer.go index f8799abf..4064de78 100644 --- a/operator/internal/informer/informer.go +++ b/operator/internal/informer/informer.go @@ -5,36 +5,52 @@ package informer import ( "context" "fmt" - "strings" + "runtime" "sync" "time" "go.uber.org/zap" - appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + kRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + + "k8s.io/client-go/dynamic" + + ctrl "sigs.k8s.io/controller-runtime" ) -// Manager helps manage lifecycle of informer -type Manager struct { - client *kubernetes.Clientset - logger *zap.Logger - informers sync.Map - resyncPeriod time.Duration - healthCheckDuration time.Duration - healthCheckStopChan chan struct{} -} +type ( + // Manager helps manage lifecycle of informer + Manager struct { + client *kubernetes.Clientset + dynamicClient *dynamic.DynamicClient + logger *zap.Logger + informers sync.Map + resyncPeriod time.Duration + healthCheckDuration time.Duration + healthCheckStopChan chan struct{} + } -type info struct { - Informer cache.SharedIndexInformer - StopCh chan struct{} - Handlers cache.ResourceEventHandlerFuncs -} + info struct { + Informer cache.SharedInformer + StopCh chan struct{} + Req *RequestWatch + } + // RequestWatch is the request body sent to the informer + RequestWatch struct { + Req ctrl.Request + ResourceName string + ResourceNamespace string + GroupVersionResource *schema.GroupVersionResource + Handlers cache.ResourceEventHandlerFuncs + } +) // NewInformerManager creates a new instance of the Informer Manager func NewInformerManager(logger *zap.Logger, kConfig *rest.Config) *Manager { @@ -42,9 +58,15 @@ func NewInformerManager(logger *zap.Logger, kConfig *rest.Config) *Manager { if err != nil { logger.Fatal("Error connecting with kubernetes", zap.Error(err)) } + dynamicClient, err := dynamic.NewForConfig(kConfig) + if err != nil { + logger.Fatal("Error connecting with kubernetes", zap.Error(err)) + } + return &Manager{ - client: clientSet, - logger: logger.Named("InformerManager"), + client: clientSet, + dynamicClient: dynamicClient, + logger: logger.Named("InformerManager"), // ResyncPeriod is the proactive resync we do, even when no events are received by the informer. resyncPeriod: 0, healthCheckDuration: 5 * time.Second, @@ -64,9 +86,9 @@ func (m *Manager) Stop() { m.logger.Info("Stopping InformerManager") // Loop through all the informers and stop them m.informers.Range(func(key, value interface{}) bool { - _, ok := value.(info) + info, ok := value.(info) if ok { - m.StopInformer(parseInformerKey(key.(string))) + m.stopInformer(m.getInformerKeyFromReq(info.Req)) } return true }) @@ -75,15 +97,54 @@ func (m *Manager) Stop() { m.logger.Info("InformerManager stopped") } +// StopForCRD is to close all the active informers for a perticular CRD +func (m *Manager) StopForCRD(crdName string) { + m.logger.Info("Stopping Informer for CRD", zap.String("crd", crdName)) + // Loop through all the informers and stop them + m.informers.Range(func(key, value interface{}) bool { + // Check if key starts with the crdName + if key.(string)[:len(crdName)] == crdName { + info, ok := value.(info) + if ok { + m.stopInformer(m.getInformerKeyFromReq(info.Req)) + } + } + return true + }) + m.logger.Info("Informer stopped for CRD", zap.String("crd", crdName)) +} + +// stopInformer is to stop a informer for a resource +// It closes the shared informer for it and deletes it from the map +func (m *Manager) stopInformer(key string) { + m.logger.Info("Stopping informer", zap.String("key", key)) + value, ok := m.informers.Load(key) + if !ok { + m.logger.Info("Informer not found", zap.String("key", key)) + return + } + + // We need to verify if the informer exists in the map + informerInfo, ok := value.(info) + if !ok { + m.logger.Error("Failed to cast WatchInfo", zap.String("key", key)) + return + } + + // Close the informer, delete it from the map + close(informerInfo.StopCh) + m.informers.Delete(key) + m.logger.Info("Informer stopped", zap.String("key", key)) +} + func (m *Manager) monitorInformers() { m.informers.Range(func(key, value interface{}) bool { - informerInfo, ok := value.(info) + info, ok := value.(info) if ok { - if !informerInfo.Informer.HasSynced() { - m.logger.Info("Informer not synced", zap.String("deployment_name", key.(string))) - crdName, resourceName, namespace := parseInformerKey(key.(string)) - m.StopInformer(crdName, resourceName, namespace) - m.enableInformer(crdName, resourceName, namespace, informerInfo.Handlers) + if !info.Informer.HasSynced() { + m.logger.Info("Informer not synced", zap.String("key", key.(string))) + m.stopInformer(m.getInformerKeyFromReq(info.Req)) + m.enableInformer(info.Req) } } return true @@ -91,39 +152,69 @@ func (m *Manager) monitorInformers() { } // AddDeploymentWatch is to add a watch on a deployment -func (m *Manager) AddDeploymentWatch(crdName, deploymentName, namespace string, handlers cache.ResourceEventHandlerFuncs) { - m.logger.Info("Adding Deployment informer", zap.String("deployment_name", deploymentName)) - m.enableInformer(crdName, deploymentName, namespace, handlers) +func (m *Manager) AddDeploymentWatch(req ctrl.Request, deploymentName, namespace string, handlers cache.ResourceEventHandlerFuncs) { + request := &RequestWatch{ + Req: req, + ResourceName: deploymentName, + ResourceNamespace: namespace, + GroupVersionResource: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + Handlers: handlers, + } + m.Add(request) +} + +// Add is to add a watch on a resource +func (m *Manager) Add(req *RequestWatch) { + m.logger.Info("Adding informer", + zap.String("gvr", req.GroupVersionResource.String()), + zap.String("resourceName", req.ResourceName), + zap.String("resourceNamespace", req.ResourceNamespace), + zap.String("crd", req.Req.String()), + ) + m.enableInformer(req) } -// enableInformer is to enable the informer for a deployment -func (m *Manager) enableInformer(crdName, deploymentName, namespace string, handlers cache.ResourceEventHandlerFuncs) { - key := getInformerKey(crdName, deploymentName, namespace) +// enableInformer is to enable the informer for a resource +func (m *Manager) enableInformer(req *RequestWatch) { + defer func() { + if rErr := recover(); rErr != nil { + m.logger.Error("Recovered from panic", zap.Any("recovered", rErr)) + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + m.logger.Error("Panic stack trace", zap.ByteString("stacktrace", buf[:n])) + } + }() + key := m.getInformerKeyFromReq(req) // Proceed only if the informer is not already running, we verify by checking the map if _, ok := m.informers.Load(key); ok { m.logger.Info("Informer already running", zap.String("key", key)) return } - // Create a new shared informer - informer := cache.NewSharedIndexInformer( + + ctx := context.Background() + // Create an informer for the resource + informer := cache.NewSharedInformer( &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return m.client.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{ - FieldSelector: "metadata.name=" + deploymentName, + ListFunc: func(options metav1.ListOptions) (kRuntime.Object, error) { + return m.dynamicClient.Resource(*req.GroupVersionResource).Namespace(req.ResourceNamespace).List(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + req.ResourceName, }) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return m.client.AppsV1().Deployments(namespace).Watch(context.TODO(), metav1.ListOptions{ - FieldSelector: "metadata.name=" + deploymentName, + return m.dynamicClient.Resource(*req.GroupVersionResource).Namespace(req.ResourceNamespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + req.ResourceName, }) }, }, - &appsv1.Deployment{}, - m.resyncPeriod, - cache.Indexers{}, + &unstructured.Unstructured{}, + 0, ) // We pass the handlers we received as a parameter - _, err := informer.AddEventHandler(handlers) + _, err := informer.AddEventHandler(req.Handlers) if err != nil { m.logger.Error("Error creating informer handler", zap.Error(err)) return @@ -137,45 +228,20 @@ func (m *Manager) enableInformer(crdName, deploymentName, namespace string, hand // Recover it in case it's not syncing, this is why we also store the handlers // Stop it when the CRD or the operator is deleted m.informers.Store(key, info{ - StopCh: informerStop, - Handlers: handlers, Informer: informer, + StopCh: informerStop, + Req: req, }) - m.logger.Info("Informer started", zap.String("key", key)) -} - -// StopInformer is to stop a informer for a resource -// It closes the shared informer for it and deletes it from the map -func (m *Manager) StopInformer(crdName, deploymentName, namespace string) { - key := getInformerKey(crdName, deploymentName, namespace) - m.logger.Info("Stopping Deployment watch", zap.String("key", key)) - value, ok := m.informers.Load(key) - if !ok { - m.logger.Info("Informer not found", zap.String("key", key)) - return - } - // We need to verify if the informer exists in the map - informerInfo, ok := value.(info) - if !ok { - m.logger.Error("Failed to cast WatchInfo", zap.String("key", key)) + // Wait for the cache to sync + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + m.logger.Error("Failed to sync informer", zap.String("key", key)) return } - - // Close the informer, delete it from the map - close(informerInfo.StopCh) - m.informers.Delete(key) - m.logger.Info("Informer stopped", zap.String("key", key)) -} - -// parseInformerKey is to parse the key to get the namespace and resource name -// The format of the key is crdName/resourceName/namespace -func parseInformerKey(key string) (crdName, resourceName, namespace string) { - parts := strings.Split(key, "/") - return parts[0], parts[1], parts[2] + m.logger.Info("Informer started", zap.String("key", key)) } -// getInformerKey is to get the key for the informer map using namespace and resource name -func getInformerKey(crdName, resourceName, namespace string) string { - return fmt.Sprintf("%s/%s/%s", crdName, resourceName, namespace) +// getInformerKeyFromReq is to get the key for the informer map using namespace and resource name from the request +func (m *Manager) getInformerKeyFromReq(req *RequestWatch) string { + return fmt.Sprintf("%s/%s/%s", req.Req.Name, req.ResourceName, req.ResourceNamespace) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a5ab126b..f4b2f226 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -19,7 +19,7 @@ func GetPrivateSerivceName(publicSVCName string) string { hash := sha256.New() hash.Write([]byte(publicSVCName)) hashed := hex.EncodeToString(hash.Sum(nil)) - pvtName := prefix + publicSVCName + privateServicePostfix + "-" + string(hashed)[:10] + "-" + string(hashed)[11:16] + pvtName := prefix + publicSVCName + privateServicePostfix + "-" + string(hashed)[:10] return pvtName } @@ -28,5 +28,5 @@ func GetEndpointSliceToResolverName(serviceName string) string { hash := sha256.New() hash.Write([]byte(serviceName)) hashed := hex.EncodeToString(hash.Sum(nil)) - return prefix + serviceName + endpointSlicePostfix + "-" + string(hashed)[:10] + "-" + string(hashed)[11:16] + return prefix + serviceName + endpointSlicePostfix + "-" + string(hashed)[:10] } diff --git a/playground/config/watch-crd.yaml b/playground/config/watch-crd.yaml index bf9d3134..a44bc9b1 100644 --- a/playground/config/watch-crd.yaml +++ b/playground/config/watch-crd.yaml @@ -9,4 +9,9 @@ spec: queueTimeout: 4 idlePeriod: 20 service: target-service - deploymentName: target \ No newline at end of file + deploymentName: target + # scaleTargetRef: + # apiVersion: apps/v1 + # kind: Deployment + # name: target +