Skip to content

Commit

Permalink
Merge pull request #22 from lumbrjx/release
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
lumbrjx authored Aug 31, 2024
2 parents f7db86d + 2a9c027 commit 1320d56
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 232 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ build-daemon:
build-daemon-stage:
docker build -f daemon/api/grpc/Dockerfile -t lumbrjx/obzev0-grpc-daemon:staging .
push-daemon-stage:
docker push lumbrjx/obzev0-grpc-daemon:staging .
docker push lumbrjx/obzev0-grpc-daemon:staging


build-controller:
Expand Down
58 changes: 58 additions & 0 deletions bpf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
TARGET = tc.o
INTERFACE = enp1s0
US_DIR = user_space
CFLAGS = -I/usr/include -I/usr/include/x86_64-linux-gnu -I/usr/include/x86_64-linux-gnu/bits -I/usr/include/x86_64-linux-gnu/sys -I/usr/include/bpf

.PHONY: install-deps
install-deps:
sudo apt update
sudo apt install -y clang llvm libelf-dev linux-headers-$$(uname -r) build-essential
cd $(US_DIR) && go mod tidy

# Compile the eBPF program
$(TARGET): main.c
clang $(CFLAGS) -O2 -g -target bpf -c tc.c -o $(TARGET)

# Load the eBPF program manually
.PHONY: load
load: $(TARGET)
sudo tc qdisc add dev $(INTERFACE) clsact
sudo tc filter add dev $(INTERFACE) ingress bpf da obj $(TARGET) sec tc
sudo tc filter add dev $(INTERFACE) egress bpf da obj $(TARGET) sec tc

# View bpf_printk output
.PHONY: view-manual
view:
sudo cat /sys/kernel/debug/tracing/trace_pipe

.PHONY: view-tcp-manual
view-tcp:
sudo cat /sys/kernel/debug/tracing/trace_pipe | grep TCP

.PHONY: view-udp-manual
view-udp:
sudo cat /sys/kernel/debug/tracing/trace_pipe | grep UDP


# build user space program
.PHONY: build-US
build-US:
cd user_space && go build -o tc_US tc.go

# start user space program
.PHONY: start-US
start-US:
sudo ./user_space/tc_US $(INTERFACE)

# Remove the filters and qdisc when done manually
.PHONY: clean
clean:
sudo tc filter del dev $(INTERFACE) ingress
sudo tc filter del dev $(INTERFACE) egress
sudo tc qdisc del dev $(INTERFACE) clsact
rm -f $(TARGET)
rm -f user_space/tc_US

# All
.PHONY: all
all: install-deps $(TARGET) build-US start-US
File renamed without changes.
16 changes: 16 additions & 0 deletions controller/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions controller/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: lumbrjx/obzev0poc
newTag: 1.0.5-alpha
newName: lumbrjx/obzev0-k8s-controller
newTag: latest
2 changes: 1 addition & 1 deletion controller/deployController.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if ! make install; then
fi

echo "Deploying the controller image into the cluster..."
if ! make deploy IMG=lumbrjx/obzev0poc:${IMAGE_TAG}; then
if ! make deploy IMG=lumbrjx/obzev0-k8s-controller:${IMAGE_TAG}; then
echo "Failed to deploy the controller image."
exit 1
fi
Expand Down
60 changes: 60 additions & 0 deletions controller/internal/controller/grpcClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package controller

import (
"context"
"fmt"
"obzev0/common/proto/latency"
"time"

pb "obzev0/common/proto/latency"
tca "obzev0/common/proto/tcAnalyser"
v "obzev0/controller/api/v1"

"google.golang.org/grpc"
)

type GrpcServiceConfig struct {
LatencyConfig v.TcpConfig
TcAConfig v.TcAnalyserConfig

// Add more fields as needed
}

func callGrpcServices(
conn *grpc.ClientConn,
config GrpcServiceConfig,
) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Handle LatencyService gRPC call
client := pb.NewLatencyServiceClient(conn)
response, err := client.StartTcpServer(
ctx,
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: config.LatencyConfig.ReqDelay,
ResDelay: config.LatencyConfig.ResDelay,
Server: config.LatencyConfig.Server,
Client: config.LatencyConfig.Client,
}},
)
if err != nil {
return fmt.Errorf("error calling StartTcpServer: %w", err)
}
fmt.Printf("Response from LatencyService gRPC server: %s\n", response.Message)

// Handle TcAnalyserService gRPC call
client2 := tca.NewTcAnalyserServiceClient(conn)
rsp, err := client2.StartUserSpace(
ctx,
&tca.RequestForUserSpace{Config: &tca.TcConfig{
Interface: config.TcAConfig.NetIFace,
}},
)
if err != nil {
return fmt.Errorf("error calling StartUserSpace: %w", err)
}
fmt.Printf("Response from TcAnalyserService gRPC server: %s\n", rsp.Message)

