Skip to content

Commit

Permalink
Update ✨ Controller node reconnectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
lumbrjx committed Sep 5, 2024
1 parent e5359f6 commit bed9e90
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 74 deletions.
2 changes: 1 addition & 1 deletion controller/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: lumbrjx/obzev0-k8s-controller
newTag: temp10
newTag: temp11
6 changes: 3 additions & 3 deletions controller/config/samples/batch_v1_obzev0resource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ spec:
reqDelay: 1
resDelay: 1
server: "7070"
client: "10.244.2.4:8080"
client: "my-express-app-service.default.svc.cluster.local:8080"
tcAnalyserSvcConfig:
enabled: false
netIFace: "eth0"
packetManipulationSvcConfig:
enabled: true
enabled: false
server: "9091"
client: "10.244.2.4:8080"
client: "my-express-app-service.default.svc.cluster.local:8080"
dropRate: "0.8"
corruptRate: "0.4"
durationSeconds: 8
Expand Down
253 changes: 183 additions & 70 deletions controller/internal/controller/obzev0resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package controller
import (
"context"
"fmt"
"log"
"os"

v1 "obzev0/controller/api/v1"
"os"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -22,100 +24,211 @@ var setupLog = ctrl.Log.WithName("setup")
func SetupInformers(mgr ctrl.Manager) {
clientset, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
log.Fatal(err, "unable to create clientset")
setupLog.Error(err, "unable to create clientset")
os.Exit(1)
}

setupLog.Info("Setting up informers")
ctx := context.Background()

// Set up CR informer
crInformer, err := mgr.GetCache().GetInformer(ctx, &v1.Obzev0Resource{})
if err != nil {
setupLog.Error(err, "unable to create CR informer")
os.Exit(1)
}
setupLog.Info("CR informer created")

labelSelector := "app=grpc-server"
daemonSetName := "grpc-server-daemonset"
// Set up DaemonSet informer
daemonSetInformer, err := mgr.GetCache().GetInformer(ctx, &appsv1.DaemonSet{})
if err != nil {
setupLog.Error(err, "unable to create DaemonSet informer")
os.Exit(1)
}
setupLog.Info("DaemonSet informer created")

listOptions := metav1.ListOptions{
LabelSelector: labelSelector,
// Set up Pod informer
podInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Pod{})
if err != nil {
setupLog.Error(err, "unable to create Pod informer")
os.Exit(1)
}
setupLog.Info("Pod informer created")

// Create a map to store gRPC connections
gRPCConnections := make(map[string]*grpc.ClientConn)

if daemonSetName != "" {
ds, err := clientset.AppsV1().
DaemonSets("default").
Get(context.TODO(), daemonSetName, metav1.GetOptions{})
// Set up DaemonSet event handler
daemonSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*appsv1.DaemonSet)
if isTargetDaemonSet(ds) {
setupLog.Info("New target DaemonSet added", "name", ds.Name)
go retryConnectToPods(clientset, ds, gRPCConnections)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newDS := newObj.(*appsv1.DaemonSet)
if isTargetDaemonSet(newDS) {
setupLog.Info("Target DaemonSet updated", "name", newDS.Name)
go retryConnectToPods(clientset, newDS, gRPCConnections)
}
},
DeleteFunc: func(obj interface{}) {
ds := obj.(*appsv1.DaemonSet)
if isTargetDaemonSet(ds) {
setupLog.Info("Target DaemonSet deleted", "name", ds.Name)
// Clean up connections for deleted DaemonSet
cleanupConnections(ds, gRPCConnections)
}
},
})

// Set up Pod event handler
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
if isTargetPod(pod) {
setupLog.Info("New target Pod added", "name", pod.Name)
go retryConnectToPod(pod, gRPCConnections)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
if isTargetPod(newPod) && newPod.Status.Phase == corev1.PodRunning {
setupLog.Info("Target Pod became ready", "name", newPod.Name)
go retryConnectToPod(newPod, gRPCConnections)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
if isTargetPod(pod) {
setupLog.Info("Target Pod deleted", "name", pod.Name)
// Clean up connection for deleted Pod
deleteConnection(pod.Status.PodIP, gRPCConnections)
}
},
})

// Set up CR event handler
crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handleCREvent(obj, gRPCConnections) },
UpdateFunc: func(oldObj, newObj interface{}) { handleCREvent(newObj, gRPCConnections) },
DeleteFunc: func(obj interface{}) { handleCREvent(obj, gRPCConnections) },
})
}

