diff --git a/controller/config/manager/kustomization.yaml b/controller/config/manager/kustomization.yaml index 04e5d73..c378b23 100644 --- a/controller/config/manager/kustomization.yaml +++ b/controller/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: lumbrjx/obzev0-k8s-controller - newTag: temp10 + newTag: temp11 diff --git a/controller/config/samples/batch_v1_obzev0resource.yaml b/controller/config/samples/batch_v1_obzev0resource.yaml index 5cd91d3..2489f2a 100644 --- a/controller/config/samples/batch_v1_obzev0resource.yaml +++ b/controller/config/samples/batch_v1_obzev0resource.yaml @@ -11,14 +11,14 @@ spec: reqDelay: 1 resDelay: 1 server: "7070" - client: "10.244.2.4:8080" + client: "my-express-app-service.default.svc.cluster.local:8080" tcAnalyserSvcConfig: enabled: false netIFace: "eth0" packetManipulationSvcConfig: - enabled: true + enabled: false server: "9091" - client: "10.244.2.4:8080" + client: "my-express-app-service.default.svc.cluster.local:8080" dropRate: "0.8" corruptRate: "0.4" durationSeconds: 8 diff --git a/controller/internal/controller/obzev0resource_controller.go b/controller/internal/controller/obzev0resource_controller.go index c6b11e6..5da2784 100644 --- a/controller/internal/controller/obzev0resource_controller.go +++ b/controller/internal/controller/obzev0resource_controller.go @@ -3,15 +3,17 @@ package controller import ( "context" "fmt" - "log" - "os" - v1 "obzev0/controller/api/v1" + "os" + "strings" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" @@ -22,11 +24,14 @@ var setupLog = ctrl.Log.WithName("setup") func SetupInformers(mgr ctrl.Manager) { clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { - log.Fatal(err, "unable to create clientset") + setupLog.Error(err, "unable to create clientset") os.Exit(1) } + setupLog.Info("Setting up informers") ctx := context.Background() + + // Set up CR informer crInformer, err := mgr.GetCache().GetInformer(ctx, &v1.Obzev0Resource{}) if err != nil { setupLog.Error(err, "unable to create CR informer") @@ -34,88 +39,196 @@ func SetupInformers(mgr ctrl.Manager) { } setupLog.Info("CR informer created") - labelSelector := "app=grpc-server" - daemonSetName := "grpc-server-daemonset" + // Set up DaemonSet informer + daemonSetInformer, err := mgr.GetCache().GetInformer(ctx, &appsv1.DaemonSet{}) + if err != nil { + setupLog.Error(err, "unable to create DaemonSet informer") + os.Exit(1) + } + setupLog.Info("DaemonSet informer created") - listOptions := metav1.ListOptions{ - LabelSelector: labelSelector, + // Set up Pod informer + podInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Pod{}) + if err != nil { + setupLog.Error(err, "unable to create Pod informer") + os.Exit(1) } + setupLog.Info("Pod informer created") + + // Create a map to store gRPC connections + gRPCConnections := make(map[string]*grpc.ClientConn) - if daemonSetName != "" { - ds, err := clientset.AppsV1(). - DaemonSets("default"). - Get(context.TODO(), daemonSetName, metav1.GetOptions{}) + // Set up DaemonSet event handler + daemonSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ds := obj.(*appsv1.DaemonSet) + if isTargetDaemonSet(ds) { + setupLog.Info("New target DaemonSet added", "name", ds.Name) + go retryConnectToPods(clientset, ds, gRPCConnections) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + newDS := newObj.(*appsv1.DaemonSet) + if isTargetDaemonSet(newDS) { + setupLog.Info("Target DaemonSet updated", "name", newDS.Name) + go retryConnectToPods(clientset, newDS, gRPCConnections) + } + }, + DeleteFunc: func(obj interface{}) { + ds := obj.(*appsv1.DaemonSet) + if isTargetDaemonSet(ds) { + setupLog.Info("Target DaemonSet deleted", "name", ds.Name) + // Clean up connections for deleted DaemonSet + cleanupConnections(ds, gRPCConnections) + } + }, + }) + + // Set up Pod event handler + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + if isTargetPod(pod) { + setupLog.Info("New target Pod added", "name", pod.Name) + go retryConnectToPod(pod, gRPCConnections) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + newPod := newObj.(*corev1.Pod) + if isTargetPod(newPod) && newPod.Status.Phase == corev1.PodRunning { + setupLog.Info("Target Pod became ready", "name", newPod.Name) + go retryConnectToPod(newPod, gRPCConnections) + } + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + if isTargetPod(pod) { + setupLog.Info("Target Pod deleted", "name", pod.Name) + // Clean up connection for deleted Pod + deleteConnection(pod.Status.PodIP, gRPCConnections) + } + }, + }) + + // Set up CR event handler + crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { handleCREvent(obj, gRPCConnections) }, + UpdateFunc: func(oldObj, newObj interface{}) { handleCREvent(newObj, gRPCConnections) }, + DeleteFunc: func(obj interface{}) { handleCREvent(obj, gRPCConnections) }, + }) +} + +func isTargetDaemonSet(ds *appsv1.DaemonSet) bool { + return ds.Labels["app"] == "grpc-server" +} + +func isTargetPod(pod *corev1.Pod) bool { + return pod.Labels["app"] == "grpc-server" +} + +func retryConnectToPods( + clientset *kubernetes.Clientset, + ds *appsv1.DaemonSet, + connections map[string]*grpc.ClientConn, +) { + for { + pods, err := clientset.CoreV1(). + Pods(ds.Namespace). + List(context.TODO(), metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(ds.Spec.Selector.MatchLabels). + String(), + }) if err != nil { - setupLog.Error(err, "unable to get specific DaemonSet") - os.Exit(1) + setupLog.Error( + err, + "Failed to list pods for DaemonSet", + "name", + ds.Name, + ) + time.Sleep(5 * time.Second) + continue } - fmt.Printf("DaemonSet Name: %s, Namespace: %s\n", ds.Name, ds.Namespace) - listOptions.LabelSelector = fmt.Sprintf( - "app=%s", - ds.Spec.Template.Labels["app"], - ) - } else { - // If no specific DaemonSet name, list DaemonSets based on label selector - fmt.Println("Listing DaemonSets based on label selector:", labelSelector) - daemonSets, err := clientset.AppsV1(). - DaemonSets(""). - List(context.TODO(), listOptions) - if err != nil { - panic(err.Error()) + + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning { + go retryConnectToPod(&pod, connections) + } } - fmt.Printf( - "There are %d daemonsets in the cluster\n", - len(daemonSets.Items), - ) + time.Sleep(30 * time.Second) // Wait before checking again + } +} - for _, ds := range daemonSets.Items { - fmt.Printf("DaemonSet Name: %s, Namespace: %s\n", ds.Name, ds.Namespace) +func retryConnectToPod(pod *corev1.Pod, connections map[string]*grpc.ClientConn) { + ip := pod.Status.PodIP + port := "50051" + address := fmt.Sprintf("%s:%s", ip, port) + + for { + if _, exists := connections[address]; exists { + return // Connection already exists } - } - pods, err := clientset.CoreV1(). - Pods("default"). - List(context.TODO(), metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - panic(err.Error()) - } - for _, pod := range pods.Items { - fmt.Println(pod.Status.PodIP) - if pod.Status.Phase == corev1.PodRunning { - ip := pod.Status.PodIP - port := "50051" - address := fmt.Sprintf("%s:%s", ip, port) - fmt.Printf("Connecting to gRPC server at %s\n", address) - - conn, err := grpc.NewClient( + setupLog.Info("Attempting to connect to gRPC server", "address", address) + conn, err := grpc.NewClient( + address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor(LoggingInterceptor), + ) + if err != nil { + setupLog.Error( + err, + "Failed to connect to gRPC server", + "address", address, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithChainUnaryInterceptor(LoggingInterceptor), ) - if err != nil { - log.Printf("Failed to connect to %s: %v\n", address, err) - continue - } + time.Sleep(5 * time.Second) + continue + } - crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - handleAdd(obj, conn) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - handleUpdate(newObj, conn) - }, - DeleteFunc: func(obj interface{}) { - handleDelete(obj) - }, - }) + connections[address] = conn + setupLog.Info("Successfully connected to gRPC server", "address", address) + return + } +} - fmt.Printf( - "Successfully connected to gRPC server at %s\n", +func cleanupConnections( + ds *appsv1.DaemonSet, + connections map[string]*grpc.ClientConn, +) { + for address, conn := range connections { + // Check if this connection belongs to the deleted DaemonSet + // This is a simplification; you might need a more robust way to associate connections with DaemonSets + if strings.HasPrefix(address, ds.Name) { + conn.Close() + delete(connections, address) + setupLog.Info( + "Cleaned up connection for deleted DaemonSet", + "address", address, ) } } } + +func deleteConnection(podIP string, connections map[string]*grpc.ClientConn) { + address := fmt.Sprintf("%s:50051", podIP) + if conn, exists := connections[address]; exists { + conn.Close() + delete(connections, address) + setupLog.Info("Cleaned up connection for deleted Pod", "address", address) + } +} + +func handleCREvent(obj interface{}, connections map[string]*grpc.ClientConn) { + cr, ok := obj.(*v1.Obzev0Resource) + if !ok { + setupLog.Error(nil, "Failed to cast object to Obzev0Resource") + return + } + + for _, conn := range connections { + CheckConnection(conn) + processCustomResource(cr, conn) + } +}