Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release #22

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ build-daemon:
build-daemon-stage:
docker build -f daemon/api/grpc/Dockerfile -t lumbrjx/obzev0-grpc-daemon:staging .
push-daemon-stage:
docker push lumbrjx/obzev0-grpc-daemon:staging .
docker push lumbrjx/obzev0-grpc-daemon:staging


build-controller:
Expand Down
58 changes: 58 additions & 0 deletions bpf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
TARGET = tc.o
INTERFACE = enp1s0
US_DIR = user_space
CFLAGS = -I/usr/include -I/usr/include/x86_64-linux-gnu -I/usr/include/x86_64-linux-gnu/bits -I/usr/include/x86_64-linux-gnu/sys -I/usr/include/bpf

.PHONY: install-deps
install-deps:
sudo apt update
sudo apt install -y clang llvm libelf-dev linux-headers-$$(uname -r) build-essential
cd $(US_DIR) && go mod tidy

# Compile the eBPF program
$(TARGET): main.c
clang $(CFLAGS) -O2 -g -target bpf -c tc.c -o $(TARGET)

# Load the eBPF program manually
.PHONY: load
load: $(TARGET)
sudo tc qdisc add dev $(INTERFACE) clsact
sudo tc filter add dev $(INTERFACE) ingress bpf da obj $(TARGET) sec tc
sudo tc filter add dev $(INTERFACE) egress bpf da obj $(TARGET) sec tc

# View bpf_printk output
.PHONY: view-manual
view:
sudo cat /sys/kernel/debug/tracing/trace_pipe

.PHONY: view-tcp-manual
view-tcp:
sudo cat /sys/kernel/debug/tracing/trace_pipe | grep TCP

.PHONY: view-udp-manual
view-udp:
sudo cat /sys/kernel/debug/tracing/trace_pipe | grep UDP


# build user space program
.PHONY: build-US
build-US:
cd user_space && go build -o tc_US tc.go

# start user space program
.PHONY: start-US
start-US:
sudo ./user_space/tc_US $(INTERFACE)

# Remove the filters and qdisc when done manually
.PHONY: clean
clean:
sudo tc filter del dev $(INTERFACE) ingress
sudo tc filter del dev $(INTERFACE) egress
sudo tc qdisc del dev $(INTERFACE) clsact
rm -f $(TARGET)
rm -f user_space/tc_US

# All
.PHONY: all
all: install-deps $(TARGET) build-US start-US
File renamed without changes.
16 changes: 16 additions & 0 deletions controller/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions controller/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: lumbrjx/obzev0poc
newTag: 1.0.5-alpha
newName: lumbrjx/obzev0-k8s-controller
newTag: latest
2 changes: 1 addition & 1 deletion controller/deployController.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if ! make install; then
fi

echo "Deploying the controller image into the cluster..."
if ! make deploy IMG=lumbrjx/obzev0poc:${IMAGE_TAG}; then
if ! make deploy IMG=lumbrjx/obzev0-k8s-controller:${IMAGE_TAG}; then
echo "Failed to deploy the controller image."
exit 1
fi
Expand Down
60 changes: 60 additions & 0 deletions controller/internal/controller/grpcClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package controller

import (
"context"
"fmt"
"obzev0/common/proto/latency"
"time"

pb "obzev0/common/proto/latency"
tca "obzev0/common/proto/tcAnalyser"
v "obzev0/controller/api/v1"

"google.golang.org/grpc"
)

type GrpcServiceConfig struct {
LatencyConfig v.TcpConfig
TcAConfig v.TcAnalyserConfig

// Add more fields as needed
}

func callGrpcServices(
conn *grpc.ClientConn,
config GrpcServiceConfig,
) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Handle LatencyService gRPC call
client := pb.NewLatencyServiceClient(conn)
response, err := client.StartTcpServer(
ctx,
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: config.LatencyConfig.ReqDelay,
ResDelay: config.LatencyConfig.ResDelay,
Server: config.LatencyConfig.Server,
Client: config.LatencyConfig.Client,
}},
)
if err != nil {
return fmt.Errorf("error calling StartTcpServer: %w", err)
}
fmt.Printf("Response from LatencyService gRPC server: %s\n", response.Message)

// Handle TcAnalyserService gRPC call
client2 := tca.NewTcAnalyserServiceClient(conn)
rsp, err := client2.StartUserSpace(
ctx,
&tca.RequestForUserSpace{Config: &tca.TcConfig{
Interface: config.TcAConfig.NetIFace,
}},
)
if err != nil {
return fmt.Errorf("error calling StartUserSpace: %w", err)
}
fmt.Printf("Response from TcAnalyserService gRPC server: %s\n", rsp.Message)

return nil
}
81 changes: 81 additions & 0 deletions controller/internal/controller/infx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package controller

