Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate logs across resources #5803

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f8f69dc
5782-fix-duplicate-logs-across-all-resources: 1. Create scope map wit…
pree-dew Sep 10, 2024
9f95898
5782-fix-duplicate-logs-across-all-resources: fix changelog.md format
pree-dew Sep 10, 2024
fa58521
5782-fix-duplicate-logs-across-all-resources: move changelog entry to…
pree-dew Sep 10, 2024
15becc9
5782-fix-duplicate-logs-across-all-resources: move changelog entry to…
pree-dew Sep 10, 2024
908cd01
5782-fix-duplicate-logs-across-all-resources: fix some lint issues
pree-dew Sep 10, 2024
5226a50
5782-fix-duplicate-logs-across-all-resources: fix some lint issues
pree-dew Sep 10, 2024
c44a80a
5782-fix-duplicate-logs-across-all-resources: fix link issue created …
pree-dew Sep 11, 2024
300463f
5782-fix-duplicate-logs-across-all-resources: fix spelling
pree-dew Sep 11, 2024
5fb27e6
5782-fix-duplicate-logs-across-all-resources: fix lint issue in template
pree-dew Sep 11, 2024
09b63d5
5782-fix-duplicate-logs-across-all-resources: fix markdown issue
pree-dew Sep 11, 2024
ea175dd
Merge branch 'main' into 5782-fix-duplicate-logs-across-all-resources
pellared Sep 12, 2024
1c960c7
Merge branch 'main' into 5782-fix-duplicate-logs-across-all-resources
XSAM Sep 13, 2024
27c3cb3
5782-fix-duplicate-logs-across-all-resources: make first test case be…
pree-dew Sep 13, 2024
918f5f1
Merge branch '5782-fix-duplicate-logs-across-all-resources' of github…
pree-dew Sep 13, 2024
0cbfdd0
5782-fix-duplicate-logs-across-all-resources: resource log order is n…
pree-dew Sep 13, 2024
d607cca
5782-fix-duplicate-logs-across-all-resources: change scope definition
pree-dew Sep 15, 2024
3535414
5782-fix-duplicate-logs-across-all-resources: rule out any caching issue
pree-dew Sep 15, 2024
ecda511
5782-fix-duplicate-logs-across-all-resources: fix lint issue
pree-dew Sep 15, 2024
b7dda3a
5782-fix-duplicate-logs-across-all-resources: resolve merge conflicts
pree-dew Sep 16, 2024
cc2038d
Merge branch 'main' into 5782-fix-duplicate-logs-across-all-resources
pellared Sep 16, 2024
88a3eaf
5782-fix-duplicate-logs-across-all-resources: resolve merge conflicts
pree-dew Sep 17, 2024
ab0555c
5782-fix-duplicate-logs-across-all-resources: resolve merge conflict
pree-dew Sep 17, 2024
95356a1
5782-fix-duplicate-logs-across-all-resources: correct pr id in changelog
pree-dew Sep 17, 2024
f24d39f
5782-fix-duplicate-logs-across-all-resources: use elements match for …
pree-dew Sep 17, 2024
5994633
Update CHANGELOG.md
pellared Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)

### Fixed

- Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5782)
pellared marked this conversation as resolved.
Show resolved Hide resolved

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
99 changes: 38 additions & 61 deletions exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform"

import (
"sync"
"time"

cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand All @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
return nil
}

resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
defer func() {
clear(resMap)
resourceLogsMapPool.Put(resMap)
}()
resourceLogsMap(&resMap, records)
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)

out := make([]*lpb.ResourceLogs, 0, len(resMap))
for _, rl := range resMap {
out = append(out, rl)
type key struct {
r attribute.Distinct
pellared marked this conversation as resolved.
Show resolved Hide resolved
is instrumentation.Scope
}
return out
}

var resourceLogsMapPool = sync.Pool{
New: func() any {
return make(map[attribute.Distinct]*lpb.ResourceLogs)
},
}
scopeMap := make(map[key]*lpb.ScopeLogs)

func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
var resources int
for _, r := range records {
res := r.Resource()
rl, ok := (*dst)[res.Equivalent()]
if !ok {
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
(*dst)[res.Equivalent()] = rl
}
rl.ScopeLogs = ScopeLogs(records)
}
}

// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
defer func() {
clear(scopeMap)
scopeLogsMapPool.Put(scopeMap)
}()
scopeLogsMap(&scopeMap, records)

out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
for _, sl := range scopeMap {
out = append(out, sl)
}
return out
}

var scopeLogsMapPool = sync.Pool{
New: func() any {
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
},
}

func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
for _, r := range records {
rKey := res.Equivalent()
scope := r.InstrumentationScope()
sl, ok := (*dst)[scope]
if !ok {
k := key{
r: rKey,
is: scope,
}
sl, iOk := scopeMap[k]
if !iOk {
sl = new(lpb.ScopeLogs)
var emptyScope instrumentation.Scope
if scope != emptyScope {
Expand All @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
}
sl.SchemaUrl = scope.SchemaURL
}
(*dst)[scope] = sl
scopeMap[k] = sl
}

sl.LogRecords = append(sl.LogRecords, LogRecord(r))
rl, rOk := resMap[rKey]
if !rOk {
resources++
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
resMap[rKey] = rl
}
if !iOk {
rl.ScopeLogs = append(rl.ScopeLogs, sl)
}
}

