Skip to content

Commit

Permalink
feat(otlp-exporter-base): implement partial success handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc committed Nov 20, 2024
1 parent 7e98761 commit d2e5cd7
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 15 deletions.
2 changes: 2 additions & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to experimental packages in this project will be documented

### :rocket: (Enhancement)

feat(otlp-exporter-base): handle OTLP partial success [#5183](https://github.com/open-telemetry/opentelemetry-js/pull/5183) @pichlermarc

### :bug: (Bug Fix)

### :books: (Refine Doc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { diag } from '@opentelemetry/api';
import { IOtlpResponseHandler } from './response-handler';

function isPartialSuccessResponse(
response: unknown
): response is { partialSuccess: never } {
return Object.prototype.hasOwnProperty.call(response, 'partialSuccess');
}

/**
* Default response handler that logs a partial success to the console.
*/
export function createLoggingPartialSuccessResponseHandler<
T,
>(): IOtlpResponseHandler<T> {
return {
handleResponse(response: T) {
// Partial success MUST never be an empty object according the specification
// see https://opentelemetry.io/docs/specs/otlp/#partial-success
if (
response == null ||
!isPartialSuccessResponse(response) ||
response.partialSuccess == null
) {
return;
}
diag.warn(
'Received Partial Success response:',
JSON.stringify(response.partialSuccess)
);
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { IExporterTransport } from './exporter-transport';
import { IExportPromiseHandler } from './bounded-queue-export-promise-handler';
import { ISerializer } from '@opentelemetry/otlp-transformer';
import { OTLPExporterError } from './types';
import { IOtlpResponseHandler } from './response-handler';
import { createLoggingPartialSuccessResponseHandler } from './logging-response-handler';
import { diag, DiagLogger } from '@opentelemetry/api';

/**
Expand All @@ -40,6 +42,7 @@ class OTLPExportDelegate<Internal, Response>
constructor(
private _transport: IExporterTransport,
private _serializer: ISerializer<Internal, Response>,
private _responseHandler: IOtlpResponseHandler<Response>,
private _promiseQueue: IExportPromiseHandler,
private _timeout: number
) {
Expand Down Expand Up @@ -79,6 +82,19 @@ class OTLPExportDelegate<Internal, Response>
this._transport.send(serializedRequest, this._timeout).then(
response => {
if (response.status === 'success') {
if (response.data != null) {
try {
this._responseHandler.handleResponse(
this._serializer.deserializeResponse(response.data)
);
} catch (e) {
this._diagLogger.warn(
'Export succeeded but could not deserialize response - is the response specification compliant?',
e,
response.data
);
}
}
// No matter the response, we can consider the export still successful.
resultCallback({
code: ExportResultCode.SUCCESS,
Expand Down Expand Up @@ -139,6 +155,7 @@ export function createOtlpExportDelegate<Internal, Response>(
return new OTLPExportDelegate(
components.transport,
components.serializer,
createLoggingPartialSuccessResponseHandler(),
components.promiseHandler,
settings.timeout
);
Expand Down
27 changes: 27 additions & 0 deletions experimental/packages/otlp-exporter-base/src/response-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Generic export response handler. Can be implemented to handle export responses like partial success.
*/
export interface IOtlpResponseHandler<Response> {
/**
* Handles an OTLP export response.
* Implementations MUST NOT throw.
* @param response
*/
handleResponse(response: Response): void;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createLoggingPartialSuccessResponseHandler } from '../../src/logging-response-handler';
import * as sinon from 'sinon';
import { IExportTraceServiceResponse } from '@opentelemetry/otlp-transformer';
import { registerMockDiagLogger } from './test-utils';

describe('loggingResponseHandler', function () {
afterEach(function () {
sinon.restore();
});

it('should diag warn if a partial success is passed', function () {
// arrange
const { warn } = registerMockDiagLogger();
const handler =
createLoggingPartialSuccessResponseHandler<IExportTraceServiceResponse>();
const partialSuccessResponse: IExportTraceServiceResponse = {
partialSuccess: {
errorMessage: 'error',
rejectedSpans: 10,
},
};

// act
handler.handleResponse(partialSuccessResponse);

//assert
sinon.assert.calledOnceWithExactly(
warn,
'Received Partial Success response:',
JSON.stringify(partialSuccessResponse.partialSuccess)
);
});

it('should not warn when a response is undefined', function () {
// arrange
const { warn } = registerMockDiagLogger();
const handler = createLoggingPartialSuccessResponseHandler();

// act
handler.handleResponse(undefined);

//assert
sinon.assert.notCalled(warn);
});

it('should not warn when a response is defined but partialSuccess is undefined', function () {
// arrange
const { warn } = registerMockDiagLogger();
const handler = createLoggingPartialSuccessResponseHandler();

// act
handler.handleResponse({ partialSuccess: undefined });

//assert
sinon.assert.notCalled(warn);
});

it('should warn when a response is defined but partialSuccess is empty object', function () {
// note: it is not permitted for the server to return such a response, but it may happen anyway
// arrange
const { warn } = registerMockDiagLogger();
const handler = createLoggingPartialSuccessResponseHandler();
const response = { partialSuccess: {} };

// act
handler.handleResponse(response);

//assert
sinon.assert.calledOnceWithExactly(
warn,
'Received Partial Success response:',
JSON.stringify(response.partialSuccess)
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import { createOtlpExportDelegate } from '../../src';
import { ExportResponse } from '../../src';
import { ISerializer } from '@opentelemetry/otlp-transformer';
import { IExportPromiseHandler } from '../../src/bounded-queue-export-promise-handler';
import { registerMockDiagLogger } from './test-utils';

interface FakeInternalRepresentation {
foo: string;
}

interface FakeSignalResponse {
baz: string;
partialSuccess?: { foo: string };
}

type FakeSerializer = ISerializer<
Expand Down Expand Up @@ -491,9 +492,7 @@ describe('OTLPExportDelegate', function () {
};
const mockTransport = <IExporterTransport>transportStubs;

const response: FakeSignalResponse = {
baz: 'partial success',
};
const response: FakeSignalResponse = {};

const serializerStubs = {
// simulate that the serializer returns something to send
Expand Down Expand Up @@ -541,6 +540,144 @@ describe('OTLPExportDelegate', function () {
});
});

it('returns success even if response cannot be deserialized', function (done) {
const { warn } = registerMockDiagLogger();
// returns mock success response (empty body)
const exportResponse: ExportResponse = {
data: Uint8Array.from([]),
status: 'success',
};

// transport does not need to do anything in this case.
const transportStubs = {
send: sinon.stub().returns(Promise.resolve(exportResponse)),
shutdown: sinon.stub(),
};
const mockTransport = <IExporterTransport>transportStubs;

const serializerStubs = {
// simulate that the serializer returns something to send
serializeRequest: sinon.stub().returns(Uint8Array.from([1])),
// simulate that it returns a partial success (response with contents)
deserializeResponse: sinon.stub().throws(new Error()),
};
const mockSerializer = <FakeSerializer>serializerStubs;

// mock a queue that has not yet reached capacity
const promiseHandlerStubs = {
pushPromise: sinon.stub(),
hasReachedLimit: sinon.stub().returns(false),
awaitAll: sinon.stub(),
};
const promiseHandler = <IExportPromiseHandler>promiseHandlerStubs;

const exporter = createOtlpExportDelegate(
{
promiseHandler: promiseHandler,
serializer: mockSerializer,
transport: mockTransport,
},
{
timeout: 1000,
}
);

exporter.export(internalRepresentation, result => {
try {
assert.strictEqual(result.code, ExportResultCode.SUCCESS);
assert.strictEqual(result.error, undefined);

// assert here as otherwise the promise will not have executed yet
sinon.assert.calledOnceWithMatch(
warn,
'OTLPExportDelegate',
'Export succeeded but could not deserialize response - is the response specification compliant?',
sinon.match.instanceOf(Error),
exportResponse.data
);
sinon.assert.calledOnce(serializerStubs.serializeRequest);
sinon.assert.calledOnce(transportStubs.send);
sinon.assert.calledOnce(promiseHandlerStubs.pushPromise);
sinon.assert.calledOnce(promiseHandlerStubs.hasReachedLimit);
sinon.assert.notCalled(promiseHandlerStubs.awaitAll);
done();
} catch (err) {
// ensures we throw if there are more calls to result;
done(err);
}
});
});

it('returns success and warns on partial success response', function (done) {
const { warn } = registerMockDiagLogger();
// returns mock success response (empty body)
const exportResponse: ExportResponse = {
data: Uint8Array.from([]),
status: 'success',
};

// transport does not need to do anything in this case.
const transportStubs = {
send: sinon.stub().returns(Promise.resolve(exportResponse)),
shutdown: sinon.stub(),
};
const mockTransport = <IExporterTransport>transportStubs;

const partialSuccessResponse: FakeSignalResponse = {
partialSuccess: { foo: 'bar' },
};

const serializerStubs = {
// simulate that the serializer returns something to send
serializeRequest: sinon.stub().returns(Uint8Array.from([1])),
// simulate that it returns a partial success (response with contents)
deserializeResponse: sinon.stub().returns(partialSuccessResponse),
};
const mockSerializer = <FakeSerializer>serializerStubs;

// mock a queue that has not yet reached capacity
const promiseHandlerStubs = {
pushPromise: sinon.stub(),
hasReachedLimit: sinon.stub().returns(false),
awaitAll: sinon.stub(),
};
const promiseHandler = <IExportPromiseHandler>promiseHandlerStubs;

const exporter = createOtlpExportDelegate(
{
promiseHandler: promiseHandler,
serializer: mockSerializer,
transport: mockTransport,
},
{
timeout: 1000,
}
);

exporter.export(internalRepresentation, result => {
try {
assert.strictEqual(result.code, ExportResultCode.SUCCESS);
assert.strictEqual(result.error, undefined);

// assert here as otherwise the promise will not have executed yet
sinon.assert.calledOnceWithMatch(
warn,
'Received Partial Success response:',
JSON.stringify(partialSuccessResponse.partialSuccess)
);
sinon.assert.calledOnce(serializerStubs.serializeRequest);
sinon.assert.calledOnce(transportStubs.send);
sinon.assert.calledOnce(promiseHandlerStubs.pushPromise);
sinon.assert.calledOnce(promiseHandlerStubs.hasReachedLimit);
sinon.assert.notCalled(promiseHandlerStubs.awaitAll);
done();
} catch (err) {
// ensures we throw if there are more calls to result;
done(err);
}
});
});

it('returns failure when send rejects', function (done) {
const transportStubs = {
// make transport reject
Expand Down
Loading

0 comments on commit d2e5cd7

Please sign in to comment.