-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
78 lines (68 loc) · 2.26 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package oteljsonlogflattenerprocessor
import (
"context"
"encoding/json"
"reflect"
"github.com/jupiterone/oteljsonlogflattenerprocessor/internal/errwrap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
)
var _ processor.Logs = (*jsonlogflattenerProcessor)(nil)
type jsonlogflattenerProcessor struct {
config *Config
next consumer.Logs
logger *zap.Logger
}
func newProcessor(config *Config, next consumer.Logs, logger *zap.Logger) (*jsonlogflattenerProcessor, error) {
return &jsonlogflattenerProcessor{
config: config,
logger: logger,
next: next,
}, nil
}
func (jlp *jsonlogflattenerProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
func (jlp *jsonlogflattenerProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if err := jlp.flatten(&ld); err != nil {
jlp.logger.Error(err.Error())
}
return jlp.next.ConsumeLogs(ctx, ld)
}
func (jlp *jsonlogflattenerProcessor) Start(_ context.Context, host component.Host) error {
return nil
}
func (jlp *jsonlogflattenerProcessor) Shutdown(context.Context) error {
return nil
}
func (jlp *jsonlogflattenerProcessor) flatten(ld *plog.Logs) error {
defer func() {
// if we paniced for some reason, recover from the panic and log
// the error message so we don't break the collector workflow
if r := recover(); r != nil {
jlp.logger.Error(r.(error).Error())
}
}()
var rootErr error
for i := 0; i < ld.ResourceLogs().Len(); i++ {
for j := 0; j < ld.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
for k := 0; k < ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().Len(); k++ {
for l, m := range ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().At(k).Attributes().AsRaw() {
if m != nil && (reflect.TypeOf(m).Kind() == reflect.Map || reflect.TypeOf(m).Kind() == reflect.Slice) {
payload, err := json.Marshal(m)
if err != nil {
jlp.logger.Error(err.Error())
rootErr = errwrap.NewErrWrap(rootErr, err)
continue
}
ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().At(k).Attributes().PutStr(l, string(payload))
}
}
}
}
}
return rootErr
}