From 8e5a25d2426090fc2da98a883b1385d04e529fc8 Mon Sep 17 00:00:00 2001 From: lumbrjx Date: Fri, 30 Aug 2024 23:38:21 +0100 Subject: [PATCH 1/3] Update :sparkles: update configs --- Makefile | 2 +- controller/config/manager/kustomization.yaml | 4 ++-- controller/deployController.sh | 2 +- daemonset.yaml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 80f9529..4330eaa 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ build-daemon: build-daemon-stage: docker build -f daemon/api/grpc/Dockerfile -t lumbrjx/obzev0-grpc-daemon:staging . push-daemon-stage: - docker push lumbrjx/obzev0-grpc-daemon:staging . + docker push lumbrjx/obzev0-grpc-daemon:staging build-controller: diff --git a/controller/config/manager/kustomization.yaml b/controller/config/manager/kustomization.yaml index 5d52359..502ac1d 100644 --- a/controller/config/manager/kustomization.yaml +++ b/controller/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: lumbrjx/obzev0poc - newTag: 1.0.5-alpha + newName: lumbrjx/obzev0-k8s-controller + newTag: latest diff --git a/controller/deployController.sh b/controller/deployController.sh index 74ea401..1a052a8 100755 --- a/controller/deployController.sh +++ b/controller/deployController.sh @@ -14,7 +14,7 @@ if ! make install; then fi echo "Deploying the controller image into the cluster..." -if ! make deploy IMG=lumbrjx/obzev0poc:${IMAGE_TAG}; then +if ! make deploy IMG=lumbrjx/obzev0-k8s-controller:${IMAGE_TAG}; then echo "Failed to deploy the controller image." exit 1 fi diff --git a/daemonset.yaml b/daemonset.yaml index b7c8ece..e861e73 100644 --- a/daemonset.yaml +++ b/daemonset.yaml @@ -13,7 +13,7 @@ spec: spec: containers: - name: grpc-server - image: lumbrjx/obzev0-grpc-daemon:1.0.8-pre + image: lumbrjx/obzev0-grpc-daemon:latest ports: - containerPort: 50051 livenessProbe: From 674b01ced81930f0531b806ba76864cffd43c2db Mon Sep 17 00:00:00 2001 From: lumbrjx Date: Sat, 31 Aug 2024 22:11:24 +0100 Subject: [PATCH 2/3] Struct :art: imporve the grpc server file structure --- daemon/api/grpc/Dockerfile | 43 ++------------ daemon/api/grpc/agent.go | 38 +++++++++++++ daemon/api/grpc/logger.go | 21 +++++++ daemon/api/grpc/metrics.go | 55 ++++++++++++++++++ daemon/api/grpc/server.go | 81 +------------------------- daemon/api/grpc/tcAnalyser/bpf/tc.c | 88 ----------------------------- 6 files changed, 120 insertions(+), 206 deletions(-) create mode 100644 daemon/api/grpc/agent.go create mode 100644 daemon/api/grpc/logger.go create mode 100644 daemon/api/grpc/metrics.go delete mode 100644 daemon/api/grpc/tcAnalyser/bpf/tc.c diff --git a/daemon/api/grpc/Dockerfile b/daemon/api/grpc/Dockerfile index 416dc58..c8a4a9c 100644 --- a/daemon/api/grpc/Dockerfile +++ b/daemon/api/grpc/Dockerfile @@ -1,33 +1,3 @@ -# FROM golang:1.22.5-alpine AS builder -# -# WORKDIR /app -# -# COPY daemon/go.mod daemon/go.sum ./ -# COPY common ./common -# COPY daemon/ . -# -# RUN sed -i 's|replace obzev0/common => ../common|replace obzev0/common => ./common|' go.mod -# RUN go mod download -# RUN CGO_ENABLED=0 GOOS=linux go build -o grpc-server ./api/grpc/server.go -# -# FROM alpine:latest AS grpc_health_probe_downloader -# RUN wget -qO /bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.11/grpc_health_probe-linux-amd64 && \ -# chmod +x /bin/grpc_health_probe -# -# FROM alpine:latest -# -# WORKDIR /root/ -# -# COPY --from=builder /app/grpc-server . -# COPY --from=builder /app/common ./common -# COPY --from=grpc_health_probe_downloader /bin/grpc_health_probe /bin/grpc_health_probe -# -# EXPOSE 50051 -# EXPOSE 2112 -# -# CMD ["./grpc-server"] -# -# Stage 1: Build eBPF program and Go application FROM golang:1.23.0-bookworm AS builder # Install dependencies for building eBPF programs @@ -43,10 +13,8 @@ RUN apt-get update && \ && apt-get clean && \ rm -rf /var/lib/apt/lists/* -# Set working directory WORKDIR /app -# Copy Go modules files and download dependencies COPY daemon/go.mod daemon/go.sum ./ COPY common ./common COPY daemon/ . @@ -54,14 +22,14 @@ COPY daemon/ . # Modify go.mod and build Go application RUN sed -i 's|replace obzev0/common => ../common|replace obzev0/common => ./common|' go.mod RUN go mod download -RUN CGO_ENABLED=0 GOOS=linux go build -o grpc-server ./api/grpc/server.go +RUN CGO_ENABLED=0 GOOS=linux go build -o grpc-server ./api/grpc/*.go # Copy and compile eBPF program -COPY daemon/api/grpc/tcAnalyser/bpf/ /app/bpf/ +COPY bpf/ /app/bpf/ WORKDIR /app/bpf RUN clang -I/usr/include -I/usr/include/x86_64-linux-gnu -I/usr/include/x86_64-linux-gnu/bits -I/usr/include/x86_64-linux-gnu/sys -I/usr/include/bpf -O2 -g -target bpf -c tc.c -o tc.o -# Stage 2: Download grpc-health-probe +# Download grpc-health-probe FROM debian:latest AS grpc_health_probe_downloader RUN apt-get update && \ apt-get install -y wget && \ @@ -70,7 +38,7 @@ RUN apt-get update && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* -# Stage 3: Create final image +# Create final image FROM debian:latest # Install iproute2 to handle network configuration @@ -79,7 +47,6 @@ RUN apt-get update && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* -# Set working directory WORKDIR /root/ # Copy compiled Go application and eBPF program @@ -87,10 +54,8 @@ COPY --from=builder /app/grpc-server . COPY --from=builder /app/bpf/tc.o /root/ COPY --from=grpc_health_probe_downloader /bin/grpc_health_probe /bin/grpc_health_probe -# Expose ports EXPOSE 50051 EXPOSE 2112 -# Default command to run your application CMD ["./grpc-server"] diff --git a/daemon/api/grpc/agent.go b/daemon/api/grpc/agent.go new file mode 100644 index 0000000..556c48d --- /dev/null +++ b/daemon/api/grpc/agent.go @@ -0,0 +1,38 @@ +package main + +import ( + ltc "obzev0/common/proto/latency" + tcanl "obzev0/common/proto/tcAnalyser" + "obzev0/daemon/api/grpc/latency" + tcanalyser "obzev0/daemon/api/grpc/tcAnalyser" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func serviceAgent(grpcServer *grpc.Server, rpcLogger *logrus.Entry) { + + // Latency Service + s := latency.LatencyService{} + ltc.RegisterLatencyServiceServer(grpcServer, &s) + + // Traffic Controller Service + tc := tcanalyser.TcAnalyserService{} + tcanl.RegisterTcAnalyserServiceServer(grpcServer, &tc) + + // Health Checking Serivce + healthSrv := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthSrv) + + healthSrv.SetServingStatus( + "grpc.health.v1.Health", + grpc_health_v1.HealthCheckResponse_SERVING, + ) + healthSrv.SetServingStatus( + "obzev0.common.proto.latency.LatencyService", + grpc_health_v1.HealthCheckResponse_SERVING, + ) + rpcLogger.Log(logrus.DebugLevel, "gRpc services have been established") +} diff --git a/daemon/api/grpc/logger.go b/daemon/api/grpc/logger.go new file mode 100644 index 0000000..4c00022 --- /dev/null +++ b/daemon/api/grpc/logger.go @@ -0,0 +1,21 @@ +package main + +import ( + "os" + + "github.com/sirupsen/logrus" +) + +func Logger() *logrus.Entry { + logger := logrus.New() + logger.SetFormatter(&logrus.JSONFormatter{ + PrettyPrint: true, + }) + logger.SetOutput(os.Stdout) + + rpcLogger = logger.WithFields(logrus.Fields{ + "service": "gRPC/server", + }) + return rpcLogger + +} diff --git a/daemon/api/grpc/metrics.go b/daemon/api/grpc/metrics.go new file mode 100644 index 0000000..022ebc9 --- /dev/null +++ b/daemon/api/grpc/metrics.go @@ -0,0 +1,55 @@ +package main + +import ( + "log" + "obzev0/daemon/api/grpc/latency" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + bytesHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "grpc_server_bytes", + Help: "Bytes processed by the gRPC server", + Buckets: prometheus.DefBuckets, + }, + []string{"method"}, + ) + responseTimeHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "grpc_server_response_time_seconds", + Help: "Response time of gRPC server methods", + Buckets: prometheus.DefBuckets, + }, + []string{"method"}, + ) +) + +func recordMetrics(method string, bytes int64, responseTime time.Duration) { + log.Printf( + "Recording metrics: method=%s, bytes=%d, responseTime=%s", + method, + bytes, + responseTime, + ) + bytesHistogram.WithLabelValues(method).Observe(float64(bytes)) + responseTimeHistogram.WithLabelValues(method).Observe(responseTime.Seconds()) +} + +func waitForMetrics() { + for { + data := <-latency.Mtrx + log.Printf("Received data: %+v", data) + + for _, bytes := range data.BytesNumber { + recordMetrics( + "LatencyService", + bytes, + time.Duration(data.ResponseTime), + ) + } + } +} diff --git a/daemon/api/grpc/server.go b/daemon/api/grpc/server.go index 8a38542..c4034b8 100644 --- a/daemon/api/grpc/server.go +++ b/daemon/api/grpc/server.go @@ -4,69 +4,14 @@ import ( "log" "net" "net/http" - ltc "obzev0/common/proto/latency" - tcanl "obzev0/common/proto/tcAnalyser" "obzev0/daemon/api/grpc/interceptors" - "obzev0/daemon/api/grpc/latency" - tcanalyser "obzev0/daemon/api/grpc/tcAnalyser" - "os" - "time" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/health" - "google.golang.org/grpc/health/grpc_health_v1" ) -var ( - bytesHistogram = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "grpc_server_bytes", - Help: "Bytes processed by the gRPC server", - Buckets: prometheus.DefBuckets, - }, - []string{"method"}, - ) - responseTimeHistogram = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "grpc_server_response_time_seconds", - Help: "Response time of gRPC server methods", - Buckets: prometheus.DefBuckets, - }, - []string{"method"}, - ) -) - -func recordMetrics(method string, bytes int64, responseTime time.Duration) { - log.Printf( - "Recording metrics: method=%s, bytes=%d, responseTime=%s", - method, - bytes, - responseTime, - ) - bytesHistogram.WithLabelValues(method).Observe(float64(bytes)) - responseTimeHistogram.WithLabelValues(method).Observe(responseTime.Seconds()) -} - -func waitForMetrics() { - for { - data := <-latency.Mtrx - log.Printf("Received data: %+v", data) - - for _, bytes := range data.BytesNumber { - recordMetrics( - "LatencyService", - bytes, - time.Duration(data.ResponseTime), - ) - } - } -} - var ( rpcLogger *logrus.Entry ) @@ -77,15 +22,8 @@ func main() { log.Fatal("Failed to start on port 50051: ", err) } - logger := logrus.New() - logger.SetFormatter(&logrus.JSONFormatter{ - PrettyPrint: true, - }) - logger.SetOutput(os.Stdout) + rpcLogger := Logger() - rpcLogger = logger.WithFields(logrus.Fields{ - "service": "gRPC/server", - }) grpcServer := grpc.NewServer( grpc.UnaryInterceptor( recovery.UnaryServerInterceptor( @@ -95,23 +33,8 @@ func main() { ), ), ) - s := latency.LatencyService{} - ltc.RegisterLatencyServiceServer(grpcServer, &s) - tc := tcanalyser.TcAnalyserService{} - tcanl.RegisterTcAnalyserServiceServer(grpcServer, &tc) - - healthSrv := health.NewServer() - grpc_health_v1.RegisterHealthServer(grpcServer, healthSrv) - - healthSrv.SetServingStatus( - "grpc.health.v1.Health", - grpc_health_v1.HealthCheckResponse_SERVING, - ) - healthSrv.SetServingStatus( - "obzev0.common.proto.latency.LatencyService", - grpc_health_v1.HealthCheckResponse_SERVING, - ) + serviceAgent(grpcServer, rpcLogger) go waitForMetrics() diff --git a/daemon/api/grpc/tcAnalyser/bpf/tc.c b/daemon/api/grpc/tcAnalyser/bpf/tc.c deleted file mode 100644 index 71ca6be..0000000 --- a/daemon/api/grpc/tcAnalyser/bpf/tc.c +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __uint(key_size, sizeof(int)); - __uint(value_size, sizeof(int)); - __uint(max_entries, 1024); -} events SEC(".maps"); - -struct event { - __u32 src_ip; - __u32 dst_ip; - __u16 src_port; - __u16 dst_port; - __u8 protocol; - __u8 direction; - __u8 tcp_flags; -}; - -// to avoid duplication :> -static __always_inline int process_packet(struct __sk_buff *skb, - unsigned char direction) { - void *data = (void *)(unsigned long)skb->data; - void *data_end = (void *)(unsigned long)skb->data_end; - - // eht header - struct ethhdr *eth = data; - if ((void *)(eth + 1) > data_end) - return TC_ACT_SHOT; - if (eth->h_proto != bpf_htons(ETH_P_IP)) - return TC_ACT_OK; - - // ip header - struct iphdr *ip = (struct iphdr *)(eth + 1); - if ((void *)(ip + 1) > data_end) - return TC_ACT_SHOT; - - // event creation - struct event e = {0}; - e.src_ip = ip->saddr; - e.dst_ip = ip->daddr; - e.protocol = ip->protocol; - e.direction = direction; - - if (ip->protocol == IPPROTO_TCP) { - struct tcphdr *tcp = (struct tcphdr *)(ip + 1); - if ((void *)(tcp + 1) > data_end) - return TC_ACT_SHOT; - - e.src_port = bpf_ntohs(tcp->source); - e.dst_port = bpf_ntohs(tcp->dest); - e.tcp_flags = tcp->fin | (tcp->syn << 1) | (tcp->rst << 2) | - (tcp->psh << 3) | (tcp->ack << 4) | (tcp->urg << 5) | - (tcp->ece << 6) | (tcp->cwr << 7); - } else if (ip->protocol == IPPROTO_UDP) { - struct udphdr *udp = (struct udphdr *)(ip + 1); - if ((void *)(udp + 1) > data_end) - return TC_ACT_SHOT; - - e.src_port = bpf_ntohs(udp->source); - e.dst_port = bpf_ntohs(udp->dest); - } else { - e.src_port = 0; - e.dst_port = 0; - e.tcp_flags = 0; - } - // outputing the data via a perf event array map - bpf_perf_event_output(skb, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); - - return TC_ACT_OK; -} - -SEC("tc") -int tc_ingress(struct __sk_buff *skb) { return process_packet(skb, 0); } - -SEC("tc") -int tc_egress(struct __sk_buff *skb) { return process_packet(skb, 1); } - -char _license[] SEC("license") = "GPL"; From 8cb32c355a96ffeac2b0482925dd88ff4acbae0e Mon Sep 17 00:00:00 2001 From: lumbrjx Date: Sat, 31 Aug 2024 23:55:44 +0100 Subject: [PATCH 3/3] Struct :art: restucture controller files --- bpf/Makefile | 58 ++++++++++ bpf/tc.c | 88 +++++++++++++++ controller/api/v1/zz_generated.deepcopy.go | 16 +++ controller/internal/controller/grpcClient.go | 60 ++++++++++ controller/internal/controller/infx.go | 81 ++++++++++++++ .../controller/obzev0resource_controller.go | 104 +----------------- daemon/go.mod | 10 +- daemon/go.sum | 3 +- 8 files changed, 311 insertions(+), 109 deletions(-) create mode 100644 bpf/Makefile create mode 100644 bpf/tc.c create mode 100644 controller/internal/controller/grpcClient.go create mode 100644 controller/internal/controller/infx.go diff --git a/bpf/Makefile b/bpf/Makefile new file mode 100644 index 0000000..47700f5 --- /dev/null +++ b/bpf/Makefile @@ -0,0 +1,58 @@ +TARGET = tc.o +INTERFACE = enp1s0 +US_DIR = user_space +CFLAGS = -I/usr/include -I/usr/include/x86_64-linux-gnu -I/usr/include/x86_64-linux-gnu/bits -I/usr/include/x86_64-linux-gnu/sys -I/usr/include/bpf + +.PHONY: install-deps +install-deps: + sudo apt update + sudo apt install -y clang llvm libelf-dev linux-headers-$$(uname -r) build-essential + cd $(US_DIR) && go mod tidy + +# Compile the eBPF program +$(TARGET): main.c + clang $(CFLAGS) -O2 -g -target bpf -c tc.c -o $(TARGET) + +# Load the eBPF program manually +.PHONY: load +load: $(TARGET) + sudo tc qdisc add dev $(INTERFACE) clsact + sudo tc filter add dev $(INTERFACE) ingress bpf da obj $(TARGET) sec tc + sudo tc filter add dev $(INTERFACE) egress bpf da obj $(TARGET) sec tc + +# View bpf_printk output +.PHONY: view-manual +view: + sudo cat /sys/kernel/debug/tracing/trace_pipe + +.PHONY: view-tcp-manual +view-tcp: + sudo cat /sys/kernel/debug/tracing/trace_pipe | grep TCP + +.PHONY: view-udp-manual +view-udp: + sudo cat /sys/kernel/debug/tracing/trace_pipe | grep UDP + + +# build user space program +.PHONY: build-US +build-US: + cd user_space && go build -o tc_US tc.go + +# start user space program +.PHONY: start-US +start-US: + sudo ./user_space/tc_US $(INTERFACE) + +# Remove the filters and qdisc when done manually +.PHONY: clean +clean: + sudo tc filter del dev $(INTERFACE) ingress + sudo tc filter del dev $(INTERFACE) egress + sudo tc qdisc del dev $(INTERFACE) clsact + rm -f $(TARGET) + rm -f user_space/tc_US + +# All +.PHONY: all +all: install-deps $(TARGET) build-US start-US diff --git a/bpf/tc.c b/bpf/tc.c new file mode 100644 index 0000000..71ca6be --- /dev/null +++ b/bpf/tc.c @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +struct { + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(int)); + __uint(value_size, sizeof(int)); + __uint(max_entries, 1024); +} events SEC(".maps"); + +struct event { + __u32 src_ip; + __u32 dst_ip; + __u16 src_port; + __u16 dst_port; + __u8 protocol; + __u8 direction; + __u8 tcp_flags; +}; + +// to avoid duplication :> +static __always_inline int process_packet(struct __sk_buff *skb, + unsigned char direction) { + void *data = (void *)(unsigned long)skb->data; + void *data_end = (void *)(unsigned long)skb->data_end; + + // eht header + struct ethhdr *eth = data; + if ((void *)(eth + 1) > data_end) + return TC_ACT_SHOT; + if (eth->h_proto != bpf_htons(ETH_P_IP)) + return TC_ACT_OK; + + // ip header + struct iphdr *ip = (struct iphdr *)(eth + 1); + if ((void *)(ip + 1) > data_end) + return TC_ACT_SHOT; + + // event creation + struct event e = {0}; + e.src_ip = ip->saddr; + e.dst_ip = ip->daddr; + e.protocol = ip->protocol; + e.direction = direction; + + if (ip->protocol == IPPROTO_TCP) { + struct tcphdr *tcp = (struct tcphdr *)(ip + 1); + if ((void *)(tcp + 1) > data_end) + return TC_ACT_SHOT; + + e.src_port = bpf_ntohs(tcp->source); + e.dst_port = bpf_ntohs(tcp->dest); + e.tcp_flags = tcp->fin | (tcp->syn << 1) | (tcp->rst << 2) | + (tcp->psh << 3) | (tcp->ack << 4) | (tcp->urg << 5) | + (tcp->ece << 6) | (tcp->cwr << 7); + } else if (ip->protocol == IPPROTO_UDP) { + struct udphdr *udp = (struct udphdr *)(ip + 1); + if ((void *)(udp + 1) > data_end) + return TC_ACT_SHOT; + + e.src_port = bpf_ntohs(udp->source); + e.dst_port = bpf_ntohs(udp->dest); + } else { + e.src_port = 0; + e.dst_port = 0; + e.tcp_flags = 0; + } + // outputing the data via a perf event array map + bpf_perf_event_output(skb, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); + + return TC_ACT_OK; +} + +SEC("tc") +int tc_ingress(struct __sk_buff *skb) { return process_packet(skb, 0); } + +SEC("tc") +int tc_egress(struct __sk_buff *skb) { return process_packet(skb, 1); } + +char _license[] SEC("license") = "GPL"; diff --git a/controller/api/v1/zz_generated.deepcopy.go b/controller/api/v1/zz_generated.deepcopy.go index dcab3d3..2c81ab4 100644 --- a/controller/api/v1/zz_generated.deepcopy.go +++ b/controller/api/v1/zz_generated.deepcopy.go @@ -87,6 +87,7 @@ func (in *Obzev0ResourceList) DeepCopyObject() runtime.Object { func (in *Obzev0ResourceSpec) DeepCopyInto(out *Obzev0ResourceSpec) { *out = *in out.LatencyServiceConfig = in.LatencyServiceConfig + out.TcAnalyserServiceConfig = in.TcAnalyserServiceConfig } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Obzev0ResourceSpec. @@ -114,6 +115,21 @@ func (in *Obzev0ResourceStatus) DeepCopy() *Obzev0ResourceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TcAnalyserConfig) DeepCopyInto(out *TcAnalyserConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TcAnalyserConfig. +func (in *TcAnalyserConfig) DeepCopy() *TcAnalyserConfig { + if in == nil { + return nil + } + out := new(TcAnalyserConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TcpConfig) DeepCopyInto(out *TcpConfig) { *out = *in diff --git a/controller/internal/controller/grpcClient.go b/controller/internal/controller/grpcClient.go new file mode 100644 index 0000000..a763d9f --- /dev/null +++ b/controller/internal/controller/grpcClient.go @@ -0,0 +1,60 @@ +package controller + +import ( + "context" + "fmt" + "obzev0/common/proto/latency" + "time" + + pb "obzev0/common/proto/latency" + tca "obzev0/common/proto/tcAnalyser" + v "obzev0/controller/api/v1" + + "google.golang.org/grpc" +) + +type GrpcServiceConfig struct { + LatencyConfig v.TcpConfig + TcAConfig v.TcAnalyserConfig + + // Add more fields as needed +} + +func callGrpcServices( + conn *grpc.ClientConn, + config GrpcServiceConfig, +) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Handle LatencyService gRPC call + client := pb.NewLatencyServiceClient(conn) + response, err := client.StartTcpServer( + ctx, + &pb.RequestForTcp{Config: &latency.TcpConfig{ + ReqDelay: config.LatencyConfig.ReqDelay, + ResDelay: config.LatencyConfig.ResDelay, + Server: config.LatencyConfig.Server, + Client: config.LatencyConfig.Client, + }}, + ) + if err != nil { + return fmt.Errorf("error calling StartTcpServer: %w", err) + } + fmt.Printf("Response from LatencyService gRPC server: %s\n", response.Message) + + // Handle TcAnalyserService gRPC call + client2 := tca.NewTcAnalyserServiceClient(conn) + rsp, err := client2.StartUserSpace( + ctx, + &tca.RequestForUserSpace{Config: &tca.TcConfig{ + Interface: config.TcAConfig.NetIFace, + }}, + ) + if err != nil { + return fmt.Errorf("error calling StartUserSpace: %w", err) + } + fmt.Printf("Response from TcAnalyserService gRPC server: %s\n", rsp.Message) + + return nil +} diff --git a/controller/internal/controller/infx.go b/controller/internal/controller/infx.go new file mode 100644 index 0000000..b3e61e6 --- /dev/null +++ b/controller/internal/controller/infx.go @@ -0,0 +1,81 @@ +package controller + +import ( + "context" + "log" + v1 "obzev0/controller/api/v1" + + "google.golang.org/grpc" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +func processCustomResource(obz *v1.Obzev0Resource, conn *grpc.ClientConn) { + name := obz.GetName() + namespace := obz.GetNamespace() + latencyConfig := obz.Spec.LatencyServiceConfig + tcAConfig := obz.Spec.TcAnalyserServiceConfig + + klog.Infof("Custom Resource processed: %s/%s", namespace, name) + klog.Infof("TCP Server Configuration: %+v", latencyConfig) + klog.Infof("Tc Analyser Configuration: %+v", tcAConfig) + + svcConfig := GrpcServiceConfig{ + LatencyConfig: latencyConfig, + TcAConfig: tcAConfig, + } + + err := callGrpcServices(conn, svcConfig) + if err != nil { + log.Printf("Error calling gRPC services: %v\n", err) + } + + defer conn.Close() +} + +func handleAdd(obj interface{}, conn *grpc.ClientConn) { + obz, ok := obj.(*v1.Obzev0Resource) + if !ok { + klog.Errorf("Error converting object to Obzev0Resource: %v", obj) + return + } + + processCustomResource(obz, conn) +} + +func handleUpdate(newObj interface{}, conn *grpc.ClientConn) { + obz, ok := newObj.(*v1.Obzev0Resource) + if !ok { + klog.Errorf("Error converting object to Obzev0Resource: %v", newObj) + return + } + + processCustomResource(obz, conn) +} + +func handleDelete(obj interface{}) { + obz, ok := obj.(*v1.Obzev0Resource) + if !ok { + klog.Errorf("Error converting object to Obzev0Resource: %v", obj) + return + } + + name := obz.GetName() + namespace := obz.GetNamespace() + + klog.Infof("Custom Resource deleted: %s/%s", namespace, name) +} +func listNodes(clientset *kubernetes.Clientset) { + nodes, err := clientset.CoreV1(). + Nodes(). + List(context.TODO(), metav1.ListOptions{}) + if err != nil { + klog.Fatalf("Error listing nodes: %v", err) + } + + klog.Info("Listing all nodes in the cluster:") + for _, node := range nodes.Items { + klog.Infof("Node Name: %s", node.Name) + } +} diff --git a/controller/internal/controller/obzev0resource_controller.go b/controller/internal/controller/obzev0resource_controller.go index 7cc209d..06ac149 100644 --- a/controller/internal/controller/obzev0resource_controller.go +++ b/controller/internal/controller/obzev0resource_controller.go @@ -5,11 +5,6 @@ import ( "fmt" "log" "os" - "time" - - "obzev0/common/proto/latency" - pb "obzev0/common/proto/latency" - tca "obzev0/common/proto/tcAnalyser" v1 "obzev0/controller/api/v1" @@ -19,7 +14,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" ) @@ -104,28 +98,13 @@ func SetupInformers(mgr ctrl.Manager) { log.Printf("Failed to connect to %s: %v\n", address, err) continue } - client := pb.NewLatencyServiceClient(conn) - response, err := client.StartTcpServer( - context.Background(), - &pb.RequestForTcp{Config: &latency.TcpConfig{ - ReqDelay: 1, - ResDelay: 1, - Server: "7090", - Client: "8080", - }}, - ) - if err != nil { - log.Printf("Error calling gRPC method: %v\n", err) - } else { - fmt.Printf("Response from gRPC server: %s\n", response.Message) - } crInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { handleAdd(obj, conn) }, UpdateFunc: func(oldObj, newObj interface{}) { - handleUpdate(oldObj, newObj) + handleUpdate(newObj, conn) }, DeleteFunc: func(obj interface{}) { handleDelete(obj) @@ -139,84 +118,3 @@ func SetupInformers(mgr ctrl.Manager) { } } } - -func handleAdd(obj interface{}, conn *grpc.ClientConn) { - obz, ok := obj.(*v1.Obzev0Resource) - if !ok { - klog.Errorf("Error converting object to Obzev0Resource: %v", obj) - return - } - - name := obz.GetName() - namespace := obz.GetNamespace() - latencyConfig := obz.Spec.LatencyServiceConfig - tcAConfig := obz.Spec.TcAnalyserServiceConfig - - klog.Infof("Custom Resource added: %s/%s", namespace, name) - klog.Infof("TCP Server Configuration: %+v", latencyConfig) - client := pb.NewLatencyServiceClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - response, err := client.StartTcpServer( - ctx, - &pb.RequestForTcp{Config: &latency.TcpConfig{ - ReqDelay: latencyConfig.ReqDelay, - ResDelay: latencyConfig.ResDelay, - Server: latencyConfig.Server, - Client: latencyConfig.Client, - }}, - ) - - if err != nil { - log.Printf("Error calling gRPC method: %v\n", err) - } else { - fmt.Printf("Response from gRPC server: %s\n", response.Message) - } - client2 := tca.NewTcAnalyserServiceClient(conn) - rsp, err := client2.StartUserSpace( - ctx, - &tca.RequestForUserSpace{Config: &tca.TcConfig{ - Interface: tcAConfig.NetIFace, - }}, - ) - - if err != nil { - log.Printf("Error calling gRPC method: %v\n", err) - } else { - fmt.Printf("Response from gRPC server: %s\n", rsp.Message) - } - - defer conn.Close() -} - -func handleUpdate(oldObj, newObj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(newObj) - if err != nil { - klog.Errorf("Error getting key for object: %v", err) - return - } - klog.Infof("Custom Resource updated: %s", key) -} - -func handleDelete(obj interface{}) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - klog.Errorf("Error getting key for object: %v", err) - return - } - klog.Infof("Custom Resource deleted: %s", key) -} - -func listNodes(clientset *kubernetes.Clientset) { - nodes, err := clientset.CoreV1(). - Nodes(). - List(context.TODO(), metav1.ListOptions{}) - if err != nil { - klog.Fatalf("Error listing nodes: %v", err) - } - - klog.Info("Listing all nodes in the cluster:") - for _, node := range nodes.Items { - klog.Infof("Node Name: %s", node.Name) - } -} diff --git a/daemon/go.mod b/daemon/go.mod index 515548b..b904d00 100644 --- a/daemon/go.mod +++ b/daemon/go.mod @@ -13,20 +13,20 @@ require ( replace obzev0/common => ../common -require github.com/sirupsen/logrus v1.9.3 +require ( + github.com/cilium/ebpf v0.16.0 + github.com/sirupsen/logrus v1.9.3 + github.com/vishvananda/netlink v1.3.0 +) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cilium/ebpf v0.16.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect - github.com/kr/text v0.2.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/stretchr/testify v1.9.0 // indirect - github.com/vishvananda/netlink v1.3.0 // indirect github.com/vishvananda/netns v0.0.4 // indirect golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/daemon/go.sum b/daemon/go.sum index cfcd099..82045d2 100644 --- a/daemon/go.sum +++ b/daemon/go.sum @@ -4,12 +4,13 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=