Skip to content

Commit

Permalink
Add attribute mapping function (GoogleCloudPlatform#324)
Browse files Browse the repository at this point in the history
* add attribute mapping function and collector configuration

Co-authored-by: Punya Biswal <[email protected]>
  • Loading branch information
dashpole and punya authored Mar 10, 2022
1 parent d88605e commit 246dad2
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 37 deletions.
30 changes: 30 additions & 0 deletions exporter/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package collector

import (
"fmt"
"time"

"google.golang.org/api/option"
Expand Down Expand Up @@ -48,6 +49,18 @@ type ClientConfig struct {

type TraceConfig struct {
ClientConfig ClientConfig `mapstructure:",squash"`
// AttributeMappings determines how to map from OpenTelemetry attribute
// keys to Google Cloud Trace keys. By default, it changes http and
// service keys so that they appear more prominently in the UI.
AttributeMappings []AttributeMapping `mapstructure:"attribute_mappings"`
}

// AttributeMapping maps from an OpenTelemetry key to a Google Cloud Trace key.
type AttributeMapping struct {
// Key is the OpenTelemetry attribute key
Key string `mapstructure:"key"`
// Replacement is the attribute sent to Google Cloud Trace
Replacement string `mapstructure:"replacement"`
}

type MetricConfig struct {
Expand Down Expand Up @@ -103,3 +116,20 @@ func DefaultConfig() Config {
},
}
}

// ValidateConfig returns an error if the provided configuration is invalid
func ValidateConfig(cfg Config) error {
seenKeys := make(map[string]struct{}, len(cfg.TraceConfig.AttributeMappings))
seenReplacements := make(map[string]struct{}, len(cfg.TraceConfig.AttributeMappings))
for _, mapping := range cfg.TraceConfig.AttributeMappings {
if _, ok := seenKeys[mapping.Key]; ok {
return fmt.Errorf("duplicate key in traces.attribute_mappings: %q", mapping.Key)
}
seenKeys[mapping.Key] = struct{}{}
if _, ok := seenReplacements[mapping.Replacement]; ok {
return fmt.Errorf("duplicate replacement in traces.attribute_mappings: %q", mapping.Replacement)
}
seenReplacements[mapping.Replacement] = struct{}{}
}
return nil
}
77 changes: 77 additions & 0 deletions exporter/collector/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import "testing"

func TestValidateConfig(t *testing.T) {
for _, tc := range []struct {
desc string
input Config
expectedErr bool
}{
{
desc: "Empty",
input: Config{},
},
{
desc: "Default",
input: DefaultConfig(),
},
{
desc: "Duplicate attribute keys",
input: Config{
TraceConfig: TraceConfig{
AttributeMappings: []AttributeMapping{
{
Key: "foo",
Replacement: "bar",
},
{
Key: "foo",
Replacement: "baz",
},
},
},
},
expectedErr: true,
},
{
desc: "Duplicate attribute replacements",
input: Config{
TraceConfig: TraceConfig{
AttributeMappings: []AttributeMapping{
{
Key: "key1",
Replacement: "same",
},
{
Key: "key2",
Replacement: "same",
},
},
},
},
expectedErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateConfig(tc.input)
if (err != nil && !tc.expectedErr) || (err == nil && tc.expectedErr) {
t.Errorf("ValidateConfig(%v) = %v; want no error", tc.input, err)
}
})
}
}
20 changes: 20 additions & 0 deletions exporter/collector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
Expand Down Expand Up @@ -104,6 +105,9 @@ func NewGoogleCloudTracesExporter(ctx context.Context, cfg Config, version strin
cloudtrace.WithProjectID(cfg.ProjectID),
cloudtrace.WithTimeout(timeout),
}
if cfg.TraceConfig.AttributeMappings != nil {
topts = append(topts, cloudtrace.WithAttributeMapping(mappingFuncFromAKM(cfg.TraceConfig.AttributeMappings)))
}

copts, err := generateClientOptions(&cfg.TraceConfig.ClientConfig, cfg.UserAgent)
if err != nil {
Expand All @@ -119,6 +123,22 @@ func NewGoogleCloudTracesExporter(ctx context.Context, cfg Config, version strin
return &TraceExporter{texporter: exp}, nil
}

func mappingFuncFromAKM(akm []AttributeMapping) func(attribute.Key) attribute.Key {
// convert list to map for easy lookups
mapFromConfig := make(map[string]string, len(akm))
for _, mapping := range akm {
mapFromConfig[mapping.Key] = mapping.Replacement
}
return func(input attribute.Key) attribute.Key {
// if a replacement was specified in the config, use it.
if replacement, ok := mapFromConfig[string(input)]; ok {
return attribute.Key(replacement)
}
// otherwise, leave the attribute as-is
return input
}
}

// PushTraces calls texporter.ExportSpan for each span in the given traces
func (te *TraceExporter) PushTraces(ctx context.Context, td pdata.Traces) error {
resourceSpans := td.ResourceSpans()
Expand Down
25 changes: 22 additions & 3 deletions exporter/collector/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ func (ts *testServer) CreateSpan(context.Context, *cloudtracepb.Span) (*cloudtra

func TestGoogleCloudTraceExport(t *testing.T) {
type testCase struct {
name string
cfg Config
expectedErr string
name string
cfg Config
expectedErr string
expectedServiceKey string
}

testCases := []testCase{
Expand All @@ -74,6 +75,21 @@ func TestGoogleCloudTraceExport(t *testing.T) {
},
},
},
expectedServiceKey: "g.co/gae/app/module",
},
{
name: "With Empty Mapping",
cfg: Config{
ProjectID: "idk",
TraceConfig: TraceConfig{
ClientConfig: ClientConfig{
Endpoint: "127.0.0.1:8080",
UseInsecure: true,
},
AttributeMappings: []AttributeMapping{},
},
},
expectedServiceKey: "service.name",
},
}

Expand Down Expand Up @@ -108,12 +124,15 @@ func TestGoogleCloudTraceExport(t *testing.T) {
span := ispans.Spans().AppendEmpty()
span.SetName(spanName)
span.SetStartTimestamp(pdata.NewTimestampFromTime(testTime))
span.Attributes().InsertString("service.name", "myservice")
err = sde.PushTraces(ctx, traces)
assert.NoError(t, err)

r := <-reqCh
assert.Len(t, r.Spans, 1)
assert.Equal(t, spanName, r.Spans[0].GetDisplayName().Value)
_, ok := r.Spans[0].GetAttributes().GetAttributeMap()[test.expectedServiceKey]
assert.True(t, ok)
assert.Equal(t, timestamppb.New(testTime), r.Spans[0].StartTime)
})
}
Expand Down
21 changes: 20 additions & 1 deletion exporter/trace/cloudtrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type options struct {

// Timeout for all API calls. If not set, defaults to 5 seconds.
Timeout time.Duration

// mapAttribute maps otel attribute keys to cloud trace attribute keys
mapAttribute AttributeMapping
}

