From a95f8d46f573221896120001269ba0246c799352 Mon Sep 17 00:00:00 2001 From: "Mateusz \"mat\" Rumian" <58699800+mat-rumian@users.noreply.github.com> Date: Tue, 13 Feb 2024 13:37:15 +0100 Subject: [PATCH] [feat][exporter/loadbalancingexporter] add benchmarks for metrics and traces consume (#30915) **Description:** According to https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30141#pullrequestreview-1843647046 PR adds benchmarks for functions: - `mergeTraces` - `mergeMetrics` - `consumeTraces` - `consumeMetrics` --- .chloggen/mat-rumian-add-lbe-benchmarks.yaml | 27 ++++++ .../loadbalancingexporter/helpers_test.go | 54 +++++++++++ .../metrics_exporter_test.go | 91 +++++++++++++++++++ .../trace_exporter_test.go | 83 +++++++++++++++++ 4 files changed, 255 insertions(+) create mode 100644 .chloggen/mat-rumian-add-lbe-benchmarks.yaml diff --git a/.chloggen/mat-rumian-add-lbe-benchmarks.yaml b/.chloggen/mat-rumian-add-lbe-benchmarks.yaml new file mode 100644 index 000000000000..6e205494522b --- /dev/null +++ b/.chloggen/mat-rumian-add-lbe-benchmarks.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add benchmarks for Metrics and Traces" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30915] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/loadbalancingexporter/helpers_test.go b/exporter/loadbalancingexporter/helpers_test.go index 768999b4ad4d..e8bda317d54b 100644 --- a/exporter/loadbalancingexporter/helpers_test.go +++ b/exporter/loadbalancingexporter/helpers_test.go @@ -119,3 +119,57 @@ func TestMergeMetrics(t *testing.T) { require.Equal(t, expectedMetrics, mergedMetrics) } + +func benchMergeTraces(b *testing.B, tracesCount int) { + traces1 := ptrace.NewTraces() + traces2 := ptrace.NewTraces() + + for i := 0; i < tracesCount; i++ { + appendSimpleTraceWithID(traces2.ResourceSpans().AppendEmpty(), [16]byte{1, 2, 3, 4}) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mergeTraces(traces1, traces2) + } +} + +func BenchmarkMergeTraces_X100(b *testing.B) { + benchMergeTraces(b, 100) +} + +func BenchmarkMergeTraces_X500(b *testing.B) { + benchMergeTraces(b, 500) +} + +func BenchmarkMergeTraces_X1000(b *testing.B) { + benchMergeTraces(b, 1000) +} + +func benchMergeMetrics(b *testing.B, metricsCount int) { + metrics1 := pmetric.NewMetrics() + metrics2 := pmetric.NewMetrics() + + for i := 0; i < metricsCount; i++ { + appendSimpleMetricWithID(metrics2.ResourceMetrics().AppendEmpty(), "metrics-2") + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mergeMetrics(metrics1, metrics2) + } +} + +func BenchmarkMergeMetrics_X100(b *testing.B) { + benchMergeMetrics(b, 100) +} + +func BenchmarkMergeMetrics_X500(b *testing.B) { + benchMergeMetrics(b, 500) +} + +func BenchmarkMergeMetrics_X1000(b *testing.B) { + benchMergeMetrics(b, 1000) +} diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 3163b96f1d41..4abc5c0717df 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -678,6 +678,97 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { require.Greater(t, counter2.Load(), int64(0)) } +func appendSimpleMetricWithServiceName(metric pmetric.Metrics, serviceName string, sigName string) { + metric.ResourceMetrics().EnsureCapacity(1) + rmetrics := metric.ResourceMetrics().AppendEmpty() + rmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, serviceName) + rmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName(sigName) +} + +func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { + sink := new(consumertest.MetricsSink) + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return newMockMetricsExporter(sink.ConsumeMetrics), nil + } + + endpoints := []string{} + for i := 0; i < endpointsCount; i++ { + endpoints = append(endpoints, fmt.Sprintf("endpoint-%d", i)) + } + + config := &Config{ + Resolver: ResolverSettings{ + Static: &StaticResolver{Hostnames: endpoints}, + }, + } + + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), config, componentFactory) + require.NotNil(b, lb) + require.NoError(b, err) + + p, err := newMetricsExporter(exportertest.NewNopCreateSettings(), config) + require.NotNil(b, p) + require.NoError(b, err) + + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + + metric1 := pmetric.NewMetrics() + metric2 := pmetric.NewMetrics() + for i := 0; i < endpointsCount; i++ { + for j := 0; j < metricsCount/endpointsCount; j++ { + appendSimpleMetricWithServiceName(metric2, fmt.Sprintf("service-%d", i), fmt.Sprintf("sig-%d", i)) + } + } + simpleMetricsWithServiceName() + md := mergeMetrics(metric1, metric2) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err = p.ConsumeMetrics(context.Background(), md) + require.NoError(b, err) + } + + b.StopTimer() + err = p.Shutdown(context.Background()) + require.NoError(b, err) +} + +func BenchmarkConsumeMetrics_1E100T(b *testing.B) { + benchConsumeMetrics(b, 1, 100) +} + +func BenchmarkConsumeMetrics_1E1000T(b *testing.B) { + benchConsumeMetrics(b, 1, 1000) +} + +func BenchmarkConsumeMetrics_5E100T(b *testing.B) { + benchConsumeMetrics(b, 5, 100) +} + +func BenchmarkConsumeMetrics_5E500T(b *testing.B) { + benchConsumeMetrics(b, 5, 500) +} + +func BenchmarkConsumeMetrics_5E1000T(b *testing.B) { + benchConsumeMetrics(b, 5, 1000) +} + +func BenchmarkConsumeMetrics_10E100T(b *testing.B) { + benchConsumeMetrics(b, 10, 100) +} + +func BenchmarkConsumeMetrics_10E500T(b *testing.B) { + benchConsumeMetrics(b, 10, 500) +} + +func BenchmarkConsumeMetrics_10E1000T(b *testing.B) { + benchConsumeMetrics(b, 10, 1000) +} + func endpoint2Config() *Config { return &Config{ Resolver: ResolverSettings{ diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 350fe6168d89..1f83b59d9364 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -534,6 +534,89 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { require.Greater(t, counter2.Load(), int64(0)) } +func benchConsumeTraces(b *testing.B, endpointsCount int, tracesCount int) { + sink := new(consumertest.TracesSink) + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return newMockTracesExporter(sink.ConsumeTraces), nil + } + + endpoints := []string{} + for i := 0; i < endpointsCount; i++ { + endpoints = append(endpoints, fmt.Sprintf("endpoint-%d", i)) + } + + config := &Config{ + Resolver: ResolverSettings{ + Static: &StaticResolver{Hostnames: endpoints}, + }, + } + + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), config, componentFactory) + require.NotNil(b, lb) + require.NoError(b, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), config) + require.NotNil(b, p) + require.NoError(b, err) + + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + + trace1 := ptrace.NewTraces() + trace2 := ptrace.NewTraces() + for i := 0; i < endpointsCount; i++ { + for j := 0; j < tracesCount/endpointsCount; j++ { + appendSimpleTraceWithID(trace2.ResourceSpans().AppendEmpty(), [16]byte{1, 2, 6, byte(i)}) + } + } + td := mergeTraces(trace1, trace2) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err = p.ConsumeTraces(context.Background(), td) + require.NoError(b, err) + } + + b.StopTimer() + err = p.Shutdown(context.Background()) + require.NoError(b, err) +} + +func BenchmarkConsumeTraces_1E100T(b *testing.B) { + benchConsumeTraces(b, 1, 100) +} + +func BenchmarkConsumeTraces_1E1000T(b *testing.B) { + benchConsumeTraces(b, 1, 1000) +} + +func BenchmarkConsumeTraces_5E100T(b *testing.B) { + benchConsumeTraces(b, 5, 100) +} + +func BenchmarkConsumeTraces_5E500T(b *testing.B) { + benchConsumeTraces(b, 5, 500) +} + +func BenchmarkConsumeTraces_5E1000T(b *testing.B) { + benchConsumeTraces(b, 5, 1000) +} + +func BenchmarkConsumeTraces_10E100T(b *testing.B) { + benchConsumeTraces(b, 10, 100) +} + +func BenchmarkConsumeTraces_10E500T(b *testing.B) { + benchConsumeTraces(b, 10, 500) +} + +func BenchmarkConsumeTraces_10E1000T(b *testing.B) { + benchConsumeTraces(b, 10, 1000) +} + func randomTraces() ptrace.Traces { v1 := uint8(rand.Intn(256)) v2 := uint8(rand.Intn(256))