diff --git a/CHANGELOG.md b/CHANGELOG.md index d7fc0c04ae4..480ed6bed04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se ### :bug: (Bug Fix) +* fix(sdk-metrics): await exports in `PeriodicExportingMetricReader` when async resource attributes have not yet settled [#5119](https://github.com/open-telemetry/opentelemetry-js/pull/5119/) @pichlermarc + ### :books: (Refine Doc) ### :house: (Internal) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index b6730033e03..646c832aa41 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -24,7 +24,6 @@ import { import { MetricReader } from './MetricReader'; import { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; -import { diag } from '@opentelemetry/api'; import { MetricProducer } from './MetricProducer'; export type PeriodicExportingMetricReaderOptions = { @@ -127,25 +126,20 @@ export class PeriodicExportingMetricReader extends MetricReader { ); } - const doExport = async () => { - const result = await internal._export(this._exporter, resourceMetrics); - if (result.code !== ExportResultCode.SUCCESS) { - throw new Error( - `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` - ); + if (resourceMetrics.resource.asyncAttributesPending) { + try { + await resourceMetrics.resource.waitForAsyncAttributes?.(); + } catch (e) { + api.diag.debug('Error while resolving async portion of resource: ', e); + globalErrorHandler(e); } - }; + } - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (resourceMetrics.resource.asyncAttributesPending) { - resourceMetrics.resource - .waitForAsyncAttributes?.() - .then(doExport, err => - diag.debug('Error while resolving async portion of resource: ', err) - ) - .catch(globalErrorHandler); - } else { - await doExport(); + const result = await internal._export(this._exporter, resourceMetrics); + if (result.code !== ExportResultCode.SUCCESS) { + throw new Error( + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + ); } } diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index d5ab5531267..9210f4622f3 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -16,12 +16,22 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { Aggregation, InstrumentType, PushMetricExporter } from '../../src'; +import { + Aggregation, + CollectionResult, + InstrumentType, + MetricProducer, + PushMetricExporter, +} from '../../src'; import { ResourceMetrics } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { TimeoutError } from '../../src/utils'; -import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import { + ExportResult, + ExportResultCode, + setGlobalErrorHandler, +} from '@opentelemetry/core'; import { assertRejects } from '../test-utils'; import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer'; import { @@ -296,6 +306,98 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); + it('should complete actions before promise resolves when async resource attributes are pending', async () => { + // arrange + const waitForAsyncAttributesStub = sinon.stub().returns( + new Promise(resolve => + setTimeout(() => { + resolve(); + }, 10) + ) + ); + const resourceMetrics: ResourceMetrics = { + resource: { + attributes: {}, + merge: sinon.stub(), + asyncAttributesPending: true, // ensure we try to await async attributes + waitForAsyncAttributes: waitForAsyncAttributesStub, // resolve when awaited + }, + scopeMetrics: [], + }; + + const mockCollectionResult: CollectionResult = { + errors: [], + resourceMetrics, + }; + const producerStubs: MetricProducer = { + collect: sinon.stub().resolves(mockCollectionResult), + }; + + const exporter = new TestMetricExporter(); + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(producerStubs); + + // act + await reader.forceFlush(); + + // assert + sinon.assert.calledOnce(waitForAsyncAttributesStub); + assert.strictEqual( + exporter.getExports().length, + 1, + 'Expected exactly 1 export to happen when awaiting forceFlush' + ); + }); + + it('should call global error handler when resolving async attributes fails', async () => { + // arrange + const expectedError = new Error('resolving async attributes failed'); + const waitForAsyncAttributesStub = sinon.stub().rejects(expectedError); + + const resourceMetrics: ResourceMetrics = { + resource: { + attributes: {}, + merge: sinon.stub(), + asyncAttributesPending: true, // ensure we try to await async attributes + waitForAsyncAttributes: waitForAsyncAttributesStub, // reject when awaited + }, + scopeMetrics: [], + }; + + const mockCollectionResult: CollectionResult = { + errors: [], + resourceMetrics, + }; + const producerStubs: MetricProducer = { + collect: sinon.stub().resolves(mockCollectionResult), + }; + + const exporter = new TestMetricExporter(); + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(producerStubs); + const errorHandlerStub = sinon.stub(); + setGlobalErrorHandler(errorHandlerStub); + + // act + await reader.forceFlush(); + + // assert + sinon.assert.calledOnce(waitForAsyncAttributesStub); + sinon.assert.calledOnceWithExactly(errorHandlerStub, expectedError); + }); + it('should throw TimeoutError when forceFlush takes too long', async () => { const exporter = new TestMetricExporter(); exporter.forceFlushTime = 60;