diff --git a/exporter/collector/config.go b/exporter/collector/config.go index 6f8b8bfc3..51ef79a51 100644 --- a/exporter/collector/config.go +++ b/exporter/collector/config.go @@ -15,6 +15,7 @@ package collector import ( + "fmt" "time" "google.golang.org/api/option" @@ -48,6 +49,18 @@ type ClientConfig struct { type TraceConfig struct { ClientConfig ClientConfig `mapstructure:",squash"` + // AttributeMappings determines how to map from OpenTelemetry attribute + // keys to Google Cloud Trace keys. By default, it changes http and + // service keys so that they appear more prominently in the UI. + AttributeMappings []AttributeMapping `mapstructure:"attribute_mappings"` +} + +// AttributeMapping maps from an OpenTelemetry key to a Google Cloud Trace key. +type AttributeMapping struct { + // Key is the OpenTelemetry attribute key + Key string `mapstructure:"key"` + // Replacement is the attribute sent to Google Cloud Trace + Replacement string `mapstructure:"replacement"` } type MetricConfig struct { @@ -103,3 +116,20 @@ func DefaultConfig() Config { }, } } + +// ValidateConfig returns an error if the provided configuration is invalid +func ValidateConfig(cfg Config) error { + seenKeys := make(map[string]struct{}, len(cfg.TraceConfig.AttributeMappings)) + seenReplacements := make(map[string]struct{}, len(cfg.TraceConfig.AttributeMappings)) + for _, mapping := range cfg.TraceConfig.AttributeMappings { + if _, ok := seenKeys[mapping.Key]; ok { + return fmt.Errorf("duplicate key in traces.attribute_mappings: %q", mapping.Key) + } + seenKeys[mapping.Key] = struct{}{} + if _, ok := seenReplacements[mapping.Replacement]; ok { + return fmt.Errorf("duplicate replacement in traces.attribute_mappings: %q", mapping.Replacement) + } + seenReplacements[mapping.Replacement] = struct{}{} + } + return nil +} diff --git a/exporter/collector/config_test.go b/exporter/collector/config_test.go new file mode 100644 index 000000000..fdd60af8e --- /dev/null +++ b/exporter/collector/config_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import "testing" + +func TestValidateConfig(t *testing.T) { + for _, tc := range []struct { + desc string + input Config + expectedErr bool + }{ + { + desc: "Empty", + input: Config{}, + }, + { + desc: "Default", + input: DefaultConfig(), + }, + { + desc: "Duplicate attribute keys", + input: Config{ + TraceConfig: TraceConfig{ + AttributeMappings: []AttributeMapping{ + { + Key: "foo", + Replacement: "bar", + }, + { + Key: "foo", + Replacement: "baz", + }, + }, + }, + }, + expectedErr: true, + }, + { + desc: "Duplicate attribute replacements", + input: Config{ + TraceConfig: TraceConfig{ + AttributeMappings: []AttributeMapping{ + { + Key: "key1", + Replacement: "same", + }, + { + Key: "key2", + Replacement: "same", + }, + }, + }, + }, + expectedErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := ValidateConfig(tc.input) + if (err != nil && !tc.expectedErr) || (err == nil && tc.expectedErr) { + t.Errorf("ValidateConfig(%v) = %v; want no error", tc.input, err) + } + }) + } +} diff --git a/exporter/collector/traces.go b/exporter/collector/traces.go index 84b39559c..a626de73f 100644 --- a/exporter/collector/traces.go +++ b/exporter/collector/traces.go @@ -27,6 +27,7 @@ import ( "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/oauth2/google" "google.golang.org/api/option" @@ -104,6 +105,9 @@ func NewGoogleCloudTracesExporter(ctx context.Context, cfg Config, version strin cloudtrace.WithProjectID(cfg.ProjectID), cloudtrace.WithTimeout(timeout), } + if cfg.TraceConfig.AttributeMappings != nil { + topts = append(topts, cloudtrace.WithAttributeMapping(mappingFuncFromAKM(cfg.TraceConfig.AttributeMappings))) + } copts, err := generateClientOptions(&cfg.TraceConfig.ClientConfig, cfg.UserAgent) if err != nil { @@ -119,6 +123,22 @@ func NewGoogleCloudTracesExporter(ctx context.Context, cfg Config, version strin return &TraceExporter{texporter: exp}, nil } +func mappingFuncFromAKM(akm []AttributeMapping) func(attribute.Key) attribute.Key { + // convert list to map for easy lookups + mapFromConfig := make(map[string]string, len(akm)) + for _, mapping := range akm { + mapFromConfig[mapping.Key] = mapping.Replacement + } + return func(input attribute.Key) attribute.Key { + // if a replacement was specified in the config, use it. + if replacement, ok := mapFromConfig[string(input)]; ok { + return attribute.Key(replacement) + } + // otherwise, leave the attribute as-is + return input + } +} + // PushTraces calls texporter.ExportSpan for each span in the given traces func (te *TraceExporter) PushTraces(ctx context.Context, td pdata.Traces) error { resourceSpans := td.ResourceSpans() diff --git a/exporter/collector/traces_test.go b/exporter/collector/traces_test.go index b38d219fd..35f85691a 100644 --- a/exporter/collector/traces_test.go +++ b/exporter/collector/traces_test.go @@ -57,9 +57,10 @@ func (ts *testServer) CreateSpan(context.Context, *cloudtracepb.Span) (*cloudtra func TestGoogleCloudTraceExport(t *testing.T) { type testCase struct { - name string - cfg Config - expectedErr string + name string + cfg Config + expectedErr string + expectedServiceKey string } testCases := []testCase{ @@ -74,6 +75,21 @@ func TestGoogleCloudTraceExport(t *testing.T) { }, }, }, + expectedServiceKey: "g.co/gae/app/module", + }, + { + name: "With Empty Mapping", + cfg: Config{ + ProjectID: "idk", + TraceConfig: TraceConfig{ + ClientConfig: ClientConfig{ + Endpoint: "127.0.0.1:8080", + UseInsecure: true, + }, + AttributeMappings: []AttributeMapping{}, + }, + }, + expectedServiceKey: "service.name", }, } @@ -108,12 +124,15 @@ func TestGoogleCloudTraceExport(t *testing.T) { span := ispans.Spans().AppendEmpty() span.SetName(spanName) span.SetStartTimestamp(pdata.NewTimestampFromTime(testTime)) + span.Attributes().InsertString("service.name", "myservice") err = sde.PushTraces(ctx, traces) assert.NoError(t, err) r := <-reqCh assert.Len(t, r.Spans, 1) assert.Equal(t, spanName, r.Spans[0].GetDisplayName().Value) + _, ok := r.Spans[0].GetAttributes().GetAttributeMap()[test.expectedServiceKey] + assert.True(t, ok) assert.Equal(t, timestamppb.New(testTime), r.Spans[0].StartTime) }) } diff --git a/exporter/trace/cloudtrace.go b/exporter/trace/cloudtrace.go index 051c96603..3a77f0a0f 100644 --- a/exporter/trace/cloudtrace.go +++ b/exporter/trace/cloudtrace.go @@ -83,6 +83,9 @@ type options struct { // Timeout for all API calls. If not set, defaults to 5 seconds. Timeout time.Duration + + // mapAttribute maps otel attribute keys to cloud trace attribute keys + mapAttribute AttributeMapping } // WithProjectID sets Google Cloud Platform project as projectID. @@ -127,6 +130,19 @@ func WithTimeout(t time.Duration) func(o *options) { } } +// AttributeMapping determines how to map from OpenTelemetry span attribute keys to +// cloud trace attribute keys. +type AttributeMapping func(attribute.Key) attribute.Key + +// WithAttributeMapping configures how to map OpenTelemetry span attributes +// to google cloud trace span attributes. By default, it maps to attributes +// that are used prominently in the trace UI. +func WithAttributeMapping(mapping AttributeMapping) func(o *options) { + return func(o *options) { + o.mapAttribute = mapping + } +} + func (o *options) handleError(err error) { if o.errorHandler != nil { o.errorHandler.Handle(err) @@ -148,7 +164,10 @@ type Exporter struct { // New creates a new Exporter thats implements trace.Exporter. func New(opts ...Option) (*Exporter, error) { - o := options{Context: context.Background()} + o := options{ + Context: context.Background(), + mapAttribute: defaultAttributeMapping, + } for _, opt := range opts { opt(&o) } diff --git a/exporter/trace/trace.go b/exporter/trace/trace.go index e9b007c12..f8a9eb7e1 100644 --- a/exporter/trace/trace.go +++ b/exporter/trace/trace.go @@ -65,7 +65,7 @@ func (e *traceExporter) ExportSpans(ctx context.Context, spanData []sdktrace.Rea // ConvertSpan converts a ReadOnlySpan to Stackdriver Trace. func (e *traceExporter) ConvertSpan(_ context.Context, sd sdktrace.ReadOnlySpan) *tracepb.Span { - return protoFromReadOnlySpan(sd, e.projectID) + return e.protoFromReadOnlySpan(sd, e.projectID) } func (e *traceExporter) Shutdown(ctx context.Context) error { diff --git a/exporter/trace/trace_proto.go b/exporter/trace/trace_proto.go index 214b7e2c9..76fbb4881 100644 --- a/exporter/trace/trace_proto.go +++ b/exporter/trace/trace_proto.go @@ -87,7 +87,7 @@ func attributeWithLabelsFromResources(sd sdktrace.ReadOnlySpan) []attribute.KeyV return attributes } -func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.Span { +func (e *traceExporter) protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.Span { if s == nil { return nil } @@ -118,20 +118,20 @@ func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.S } attributes := attributeWithLabelsFromResources(s) - copyAttributes(&sp.Attributes, attributes) + e.copyAttributes(&sp.Attributes, attributes) // NOTE(ymotongpoo): omitting copyMonitoringReesourceAttributes() var annotations, droppedAnnotationsCount int es := s.Events() - for i, e := range es { + for i, ev := range es { if annotations >= maxAnnotationEventsPerSpan { droppedAnnotationsCount = len(es) - i break } - annotation := &tracepb.Span_TimeEvent_Annotation{Description: trunc(e.Name, maxAttributeStringValue)} - copyAttributes(&annotation.Attributes, e.Attributes) + annotation := &tracepb.Span_TimeEvent_Annotation{Description: trunc(ev.Name, maxAttributeStringValue)} + e.copyAttributes(&annotation.Attributes, ev.Attributes) event := &tracepb.Span_TimeEvent{ - Time: timestampProto(e.Time), + Time: timestampProto(ev.Time), Value: &tracepb.Span_TimeEvent_Annotation_{Annotation: annotation}, } annotations++ @@ -171,7 +171,7 @@ func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.S sp.TimeEvents.DroppedAnnotationsCount = clip32(droppedAnnotationsCount) } - sp.Links = linksProtoFromLinks(s.Links()) + sp.Links = e.linksProtoFromLinks(s.Links()) return sp } @@ -179,7 +179,7 @@ func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.S // Converts OTel span links to Cloud Trace links proto in order. If there are // more than maxNumLinks links, the first maxNumLinks will be taken and the rest // dropped. -func linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links { +func (e *traceExporter) linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links { numLinks := len(links) if numLinks == 0 { return nil @@ -197,7 +197,7 @@ func linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links { SpanId: link.SpanContext.SpanID().String(), Type: tracepb.Span_Link_TYPE_UNSPECIFIED, } - copyAttributes(&linkPb.Attributes, link.Attributes) + e.copyAttributes(&linkPb.Attributes, link.Attributes) linksPb.Link = append(linksPb.Link, linkPb) } linksPb.DroppedLinksCount = clip32(numLinks - numLinksToKeep) @@ -215,7 +215,7 @@ func timestampProto(t time.Time) *timestamppb.Timestamp { // copyAttributes copies a map of attributes to a proto map field. // It creates the map if it is nil. -func copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) { +func (e *traceExporter) copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) { if len(in) == 0 { return } @@ -231,30 +231,36 @@ func copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) { if av == nil { continue } - switch kv.Key { - case pathAttribute: - (*out).AttributeMap[labelHTTPPath] = av - case hostAttribute: - (*out).AttributeMap[labelHTTPHost] = av - case methodAttribute: - (*out).AttributeMap[labelHTTPMethod] = av - case userAgentAttribute: - (*out).AttributeMap[labelHTTPUserAgent] = av - case statusCodeAttribute: - (*out).AttributeMap[labelHTTPStatusCode] = av - case serviceAttribute: - (*out).AttributeMap[labelService] = av - default: - if len(kv.Key) > 128 { - dropped++ - continue - } - (*out).AttributeMap[string(kv.Key)] = av + key := e.o.mapAttribute(kv.Key) + if len(key) > 128 { + dropped++ + continue } + (*out).AttributeMap[string(key)] = av } (*out).DroppedAttributesCount = dropped } +// defaultAttributeMapping maps attributes to trace attributes which are +// used by cloud trace for prominent UI functions, and keeps all others. +func defaultAttributeMapping(k attribute.Key) attribute.Key { + switch k { + case pathAttribute: + return labelHTTPPath + case hostAttribute: + return labelHTTPHost + case methodAttribute: + return labelHTTPMethod + case userAgentAttribute: + return labelHTTPUserAgent + case statusCodeAttribute: + return labelHTTPStatusCode + case serviceAttribute: + return labelService + } + return k +} + func attributeValue(keyValue attribute.KeyValue) *tracepb.AttributeValue { v := keyValue.Value switch v.Type() { diff --git a/exporter/trace/trace_proto_test.go b/exporter/trace/trace_proto_test.go index 66b2d2599..a624f26bd 100644 --- a/exporter/trace/trace_proto_test.go +++ b/exporter/trace/trace_proto_test.go @@ -25,6 +25,15 @@ import ( tracepb "google.golang.org/genproto/googleapis/devtools/cloudtrace/v2" ) +func testExporter() *traceExporter { + return &traceExporter{ + o: &options{ + Context: context.Background(), + mapAttribute: defaultAttributeMapping, + }, + } +} + func genSpanContext() trace.SpanContext { tracer := sdktrace.NewTracerProvider().Tracer("") _, span := tracer.Start(context.Background(), "") @@ -33,10 +42,12 @@ func genSpanContext() trace.SpanContext { func TestTraceProto_linksProtoFromLinks(t *testing.T) { t.Run("Should be nil when no links", func(t *testing.T) { - assert.Nil(t, linksProtoFromLinks([]sdktrace.Link{})) + e := testExporter() + assert.Nil(t, e.linksProtoFromLinks([]sdktrace.Link{})) }) t.Run("Can convert one link", func(t *testing.T) { + e := testExporter() spanContext := genSpanContext() link := sdktrace.Link{ SpanContext: spanContext, @@ -44,7 +55,7 @@ func TestTraceProto_linksProtoFromLinks(t *testing.T) { attribute.String("hello", "world"), }, } - linksPb := linksProtoFromLinks([]sdktrace.Link{link}) + linksPb := e.linksProtoFromLinks([]sdktrace.Link{link}) assert.NotNil(t, linksPb) assert.EqualValues(t, linksPb.DroppedLinksCount, 0) @@ -62,6 +73,7 @@ func TestTraceProto_linksProtoFromLinks(t *testing.T) { }) t.Run("Drops links when there are more than 128", func(t *testing.T) { + e := testExporter() var links []sdktrace.Link for i := 0; i < 148; i++ { links = append( @@ -73,7 +85,7 @@ func TestTraceProto_linksProtoFromLinks(t *testing.T) { }, }) } - linksPb := linksProtoFromLinks(links) + linksPb := e.linksProtoFromLinks(links) assert.NotNil(t, linksPb) assert.EqualValues(t, linksPb.DroppedLinksCount, 20) assert.Len(t, linksPb.Link, maxNumLinks)