From 41fa6ff846f18f907dee3f17bb743dae2553f99b Mon Sep 17 00:00:00 2001 From: Georg Pirklbauer Date: Tue, 6 Aug 2024 16:44:38 +0200 Subject: [PATCH] [config] Implement temporality preference option (#10796) #### Description adds temporality preference option for internal telemetry exported via OTLP #### Link to tracking issue (partly) fixes #10745 #### Testing unit tests #### Documentation Feature is already documented, but not implemented yet. --------- Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .chloggen/int-telemetry-otlp-options.yaml | 25 ++ service/internal/proctelemetry/config.go | 51 ++++ service/internal/proctelemetry/config_test.go | 246 ++++++++++++++++-- 3 files changed, 301 insertions(+), 21 deletions(-) create mode 100644 .chloggen/int-telemetry-otlp-options.yaml diff --git a/.chloggen/int-telemetry-otlp-options.yaml b/.chloggen/int-telemetry-otlp-options.yaml new file mode 100644 index 00000000000..008977eeb6f --- /dev/null +++ b/.chloggen/int-telemetry-otlp-options.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Implement the `temporality_preference` setting for internal telemetry exported via OTLP" + +# One or more tracking issues or pull requests related to the change +issues: [ 10745 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [ user ] diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index b2d7ae19cb7..14e533344d0 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/collector/processor/processorhelper" @@ -252,6 +253,18 @@ func initOTLPgRPCExporter(ctx context.Context, otlpConfig *config.OTLPMetric) (s if len(otlpConfig.Headers) > 0 { opts = append(opts, otlpmetricgrpc.WithHeaders(otlpConfig.Headers)) } + if otlpConfig.TemporalityPreference != nil { + switch *otlpConfig.TemporalityPreference { + case "delta": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceDelta)) + case "cumulative": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceCumulative)) + case "lowmemory": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceLowMemory)) + default: + return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) + } + } return otlpmetricgrpc.New(ctx, opts...) } @@ -289,6 +302,44 @@ func initOTLPHTTPExporter(ctx context.Context, otlpConfig *config.OTLPMetric) (s if len(otlpConfig.Headers) > 0 { opts = append(opts, otlpmetrichttp.WithHeaders(otlpConfig.Headers)) } + if otlpConfig.TemporalityPreference != nil { + switch *otlpConfig.TemporalityPreference { + case "delta": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceDelta)) + case "cumulative": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceCumulative)) + case "lowmemory": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceLowMemory)) + default: + return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) + } + } return otlpmetrichttp.New(ctx, opts...) } + +func temporalityPreferenceCumulative(_ sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality +} + +func temporalityPreferenceDelta(ik sdkmetric.InstrumentKind) metricdata.Temporality { + switch ik { + case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindObservableCounter, sdkmetric.InstrumentKindHistogram: + return metricdata.DeltaTemporality + case sdkmetric.InstrumentKindObservableUpDownCounter, sdkmetric.InstrumentKindUpDownCounter: + return metricdata.CumulativeTemporality + default: + return metricdata.DeltaTemporality + } +} + +func temporalityPreferenceLowMemory(ik sdkmetric.InstrumentKind) metricdata.Temporality { + switch ik { + case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram: + return metricdata.DeltaTemporality + case sdkmetric.InstrumentKindObservableCounter, sdkmetric.InstrumentKindObservableUpDownCounter, sdkmetric.InstrumentKindUpDownCounter: + return metricdata.CumulativeTemporality + default: + return metricdata.DeltaTemporality + } +} diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go index 5228b0ead44..d0560ac9c8c 100644 --- a/service/internal/proctelemetry/config_test.go +++ b/service/internal/proctelemetry/config_test.go @@ -7,10 +7,17 @@ import ( "context" "errors" "net/url" + "reflect" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/contrib/config" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" ) func strPtr(s string) *string { @@ -22,15 +29,28 @@ func intPtr(i int) *int { } func TestMetricReader(t *testing.T) { + consoleExporter, err := stdoutmetric.New( + stdoutmetric.WithPrettyPrint(), + ) + require.NoError(t, err) + ctx := context.Background() + otlpGRPCExporter, err := otlpmetricgrpc.New(ctx) + require.NoError(t, err) + otlpHTTPExporter, err := otlpmetrichttp.New(ctx) + require.NoError(t, err) + promExporter, err := otelprom.New() + require.NoError(t, err) + testCases := []struct { - name string - reader config.MetricReader - args any - err error + name string + reader config.MetricReader + args any + wantErr error + wantReader sdkmetric.Reader }{ { - name: "noreader", - err: errors.New("unsupported metric reader type { }"), + name: "noreader", + wantErr: errors.New("unsupported metric reader type { }"), }, { name: "pull prometheus invalid exporter", @@ -41,7 +61,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errNoValidMetricExporter, + wantErr: errNoValidMetricExporter, }, { name: "pull/prometheus-invalid-config-no-host", @@ -52,7 +72,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("host must be specified"), + wantErr: errors.New("host must be specified"), }, { name: "pull/prometheus-invalid-config-no-port", @@ -65,10 +85,10 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("port must be specified"), + wantErr: errors.New("port must be specified"), }, { - name: "pull/prometheus-invalid-config-no-port", + name: "pull/prometheus-valid", reader: config.MetricReader{ Pull: &config.PullMetricReader{ Exporter: config.MetricExporter{ @@ -79,6 +99,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: promExporter, }, { name: "periodic/invalid-exporter", @@ -92,14 +113,14 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errNoValidMetricExporter, + wantErr: errNoValidMetricExporter, }, { name: "periodic/no-exporter", reader: config.MetricReader{ Periodic: &config.PeriodicMetricReader{}, }, - err: errNoValidMetricExporter, + wantErr: errNoValidMetricExporter, }, { name: "periodic/console-exporter", @@ -110,6 +131,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(consoleExporter), }, { name: "periodic/console-exporter-with-timeout-interval", @@ -122,6 +144,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(consoleExporter), }, { name: "periodic/otlp-exporter-invalid-protocol", @@ -134,7 +157,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("unsupported protocol http/invalid"), + wantErr: errors.New("unsupported protocol http/invalid"), }, { name: "periodic/otlp-grpc-exporter-no-endpoint", @@ -152,6 +175,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), }, { name: "periodic/otlp-grpc-exporter", @@ -170,6 +194,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), }, { name: "periodic/otlp-grpc-exporter-no-scheme", @@ -188,6 +213,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), }, { name: "periodic/otlp-grpc-invalid-endpoint", @@ -206,7 +232,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, + wantErr: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, }, { name: "periodic/otlp-grpc-invalid-compression", @@ -225,7 +251,87 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("unsupported compression \"invalid\""), + wantErr: errors.New("unsupported compression \"invalid\""), + }, + { + name: "periodic/otlp-grpc-delta-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "grpc/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("delta"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-cumulative-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "grpc/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("cumulative"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-lowmemory-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "grpc/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("lowmemory"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-invalid-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "grpc/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("invalid"), + }, + }, + }, + }, + wantErr: errors.New("unsupported temporality preference \"invalid\""), }, { name: "periodic/otlp-http-exporter", @@ -244,6 +350,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), }, { name: "periodic/otlp-http-exporter-with-path", @@ -262,6 +369,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), }, { name: "periodic/otlp-http-exporter-no-endpoint", @@ -279,6 +387,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), }, { name: "periodic/otlp-http-exporter-no-scheme", @@ -297,6 +406,7 @@ func TestMetricReader(t *testing.T) { }, }, }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), }, { name: "periodic/otlp-http-invalid-endpoint", @@ -315,7 +425,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, + wantErr: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, }, { name: "periodic/otlp-http-invalid-compression", @@ -334,21 +444,115 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("unsupported compression \"invalid\""), + wantErr: errors.New("unsupported compression \"invalid\""), + }, + { + name: "periodic/otlp-http-cumulative-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "http/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("cumulative"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-lowmemory-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "http/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("lowmemory"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-delta-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "http/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("delta"), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-invalid-temporality", + reader: config.MetricReader{ + Periodic: &config.PeriodicMetricReader{ + Exporter: config.MetricExporter{ + OTLP: &config.OTLPMetric{ + Protocol: "http/protobuf", + Endpoint: "localhost:4318", + Compression: strPtr("none"), + Timeout: intPtr(1000), + Headers: map[string]string{ + "test": "test1", + }, + TemporalityPreference: strPtr("invalid"), + }, + }, + }, + }, + wantErr: errors.New("unsupported temporality preference \"invalid\""), }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error)) + gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error)) + defer func() { - if reader != nil { - assert.NoError(t, reader.Shutdown(context.Background())) + if gotReader != nil { + assert.NoError(t, gotReader.Shutdown(context.Background())) } if server != nil { assert.NoError(t, server.Shutdown(context.Background())) } }() - assert.Equal(t, tt.err, err) + + assert.Equal(t, tt.wantErr, err) + + if tt.wantReader == nil { + assert.Nil(t, gotReader) + } else { + assert.Equal(t, reflect.TypeOf(tt.wantReader), reflect.TypeOf(gotReader)) + + if reflect.TypeOf(tt.wantReader).String() == "*metric.PeriodicReader" { + wantExporterType := reflect.Indirect(reflect.ValueOf(tt.wantReader)).FieldByName("exporter").Elem().Type() + gotExporterType := reflect.Indirect(reflect.ValueOf(gotReader)).FieldByName("exporter").Elem().Type() + assert.Equal(t, wantExporterType, gotExporterType) + } + } }) } }