Skip to content

Commit

Permalink
[receiver/k8sobjects] ensure the k8s.namespace.name attribute is se…
Browse files Browse the repository at this point in the history
…t for objects retrieved using the `watch` mode (open-telemetry#36432)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
This PR ensures that the structure of log records generated by the
k8sobjects receiver is the same, regardless of the mode (`watch` or
`pull`) being used. This also solves the issue of the
`k8s.namespace.name` attribute not being set for objects retrieved with
`watch` mode.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#36352 

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Added unit tests and adapted e2e tests

---------

Signed-off-by: Florian Bacher <[email protected]>
Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
bacherfl and ChrsMark authored Dec 20, 2024
1 parent 96368fa commit e1bd751
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 29 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sobjects-data-structure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: ensure the `k8s.namespace.name` attribute is set for objects retrieved using the `watch` mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36352]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions receiver/k8sobjectsreceiver/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func TestE2E(t *testing.T) {
}, time.Duration(tc.timeoutMinutes)*time.Minute, 1*time.Second,
"Timeout: failed to receive logs in %d minutes", tc.timeoutMinutes)

// golden.WriteLogs(t, expectedFile, logsConsumer.AllLogs()[0])

require.NoErrorf(t, plogtest.CompareLogs(expected, logsConsumer.AllLogs()[0],
plogtest.IgnoreObservedTimestamp(),
plogtest.IgnoreResourceLogsOrder(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
resourceLogs:
- resource: {}
- resource:
attributes:
- key: k8s.namespace.name
value:
stringValue: default
scopeLogs:
- scope: {}
logRecords:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
resourceLogs:
- resource: {}
- resource:
attributes:
- key: k8s.namespace.name
value:
stringValue: default
scopeLogs:
- scope: {}
logRecords:
Expand Down
18 changes: 15 additions & 3 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ func unstructuredListToLogData(event *unstructured.UnstructuredList, observedAt
namespaceResourceMap := make(map[string]plog.LogRecordSlice)

for _, e := range event.Items {
logSlice, ok := namespaceResourceMap[e.GetNamespace()]
logSlice, ok := namespaceResourceMap[getNamespace(e)]
if !ok {
rl := resourceLogs.AppendEmpty()
resourceAttrs := rl.Resource().Attributes()
if namespace := e.GetNamespace(); namespace != "" {
if namespace := getNamespace(e); namespace != "" {
resourceAttrs.PutStr(semconv.AttributeK8SNamespaceName, namespace)
}
sl := rl.ScopeLogs().AppendEmpty()
logSlice = sl.LogRecords()
namespaceResourceMap[e.GetNamespace()] = logSlice
namespaceResourceMap[getNamespace(e)] = logSlice
}
record := logSlice.AppendEmpty()
record.SetObservedTimestamp(pcommon.NewTimestampFromTime(observedAt))
Expand All @@ -79,3 +79,15 @@ func unstructuredListToLogData(event *unstructured.UnstructuredList, observedAt
}
return out
}

func getNamespace(e unstructured.Unstructured) string {
// first, try to use the GetNamespace() method, which checks for the metadata.namespace property
if namespace := e.GetNamespace(); namespace != "" {
return namespace
}
// try to look up namespace in object.metadata.namespace (for objects reported via watch mode)
if namespace, ok, _ := unstructured.NestedString(e.Object, "object", "metadata", "namespace"); ok {
return namespace
}
return ""
}
106 changes: 82 additions & 24 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.27.0"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -91,7 +90,7 @@ func TestUnstructuredListToLogData(t *testing.T) {
assert.Equal(t, 3, logRecords.Len())
})

t.Run("Test event.name in watch events", func(t *testing.T) {
t.Run("Test event observed timestamp is present", func(t *testing.T) {
config := &K8sObjectsConfig{
gvr: &schema.GroupVersionResource{
Group: "",
Expand All @@ -112,7 +111,8 @@ func TestUnstructuredListToLogData(t *testing.T) {
},
}

logs, err := watchObjectsToLogData(event, time.Now(), config)
observedAt := time.Now()
logs, err := watchObjectsToLogData(event, observedAt, config)
assert.NoError(t, err)

assert.Equal(t, 1, logs.LogRecordCount())
Expand All @@ -123,47 +123,105 @@ func TestUnstructuredListToLogData(t *testing.T) {
logRecords := rl.ScopeLogs().At(0).LogRecords()
assert.Equal(t, 1, rl.ScopeLogs().Len())
assert.Equal(t, 1, logRecords.Len())

attrs := logRecords.At(0).Attributes()
eventName, ok := attrs.Get("event.name")
require.True(t, ok)
assert.EqualValues(t, "generic-name", eventName.AsRaw())
assert.Positive(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix())
assert.Equal(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix(), observedAt.Unix())
})

t.Run("Test event observed timestamp is present", func(t *testing.T) {
t.Run("Test pull and watch objects both contain k8s.namespace.name", func(t *testing.T) {
observedTimestamp := time.Now()
config := &K8sObjectsConfig{
gvr: &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "events",
},
}
event := &watch.Event{
watchedEvent := &watch.Event{
Type: watch.Added,
Object: &unstructured.Unstructured{
Object: map[string]any{
"kind": "Event",
"apiVersion": "v1",
"metadata": map[string]any{
"name": "generic-name",
"name": "generic-name",
"namespace": "my-namespace",
},
},
},
}

observedAt := time.Now()
logs, err := watchObjectsToLogData(event, observedAt, config)
assert.NoError(t, err)

assert.Equal(t, 1, logs.LogRecordCount())
pulledEvent := &unstructured.UnstructuredList{
Items: []unstructured.Unstructured{{
Object: map[string]any{
"kind": "Event",
"apiVersion": "v1",
"metadata": map[string]any{
"name": "generic-name",
"namespace": "my-namespace",
},
},
}},
}

resourceLogs := logs.ResourceLogs()
assert.Equal(t, 1, resourceLogs.Len())
rl := resourceLogs.At(0)
logRecords := rl.ScopeLogs().At(0).LogRecords()
assert.Equal(t, 1, rl.ScopeLogs().Len())
assert.Equal(t, 1, logRecords.Len())
assert.Positive(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix())
assert.Equal(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix(), observedAt.Unix())
logEntryFromWatchEvent, err := watchObjectsToLogData(watchedEvent, observedTimestamp, config)
assert.NoError(t, err)
assert.NotNil(t, logEntryFromWatchEvent)

// verify the event.type, event.domain and k8s.resource.name attributes have been added

watchEventResourceAttrs := logEntryFromWatchEvent.ResourceLogs().At(0).Resource().Attributes()
k8sNamespace, ok := watchEventResourceAttrs.Get(semconv.AttributeK8SNamespaceName)
assert.True(t, ok)
assert.Equal(t,
"my-namespace",
k8sNamespace.Str(),
)

watchEvenLogRecordtAttrs := logEntryFromWatchEvent.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()
eventType, ok := watchEvenLogRecordtAttrs.Get("event.name")
assert.True(t, ok)
assert.Equal(
t,
"generic-name",
eventType.AsString(),
)

eventDomain, ok := watchEvenLogRecordtAttrs.Get("event.domain")
assert.True(t, ok)
assert.Equal(
t,
"k8s",
eventDomain.AsString(),
)

k8sResourceName, ok := watchEvenLogRecordtAttrs.Get("k8s.resource.name")
assert.True(t, ok)
assert.Equal(
t,
"events",
k8sResourceName.AsString(),
)

logEntryFromPulledEvent := unstructuredListToLogData(pulledEvent, observedTimestamp, config)
assert.NotNil(t, logEntryFromPulledEvent)

pullEventResourceAttrs := logEntryFromPulledEvent.ResourceLogs().At(0).Resource().Attributes()
k8sNamespace, ok = pullEventResourceAttrs.Get(semconv.AttributeK8SNamespaceName)
assert.True(t, ok)
assert.Equal(
t,
"my-namespace",
k8sNamespace.Str(),
)

pullEventLogRecordAttrs := logEntryFromPulledEvent.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()

k8sResourceName, ok = pullEventLogRecordAttrs.Get("k8s.resource.name")
assert.True(t, ok)
assert.Equal(
t,
"events",
k8sResourceName.AsString(),
)
})
}

0 comments on commit e1bd751

Please sign in to comment.