Skip to content

Commit

Permalink
feat: dynamic header for OTLPTraceExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
bozzelliandrea committed Nov 6, 2024
1 parent fe4d368 commit 371b53f
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import { validateAndNormalizeHeaders } from '../util';

export interface OtlpHttpConfiguration extends OtlpSharedConfiguration {
url: string;
headers: Record<string, string>;
headers: Record<string, string | Function>;
}

function mergeHeaders(
userProvidedHeaders: Record<string, string> | undefined | null,
fallbackHeaders: Record<string, string> | undefined | null,
defaultHeaders: Record<string, string>
): Record<string, string> {
userProvidedHeaders: Record<string, string | Function> | undefined | null,
fallbackHeaders: Record<string, string | Function> | undefined | null,
defaultHeaders: Record<string, string | Function>
): Record<string, string | Function> {
const requiredHeaders = {
...defaultHeaders,
};
Expand Down Expand Up @@ -93,7 +93,7 @@ export function mergeOtlpHttpConfigurationWithDefaults(
}

export function getHttpConfigurationDefaults(
requiredHeaders: Record<string, string>,
requiredHeaders: Record<string, string | Function>,
signalResourcePath: string
): OtlpHttpConfiguration {
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export abstract class OTLPExporterBrowserBase<
// sendBeacon has no way to signal retry, so we do not wrap it in a RetryingTransport
this._transport = createSendBeaconTransport({
url: actualConfig.url,
blobType: actualConfig.headers['Content-Type'],
blobType: actualConfig.headers['Content-Type'] as string,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,44 @@ import {

export interface XhrRequestParameters {
url: string;
headers: Record<string, string>;
headers: Record<string, string | Function>;
}

class XhrTransport implements IExporterTransport {
constructor(private _parameters: XhrRequestParameters) {}

send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
return new Promise<ExportResponse>(resolve => {
return new Promise<ExportResponse>(async resolve => {
const xhr = new XMLHttpRequest();
xhr.timeout = timeoutMillis;
xhr.open('POST', this._parameters.url);
Object.entries(this._parameters.headers).forEach(([k, v]) => {
xhr.setRequestHeader(k, v);
});

await Promise.all(
Object.entries(this._parameters.headers).map(
async ([k, v]: [string, string | Function]) => {
if (!v) {
xhr.setRequestHeader(k, '');
return;
}

if (typeof v === 'string' || v instanceof String) {
xhr.setRequestHeader(k, v as string);
return;
}

try {
const result = v();
if (result instanceof Promise) {
xhr.setRequestHeader(k, String(await Promise.resolve(result)));
} else {
xhr.setRequestHeader(k, String(result));
}
} catch (err) {
diag.error(`Failed Header [${k}] evaluation caused by: ${err}`);
}
}
)
);

xhr.ontimeout = _ => {
resolve({
Expand Down Expand Up @@ -81,7 +105,12 @@ class XhrTransport implements IExporterTransport {
};

xhr.send(
new Blob([data], { type: this._parameters.headers['Content-Type'] })
new Blob([data], {
type:
this._parameters.headers['Content-Type'] instanceof Function
? this._parameters.headers['Content-Type']()
: this._parameters.headers['Content-Type'],
})
);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export abstract class OTLPExporterNodeBase<
constructor(
config: OTLPExporterNodeConfigBase = {},
serializer: ISerializer<ExportItem[], ServiceResponse>,
requiredHeaders: Record<string, string>,
requiredHeaders: Record<string, string | Function>,
signalIdentifier: string,
signalResourcePath: string
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export type sendWithHttp = (

export interface HttpRequestParameters {
url: string;
headers: Record<string, string>;
headers: Record<string, string | Function>;
compression: 'gzip' | 'none';
agentOptions: http.AgentOptions | https.AgentOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,46 @@ import {
parseRetryAfterToMills,
} from '../../is-export-retryable';
import { OTLPExporterError } from '../../types';
import { diag } from '@opentelemetry/api';

export async function mapHeaders(
headers: Record<string, string | Function>
): Promise<Record<string, string>> {
const entries = await Promise.all(
Object.entries(headers).map(
async ([k, v]: [string, string | Function]): Promise<
[string, string]
> => {
if (!v) {
return [k, ''];
}

if (typeof v === 'string' || v instanceof String) {
return [k, v as string];
}

const result = v();
if (result instanceof Promise) {
try {
return [k, String(await Promise.resolve(result))];
} catch (err) {
diag.error(`Failed Header [${k}] evaluation caused by: ${err}`);
}
}

return [k, String(result)];
}
)
);

return entries.reduce(
(acc: Record<string, string>, [key, value]: [string, string]) => {
acc[key] = value;
return acc;
},
{}
);
}

/**
* Sends data using http
Expand All @@ -33,24 +73,26 @@ import { OTLPExporterError } from '../../types';
* @param onDone
* @param timeoutMillis
*/
export function sendWithHttp(
export async function sendWithHttp(
params: HttpRequestParameters,
agent: http.Agent | https.Agent,
data: Uint8Array,
onDone: (response: ExportResponse) => void,
timeoutMillis: number
): void {
): Promise<void> {
const parsedUrl = new URL(params.url);
const nodeVersion = Number(process.versions.node.split('.')[0]);

const headers = await mapHeaders(params.headers);

diag.info('ALL HEADERS: ' + JSON.stringify(headers));

const options: http.RequestOptions | https.RequestOptions = {
hostname: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname,
method: 'POST',
headers: {
...params.headers,
},
headers: headers,
agent: agent,
};

Expand Down
2 changes: 1 addition & 1 deletion experimental/packages/otlp-exporter-base/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export interface ExportServiceError {
* Collector Exporter base config
*/
export interface OTLPExporterConfigBase {
headers?: Record<string, string>;
headers?: Record<string, string | Function>;
url?: string;
concurrencyLimit?: number;
/** Maximum time the OTLP exporter will wait for each batch export.
Expand Down
28 changes: 17 additions & 11 deletions experimental/packages/otlp-exporter-base/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ import { diag } from '@opentelemetry/api';
* @param partialHeaders
*/
export function validateAndNormalizeHeaders(
partialHeaders: Partial<Record<string, unknown>> = {}
): Record<string, string> {
const headers: Record<string, string> = {};
Object.entries(partialHeaders).forEach(([key, value]) => {
if (typeof value !== 'undefined') {
headers[key] = String(value);
} else {
diag.warn(
`Header "${key}" has invalid value (${value}) and will be ignored`
);
partialHeaders: Partial<Record<string, string | Function>> = {}
): Record<string, string | Function> {
const headers: Record<string, string | Function> = {};
Object.entries(partialHeaders).forEach(
([key, value]: [string, string | Function | undefined]) => {
if (typeof value !== 'undefined') {
if (value instanceof Function) {
headers[key] = value as Function;
} else {
headers[key] = String(value);
}
} else {
diag.warn(
`Header "${key}" has invalid value (${value}) and will be ignored`
);
}
}
});
);
return headers;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('parseHeaders', function () {
it('should ignore undefined headers', function () {
// Need to stub/spy on the underlying logger as the "diag" instance is global
const spyWarn = sinon.stub(diag, 'warn');
const headers: Partial<Record<string, unknown>> = {
const headers: Partial<Record<string, string | undefined | any>> = {
foo1: undefined,
foo2: 'bar',
foo3: 1,
Expand Down
42 changes: 32 additions & 10 deletions packages/opentelemetry-exporter-zipkin/src/platform/browser/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import * as zipkinTypes from '../../types';
* @param headers - headers
* send
*/
export function prepareSend(
export async function prepareSend(
urlStr: string,
headers?: Record<string, string>
): zipkinTypes.SendFn {
let xhrHeaders: Record<string, string>;
): Promise<zipkinTypes.SendFn> {
let xhrHeaders: Record<string, string | Function>;
const useBeacon = typeof navigator.sendBeacon === 'function' && !headers;
if (headers) {
xhrHeaders = {
Expand All @@ -45,7 +45,7 @@ export function prepareSend(
/**
* Send spans to the remote Zipkin service.
*/
return function send(
return async function send(
zipkinSpans: zipkinTypes.Span[],
done: (result: ExportResult) => void
) {
Expand All @@ -57,7 +57,7 @@ export function prepareSend(
if (useBeacon) {
sendWithBeacon(payload, done, urlStr);
} else {
sendWithXhr(payload, done, urlStr, xhrHeaders);
await sendWithXhr(payload, done, urlStr, xhrHeaders);
}
};
}
Expand Down Expand Up @@ -91,17 +91,39 @@ function sendWithBeacon(
* @param urlStr
* @param xhrHeaders
*/
function sendWithXhr(
async function sendWithXhr(
data: string,
done: (result: ExportResult) => void,
urlStr: string,
xhrHeaders: Record<string, string> = {}
xhrHeaders: Record<string, string | Function> = {}
) {
const xhr = new XMLHttpRequest();
xhr.open('POST', urlStr);
Object.entries(xhrHeaders).forEach(([k, v]) => {
xhr.setRequestHeader(k, v);
});

await Promise.all(
Object.entries(xhrHeaders).map(async ([k, v]) => {
if (!v) {
xhr.setRequestHeader(k, '');
return;
}

if (typeof v === 'string' || v instanceof String) {
xhr.setRequestHeader(k, v as string);
return;
}

try {
const result = v();
if (result instanceof Promise) {
xhr.setRequestHeader(k, String(await result));
} else {
xhr.setRequestHeader(k, String(result));
}
} catch (err) {
diag.error(`Failed Header [${k}] evaluation caused by: ${err}`);
}
})
);

xhr.onreadystatechange = () => {
if (xhr.readyState === XMLHttpRequest.DONE) {
Expand Down

0 comments on commit 371b53f

Please sign in to comment.