// WithProjectID sets Google Cloud Platform project as projectID.
Expand Down Expand Up @@ -127,6 +130,19 @@ func WithTimeout(t time.Duration) func(o *options) {
}
}

// AttributeMapping determines how to map from OpenTelemetry span attribute keys to
// cloud trace attribute keys.
type AttributeMapping func(attribute.Key) attribute.Key

// WithAttributeMapping configures how to map OpenTelemetry span attributes
// to google cloud trace span attributes. By default, it maps to attributes
// that are used prominently in the trace UI.
func WithAttributeMapping(mapping AttributeMapping) func(o *options) {
return func(o *options) {
o.mapAttribute = mapping
}
}

func (o *options) handleError(err error) {
if o.errorHandler != nil {
o.errorHandler.Handle(err)
Expand All @@ -148,7 +164,10 @@ type Exporter struct {

// New creates a new Exporter thats implements trace.Exporter.
func New(opts ...Option) (*Exporter, error) {
o := options{Context: context.Background()}
o := options{
Context: context.Background(),
mapAttribute: defaultAttributeMapping,
}
for _, opt := range opts {
opt(&o)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *traceExporter) ExportSpans(ctx context.Context, spanData []sdktrace.Rea

// ConvertSpan converts a ReadOnlySpan to Stackdriver Trace.
func (e *traceExporter) ConvertSpan(_ context.Context, sd sdktrace.ReadOnlySpan) *tracepb.Span {
return protoFromReadOnlySpan(sd, e.projectID)
return e.protoFromReadOnlySpan(sd, e.projectID)
}

func (e *traceExporter) Shutdown(ctx context.Context) error {
Expand Down
64 changes: 35 additions & 29 deletions exporter/trace/trace_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func attributeWithLabelsFromResources(sd sdktrace.ReadOnlySpan) []attribute.KeyV
return attributes
}

func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.Span {
func (e *traceExporter) protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.Span {
if s == nil {
return nil
}
Expand Down Expand Up @@ -118,20 +118,20 @@ func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.S
}

attributes := attributeWithLabelsFromResources(s)
copyAttributes(&sp.Attributes, attributes)
e.copyAttributes(&sp.Attributes, attributes)
// NOTE(ymotongpoo): omitting copyMonitoringReesourceAttributes()

var annotations, droppedAnnotationsCount int
es := s.Events()
for i, e := range es {
for i, ev := range es {
if annotations >= maxAnnotationEventsPerSpan {
droppedAnnotationsCount = len(es) - i
break
}
annotation := &tracepb.Span_TimeEvent_Annotation{Description: trunc(e.Name, maxAttributeStringValue)}
copyAttributes(&annotation.Attributes, e.Attributes)
annotation := &tracepb.Span_TimeEvent_Annotation{Description: trunc(ev.Name, maxAttributeStringValue)}
e.copyAttributes(&annotation.Attributes, ev.Attributes)
event := &tracepb.Span_TimeEvent{
Time: timestampProto(e.Time),
Time: timestampProto(ev.Time),
Value: &tracepb.Span_TimeEvent_Annotation_{Annotation: annotation},
}
annotations++
Expand Down Expand Up @@ -171,15 +171,15 @@ func protoFromReadOnlySpan(s sdktrace.ReadOnlySpan, projectID string) *tracepb.S
sp.TimeEvents.DroppedAnnotationsCount = clip32(droppedAnnotationsCount)
}

sp.Links = linksProtoFromLinks(s.Links())
sp.Links = e.linksProtoFromLinks(s.Links())

return sp
}

// Converts OTel span links to Cloud Trace links proto in order. If there are
// more than maxNumLinks links, the first maxNumLinks will be taken and the rest
// dropped.
func linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links {
func (e *traceExporter) linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links {
numLinks := len(links)
if numLinks == 0 {
return nil
Expand All @@ -197,7 +197,7 @@ func linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links {
SpanId: link.SpanContext.SpanID().String(),
Type: tracepb.Span_Link_TYPE_UNSPECIFIED,
}
copyAttributes(&linkPb.Attributes, link.Attributes)
e.copyAttributes(&linkPb.Attributes, link.Attributes)
linksPb.Link = append(linksPb.Link, linkPb)
}
linksPb.DroppedLinksCount = clip32(numLinks - numLinksToKeep)
Expand All @@ -215,7 +215,7 @@ func timestampProto(t time.Time) *timestamppb.Timestamp {

// copyAttributes copies a map of attributes to a proto map field.
// It creates the map if it is nil.
func copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) {
func (e *traceExporter) copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) {
if len(in) == 0 {
return
}
Expand All @@ -231,30 +231,36 @@ func copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) {
if av == nil {
continue
}
switch kv.Key {
case pathAttribute:
(*out).AttributeMap[labelHTTPPath] = av
case hostAttribute:
(*out).AttributeMap[labelHTTPHost] = av
case methodAttribute:
(*out).AttributeMap[labelHTTPMethod] = av
case userAgentAttribute:
(*out).AttributeMap[labelHTTPUserAgent] = av
case statusCodeAttribute:
(*out).AttributeMap[labelHTTPStatusCode] = av
case serviceAttribute:
(*out).AttributeMap[labelService] = av
default:
if len(kv.Key) > 128 {
dropped++
continue
}
(*out).AttributeMap[string(kv.Key)] = av
key := e.o.mapAttribute(kv.Key)
if len(key) > 128 {
dropped++
continue
}
(*out).AttributeMap[string(key)] = av
}
(*out).DroppedAttributesCount = dropped
}

// defaultAttributeMapping maps attributes to trace attributes which are
// used by cloud trace for prominent UI functions, and keeps all others.
func defaultAttributeMapping(k attribute.Key) attribute.Key {
switch k {
case pathAttribute:
return labelHTTPPath
case hostAttribute:
return labelHTTPHost
case methodAttribute:
return labelHTTPMethod
case userAgentAttribute:
return labelHTTPUserAgent
case statusCodeAttribute:
return labelHTTPStatusCode
case serviceAttribute:
return labelService
}
return k
}

func attributeValue(keyValue attribute.KeyValue) *tracepb.AttributeValue {
v := keyValue.Value
switch v.Type() {
Expand Down
Loading

0 comments on commit 246dad2

Please sign in to comment.