Skip to content

Commit

Permalink
Support standard annotations for OTEL generic resource attributes (#1569
Browse files Browse the repository at this point in the history
)

* support other resource metadata attributes

* integration tests

* fix integration tests beyla config

* fix configuration of tests
mariomac authored Jan 24, 2025
1 parent 1d45a10 commit cda3539
Showing 34 changed files with 209 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
@@ -175,7 +175,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs
wk.processByContainer[containerInfo.ContainerID] = procInfo

if pod := wk.store.PodByContainerID(containerInfo.ContainerID); pod != nil {
procInfo = withMetadata(procInfo, pod)
procInfo = withMetadata(procInfo, pod.Meta)
}
return procInfo, true
}
103 changes: 83 additions & 20 deletions pkg/internal/kube/store.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"sync"

"github.com/grafana/beyla/pkg/export/attributes"
attr "github.com/grafana/beyla/pkg/export/attributes/names"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/helpers/maps"
"github.com/grafana/beyla/pkg/kubecache/informer"
@@ -47,8 +48,12 @@ type ResourceLabels map[string][]string

const (
ResourceAttributesPrefix = "resource.opentelemetry.io/"
ServiceNameAnnotation = ResourceAttributesPrefix + "service.name"
ServiceNamespaceAnnotation = ResourceAttributesPrefix + "service.namespace"
ServiceNameAnnotation = ResourceAttributesPrefix + serviceNameKey
ServiceNamespaceAnnotation = ResourceAttributesPrefix + serviceNamespaceKey

EnvResourceAttributes = "OTEL_RESOURCE_ATTRIBUTES"
EnvServiceName = "OTEL_SERVICE_NAME"
EnvServiceNamespace = "OTEL_SERVICE_NAMESPACE"
)

