diff --git a/processor/odigosresourcenameprocessor/factory.go b/processor/odigosresourcenameprocessor/factory.go index 324be5b2fc..860b8762e9 100644 --- a/processor/odigosresourcenameprocessor/factory.go +++ b/processor/odigosresourcenameprocessor/factory.go @@ -68,7 +68,8 @@ func createTracesProcessor( cfg, nextConsumer, proc.processTraces, - processorhelper.WithCapabilities(processorCapabilities)) + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(proc.Shutdown)) } func createMetricsProcessor( @@ -88,7 +89,8 @@ func createMetricsProcessor( cfg, nextConsumer, proc.processMetrics, - processorhelper.WithCapabilities(processorCapabilities)) + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(proc.Shutdown)) } func createLogsProcessor( @@ -108,7 +110,8 @@ func createLogsProcessor( cfg, nextConsumer, proc.processLogs, - processorhelper.WithCapabilities(processorCapabilities)) + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(proc.Shutdown)) } func initNameResolver(cfg component.Config, logger *zap.Logger) error { @@ -133,11 +136,12 @@ func initNameResolver(cfg component.Config, logger *zap.Logger) error { } nameResolver = &NameResolver{ - kc: kubeClient, logger: logger, devicesToPods: map[string]string{}, mu: sync.RWMutex{}, kubelet: kubelet, + shutdown: make(chan struct{}), } + return nameResolver.Start() } diff --git a/processor/odigosresourcenameprocessor/name_resolver.go b/processor/odigosresourcenameprocessor/name_resolver.go index f192893c8b..8909904e4c 100644 --- a/processor/odigosresourcenameprocessor/name_resolver.go +++ b/processor/odigosresourcenameprocessor/name_resolver.go @@ -1,16 +1,11 @@ package odigosresourcenameprocessor import ( - "context" "errors" - "fmt" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "os" "sync" + "time" ) const ( @@ -27,15 +22,28 @@ type NameResolver struct { kubelet *kubeletClient mu sync.RWMutex devicesToPods map[string]string + shutdown chan struct{} } func (n *NameResolver) Resolve(deviceID string) (string, error) { n.mu.RLock() - defer n.mu.RUnlock() - name, ok := n.devicesToPods[deviceID] + n.mu.RUnlock() + if !ok { - return "", ErrNoDeviceFound + err := n.updateDevicesToPods() + if err != nil { + n.logger.Error("Error updating devices to pods", zap.Error(err)) + return "", err + } + + n.mu.RLock() + name, ok = n.devicesToPods[deviceID] + n.mu.RUnlock() + + if !ok { + return "", ErrNoDeviceFound + } } return name, nil @@ -56,29 +64,27 @@ func (n *NameResolver) updateDevicesToPods() error { func (n *NameResolver) Start() error { n.logger.Info("Starting NameResolver ...") - nn, ok := os.LookupEnv(nodeNameEnvVar) - if !ok { - return fmt.Errorf("env var %s is not set", nodeNameEnvVar) - } - - w, err := n.kc.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s", nn), - }) - if err != nil { - n.logger.Error("Error watching pods", zap.Error(err)) - return err - } - go func() { - for event := range w.ResultChan() { - pod := event.Object.(*corev1.Pod) - if event.Type == watch.Modified && pod.Status.Phase == corev1.PodRunning { - if err := n.updateDevicesToPods(); err != nil { + // Refresh devices to pods every 5 seconds + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + err := n.updateDevicesToPods() + if err != nil { n.logger.Error("Error updating devices to pods", zap.Error(err)) } + case <-n.shutdown: + ticker.Stop() + return } } }() return nil } + +func (n *NameResolver) Shutdown() { + n.logger.Info("Shutting down NameResolver ...") + close(n.shutdown) +} diff --git a/processor/odigosresourcenameprocessor/naming.go b/processor/odigosresourcenameprocessor/naming.go index ad91d1ea0b..7c2153c6a4 100644 --- a/processor/odigosresourcenameprocessor/naming.go +++ b/processor/odigosresourcenameprocessor/naming.go @@ -28,20 +28,83 @@ func (n *NameFromOwner) GetName(containerDetails *ContainerDetails) string { return containerDetails.ContainerName } - name, err := n.getNameByOwner(containerDetails) + name, kind, err := n.getNameByOwner(containerDetails) if err != nil { n.logger.Error("Failed to get name by owner, using pod name", zap.Error(err)) return containerDetails.PodName } + overwrittenName, exists := n.getNameFromAnnotation(name, kind, containerDetails.PodNamespace) + if exists { + return overwrittenName + } + return name } -func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (string, error) { +func (n *NameFromOwner) getNameFromAnnotation(name string, kind string, namespace string) (string, bool) { + obj := n.getKubeObject(name, kind, namespace) + if obj == nil { + return "", false + } + + annotations := obj.GetAnnotations() + if annotations == nil { + return "", false + } + + overwrittenName, exists := annotations["odigos.io/reported-name"] + if !exists { + return "", false + } + + return overwrittenName, true +} + +func (n *NameFromOwner) getKubeObject(name string, kind string, namespace string) metav1.Object { + switch kind { + case "Deployment": + deployment, err := n.kc.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + n.logger.Error("Failed to get deployment", zap.Error(err)) + return nil + } + + return deployment + case "StatefulSet": + statefulSet, err := n.kc.AppsV1().StatefulSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + n.logger.Error("Failed to get statefulset", zap.Error(err)) + return nil + } + + return statefulSet + case "DaemonSet": + daemonSet, err := n.kc.AppsV1().DaemonSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + n.logger.Error("Failed to get daemonset", zap.Error(err)) + return nil + } + + return daemonSet + case "Pod": + pod, err := n.kc.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + n.logger.Error("Failed to get pod", zap.Error(err)) + return nil + } + + return pod + } + + return nil +} + +func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (string, string, error) { pod, err := n.kc.CoreV1().Pods(containerDetails.PodNamespace). Get(context.Background(), containerDetails.PodName, metav1.GetOptions{}) if err != nil { - return "", err + return "", "", err } ownerRefs := pod.GetOwnerReferences() @@ -49,19 +112,19 @@ func (n *NameFromOwner) getNameByOwner(containerDetails *ContainerDetails) (stri if ownerRef.Kind == "ReplicaSet" { rs, err := n.kc.AppsV1().ReplicaSets(pod.Namespace).Get(context.Background(), ownerRef.Name, metav1.GetOptions{}) if err != nil { - return "", err + return "", "", err } ownerRefs = rs.GetOwnerReferences() for _, ownerRef := range ownerRefs { if ownerRef.Kind == "Deployment" { - return ownerRef.Name, nil + return ownerRef.Name, ownerRef.Kind, nil } } } else if ownerRef.Kind == "StatefulSet" || ownerRef.Kind == "DaemonSet" { - return ownerRef.Name, nil + return ownerRef.Name, ownerRef.Kind, nil } } - return containerDetails.PodName, nil + return containerDetails.PodName, "Pod", nil } diff --git a/processor/odigosresourcenameprocessor/resource_processor.go b/processor/odigosresourcenameprocessor/resource_processor.go index 74f647f128..214a67150c 100644 --- a/processor/odigosresourcenameprocessor/resource_processor.go +++ b/processor/odigosresourcenameprocessor/resource_processor.go @@ -78,3 +78,8 @@ func (rp *resourceProcessor) processLogs(ctx context.Context, ld plog.Logs) (plo } return ld, nil } + +func (rp *resourceProcessor) Shutdown(ctx context.Context) error { + rp.nameResolver.Shutdown() + return nil +}