Skip to content

Commit

Permalink
name resolver improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
edeNFed committed Feb 27, 2023
1 parent 0e191fc commit 5bed9e9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 21 deletions.
7 changes: 6 additions & 1 deletion processor/odigosresourcenameprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ func initNameResolver(cfg component.Config, logger *zap.Logger) error {
return err
}

kubelet, err := NewKubeletClient()
ns := &NameFromOwner{
kc: kubeClient,
logger: logger,
}

kubelet, err := NewKubeletClient(ns)
if err != nil {
return err
}
Expand Down
30 changes: 11 additions & 19 deletions processor/odigosresourcenameprocessor/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"regexp"
"strings"
"time"
)
Expand All @@ -16,21 +15,22 @@ var (
socketPath = "unix://" + socketDir + "/kubelet.sock"

connectionTimeout = 10 * time.Second
ownerRegex = regexp.MustCompile(`(?P<deployment_name>[a-z0-9]+(?:-[a-z0-9]+)*?)-[a-f0-9]{10}-[a-z0-9]+`)
)

type kubeletClient struct {
conn *grpc.ClientConn
conn *grpc.ClientConn
nameStrategy NameStrategy
}

func NewKubeletClient() (*kubeletClient, error) {
func NewKubeletClient(ns NameStrategy) (*kubeletClient, error) {
conn, err := connectToKubelet(socketPath)
if err != nil {
return nil, err
}

return &kubeletClient{
conn: conn,
conn: conn,
nameStrategy: ns,
}, nil
}

Expand All @@ -52,7 +52,12 @@ func (c *kubeletClient) GetAllocations() (map[string]string, error) {
for _, device := range container.Devices {
for _, id := range device.DeviceIds {
if strings.Contains(device.GetResourceName(), "odigos.io") {
allocations[id] = calculateResourceName(pod.Name, len(pod.Containers), container.Name)
allocations[id] = c.nameStrategy.GetName(&ContainerDetails{
PodName: pod.Name,
PodNamespace: pod.Namespace,
ContainerName: container.Name,
ContainersInPod: len(pod.Containers),
})
}
}
}
Expand All @@ -62,19 +67,6 @@ func (c *kubeletClient) GetAllocations() (map[string]string, error) {
return allocations, nil
}

func calculateResourceName(podName string, containers int, containerName string) string {
if containers > 1 {
return containerName
}

match := ownerRegex.FindStringSubmatch(podName)
if len(match) > 1 {
return match[1]
}

return podName
}

func connectToKubelet(socket string) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
Expand Down
67 changes: 67 additions & 0 deletions processor/odigosresourcenameprocessor/naming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package odigosresourcenameprocessor

import (
"context"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type ContainerDetails struct {
PodName string
PodNamespace string
ContainersInPod int
ContainerName string
}

type NameStrategy interface {
GetName(containerDetails *ContainerDetails) string
}

type NameFromOwner struct {
kc kubernetes.Interface
logger *zap.Logger
}

func (n *NameFromOwner) GetName(containerDetails *ContainerDetails) string {
if containerDetails.ContainersInPod > 1 {
return containerDetails.ContainerName
}

name, err := n.getNameByOwner(containerDetails)
if err != nil {
n.logger.Error("Failed to get name by owner, using pod name", zap.Error(err))
return containerDetails.PodName
}

return name
}

func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (string, error) {
pod, err := n.kc.CoreV1().Pods(containerDetails.PodNamespace).
Get(context.Background(), containerDetails.PodName, metav1.GetOptions{})
if err != nil {
return "", err
}

ownerRefs := pod.GetOwnerReferences()
for _, ownerRef := range ownerRefs {
if ownerRef.Kind == "ReplicaSet" {
rs, err := n.kc.AppsV1().ReplicaSets(pod.Namespace).Get(context.Background(), ownerRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}

ownerRefs = rs.GetOwnerReferences()
for _, ownerRef := range ownerRefs {
if ownerRef.Kind == "Deployment" {
return ownerRef.Name, nil
}
}
} else if ownerRef.Kind == "StatefulSet" || ownerRef.Kind == "DaemonSet" {
return ownerRef.Name, nil
}
}

return containerDetails.PodName, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func (rp *resourceProcessor) processAttributes(ctx context.Context, logger *zap.
}

// Replace resource name
logger.Info("Replacing resource name", zap.String("name", string(name)))
resourceName.SetStr(string(name))
return
}
Expand Down

0 comments on commit 5bed9e9

Please sign in to comment.