import (
"context"
"log"
v1 "obzev0/controller/api/v1"

"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

func processCustomResource(obz *v1.Obzev0Resource, conn *grpc.ClientConn) {
name := obz.GetName()
namespace := obz.GetNamespace()
latencyConfig := obz.Spec.LatencyServiceConfig
tcAConfig := obz.Spec.TcAnalyserServiceConfig

klog.Infof("Custom Resource processed: %s/%s", namespace, name)
klog.Infof("TCP Server Configuration: %+v", latencyConfig)
klog.Infof("Tc Analyser Configuration: %+v", tcAConfig)

svcConfig := GrpcServiceConfig{
LatencyConfig: latencyConfig,
TcAConfig: tcAConfig,
}

err := callGrpcServices(conn, svcConfig)
if err != nil {
log.Printf("Error calling gRPC services: %v\n", err)
}

defer conn.Close()
}

func handleAdd(obj interface{}, conn *grpc.ClientConn) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

processCustomResource(obz, conn)
}

func handleUpdate(newObj interface{}, conn *grpc.ClientConn) {
obz, ok := newObj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", newObj)
return
}

processCustomResource(obz, conn)
}

func handleDelete(obj interface{}) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

name := obz.GetName()
namespace := obz.GetNamespace()

klog.Infof("Custom Resource deleted: %s/%s", namespace, name)
}
func listNodes(clientset *kubernetes.Clientset) {
nodes, err := clientset.CoreV1().
Nodes().
List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Fatalf("Error listing nodes: %v", err)
}

klog.Info("Listing all nodes in the cluster:")
for _, node := range nodes.Items {
klog.Infof("Node Name: %s", node.Name)
}
}
104 changes: 1 addition & 103 deletions controller/internal/controller/obzev0resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import (
"fmt"
"log"
"os"
"time"

"obzev0/common/proto/latency"
pb "obzev0/common/proto/latency"
tca "obzev0/common/proto/tcAnalyser"

v1 "obzev0/controller/api/v1"

Expand All @@ -19,7 +14,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -104,28 +98,13 @@ func SetupInformers(mgr ctrl.Manager) {
log.Printf("Failed to connect to %s: %v\n", address, err)
continue
}
client := pb.NewLatencyServiceClient(conn)
response, err := client.StartTcpServer(
context.Background(),
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: 1,
ResDelay: 1,
Server: "7090",
Client: "8080",
}},
)
if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", response.Message)
}

crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handleAdd(obj, conn)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handleUpdate(oldObj, newObj)
handleUpdate(newObj, conn)
},
DeleteFunc: func(obj interface{}) {
handleDelete(obj)
Expand All @@ -139,84 +118,3 @@ func SetupInformers(mgr ctrl.Manager) {
}
}
}

func handleAdd(obj interface{}, conn *grpc.ClientConn) {
obz, ok := obj.(*v1.Obzev0Resource)
if !ok {
klog.Errorf("Error converting object to Obzev0Resource: %v", obj)
return
}

name := obz.GetName()
namespace := obz.GetNamespace()
latencyConfig := obz.Spec.LatencyServiceConfig
tcAConfig := obz.Spec.TcAnalyserServiceConfig

klog.Infof("Custom Resource added: %s/%s", namespace, name)
klog.Infof("TCP Server Configuration: %+v", latencyConfig)
client := pb.NewLatencyServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
response, err := client.StartTcpServer(
ctx,
&pb.RequestForTcp{Config: &latency.TcpConfig{
ReqDelay: latencyConfig.ReqDelay,
ResDelay: latencyConfig.ResDelay,
Server: latencyConfig.Server,
Client: latencyConfig.Client,
}},
)

if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", response.Message)
}
client2 := tca.NewTcAnalyserServiceClient(conn)
rsp, err := client2.StartUserSpace(
ctx,
&tca.RequestForUserSpace{Config: &tca.TcConfig{
Interface: tcAConfig.NetIFace,
}},
)

if err != nil {
log.Printf("Error calling gRPC method: %v\n", err)
} else {
fmt.Printf("Response from gRPC server: %s\n", rsp.Message)
}

defer conn.Close()
}

func handleUpdate(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
klog.Errorf("Error getting key for object: %v", err)
return
}
klog.Infof("Custom Resource updated: %s", key)
}

func handleDelete(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("Error getting key for object: %v", err)
return
}
klog.Infof("Custom Resource deleted: %s", key)
}

func listNodes(clientset *kubernetes.Clientset) {
nodes, err := clientset.CoreV1().
Nodes().
List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Fatalf("Error listing nodes: %v", err)
}

klog.Info("Listing all nodes in the cluster:")
for _, node := range nodes.Items {
klog.Infof("Node Name: %s", node.Name)
}
}
Loading
Loading