Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rare but severely incorrect delta metric exports when shutting down PeriodicReader #6344

Open
alecholmes opened this issue Feb 20, 2025 · 1 comment
Labels
bug Something isn't working

Comments

@alecholmes
Copy link

alecholmes commented Feb 20, 2025

Description

I've found a PeriodicReader shutdown timing bug that can result in the final export of delta metrics accidentally reporting the cumulative value of a metric instead of the delta value since the previous export.

Here is a simplified example of what I'm seeing:

  1. Create a Meter backed by a PeriodicReader with delta temporality.
  2. Register observable counter that always reports the value 10 in its callback.
  3. First PeriodicReader export will report a value of 10.
  4. Second and subsequent PeriodicReader exports will report values of 0 as expected.
  5. Call Shutdown on the MeterProvider. This shuts down the PeriodicReader which forces one final collect+export.
  6. Final PeriodicReader export will almost always report a value of 0 but in rare cases will report a value of 10.

In other words, the final export will sometimes report the cumulative total value instead of the delta since the last export.

In practice, this bug is hit when Shutdown is called during an ongoing collect+export, specifically when this or this error check checks are hit. These branches get hit by Shutdown canceling the background context being checked.

Environment

  • OS: seen on multiple (Linux and MacOS)
  • Architecture: seem on multiple (arm64 and amd64)
  • Go Version: 1.24 (also seen on 1.23)
  • opentelemetry-go version: v1.34.0

Steps To Reproduce

In the happy path for collecting sum metrics:

  1. Metric measurement from callback is added to valueMap here.
    • With the example above, this would put a measurement of 10 in the map.
    • Not sure why this does v.n += value instead of v.n = value for cumulative types, btw.
  2. Subsequent agg computation uses that valueMap measurement here to compute the delta and then clears that valueMap a few lines later.
    • This would pull 10 from the map and calulate the delta as 10 - lastReported.
    • After the first export, lastReported is set to 10 and this never changes.

In the buggy shutdown path:

  1. Metric measurement from callback is added to valueMap during an export cycle as part of a periodic reader background loop.
    • With the example above, this would put a measurement of 10 in the map.
  2. Shutdown is called while getting measurements so instead of doing any agg computations, the export ends early here.
    • The valueMap still has a value of 10 since it's not until aggregations run later that the value map is cleared.
  3. The PeriodicReader kicks off one final collect+export...
  4. Metric measurement from callback is added to non-empty valueMap during the export cycle
    • This puts a measurement of 20 in the map.
    • v.n += value is actually v.n = 10 + 10 in this case.
  5. Subsequent agg computation uses the incorrect valueMap measurement.
    • This would pull 20 from the map and calulate the delta as 20 - lastReported (or 20 - 10 or 10)

I can reliably reproduce this with this unit test:

package tenantmetricsotel

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/metric"
	sdkmetric "go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestRepro(t *testing.T) {
	exp := &testExporter{}

	start := time.Now()
	reader := sdkmetric.NewPeriodicReader(exp, sdkmetric.WithInterval(5*time.Millisecond))

	meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
	meter := meterProvider.Meter("otel-test-metrics")

	var wg sync.WaitGroup

	cs := &staticCounters{meter: meter}

	t.Log("creating counters")
	for range 100 {
		cs.add(t)
	}
	require.Equal(t, int64(0), exp.exportCount.Load())

	t.Log("shutting down")
	wg.Add(1)
	go func() {
		defer wg.Done()

		dur := 200 * time.Millisecond
		time.Sleep(dur - time.Since(start))

		assert.NoError(t, meterProvider.Shutdown(context.Background()))
	}()
	wg.Wait()

	// Each counter always reports 10 so the ultimate total count is 10*n, where n is the number of counters.
	// Since the sum of exported deltas is ultimately the cumulative total,
	// the exported deltas should add up to 10*n too.
	assert.Equal(t, cs.numCounters.Load()*10, exp.exportTotal.Load())
}

type staticCounters struct {
	meter       metric.Meter
	numCounters atomic.Int64
}

func (c *staticCounters) add(t testing.TB) {
	ordinal := int(c.numCounters.Add(1) - 1)

	attrSet := metric.WithAttributeSet(attribute.NewSet(attribute.String("ordinal", strconv.Itoa(ordinal))))

	oc, err := c.meter.Int64ObservableCounter("foo")
	_, err = c.meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error {
		observer.ObserveInt64(oc, 10, attrSet)
		return nil
	}, oc)
	require.NoError(t, err)
}

type testExporter struct {
	exportCount atomic.Int64
	exportTotal atomic.Int64
}

var _ sdkmetric.Exporter = (*testExporter)(nil)

func (e *testExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality {
	return metricdata.DeltaTemporality // metricdata.CumulativeTemporality
}

func (e *testExporter) Aggregation(kind sdkmetric.InstrumentKind) sdkmetric.Aggregation {
	return sdkmetric.AggregationSum{}
}

func (e *testExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
	e.exportCount.Add(1)

	sumTotal := int64(0)
	for _, sm := range rm.ScopeMetrics {
		for _, m := range sm.Metrics {
			for _, dp := range m.Data.(metricdata.Sum[int64]).DataPoints {
				sumTotal += dp.Value
			}
		}
	}

	if e.Temporality(sdkmetric.InstrumentKindCounter) == metricdata.DeltaTemporality {
		e.exportTotal.Add(sumTotal)
		if sumTotal > 0 {
			fmt.Println(">> total delta is non-zero", e.exportCount.Load(), sumTotal)
		}
	} else {
		e.exportTotal.Store(sumTotal)
	}

	return nil
}

func (e *testExporter) ForceFlush(ctx context.Context) error {
	return nil
}

func (e *testExporter) Shutdown(ctx context.Context) error {
	return nil
}

Expected behavior

Exported values of delta metrics should always be the delta since the last export, not the cumulative value since the process started. The latter results in wildly incorrect metrics.

More generally, I would not expect a context being canceled during a collect+export cycle to corrupt state.

@alecholmes alecholmes added the bug Something isn't working label Feb 20, 2025
@alecholmes
Copy link
Author

I also realized after filing the bug that this affect cumulative counters as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant