Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metrics export defaults and truncate metric labels #380

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions js/core/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Counter, Histogram, Meter, metrics } from '@opentelemetry/api';

export const METER_NAME = 'genkit';
export const METRIC_NAME_PREFIX = 'genkit';
const METRIC_DIMENSION_MAX_CHARS = 32;

export function internalMetricNamespaceWrap(...namespaces) {
return [METRIC_NAME_PREFIX, ...namespaces].join('/');
Expand Down Expand Up @@ -68,6 +69,7 @@ export class MetricCounter extends Metric<Counter> {

add(val?: number, opts?: any) {
if (val) {
truncateDimensions(opts);
this.get().add(val, opts);
}
}
Expand All @@ -87,7 +89,24 @@ export class MetricHistogram extends Metric<Histogram> {

record(val?: number, opts?: any) {
if (val) {
truncateDimensions(opts);
this.get().record(val, opts);
}
}
}

/**
* Truncates all of the metric dimensions to avoid long, high-cardinality
* dimensions being added to metrics.
*/
function truncateDimensions(opts?: any) {
if (opts) {
Object.keys(opts).forEach((k) => {
bryanatkinson marked this conversation as resolved.
Show resolved Hide resolved
// We don't want to truncate paths. They are known to be long but with
// relatively low cardinality, and are useful for downstream monitoring.
if (!k.startsWith('path') && typeof opts[k] == 'string') {
opts[k] = opts[k].substring(0, METRIC_DIMENSION_MAX_CHARS);
}
});
}
}
46 changes: 23 additions & 23 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ const flowCounter = new MetricCounter(_N('requests'), {
valueType: ValueType.INT,
});

const variantCounter = new MetricCounter(_N('variants'), {
description: 'Tracks unique flow variants per flow.',
const pathCounter = new MetricCounter(_N('path/requests'), {
description: 'Tracks unique flow paths per flow.',
valueType: ValueType.INT,
});

const variantLatencies = new MetricHistogram(_N('variants/latency'), {
description: 'Latencies per flow variant.',
const pathLatencies = new MetricHistogram(_N('path/latency'), {
description: 'Latencies per flow path.',
ValueType: ValueType.DOUBLE,
unit: 'ms',
});
Expand Down Expand Up @@ -79,25 +79,25 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {

const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (paths) {
const relevantVariants = Array.from(paths).filter((meta) =>
const relevantPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);

logger.logStructured(`Variants[/${flowName}]`, {
logger.logStructured(`Paths[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants.map((variant) => variant.path),
paths: relevantPaths.map((p) => p.path),
});

relevantVariants.forEach((variant) => {
variantCounter.add(1, {
relevantPaths.forEach((p) => {
pathCounter.add(1, {
...dimensions,
success: 'success',
variant: variant.path,
path: p.path,
});

variantLatencies.record(variant.latency, {
pathLatencies.record(p.latency, {
...dimensions,
variant: variant.path,
path: p.path,
});
});
}
Expand All @@ -121,33 +121,33 @@ export function writeFlowFailure(
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (allPaths) {
const failPath = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(allPaths).filter(
const relevantPaths = Array.from(allPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== failPath
);

logger.logStructured(`Variants[/${flowName}]`, {
logger.logStructured(`Paths[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants.map((variant) => variant.path),
paths: relevantPaths.map((p) => p.path),
});

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) => {
variantCounter.add(1, {
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
pathCounter.add(1, {
flowName: flowName,
success: 'success',
variant: variant.path,
path: p.path,
});

variantLatencies.record(variant.latency, {
pathLatencies.record(p.latency, {
...dimensions,
variant: variant.path,
path: p.path,
});
});

variantCounter.add(1, {
pathCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: failPath,
path: failPath,
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions js/plugins/google-cloud/src/gcpOpenTelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ export class GcpOpenTelemetry implements TelemetryConfig {
: new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE);
return new PeriodicExportingMetricReader({
exportIntervalMillis:
this.options?.telemetryConfig?.metricExportIntervalMillis || 10_000,
this.options?.telemetryConfig?.metricExportIntervalMillis || 60_000,
exportTimeoutMillis:
this.options?.telemetryConfig?.metricExportTimeoutMillis || 10_000,
this.options?.telemetryConfig?.metricExportTimeoutMillis || 60_000,
exporter: metricExporter,
});
}
Expand Down
68 changes: 63 additions & 5 deletions js/plugins/google-cloud/tests/metrics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
defineAction,
} from '@genkit-ai/core';
import { registerFlowStateStore } from '@genkit-ai/core/registry';
import { defineFlow, runAction, runFlow } from '@genkit-ai/flow';
import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow';
import {
GcpOpenTelemetry,
__getMetricExporterForTesting,
Expand Down Expand Up @@ -129,6 +129,23 @@ describe('GoogleCloudMetrics', () => {
assert.ok(latencyHistogram.attributes.sourceVersion);
});

it('truncates metric dimensions', async () => {
const testFlow = createFlow('anExtremelyLongFlowNameThatIsTooBig');

await runFlow(testFlow);

const requestCounter = await getCounterMetric('genkit/flow/requests');
const latencyHistogram = await getHistogramMetric('genkit/flow/latency');
assert.equal(
requestCounter.attributes.name,
'anExtremelyLongFlowNameThatIsToo'
);
assert.equal(
latencyHistogram.attributes.name,
'anExtremelyLongFlowNameThatIsToo'
);
});

it('writes action failure metrics', async () => {
const testAction = createAction('testActionWithFailure', async () => {
const nothing = null;
Expand Down Expand Up @@ -272,6 +289,40 @@ describe('GoogleCloudMetrics', () => {
assert.ok(requestCounter.attributes.sourceVersion);
});

it('writes flow paths metrics', async () => {
const flow = createFlow('pathTestFlow', async () => {
const step1Result = await run('step1', async () => {
return await run('substep_a', async () => {
return await run('substep_b', async () => 'res1');
});
});
const step2Result = await run('step2', async () => 'res2');
return step1Result + step2Result;
});

await runFlow(flow);

const expectedPaths = new Set([
'/{pathTestFlow,t:flow}/{step2,t:flowStep}',
'/{pathTestFlow,t:flow}/{step1,t:flowStep}/{substep_a,t:flowStep}/{substep_b,t:flowStep}',
]);
const pathCounterPoints = await getCounterDataPoints(
'genkit/flow/path/requests'
).then((points) =>
points.filter((point) => point.attributes.name === 'pathTestFlow')
);
const paths = new Set(
pathCounterPoints.map((point) => point.attributes.path)
);
assert.deepEqual(paths, expectedPaths);
pathCounterPoints.forEach((point) => {
assert.equal(point.value, 1);
assert.equal(point.attributes.source, 'ts');
assert.equal(point.attributes.success, 'success');
assert.ok(point.attributes.sourceVersion);
});
});

describe('Configuration', () => {
it('should export only traces', async () => {
const telemetry = new GcpOpenTelemetry({
Expand Down Expand Up @@ -315,22 +366,29 @@ describe('GoogleCloudMetrics', () => {
assert.fail(`Waiting for metric ${name} but it has not been written.`);
}

/** Finds a counter metric with the given name in the in memory exporter */
async function getCounterMetric(
/** Finds all datapoints for a counter metric with the given name in the in memory exporter */
async function getCounterDataPoints(
metricName: string
): Promise<DataPoint<Counter>> {
): Promise<List<DataPoint<Counter>>> {
const genkitMetrics = await getGenkitMetrics();
const counterMetric: SumMetricData = genkitMetrics.metrics.find(
(e) => e.descriptor.name === metricName && e.descriptor.type === 'COUNTER'
);
if (counterMetric) {
return counterMetric.dataPoints.at(-1);
return counterMetric.dataPoints;
}
assert.fail(
`No counter metric named ${metricName} was found. Only found: ${genkitMetrics.metrics.map((e) => e.descriptor.name)}`
);
}

/** Finds a counter metric with the given name in the in memory exporter */
async function getCounterMetric(
metricName: string
): Promise<DataPoint<Counter>> {
return getCounterDataPoints(metricName).then((points) => points.at(-1));
}

/** Finds a histogram metric with the given name in the in memory exporter */
async function getHistogramMetric(
metricName: string
Expand Down
Loading