diff --git a/pkg/api/encode_otlp.go b/pkg/api/encode_otlp.go index 0a9fa8ba8..a2acc36b7 100644 --- a/pkg/api/encode_otlp.go +++ b/pkg/api/encode_otlp.go @@ -23,6 +23,7 @@ type EncodeOtlpLogs struct { type EncodeOtlpTraces struct { *OtlpConnectionInfo + SpanSplitter []string `yaml:"spanSplitter,omitempty" json:"spanSplitter,omitempty" doc:"separate span for each prefix listed"` } type EncodeOtlpMetrics struct { diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go index 54efdfd57..f74f4e770 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go @@ -19,16 +19,15 @@ package opentelemetry import ( "context" + "strings" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" ) const ( @@ -37,20 +36,52 @@ const ( ) type EncodeOtlpTrace struct { - cfg api.EncodeOtlpTraces - ctx context.Context - res *resource.Resource - tp *sdktrace.TracerProvider - tracer trace.Tracer + cfg api.EncodeOtlpTraces + ctx context.Context + res *resource.Resource + tp *sdktrace.TracerProvider } // Encode encodes a metric to be exported -func (e *EncodeOtlpTrace) Encode(metricRecord config.GenericMap) { - log.Tracef("entering EncodeOtlpTrace. entry = %v", metricRecord) - _, span := e.tracer.Start(e.ctx, flpEncodeSpanName) - defer span.End() - attributes := obtainAttributesFromEntry(metricRecord) - span.SetAttributes(*attributes...) +func (e *EncodeOtlpTrace) Encode(entry config.GenericMap) { + log.Tracef("entering EncodeOtlpTrace. entry = %v", entry) + tr := e.tp.Tracer(flpTracerName) + ll := len(e.cfg.SpanSplitter) + newCtx, span0 := tr.Start(e.ctx, flpEncodeSpanName) + attributes := obtainAttributesFromEntry(entry) + span0.SetAttributes(*attributes...) + span0.End() + if ll == 0 { + return + } + // for each item in SpanSplitter, make a separate entry for each listed item + // do not include fields that belong exclusively to other items + ss := e.cfg.SpanSplitter + records := make([]config.GenericMap, ll) + for i := 0; i < ll; i++ { + records[i] = make(config.GenericMap) + } +OUTER: + for key, value := range entry { + for i := 0; i < ll; i++ { + if strings.HasPrefix(key, ss[i]) { + trimmed := strings.TrimPrefix(key, ss[i]) + records[i][trimmed] = value + continue OUTER + } + } + // if we reach here, the field did not have any of the prefixes. + // copy it into each of the records + for i := 0; i < ll; i++ { + records[i][key] = value + } + } + for i := 0; i < ll; i++ { + _, span := tr.Start(newCtx, ss[i]) + attributes := obtainAttributesFromEntry(records[i]) + span.SetAttributes(*attributes...) + span.End() + } } func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { @@ -63,7 +94,6 @@ func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StagePara ctx := context.Background() res := newResource() - tracer := otel.Tracer(flpTracerName) tp, err := NewOtlpTracerProvider(ctx, params, res) if err != nil { @@ -71,11 +101,10 @@ func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StagePara } w := &EncodeOtlpTrace{ - cfg: cfg, - ctx: ctx, - res: res, - tp: tp, - tracer: tracer, + cfg: cfg, + ctx: ctx, + res: res, + tp: tp, } return w, nil }