// Transform the categorized map into a slice
resLogs := make([]*lpb.ResourceLogs, 0, resources)
for _, rl := range resMap {
resLogs = append(resLogs, rl)
}

return resLogs
}

// LogRecord returns an OTLP LogRecord generated from record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
lpb "go.opentelemetry.io/proto/otlp/logs/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"

"go.opentelemetry.io/otel/attribute"
api "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
Expand Down Expand Up @@ -226,6 +227,81 @@ func TestResourceLogs(t *testing.T) {
assert.Equal(t, want, ResourceLogs(records))
}

func TestResourceLogsPerResource(t *testing.T) {
XSAM marked this conversation as resolved.
Show resolved Hide resolved
res1 := resource.NewWithAttributes(
semconv.SchemaURL,
attribute.KeyValue{
Key: "service.name",
Value: attribute.StringValue("service1"),
},
)
res2 := resource.NewWithAttributes(
semconv.SchemaURL,
attribute.KeyValue{
Key: "service.name",
Value: attribute.StringValue("service2"),
},
)

scope := instrumentation.Scope{
Name: "test/code/path1",
Version: "v0.2.0",
SchemaURL: semconv.SchemaURL,
}
records = func() []log.Record {
var out []log.Record

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Body: bodyA,
Attributes: []api.KeyValue{alice},
Resource: res1,
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Body: bodyB,
Attributes: []api.KeyValue{bob},
Resource: res1,
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Body: bodyB,
Attributes: []api.KeyValue{bob},
Resource: res2,
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Body: bodyB,
Attributes: []api.KeyValue{alice},
Resource: res2,
InstrumentationScope: &scope,
}.NewRecord())
pellared marked this conversation as resolved.
Show resolved Hide resolved

return out
}()

rLogs := ResourceLogs(records)
assert.Equal(t, 2, len(rLogs))

for _, r := range rLogs {
if r.Resource.Attributes[0].Value.GetStringValue() == "service1" {
assert.Equal(t, 1, len(r.ScopeLogs))
assert.Equal(t, 2, len(r.ScopeLogs[0].LogRecords))
} else {
assert.Equal(t, 2, len(r.ScopeLogs))
assert.Equal(t, 1, len(r.ScopeLogs[0].LogRecords))
assert.Equal(t, 1, len(r.ScopeLogs[1].LogRecords))
}
}
}

func TestSeverityNumber(t *testing.T) {
for i := 0; i <= int(api.SeverityFatal4); i++ {
want := lpb.SeverityNumber(i)
Expand Down
99 changes: 38 additions & 61 deletions exporters/otlp/otlplog/otlploghttp/internal/transform/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/transform"

import (
"sync"
"time"

cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand All @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
return nil
}

resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
defer func() {
clear(resMap)
resourceLogsMapPool.Put(resMap)
}()
resourceLogsMap(&resMap, records)
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)

out := make([]*lpb.ResourceLogs, 0, len(resMap))
for _, rl := range resMap {
out = append(out, rl)
type key struct {
r attribute.Distinct
is instrumentation.Scope
}
return out
}

var resourceLogsMapPool = sync.Pool{
New: func() any {
return make(map[attribute.Distinct]*lpb.ResourceLogs)
},
}
scopeMap := make(map[key]*lpb.ScopeLogs)

func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
var resources int
for _, r := range records {
res := r.Resource()
rl, ok := (*dst)[res.Equivalent()]
if !ok {
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
(*dst)[res.Equivalent()] = rl
}
rl.ScopeLogs = ScopeLogs(records)
}
}

// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
defer func() {
clear(scopeMap)
scopeLogsMapPool.Put(scopeMap)
}()
scopeLogsMap(&scopeMap, records)

out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
for _, sl := range scopeMap {
out = append(out, sl)
}
return out
}

var scopeLogsMapPool = sync.Pool{
New: func() any {
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
},
}

func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
for _, r := range records {
rKey := res.Equivalent()
scope := r.InstrumentationScope()
sl, ok := (*dst)[scope]
if !ok {
k := key{
r: rKey,
is: scope,
}
sl, iOk := scopeMap[k]
if !iOk {
sl = new(lpb.ScopeLogs)
var emptyScope instrumentation.Scope
if scope != emptyScope {
Expand All @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
}
sl.SchemaUrl = scope.SchemaURL
}
(*dst)[scope] = sl
scopeMap[k] = sl
}

sl.LogRecords = append(sl.LogRecords, LogRecord(r))
rl, rOk := resMap[rKey]
if !rOk {
resources++
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
resMap[rKey] = rl
}
if !iOk {
rl.ScopeLogs = append(rl.ScopeLogs, sl)
}
}

// Transform the categorized map into a slice
resLogs := make([]*lpb.ResourceLogs, 0, resources)
for _, rl := range resMap {
resLogs = append(resLogs, rl)
}

return resLogs
}

// LogRecord returns an OTLP LogRecord generated from record.
Expand Down
Loading
Loading