return nil
}
81 changes: 81 additions & 0 deletions controller/internal/controller/infx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package controller

import (
"context"
"log"
v1 "obzev0/controller/api/v1"

"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

func processCustomResource(obz *v1.Obzev0Resource, conn *grpc.ClientConn) {
name := obz.GetName()
namespace := obz.GetNamespace()
latencyConfig := obz.Spec.LatencyServiceConfig
tcAConfig := obz.Spec.TcAnalyserServiceConfig

klog.Infof("Custom Resource processed: %s/%s", namespace, name)
klog.Infof("TCP Server Configuration: %+v", latencyConfig)
klog.Infof("Tc Analyser Configuration: %+v", tcAConfig)

svcConfig := GrpcServiceConfig{
LatencyConfig: latencyConfig,
TcAConfig: tcAConfig,
}

err := callGrpcServices(conn, svcConfig)
if err != nil {
log.Printf("Error calling gRPC services: %v\n", err)
}

defer conn.Close()
}

func handleAdd(obj interface{}, conn *grpc.ClientConn) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

processCustomResource(obz, conn)
}

func handleUpdate(newObj interface{}, conn *grpc.ClientConn) {
obz, ok := newObj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", newObj)
return
}

processCustomResource(obz, conn)
}

func handleDelete(obj interface{}) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

name := obz.GetName()
namespace := obz.GetNamespace()

klog.Infof("Custom Resource deleted: %s/%s", namespace, name)
}
func listNodes(clientset *kubernetes.Clientset) {
nodes, err := clientset.CoreV1().
Nodes().
List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Fatalf("Error listing nodes: %v", err)
}

klog.Info("Listing all nodes in the cluster:")
for _, node := range nodes.Items {
klog.Infof("Node Name: %s", node.Name)
}
}
104 changes: 1 addition & 103 deletions controller/internal/controller/obzev0resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import (
"fmt"
"log"
"os"
"time"

"obzev0/common/proto/latency"
pb "obzev0/common/proto/latency"
tca "obzev0/common/proto/tcAnalyser"

v1 "obzev0/controller/api/v1"

Expand All @@ -19,7 +14,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -104,28 +98,13 @@ func SetupInformers(mgr ctrl.Manager) {
log.Printf("Failed to connect to %s: %v\n", address, err)
continue
}
client := pb.NewLatencyServiceClient(conn)
response, err := client.StartTcpServer(
context.Background(),
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: 1,
ResDelay: 1,
Server: "7090",
Client: "8080",
}},
)
if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", response.Message)
}

crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handleAdd(obj, conn)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handleUpdate(oldObj, newObj)
handleUpdate(newObj, conn)
},
DeleteFunc: func(obj interface{}) {
handleDelete(obj)
Expand All @@ -139,84 +118,3 @@ func SetupInformers(mgr ctrl.Manager) {
}
}
}

func handleAdd(obj interface{}, conn *grpc.ClientConn) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

name := obz.GetName()
namespace := obz.GetNamespace()
latencyConfig := obz.Spec.LatencyServiceConfig
tcAConfig := obz.Spec.TcAnalyserServiceConfig

klog.Infof("Custom Resource added: %s/%s", namespace, name)
klog.Infof("TCP Server Configuration: %+v", latencyConfig)
client := pb.NewLatencyServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
response, err := client.StartTcpServer(
ctx,
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: latencyConfig.ReqDelay,
ResDelay: latencyConfig.ResDelay,
Server: latencyConfig.Server,
Client: latencyConfig.Client,
}},
)

if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", response.Message)
}
client2 := tca.NewTcAnalyserServiceClient(conn)
rsp, err := client2.StartUserSpace(
ctx,
&tca.RequestForUserSpace{Config: &tca.TcConfig{
Interface: tcAConfig.NetIFace,
}},
)

if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", rsp.Message)
}

defer conn.Close()
}

func handleUpdate(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
klog.Errorf("Error getting key for object: %v", err)
return
}
klog.Infof("Custom Resource updated: %s", key)
}

func handleDelete(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("Error getting key for object: %v", err)
return
}
klog.Infof("Custom Resource deleted: %s", key)
}

func listNodes(clientset *kubernetes.Clientset) {
nodes, err := clientset.CoreV1().
Nodes().
List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Fatalf("Error listing nodes: %v", err)
}

klog.Info("Listing all nodes in the cluster:")
for _, node := range nodes.Items {
klog.Infof("Node Name: %s", node.Name)
}
}
Loading

0 comments on commit 1320d56

Please sign in to comment.