Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk-metrics): await export when async attributes are pending #5126

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se

### :bug: (Bug Fix)

* fix(sdk-metrics): await exports in `PeriodicExportingMetricReader` when async resource attributes have not yet settled [#5119](https://github.com/open-telemetry/opentelemetry-js/pull/5119/) @pichlermarc

### :books: (Refine Doc)

### :house: (Internal)
Expand Down
30 changes: 12 additions & 18 deletions packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
import { MetricReader } from './MetricReader';
import { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';
import { diag } from '@opentelemetry/api';
import { MetricProducer } from './MetricProducer';

export type PeriodicExportingMetricReaderOptions = {
Expand Down Expand Up @@ -127,25 +126,20 @@ export class PeriodicExportingMetricReader extends MetricReader {
);
}

const doExport = async () => {
const result = await internal._export(this._exporter, resourceMetrics);
if (result.code !== ExportResultCode.SUCCESS) {
throw new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
);
if (resourceMetrics.resource.asyncAttributesPending) {
try {
await resourceMetrics.resource.waitForAsyncAttributes?.();
} catch (e) {
api.diag.debug('Error while resolving async portion of resource: ', e);
globalErrorHandler(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about globalErrorHandler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globalErrorHandler is a poorly understood and (in my opinion) poorly thought out spec. I'd probably get rid of it if i had the chance.

}
};
}

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (resourceMetrics.resource.asyncAttributesPending) {
resourceMetrics.resource
.waitForAsyncAttributes?.()
.then(doExport, err =>
diag.debug('Error while resolving async portion of resource: ', err)
)
.catch(globalErrorHandler);
} else {
await doExport();
const result = await internal._export(this._exporter, resourceMetrics);
if (result.code !== ExportResultCode.SUCCESS) {
throw new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@

import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { Aggregation, InstrumentType, PushMetricExporter } from '../../src';
import {
Aggregation,
CollectionResult,
InstrumentType,
MetricProducer,
PushMetricExporter,
} from '../../src';
import { ResourceMetrics } from '../../src/export/MetricData';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { TimeoutError } from '../../src/utils';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import {
ExportResult,
ExportResultCode,
setGlobalErrorHandler,
} from '@opentelemetry/core';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import {
Expand Down Expand Up @@ -296,6 +306,98 @@ describe('PeriodicExportingMetricReader', () => {
await reader.shutdown();
});

it('should complete actions before promise resolves when async resource attributes are pending', async () => {
// arrange
const waitForAsyncAttributesStub = sinon.stub().returns(
new Promise<void>(resolve =>
setTimeout(() => {
resolve();
}, 10)
)
);
const resourceMetrics: ResourceMetrics = {
resource: {
attributes: {},
merge: sinon.stub(),
asyncAttributesPending: true, // ensure we try to await async attributes
waitForAsyncAttributes: waitForAsyncAttributesStub, // resolve when awaited
},
scopeMetrics: [],
};

const mockCollectionResult: CollectionResult = {
errors: [],
resourceMetrics,
};
const producerStubs: MetricProducer = {
collect: sinon.stub().resolves(mockCollectionResult),
};

const exporter = new TestMetricExporter();

const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});

reader.setMetricProducer(producerStubs);

// act
await reader.forceFlush();

// assert
sinon.assert.calledOnce(waitForAsyncAttributesStub);
assert.strictEqual(
exporter.getExports().length,
1,
'Expected exactly 1 export to happen when awaiting forceFlush'
);
});

it('should call global error handler when resolving async attributes fails', async () => {
// arrange
const expectedError = new Error('resolving async attributes failed');
const waitForAsyncAttributesStub = sinon.stub().rejects(expectedError);

const resourceMetrics: ResourceMetrics = {
resource: {
attributes: {},
merge: sinon.stub(),
asyncAttributesPending: true, // ensure we try to await async attributes
waitForAsyncAttributes: waitForAsyncAttributesStub, // reject when awaited
},
scopeMetrics: [],
};

const mockCollectionResult: CollectionResult = {
errors: [],
resourceMetrics,
};
const producerStubs: MetricProducer = {
collect: sinon.stub().resolves(mockCollectionResult),
};

const exporter = new TestMetricExporter();

const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});

reader.setMetricProducer(producerStubs);
const errorHandlerStub = sinon.stub();
setGlobalErrorHandler(errorHandlerStub);

// act
await reader.forceFlush();

// assert
sinon.assert.calledOnce(waitForAsyncAttributesStub);
sinon.assert.calledOnceWithExactly(errorHandlerStub, expectedError);
});

it('should throw TimeoutError when forceFlush takes too long', async () => {
const exporter = new TestMetricExporter();
exporter.forceFlushTime = 60;
Expand Down
Loading