diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go index 5f1d4951ec1..2e34d1f60c3 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -35,6 +35,11 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) +const ( + // esDocumentIDAttribute is the attribute key used to store the document ID in the log record. + esDocumentIDAttribute = "elasticsearch.document_id" +) + func init() { outputs.RegisterType("otelconsumer", makeOtelConsumer) } @@ -84,9 +89,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) sourceLogs := resourceLogs.ScopeLogs().AppendEmpty() logRecords := sourceLogs.LogRecords() + // Convert the batch of events to Otel plog.Logs. The encoding we + // choose here is to set all fields in a Map in the Body of the log + // record. Each log record encodes a single beats event. + // This way we have full control over the final structure of the log in the + // destination, as long as the exporter allows it. + // For example, the elasticsearchexporter has an encoding specifically for this. + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35444. events := batch.Events() for _, event := range events { logRecord := logRecords.AppendEmpty() + + if id, ok := event.Content.Meta["_id"]; ok { + // Specify the id as an attribute used by the elasticsearchexporter + // to set the final document ID in Elasticsearch. + // When using the bodymap encoding in the exporter all attributes + // are stripped out of the final Elasticsearch document. + // + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36882. + switch id := id.(type) { + case string: + logRecord.Attributes().PutStr(esDocumentIDAttribute, id) + } + } + beatEvent := event.Content.Fields.Clone() beatEvent["@timestamp"] = event.Content.Timestamp logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp)) diff --git a/libbeat/outputs/otelconsumer/otelconsumer_test.go b/libbeat/outputs/otelconsumer/otelconsumer_test.go index 094058a2923..bcb7253023c 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -43,6 +43,7 @@ func TestPublish(t *testing.T) { event1 := beat.Event{Fields: mapstr.M{"field": 1}} event2 := beat.Event{Fields: mapstr.M{"field": 2}} event3 := beat.Event{Fields: mapstr.M{"field": 3}} + event4 := beat.Event{Meta: mapstr.M{"_id": "abc123"}} makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer { t.Helper() @@ -118,6 +119,26 @@ func TestPublish(t *testing.T) { assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag) }) + t.Run("sets the elasticsearchexporter doc id attribute from metadata", func(t *testing.T) { + batch := outest.NewBatch(event4) + + var docID string + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + record := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + attr, ok := record.Attributes().Get(esDocumentIDAttribute) + assert.True(t, ok, "document ID attribute should be set") + docID = attr.AsString() + + return nil + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + assert.Equal(t, event4.Meta["_id"], docID) + }) + t.Run("sets the @timestamp field with the correct format", func(t *testing.T) { batch := outest.NewBatch(event3) batch.Events()[0].Content.Timestamp = time.Date(2025, time.January, 29, 9, 2, 39, 0, time.UTC)