func isTargetDaemonSet(ds *appsv1.DaemonSet) bool {
return ds.Labels["app"] == "grpc-server"
}

func isTargetPod(pod *corev1.Pod) bool {
return pod.Labels["app"] == "grpc-server"
}

func retryConnectToPods(
clientset *kubernetes.Clientset,
ds *appsv1.DaemonSet,
connections map[string]*grpc.ClientConn,
) {
for {
pods, err := clientset.CoreV1().
Pods(ds.Namespace).
List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(ds.Spec.Selector.MatchLabels).
String(),
})
if err != nil {
setupLog.Error(err, "unable to get specific DaemonSet")
os.Exit(1)
setupLog.Error(
err,
"Failed to list pods for DaemonSet",
"name",
ds.Name,
)
time.Sleep(5 * time.Second)
continue
}
fmt.Printf("DaemonSet Name: %s, Namespace: %s\n", ds.Name, ds.Namespace)
listOptions.LabelSelector = fmt.Sprintf(
"app=%s",
ds.Spec.Template.Labels["app"],
)
} else {
// If no specific DaemonSet name, list DaemonSets based on label selector
fmt.Println("Listing DaemonSets based on label selector:", labelSelector)
daemonSets, err := clientset.AppsV1().
DaemonSets("").
List(context.TODO(), listOptions)
if err != nil {
panic(err.Error())

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
go retryConnectToPod(&pod, connections)
}
}

fmt.Printf(
"There are %d daemonsets in the cluster\n",
len(daemonSets.Items),
)
time.Sleep(30 * time.Second) // Wait before checking again
}
}

for _, ds := range daemonSets.Items {
fmt.Printf("DaemonSet Name: %s, Namespace: %s\n", ds.Name, ds.Namespace)
func retryConnectToPod(pod *corev1.Pod, connections map[string]*grpc.ClientConn) {
ip := pod.Status.PodIP
port := "50051"
address := fmt.Sprintf("%s:%s", ip, port)

for {
if _, exists := connections[address]; exists {
return // Connection already exists
}
}

pods, err := clientset.CoreV1().
Pods("default").
List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
panic(err.Error())
}
for _, pod := range pods.Items {
fmt.Println(pod.Status.PodIP)
if pod.Status.Phase == corev1.PodRunning {
ip := pod.Status.PodIP
port := "50051"
address := fmt.Sprintf("%s:%s", ip, port)
fmt.Printf("Connecting to gRPC server at %s\n", address)

conn, err := grpc.NewClient(
setupLog.Info("Attempting to connect to gRPC server", "address", address)
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(LoggingInterceptor),
)
if err != nil {
setupLog.Error(
err,
"Failed to connect to gRPC server",
"address",
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(LoggingInterceptor),
)
if err != nil {
log.Printf("Failed to connect to %s: %v\n", address, err)
continue
}
time.Sleep(5 * time.Second)
continue
}

crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handleAdd(obj, conn)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handleUpdate(newObj, conn)
},
DeleteFunc: func(obj interface{}) {
handleDelete(obj)
},
})
connections[address] = conn
setupLog.Info("Successfully connected to gRPC server", "address", address)
return
}
}

fmt.Printf(
"Successfully connected to gRPC server at %s\n",
func cleanupConnections(
ds *appsv1.DaemonSet,
connections map[string]*grpc.ClientConn,
) {
for address, conn := range connections {
// Check if this connection belongs to the deleted DaemonSet
// This is a simplification; you might need a more robust way to associate connections with DaemonSets
if strings.HasPrefix(address, ds.Name) {
conn.Close()
delete(connections, address)
setupLog.Info(
"Cleaned up connection for deleted DaemonSet",
"address",
address,
)
}
}
}

func deleteConnection(podIP string, connections map[string]*grpc.ClientConn) {
address := fmt.Sprintf("%s:50051", podIP)
if conn, exists := connections[address]; exists {
conn.Close()
delete(connections, address)
setupLog.Info("Cleaned up connection for deleted Pod", "address", address)
}
}

func handleCREvent(obj interface{}, connections map[string]*grpc.ClientConn) {
cr, ok := obj.(*v1.Obzev0Resource)
if !ok {
setupLog.Error(nil, "Failed to cast object to Obzev0Resource")
return
}

for _, conn := range connections {
CheckConnection(conn)
processCustomResource(cr, conn)
}
}

0 comments on commit bed9e90

Please sign in to comment.