Skip to content

Commit

Permalink
feat: move informer to dynamic informer
Browse files Browse the repository at this point in the history
  • Loading branch information
ramantehlan committed Jun 10, 2024
1 parent b9d5271 commit 3eabf14
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 108 deletions.
6 changes: 2 additions & 4 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,9 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// We reset mutex if crd is deleted, so it can be used again if the same CRD is reapplied
r.getMutexForInformerStart(req.NamespacedName.String()).Do(func() {
// Watch for changes in target deployment
go r.Informer.AddDeploymentWatch(es.Name, es.Spec.DeploymentName,
req.Namespace, r.getTargetDeploymentChangeHandler(ctx, es, req))
go r.Informer.AddDeploymentWatch(req, es.Spec.DeploymentName, req.Namespace, r.getTargetDeploymentChangeHandler(ctx, es, req))
// Watch for changes in activator deployment
go r.Informer.AddDeploymentWatch(es.Name, resolverDeploymentName,
resolverNamespace, r.getResolverChangeHandler(ctx, es, req))
go r.Informer.AddDeploymentWatch(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req))
})

deploymentNamespacedName := types.NamespacedName{
Expand Down
8 changes: 4 additions & 4 deletions operator/internal/controller/opsCRD.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func (r *ElastiServiceReconciler) finalizeCRD(ctx context.Context, es *v1alpha1.
r.Logger.Info("ElastiService is being deleted", zap.String("name", es.Name), zap.Any("deletionTimestamp", es.ObjectMeta.DeletionTimestamp))
// Reset the informer start mutex, so if the ElastiService is recreated, we will need to reset the informer
r.resetMutexForInformer(req.NamespacedName.String())
// Stop target service informer
go r.Informer.StopInformer(es.Name, es.Spec.DeploymentName, es.Namespace)
// Stop resolver informer
go r.Informer.StopInformer(es.Name, resolverDeploymentName, resolverNamespace)
// Stop all active informers
// NOTE: If the informerManager is shared across multiple controllers, this will stop all informers
// In that case, we must call the
go r.Informer.StopForCRD(req.Name)
// Remove CRD details from service directory
crdDirectory.CRDDirectory.RemoveCRD(es.Spec.Service)

Expand Down
63 changes: 44 additions & 19 deletions operator/internal/controller/opsInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -23,26 +25,10 @@ func (r *ElastiServiceReconciler) resetMutexForInformer(key string) {
r.WatcherStartLock.Delete(key)
}

func (r *ElastiServiceReconciler) getTargetDeploymentChangeHandler(_ context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
func (r *ElastiServiceReconciler) getTargetDeploymentChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
newDeployment := new.(*appsv1.Deployment)
condition := newDeployment.Status.Conditions
if newDeployment.Status.Replicas == 0 {
r.Logger.Debug("Deployment has 0 replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String()))
_, err := r.runReconcile(context.Background(), req, ProxyMode)
if err != nil {
r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err))
return
}
} else if newDeployment.Status.Replicas > 0 && condition[1].Status == "True" {
r.Logger.Debug("Deployment has replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String()))
_, err := r.runReconcile(context.Background(), req, ServeMode)
if err != nil {
r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err))
return
}
}
r.handleTargetChanges(ctx, new, es, req)
},
}
}
Expand Down Expand Up @@ -71,7 +57,11 @@ func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context,
}

func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj interface{}, serviceName, namespace string) {
resolverDeployment := obj.(*appsv1.Deployment)
resolverDeployment, err := r.unstructuredToDeployment(obj)
if err != nil {
r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err))
return
}
if resolverDeployment.Name == resolverDeploymentName {
targetNamespacedName := types.NamespacedName{
Name: serviceName,
Expand All @@ -88,3 +78,38 @@ func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj
}
}
}

func (r *ElastiServiceReconciler) handleTargetChanges(ctx context.Context, obj interface{}, es *v1alpha1.ElastiService, req ctrl.Request) {
newDeployment, err := r.unstructuredToDeployment(obj)
if err != nil {
r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err))
return
}
condition := newDeployment.Status.Conditions
if newDeployment.Status.Replicas == 0 {
r.Logger.Debug("Deployment has 0 replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String()))
_, err := r.runReconcile(ctx, req, ProxyMode)
if err != nil {
r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err))
return
}
} else if newDeployment.Status.Replicas > 0 && condition[1].Status == "True" {
r.Logger.Debug("Deployment has replicas", zap.String("deployment_name", es.Spec.DeploymentName), zap.String("es", req.String()))
_, err := r.runReconcile(ctx, req, ServeMode)
if err != nil {
r.Logger.Error("Reconciliation failed", zap.String("es", req.String()), zap.Error(err))
return
}
}
}

func (r *ElastiServiceReconciler) unstructuredToDeployment(obj interface{}) (*appsv1.Deployment, error) {
unstructuredObj := obj.(*unstructured.Unstructured)
newDeployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), newDeployment)
if err != nil {
r.Logger.Error("Failed to convert unstructured to deployment", zap.Error(err))
return nil, err
}
return newDeployment, nil
}
Loading

0 comments on commit 3eabf14

Please sign in to comment.