Skip to content

Commit

Permalink
support reported-name annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
edeNFed committed Jun 29, 2023
1 parent 5acb471 commit 97fe059
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 37 deletions.
12 changes: 8 additions & 4 deletions processor/odigosresourcenameprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func createTracesProcessor(
cfg,
nextConsumer,
proc.processTraces,
processorhelper.WithCapabilities(processorCapabilities))
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(proc.Shutdown))
}

func createMetricsProcessor(
Expand All @@ -88,7 +89,8 @@ func createMetricsProcessor(
cfg,
nextConsumer,
proc.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(proc.Shutdown))
}

func createLogsProcessor(
Expand All @@ -108,7 +110,8 @@ func createLogsProcessor(
cfg,
nextConsumer,
proc.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(proc.Shutdown))
}

func initNameResolver(cfg component.Config, logger *zap.Logger) error {
Expand All @@ -133,11 +136,12 @@ func initNameResolver(cfg component.Config, logger *zap.Logger) error {
}

nameResolver = &NameResolver{
kc: kubeClient,
logger: logger,
devicesToPods: map[string]string{},
mu: sync.RWMutex{},
kubelet: kubelet,
shutdown: make(chan struct{}),
}

return nameResolver.Start()
}
58 changes: 32 additions & 26 deletions processor/odigosresourcenameprocessor/name_resolver.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package odigosresourcenameprocessor

import (
"context"
"errors"
"fmt"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"os"
"sync"
"time"
)

const (
Expand All @@ -27,15 +22,28 @@ type NameResolver struct {
kubelet *kubeletClient
mu sync.RWMutex
devicesToPods map[string]string
shutdown chan struct{}
}

func (n *NameResolver) Resolve(deviceID string) (string, error) {
n.mu.RLock()
defer n.mu.RUnlock()

name, ok := n.devicesToPods[deviceID]
n.mu.RUnlock()

if !ok {
return "", ErrNoDeviceFound
err := n.updateDevicesToPods()
if err != nil {
n.logger.Error("Error updating devices to pods", zap.Error(err))
return "", err
}

n.mu.RLock()
name, ok = n.devicesToPods[deviceID]
n.mu.RUnlock()

if !ok {
return "", ErrNoDeviceFound
}
}

return name, nil
Expand All @@ -56,29 +64,27 @@ func (n *NameResolver) updateDevicesToPods() error {

func (n *NameResolver) Start() error {
n.logger.Info("Starting NameResolver ...")
nn, ok := os.LookupEnv(nodeNameEnvVar)
if !ok {
return fmt.Errorf("env var %s is not set", nodeNameEnvVar)
}

w, err := n.kc.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nn),
})
if err != nil {
n.logger.Error("Error watching pods", zap.Error(err))
return err
}

go func() {
for event := range w.ResultChan() {
pod := event.Object.(*corev1.Pod)
if event.Type == watch.Modified && pod.Status.Phase == corev1.PodRunning {
if err := n.updateDevicesToPods(); err != nil {
// Refresh devices to pods every 5 seconds
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
err := n.updateDevicesToPods()
if err != nil {
n.logger.Error("Error updating devices to pods", zap.Error(err))
}
case <-n.shutdown:
ticker.Stop()
return
}
}
}()

return nil
}

func (n *NameResolver) Shutdown() {
n.logger.Info("Shutting down NameResolver ...")
close(n.shutdown)
}
77 changes: 70 additions & 7 deletions processor/odigosresourcenameprocessor/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,103 @@ func (n *NameFromOwner) GetName(containerDetails *ContainerDetails) string {
return containerDetails.ContainerName
}

name, err := n.getNameByOwner(containerDetails)
name, kind, err := n.getNameByOwner(containerDetails)
if err != nil {
n.logger.Error("Failed to get name by owner, using pod name", zap.Error(err))
return containerDetails.PodName
}

overwrittenName, exists := n.getNameFromAnnotation(name, kind, containerDetails.PodNamespace)
if exists {
return overwrittenName
}

return name
}

func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (string, error) {
func (n *NameFromOwner) getNameFromAnnotation(name string, kind string, namespace string) (string, bool) {
obj := n.getKubeObject(name, kind, namespace)
if obj == nil {
return "", false
}

annotations := obj.GetAnnotations()
if annotations == nil {
return "", false
}

overwrittenName, exists := annotations["odigos.io/reported-name"]
if !exists {
return "", false
}

return overwrittenName, true
}

func (n *NameFromOwner) getKubeObject(name string, kind string, namespace string) metav1.Object {
switch kind {
case "Deployment":
deployment, err := n.kc.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
n.logger.Error("Failed to get deployment", zap.Error(err))
return nil
}

return deployment
case "StatefulSet":
statefulSet, err := n.kc.AppsV1().StatefulSets(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
n.logger.Error("Failed to get statefulset", zap.Error(err))
return nil
}

return statefulSet
case "DaemonSet":
daemonSet, err := n.kc.AppsV1().DaemonSets(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
n.logger.Error("Failed to get daemonset", zap.Error(err))
return nil
}

return daemonSet
case "Pod":
pod, err := n.kc.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
n.logger.Error("Failed to get pod", zap.Error(err))
return nil
}

return pod
}

return nil
}

func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (string, string, error) {
pod, err := n.kc.CoreV1().Pods(containerDetails.PodNamespace).
Get(context.Background(), containerDetails.PodName, metav1.GetOptions{})
if err != nil {
return "", err
return "", "", err
}

ownerRefs := pod.GetOwnerReferences()
for _, ownerRef := range ownerRefs {
if ownerRef.Kind == "ReplicaSet" {
rs, err := n.kc.AppsV1().ReplicaSets(pod.Namespace).Get(context.Background(), ownerRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
return "", "", err
}

ownerRefs = rs.GetOwnerReferences()
for _, ownerRef := range ownerRefs {
if ownerRef.Kind == "Deployment" {
return ownerRef.Name, nil
return ownerRef.Name, ownerRef.Kind, nil
}
}
} else if ownerRef.Kind == "StatefulSet" || ownerRef.Kind == "DaemonSet" {
return ownerRef.Name, nil
return ownerRef.Name, ownerRef.Kind, nil
}
}

return containerDetails.PodName, nil
return containerDetails.PodName, "Pod", nil
}
5 changes: 5 additions & 0 deletions processor/odigosresourcenameprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (rp *resourceProcessor) processLogs(ctx context.Context, ld plog.Logs) (plo
}
return ld, nil
}

func (rp *resourceProcessor) Shutdown(ctx context.Context) error {
rp.nameResolver.Shutdown()
return nil
}

0 comments on commit 97fe059

Please sign in to comment.