var DefaultResourceLabels = ResourceLabels{
@@ -79,15 +84,16 @@ type Store struct {
namespaces map[uint32]*container.Info

// container ID to pod matcher
podsByContainer map[string]*informer.ObjectMeta
podsByContainer map[string]*CachedObjMeta
// first key: pod owner ID, second key: container ID
containersByOwner maps.Map2[string, string, *informer.ContainerInfo]

// ip to generic IP info (Node, Service, *including* Pods)
objectMetaByIP map[string]*informer.ObjectMeta
objectMetaByIP map[string]*CachedObjMeta
// used to track the changed/removed IPs of a given object
// and remove them from objectMetaByIP on update or deletion
objectMetaByQName map[qualifiedName]*informer.ObjectMeta
objectMetaByQName map[qualifiedName]*CachedObjMeta
// todo: can be probably removed as objectMetaByIP already caches the service name/namespace
otelServiceInfoByIP map[string]OTelServiceNamePair

// Instead of subscribing to the informer directly, the rest of components
@@ -98,16 +104,23 @@ type Store struct {
resourceLabels ResourceLabels
}

type CachedObjMeta struct {
Meta *informer.ObjectMeta
ServiceName string
ServiceNamespace string
OTELResourceMeta map[attr.Name]string
}

func NewStore(kubeMetadata meta.Notifier, resourceLabels ResourceLabels) *Store {
log := dblog()
db := &Store{
log: log,
containerIDs: map[string]*container.Info{},
namespaces: map[uint32]*container.Info{},
podsByContainer: map[string]*informer.ObjectMeta{},
podsByContainer: map[string]*CachedObjMeta{},
containerByPID: map[uint32]*container.Info{},
objectMetaByIP: map[string]*informer.ObjectMeta{},
objectMetaByQName: map[qualifiedName]*informer.ObjectMeta{},
objectMetaByIP: map[string]*CachedObjMeta{},
objectMetaByQName: map[qualifiedName]*CachedObjMeta{},
containersByOwner: maps.Map2[string, string, *informer.ContainerInfo]{},
otelServiceInfoByIP: map[string]OTelServiceNamePair{},
metadataNotifier: kubeMetadata,
@@ -120,6 +133,53 @@ func NewStore(kubeMetadata meta.Notifier, resourceLabels ResourceLabels) *Store

func (s *Store) ID() string { return "unique-metadata-observer" }

// cacheResourceMetadata extracts the resource attribute from different standard OTEL sources, in order of preference:
// 1. Resource attributes set via OTEL_RESOURCE_ATTRIBUTES and OTEL_SERVICE_NAME* environment variables
// 2. Resource attributes set via annotations (with the resource.opentelemetry.io/ prefix)
// 3. Resource attributes set via labels (e.g. app.kubernetes.io/name)
func (s *Store) cacheResourceMetadata(meta *informer.ObjectMeta) *CachedObjMeta {
// store metadata from labels, if set
com := CachedObjMeta{
Meta: meta,
OTELResourceMeta: map[attr.Name]string{},
}
if len(meta.Labels) > 0 {
for propertyName, labels := range s.resourceLabels {
for _, label := range labels {
if val := meta.Labels[label]; val != "" {
com.OTELResourceMeta[attr.Name(propertyName)] = val
break
}
}
}
}
// override with metadata from annotations
for labelName, labelValue := range meta.Annotations {
if !strings.HasPrefix(labelName, ResourceAttributesPrefix) {
continue
}
propertyName := labelName[len(ResourceAttributesPrefix):]
com.OTELResourceMeta[attr.Name(propertyName)] = labelValue
}
// override with metadata from OTEL_RESOURCE_ATTRIBUTES, OTEL_SERVICE_NAME and OTEL_SERVICE_NAMESPACE
for _, cnt := range meta.GetPod().GetContainers() {
if len(cnt.Env) == 0 {
continue
}
attributes.ParseOTELResourceVariable(cnt.Env[EnvResourceAttributes], func(k, v string) {
com.OTELResourceMeta[attr.Name(k)] = v
})
if val := cnt.Env[EnvServiceName]; val != "" {
com.OTELResourceMeta[serviceNameKey] = val
}
if val := cnt.Env[EnvServiceNamespace]; val != "" {
com.OTELResourceMeta[serviceNamespaceKey] = val
}
}
com.ServiceName, com.ServiceNamespace = s.serviceNameNamespaceForMetadata(meta)
return &com
}

// On is invoked by the informer when a new Kube object is created, updated or deleted.
// It will forward the notification to all the Store subscribers
func (s *Store) On(event *informer.Event) error {
@@ -182,19 +242,20 @@ func (s *Store) updateObjectMeta(meta *informer.ObjectMeta) {
// this will avoid to leak some IPs and containers that exist in the
// stored snapshot but not in the updated snapshot
if previousObject, ok := s.objectMetaByQName[qName(meta)]; ok {
s.unlockedDeleteObjectMeta(previousObject)
s.unlockedDeleteObjectMeta(previousObject.Meta)
}
s.unlockedAddObjectMeta(meta)
}

// it's important to make sure that any element added here is removed when
// calling unlockedDeleteObjectMeta with the same ObjectMeta
func (s *Store) unlockedAddObjectMeta(meta *informer.ObjectMeta) {
cmeta := s.cacheResourceMetadata(meta)
qn := qName(meta)
s.objectMetaByQName[qn] = meta
s.objectMetaByQName[qn] = cmeta

for _, ip := range meta.Ips {
s.objectMetaByIP[ip] = meta
s.objectMetaByIP[ip] = cmeta
}

s.otelServiceInfoByIP = map[string]OTelServiceNamePair{}
@@ -204,7 +265,7 @@ func (s *Store) unlockedAddObjectMeta(meta *informer.ObjectMeta) {
s.log.Debug("adding pod to store",
"ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containers", meta.Pod.Containers)
for _, c := range meta.Pod.Containers {
s.podsByContainer[c.Id] = meta
s.podsByContainer[c.Id] = cmeta
// TODO: make sure we can handle when the containerIDs is set after this function is triggered
info, ok := s.containerIDs[c.Id]
if ok {
@@ -227,7 +288,7 @@ func (s *Store) deleteObjectMeta(meta *informer.ObjectMeta) {
// containers could have been removed in the last snapshot

if previousObject, ok := s.objectMetaByQName[qName(meta)]; ok {
s.unlockedDeleteObjectMeta(previousObject)
s.unlockedDeleteObjectMeta(previousObject.Meta)
}
s.unlockedDeleteObjectMeta(meta)
}
@@ -262,19 +323,19 @@ func fetchOwnerID(meta *informer.ObjectMeta) string {
return oID
}

func (s *Store) PodByContainerID(cid string) *informer.ObjectMeta {
func (s *Store) PodByContainerID(cid string) *CachedObjMeta {
s.access.RLock()
defer s.access.RUnlock()
return s.podsByContainer[cid]
}

// PodContainerByPIDNs second return value: container Name
func (s *Store) PodContainerByPIDNs(pidns uint32) (*informer.ObjectMeta, string) {
func (s *Store) PodContainerByPIDNs(pidns uint32) (*CachedObjMeta, string) {
s.access.RLock()
defer s.access.RUnlock()
if info, ok := s.namespaces[pidns]; ok {
if om, ok := s.podsByContainer[info.ContainerID]; ok {
oID := fetchOwnerID(om)
oID := fetchOwnerID(om.Meta)
containerName := ""
if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok {
containerName = containerInfo.Name
@@ -285,7 +346,7 @@ func (s *Store) PodContainerByPIDNs(pidns uint32) (*informer.ObjectMeta, string)
return nil, ""
}

func (s *Store) ObjectMetaByIP(ip string) *informer.ObjectMeta {
func (s *Store) ObjectMetaByIP(ip string) *CachedObjMeta {
s.access.RLock()
defer s.access.RUnlock()
return s.objectMetaByIP[ip]
@@ -297,6 +358,8 @@ func (s *Store) ServiceNameNamespaceForMetadata(om *informer.ObjectMeta) (string
return s.serviceNameNamespaceForMetadata(om)
}

// TODO: this function can be probably simplified, as it is used to build a CachedObjectMeta
// that already contains the metadata
func (s *Store) serviceNameNamespaceForMetadata(om *informer.ObjectMeta) (string, string) {
var name string
var namespace string
@@ -342,7 +405,7 @@ func (s *Store) ServiceNameNamespaceForIP(ip string) (string, string) {

name, namespace := "", ""
if om, ok := s.objectMetaByIP[ip]; ok {
name, namespace = s.serviceNameNamespaceForMetadata(om)
name, namespace = s.serviceNameNamespaceForMetadata(om.Meta)
}

s.otelServiceInfoByIP[ip] = OTelServiceNamePair{Name: name, Namespace: namespace}
@@ -440,7 +503,7 @@ func (s *Store) Subscribe(observer meta.Observer) {
defer s.access.RUnlock()
s.BaseNotifier.Subscribe(observer)
for _, pod := range s.podsByContainer {
if err := observer.On(&informer.Event{Type: informer.EventType_CREATED, Resource: pod}); err != nil {
if err := observer.On(&informer.Event{Type: informer.EventType_CREATED, Resource: pod.Meta}); err != nil {
s.log.Debug("observer failed sending Pod info. Unsubscribing it", "observer", observer.ID(), "error", err)
s.BaseNotifier.Unsubscribe(observer)
return
@@ -450,7 +513,7 @@ func (s *Store) Subscribe(observer meta.Observer) {
// is the subscriber the one that should decide whether to ignore such duplicates or
// incomplete info
for _, ips := range s.objectMetaByIP {
if err := observer.On(&informer.Event{Type: informer.EventType_CREATED, Resource: ips}); err != nil {
if err := observer.On(&informer.Event{Type: informer.EventType_CREATED, Resource: ips.Meta}); err != nil {
s.log.Debug("observer failed sending Object Meta. Unsubscribing it", "observer", observer.ID(), "error", err)
s.BaseNotifier.Unsubscribe(observer)
return
16 changes: 8 additions & 8 deletions pkg/internal/kube/store_test.go
Original file line number Diff line number Diff line change
@@ -296,12 +296,12 @@ func TestMetaByIPEntryRemovedIfIPGroupChanges(t *testing.T) {
assert.Nil(t, store.ObjectMetaByIP("1.2.3.4"))
om := store.ObjectMetaByIP("3.1.1.1")
require.NotNil(t, om)
assert.Equal(t, "object_1", om.Name)
assert.Equal(t, []string{"3.1.1.1", "3.2.2.2"}, om.Ips)
assert.Equal(t, "object_1", om.Meta.Name)
assert.Equal(t, []string{"3.1.1.1", "3.2.2.2"}, om.Meta.Ips)
om = store.ObjectMetaByIP("3.2.2.2")
require.NotNil(t, om)
assert.Equal(t, "object_1", om.Name)
assert.Equal(t, []string{"3.1.1.1", "3.2.2.2"}, om.Ips)
assert.Equal(t, "object_1", om.Meta.Name)
assert.Equal(t, []string{"3.1.1.1", "3.2.2.2"}, om.Meta.Ips)

// AND WHEN an object is updated with a different set of IPs
_ = store.On(&informer.Event{
@@ -317,12 +317,12 @@ func TestMetaByIPEntryRemovedIfIPGroupChanges(t *testing.T) {
assert.Nil(t, store.ObjectMetaByIP("3.1.1.1"))
om = store.ObjectMetaByIP("3.3.3.3")
require.NotNil(t, om)
assert.Equal(t, "object_1", om.Name)
assert.Equal(t, []string{"3.2.2.2", "3.3.3.3"}, om.Ips)
assert.Equal(t, "object_1", om.Meta.Name)
assert.Equal(t, []string{"3.2.2.2", "3.3.3.3"}, om.Meta.Ips)
om = store.ObjectMetaByIP("3.2.2.2")
require.NotNil(t, om)
assert.Equal(t, "object_1", om.Name)
assert.Equal(t, []string{"3.2.2.2", "3.3.3.3"}, om.Ips)
assert.Equal(t, "object_1", om.Meta.Name)
assert.Equal(t, []string{"3.2.2.2", "3.3.3.3"}, om.Meta.Ips)
}

func TestNoLeakOnUpdateOrDeletion(t *testing.T) {
7 changes: 4 additions & 3 deletions pkg/internal/netolly/transform/k8s/kubernetes.go
Original file line number Diff line number Diff line change
@@ -119,8 +119,8 @@ func (n *decorator) transform(flow *ebpf.Record) bool {

// decorate the flow with Kube metadata. Returns false if there is no metadata found for such IP
func (n *decorator) decorate(flow *ebpf.Record, prefix, ip string) bool {
meta := n.kube.ObjectMetaByIP(ip)
if meta == nil {
cachedObj := n.kube.ObjectMetaByIP(ip)
if cachedObj == nil {
if n.log.Enabled(context.TODO(), slog.LevelDebug) {
// avoid spoofing the debug logs with the same message for each flow whose IP can't be decorated
if !n.alreadyLoggedIPs.Contains(ip) {
@@ -130,6 +130,7 @@ func (n *decorator) decorate(flow *ebpf.Record, prefix, ip string) bool {
}
return false
}
meta := cachedObj.Meta
ownerName, ownerKind := meta.Name, meta.Kind
if owner := kube.TopOwner(meta.Pod); owner != nil {
ownerName, ownerKind = owner.Name, owner.Kind
@@ -144,7 +145,7 @@ func (n *decorator) decorate(flow *ebpf.Record, prefix, ip string) bool {
if meta.Pod != nil && meta.Pod.HostIp != "" {
flow.Attrs.Metadata[attr.Name(prefix+attrSuffixHostIP)] = meta.Pod.HostIp
if host := n.kube.ObjectMetaByIP(meta.Pod.HostIp); host != nil {
flow.Attrs.Metadata[attr.Name(prefix+attrSuffixHostName)] = host.Name
flow.Attrs.Metadata[attr.Name(prefix+attrSuffixHostName)] = host.Meta.Name
}
}
// decorate other names from metadata, if required
29 changes: 16 additions & 13 deletions pkg/transform/k8s.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"maps"
"time"

"github.com/mariomac/pipes/pipe"
@@ -12,7 +13,6 @@ import (
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/kubecache/informer"
"github.com/grafana/beyla/pkg/kubeflags"
)

@@ -119,14 +119,14 @@ func (md *metadataDecorator) do(span *request.Span) {
}
}

func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.ObjectMeta, containerName string) {
if meta.Pod == nil {
func (md *metadataDecorator) appendMetadata(span *request.Span, meta *kube.CachedObjMeta, containerName string) {
if meta.Meta.Pod == nil {
// if this message happen, there is a bug
klog().Debug("pod metadata for is nil. Ignoring decoration", "meta", meta)
return
}
topOwner := kube.TopOwner(meta.Pod)
name, namespace := md.db.ServiceNameNamespaceForMetadata(meta)
topOwner := kube.TopOwner(meta.Meta.Pod)
name, namespace := md.db.ServiceNameNamespaceForMetadata(meta.Meta)
// If the user has not defined criteria values for the reported
// service name and namespace, we will automatically set it from
// the kubernetes metadata
@@ -142,17 +142,17 @@ func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.O
// (related issue: https://github.com/grafana/beyla/issues/1124)
// Service Instance ID is set according to OTEL collector conventions:
// (related issue: https://github.com/grafana/k8s-monitoring-helm/issues/942)
span.Service.UID.Instance = meta.Namespace + "." + meta.Name + "." + containerName
span.Service.UID.Instance = meta.Meta.Namespace + "." + meta.Meta.Name + "." + containerName

// if, in the future, other pipeline steps modify the service metadata, we should
// replace the map literal by individual entry insertions
span.Service.Metadata = map[attr.Name]string{
attr.K8sNamespaceName: meta.Namespace,
attr.K8sPodName: meta.Name,
attr.K8sNamespaceName: meta.Meta.Namespace,
attr.K8sPodName: meta.Meta.Name,
attr.K8sContainerName: containerName,
attr.K8sNodeName: meta.Pod.NodeName,
attr.K8sPodUID: meta.Pod.Uid,
attr.K8sPodStartTime: meta.Pod.StartTimeStr,
attr.K8sNodeName: meta.Meta.Pod.NodeName,
attr.K8sPodUID: meta.Meta.Pod.Uid,
attr.K8sPodStartTime: meta.Meta.Pod.StartTimeStr,
attr.K8sClusterName: md.clusterName,
}

@@ -162,14 +162,17 @@ func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.O
span.Service.Metadata[attr.K8sOwnerName] = topOwner.Name
}

for _, owner := range meta.Pod.Owners {
for _, owner := range meta.Meta.Pod.Owners {
if kindLabel := OwnerLabelName(owner.Kind); kindLabel != "" {
span.Service.Metadata[kindLabel] = owner.Name
}
}

// append resource metadata from cached object
maps.Copy(span.Service.Metadata, meta.OTELResourceMeta)

// override hostname by the Pod name
span.Service.HostName = meta.Name
span.Service.HostName = meta.Meta.Name
}

func OwnerLabelName(kind string) attr.Name {
Loading

0 comments on commit cda3539

Please sign in to comment.