Skip to content

Commit

Permalink
otelconsumer: set document id attribute for elasticsearchexporter (#4…
Browse files Browse the repository at this point in the history
…2412)

* otelconsumer: set document id attribute for elasticsearchexporter

* add comment about field being removed from elasticsearch
  • Loading branch information
mauri870 authored Jan 31, 2025
1 parent 0ff52eb commit 04eac62
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
26 changes: 26 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

const (
// esDocumentIDAttribute is the attribute key used to store the document ID in the log record.
esDocumentIDAttribute = "elasticsearch.document_id"
)

func init() {
outputs.RegisterType("otelconsumer", makeOtelConsumer)
}
Expand Down Expand Up @@ -84,9 +89,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
sourceLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := sourceLogs.LogRecords()

// Convert the batch of events to Otel plog.Logs. The encoding we
// choose here is to set all fields in a Map in the Body of the log
// record. Each log record encodes a single beats event.
// This way we have full control over the final structure of the log in the
// destination, as long as the exporter allows it.
// For example, the elasticsearchexporter has an encoding specifically for this.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35444.
events := batch.Events()
for _, event := range events {
logRecord := logRecords.AppendEmpty()

if id, ok := event.Content.Meta["_id"]; ok {
// Specify the id as an attribute used by the elasticsearchexporter
// to set the final document ID in Elasticsearch.
// When using the bodymap encoding in the exporter all attributes
// are stripped out of the final Elasticsearch document.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36882.
switch id := id.(type) {
case string:
logRecord.Attributes().PutStr(esDocumentIDAttribute, id)
}
}

beatEvent := event.Content.Fields.Clone()
beatEvent["@timestamp"] = event.Content.Timestamp
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp))
Expand Down
21 changes: 21 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestPublish(t *testing.T) {
event1 := beat.Event{Fields: mapstr.M{"field": 1}}
event2 := beat.Event{Fields: mapstr.M{"field": 2}}
event3 := beat.Event{Fields: mapstr.M{"field": 3}}
event4 := beat.Event{Meta: mapstr.M{"_id": "abc123"}}

makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer {
t.Helper()
Expand Down Expand Up @@ -118,6 +119,26 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("sets the elasticsearchexporter doc id attribute from metadata", func(t *testing.T) {
batch := outest.NewBatch(event4)

var docID string
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
record := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
attr, ok := record.Attributes().Get(esDocumentIDAttribute)
assert.True(t, ok, "document ID attribute should be set")
docID = attr.AsString()

return nil
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, event4.Meta["_id"], docID)
})

t.Run("sets the @timestamp field with the correct format", func(t *testing.T) {
batch := outest.NewBatch(event3)
batch.Events()[0].Content.Timestamp = time.Date(2025, time.January, 29, 9, 2, 39, 0, time.UTC)
Expand Down

0 comments on commit 04eac62

Please sign in to comment.