From a98ddce8ce1031a3e03cb63c4c91cc9862a48706 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Mon, 25 Nov 2024 16:34:32 +0100 Subject: [PATCH 1/4] Remove memory leak at exporter shutdown --- .chloggen/exporter-shutdown-memory-leak.yaml | 25 ++++++++++++++ .../internal/metadata/generated_telemetry.go | 16 ++++----- .../exporterhelper/internal/queue_sender.go | 33 +++++++++++++++---- 3 files changed, 60 insertions(+), 14 deletions(-) create mode 100644 .chloggen/exporter-shutdown-memory-leak.yaml diff --git a/.chloggen/exporter-shutdown-memory-leak.yaml b/.chloggen/exporter-shutdown-memory-leak.yaml new file mode 100644 index 00000000000..3e432303c0a --- /dev/null +++ b/.chloggen/exporter-shutdown-memory-leak.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak at exporter shutdown + +# One or more tracking issues or pull requests related to the change +issues: [11401] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index 9522b231122..41bea2f8ce1 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -51,7 +51,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { } // InitExporterQueueCapacity configures the ExporterQueueCapacity metric. -func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_capacity", @@ -59,17 +59,17 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts metric.WithUnit("{batches}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...) return nil }, builder.ExporterQueueCapacity) - return err + return reg, err } // InitExporterQueueSize configures the ExporterQueueSize metric. -func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_size", @@ -77,13 +77,13 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ... metric.WithUnit("{batches}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...) return nil }, builder.ExporterQueueSize) - return err + return reg, err } // NewTelemetryBuilder provides a struct with methods to update all internal telemetry diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 9b53a340146..116ced2d598 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -73,6 +73,8 @@ type QueueSender struct { traceAttribute attribute.KeyValue consumers *queue.Consumers[internal.Request] + shutdownCallbacks []func() + obsrep *ObsReport exporterID component.ID } @@ -108,18 +110,37 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { } dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String()) - return multierr.Append( - qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, - metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))), - qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, - metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))), - ) + + reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))) + + qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() { + if reg1 != nil { + _ = reg1.Unregister() + } + }) + + reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))) + + qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() { + if reg2 != nil { + _ = reg2.Unregister() + } + }) + + return multierr.Append(err1, err2) } // Shutdown is invoked during service shutdown. func (qs *QueueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. + + for _, callback := range qs.shutdownCallbacks { + callback() + } + if err := qs.queue.Shutdown(ctx); err != nil { return err } From 011d62e4f07809a6c0c195da51b401ca0d2bf2ca Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Mon, 25 Nov 2024 17:49:08 +0100 Subject: [PATCH 2/4] Update the template for generated_telemetry.go --- cmd/mdatagen/internal/samplereceiver/factory.go | 2 +- .../internal/metadata/generated_telemetry.go | 8 ++++---- cmd/mdatagen/internal/templates/telemetry.go.tmpl | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/mdatagen/internal/samplereceiver/factory.go b/cmd/mdatagen/internal/samplereceiver/factory.go index 7a3c9bb170d..e7445f9cb29 100644 --- a/cmd/mdatagen/internal/samplereceiver/factory.go +++ b/cmd/mdatagen/internal/samplereceiver/factory.go @@ -48,5 +48,5 @@ type nopReceiver struct { } func (r nopReceiver) initOptionalMetric() { - _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 }) + _, _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 }) } diff --git a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go index 71d4e1e78d6..99c7017b575 100644 --- a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go +++ b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go @@ -55,7 +55,7 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O } // InitQueueLength configures the QueueLength metric. -func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.QueueLength, err = builder.meter.Int64ObservableGauge( "otelcol_queue_length", @@ -63,13 +63,13 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric metric.WithUnit("{items}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.QueueLength, cb(), opts...) return nil }, builder.QueueLength) - return err + return reg, err } // NewTelemetryBuilder provides a struct with methods to update all internal telemetry diff --git a/cmd/mdatagen/internal/templates/telemetry.go.tmpl b/cmd/mdatagen/internal/templates/telemetry.go.tmpl index 8b6c6e7d1dc..3f365e45167 100644 --- a/cmd/mdatagen/internal/templates/telemetry.go.tmpl +++ b/cmd/mdatagen/internal/templates/telemetry.go.tmpl @@ -56,7 +56,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { {{- range $name, $metric := .Telemetry.Metrics }} {{- if $metric.Optional }} // Init{{ $name.Render }} configures the {{ $name.Render }} metric. -func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}( "otelcol_{{ $name }}", @@ -68,14 +68,14 @@ func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async ) {{- if $metric.Data.Async }} if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...) return nil }, builder.{{ $name.Render }}) {{- end }} - return err + return reg, err } {{- else }} From c42de6cc69de0ba75f4795816a36925f6c043f25 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Tue, 26 Nov 2024 18:10:10 +0100 Subject: [PATCH 3/4] Clear shutdownCallbacks after use --- exporter/exporterhelper/internal/queue_sender.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 116ced2d598..c9084839d61 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -140,6 +140,7 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { for _, callback := range qs.shutdownCallbacks { callback() } + qs.shutdownCallbacks = nil if err := qs.queue.Shutdown(ctx); err != nil { return err From ae1515d6a60d5eed83cab7fc55501d04d2f4ed5b Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Thu, 28 Nov 2024 16:03:21 +0100 Subject: [PATCH 4/4] Log errors returned from reg.Unregister() --- .../exporterhelper/internal/queue_sender.go | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index c9084839d61..30bb8d1b81b 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -73,10 +73,10 @@ type QueueSender struct { traceAttribute attribute.KeyValue consumers *queue.Consumers[internal.Request] - shutdownCallbacks []func() - - obsrep *ObsReport - exporterID component.ID + obsrep *ObsReport + exporterID component.ID + logger *zap.Logger + shutdownFns []component.ShutdownFunc } func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int, @@ -87,6 +87,7 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin traceAttribute: attribute.String(ExporterKey, set.ID.String()), obsrep: obsrep, exporterID: set.ID, + logger: set.Logger, } consumeFunc := func(ctx context.Context, req internal.Request) error { err := qs.NextSender.Send(ctx, req) @@ -114,19 +115,21 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))) - qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() { + qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error { if reg1 != nil { - _ = reg1.Unregister() + return reg1.Unregister() } + return nil }) reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))) - qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() { + qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error { if reg2 != nil { - _ = reg2.Unregister() + return reg2.Unregister() } + return nil }) return multierr.Append(err1, err2) @@ -137,10 +140,13 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - for _, callback := range qs.shutdownCallbacks { - callback() + for _, fn := range qs.shutdownFns { + err := fn(ctx) + if err != nil { + qs.logger.Warn("Error while shutting down QueueSender", zap.Error(err)) + } } - qs.shutdownCallbacks = nil + qs.shutdownFns = nil if err := qs.queue.Shutdown(ctx); err != nil { return err