From 058d3a19776e2d49d60da90b76cad87c91368fa6 Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Mon, 10 Jun 2024 20:19:38 +0000 Subject: [PATCH 1/3] Update metrics export to default to only once per minute instead of every 10s, and truncate any long metric dimensions. --- js/core/src/metrics.ts | 17 +++++++++++++++++ js/plugins/google-cloud/src/gcpOpenTelemetry.ts | 4 ++-- js/plugins/google-cloud/tests/metrics_test.ts | 11 +++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/js/core/src/metrics.ts b/js/core/src/metrics.ts index 779640957..b8422668d 100644 --- a/js/core/src/metrics.ts +++ b/js/core/src/metrics.ts @@ -18,6 +18,7 @@ import { Counter, Histogram, Meter, metrics } from '@opentelemetry/api'; export const METER_NAME = 'genkit'; export const METRIC_NAME_PREFIX = 'genkit'; +const METRIC_DIMENSION_MAX_CHARS = 32; export function internalMetricNamespaceWrap(...namespaces) { return [METRIC_NAME_PREFIX, ...namespaces].join('/'); @@ -68,6 +69,7 @@ export class MetricCounter extends Metric { add(val?: number, opts?: any) { if (val) { + truncateDimensions(opts); this.get().add(val, opts); } } @@ -87,7 +89,22 @@ export class MetricHistogram extends Metric { record(val?: number, opts?: any) { if (val) { + truncateDimensions(opts); this.get().record(val, opts); } } } + +/** + * Truncates all of the metric dimensions to avoid long, high-cardinality + * dimensions being added to metrics. + */ +function truncateDimensions(opts?: any) { + if (opts) { + Object.keys(opts).forEach(k => { + if (typeof opts[k] == 'string') { + opts[k] = opts[k].substring(0, METRIC_DIMENSION_MAX_CHARS); + } + }); + } +} diff --git a/js/plugins/google-cloud/src/gcpOpenTelemetry.ts b/js/plugins/google-cloud/src/gcpOpenTelemetry.ts index a853a51cb..43ec46dbf 100644 --- a/js/plugins/google-cloud/src/gcpOpenTelemetry.ts +++ b/js/plugins/google-cloud/src/gcpOpenTelemetry.ts @@ -150,9 +150,9 @@ export class GcpOpenTelemetry implements TelemetryConfig { : new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE); return new PeriodicExportingMetricReader({ exportIntervalMillis: - this.options?.telemetryConfig?.metricExportIntervalMillis || 10_000, + this.options?.telemetryConfig?.metricExportIntervalMillis || 60_000, exportTimeoutMillis: - this.options?.telemetryConfig?.metricExportTimeoutMillis || 10_000, + this.options?.telemetryConfig?.metricExportTimeoutMillis || 60_000, exporter: metricExporter, }); } diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index ffeabd8b0..36dbf6338 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -129,6 +129,17 @@ describe('GoogleCloudMetrics', () => { assert.ok(latencyHistogram.attributes.sourceVersion); }); + it('truncates metric dimensions', async () => { + const testFlow = createFlow('anExtremelyLongFlowNameThatIsTooBig'); + + await runFlow(testFlow); + + const requestCounter = await getCounterMetric('genkit/flow/requests'); + const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); + assert.equal(requestCounter.attributes.name, 'anExtremelyLongFlowNameThatIsToo'); + assert.equal(latencyHistogram.attributes.name, 'anExtremelyLongFlowNameThatIsToo'); + }); + it('writes action failure metrics', async () => { const testAction = createAction('testActionWithFailure', async () => { const nothing = null; From 1c2ced3941d37725a69b0143c8afdb6852602a04 Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Mon, 10 Jun 2024 20:27:51 +0000 Subject: [PATCH 2/3] Run format --- js/core/src/metrics.ts | 2 +- js/plugins/google-cloud/tests/metrics_test.ts | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/js/core/src/metrics.ts b/js/core/src/metrics.ts index b8422668d..1d5148f71 100644 --- a/js/core/src/metrics.ts +++ b/js/core/src/metrics.ts @@ -101,7 +101,7 @@ export class MetricHistogram extends Metric { */ function truncateDimensions(opts?: any) { if (opts) { - Object.keys(opts).forEach(k => { + Object.keys(opts).forEach((k) => { if (typeof opts[k] == 'string') { opts[k] = opts[k].substring(0, METRIC_DIMENSION_MAX_CHARS); } diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index 36dbf6338..17e3a6f23 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -136,8 +136,14 @@ describe('GoogleCloudMetrics', () => { const requestCounter = await getCounterMetric('genkit/flow/requests'); const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); - assert.equal(requestCounter.attributes.name, 'anExtremelyLongFlowNameThatIsToo'); - assert.equal(latencyHistogram.attributes.name, 'anExtremelyLongFlowNameThatIsToo'); + assert.equal( + requestCounter.attributes.name, + 'anExtremelyLongFlowNameThatIsToo' + ); + assert.equal( + latencyHistogram.attributes.name, + 'anExtremelyLongFlowNameThatIsToo' + ); }); it('writes action failure metrics', async () => { From f4f8796c36ea835db2071cdbeccf4f924a06fac5 Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Tue, 11 Jun 2024 18:13:12 +0000 Subject: [PATCH 3/3] Omit path dimensions from truncated metric labels and ensure consistent naming of paths. --- js/core/src/metrics.ts | 4 +- js/flow/src/telemetry.ts | 46 ++++++++--------- js/plugins/google-cloud/tests/metrics_test.ts | 51 +++++++++++++++++-- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/js/core/src/metrics.ts b/js/core/src/metrics.ts index 1d5148f71..1398eb183 100644 --- a/js/core/src/metrics.ts +++ b/js/core/src/metrics.ts @@ -102,7 +102,9 @@ export class MetricHistogram extends Metric { function truncateDimensions(opts?: any) { if (opts) { Object.keys(opts).forEach((k) => { - if (typeof opts[k] == 'string') { + // We don't want to truncate paths. They are known to be long but with + // relatively low cardinality, and are useful for downstream monitoring. + if (!k.startsWith('path') && typeof opts[k] == 'string') { opts[k] = opts[k].substring(0, METRIC_DIMENSION_MAX_CHARS); } }); diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts index 975977983..f44d47cc1 100644 --- a/js/flow/src/telemetry.ts +++ b/js/flow/src/telemetry.ts @@ -39,13 +39,13 @@ const flowCounter = new MetricCounter(_N('requests'), { valueType: ValueType.INT, }); -const variantCounter = new MetricCounter(_N('variants'), { - description: 'Tracks unique flow variants per flow.', +const pathCounter = new MetricCounter(_N('path/requests'), { + description: 'Tracks unique flow paths per flow.', valueType: ValueType.INT, }); -const variantLatencies = new MetricHistogram(_N('variants/latency'), { - description: 'Latencies per flow variant.', +const pathLatencies = new MetricHistogram(_N('path/latency'), { + description: 'Latencies per flow path.', ValueType: ValueType.DOUBLE, unit: 'ms', }); @@ -79,25 +79,25 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) { const paths = traceMetadataAls.getStore()?.paths || new Set(); if (paths) { - const relevantVariants = Array.from(paths).filter((meta) => + const relevantPaths = Array.from(paths).filter((meta) => meta.path.includes(flowName) ); - logger.logStructured(`Variants[/${flowName}]`, { + logger.logStructured(`Paths[/${flowName}]`, { flowName: flowName, - variants: relevantVariants.map((variant) => variant.path), + paths: relevantPaths.map((p) => p.path), }); - relevantVariants.forEach((variant) => { - variantCounter.add(1, { + relevantPaths.forEach((p) => { + pathCounter.add(1, { ...dimensions, success: 'success', - variant: variant.path, + path: p.path, }); - variantLatencies.record(variant.latency, { + pathLatencies.record(p.latency, { ...dimensions, - variant: variant.path, + path: p.path, }); }); } @@ -121,33 +121,33 @@ export function writeFlowFailure( traceMetadataAls.getStore()?.paths || new Set(); if (allPaths) { const failPath = spanMetadataAls?.getStore()?.path; - const relevantVariants = Array.from(allPaths).filter( + const relevantPaths = Array.from(allPaths).filter( (meta) => meta.path.includes(flowName) && meta.path !== failPath ); - logger.logStructured(`Variants[/${flowName}]`, { + logger.logStructured(`Paths[/${flowName}]`, { flowName: flowName, - variants: relevantVariants.map((variant) => variant.path), + paths: relevantPaths.map((p) => p.path), }); - // All variants that have succeeded need to be tracked as succeeded. - relevantVariants.forEach((variant) => { - variantCounter.add(1, { + // All paths that have succeeded need to be tracked as succeeded. + relevantPaths.forEach((p) => { + pathCounter.add(1, { flowName: flowName, success: 'success', - variant: variant.path, + path: p.path, }); - variantLatencies.record(variant.latency, { + pathLatencies.record(p.latency, { ...dimensions, - variant: variant.path, + path: p.path, }); }); - variantCounter.add(1, { + pathCounter.add(1, { flowName: flowName, success: 'failure', - variant: failPath, + path: failPath, }); } } diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index 17e3a6f23..cdaab67a8 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -25,7 +25,7 @@ import { defineAction, } from '@genkit-ai/core'; import { registerFlowStateStore } from '@genkit-ai/core/registry'; -import { defineFlow, runAction, runFlow } from '@genkit-ai/flow'; +import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow'; import { GcpOpenTelemetry, __getMetricExporterForTesting, @@ -289,6 +289,40 @@ describe('GoogleCloudMetrics', () => { assert.ok(requestCounter.attributes.sourceVersion); }); + it('writes flow paths metrics', async () => { + const flow = createFlow('pathTestFlow', async () => { + const step1Result = await run('step1', async () => { + return await run('substep_a', async () => { + return await run('substep_b', async () => 'res1'); + }); + }); + const step2Result = await run('step2', async () => 'res2'); + return step1Result + step2Result; + }); + + await runFlow(flow); + + const expectedPaths = new Set([ + '/{pathTestFlow,t:flow}/{step2,t:flowStep}', + '/{pathTestFlow,t:flow}/{step1,t:flowStep}/{substep_a,t:flowStep}/{substep_b,t:flowStep}', + ]); + const pathCounterPoints = await getCounterDataPoints( + 'genkit/flow/path/requests' + ).then((points) => + points.filter((point) => point.attributes.name === 'pathTestFlow') + ); + const paths = new Set( + pathCounterPoints.map((point) => point.attributes.path) + ); + assert.deepEqual(paths, expectedPaths); + pathCounterPoints.forEach((point) => { + assert.equal(point.value, 1); + assert.equal(point.attributes.source, 'ts'); + assert.equal(point.attributes.success, 'success'); + assert.ok(point.attributes.sourceVersion); + }); + }); + describe('Configuration', () => { it('should export only traces', async () => { const telemetry = new GcpOpenTelemetry({ @@ -332,22 +366,29 @@ describe('GoogleCloudMetrics', () => { assert.fail(`Waiting for metric ${name} but it has not been written.`); } - /** Finds a counter metric with the given name in the in memory exporter */ - async function getCounterMetric( + /** Finds all datapoints for a counter metric with the given name in the in memory exporter */ + async function getCounterDataPoints( metricName: string - ): Promise> { + ): Promise>> { const genkitMetrics = await getGenkitMetrics(); const counterMetric: SumMetricData = genkitMetrics.metrics.find( (e) => e.descriptor.name === metricName && e.descriptor.type === 'COUNTER' ); if (counterMetric) { - return counterMetric.dataPoints.at(-1); + return counterMetric.dataPoints; } assert.fail( `No counter metric named ${metricName} was found. Only found: ${genkitMetrics.metrics.map((e) => e.descriptor.name)}` ); } + /** Finds a counter metric with the given name in the in memory exporter */ + async function getCounterMetric( + metricName: string + ): Promise> { + return getCounterDataPoints(metricName).then((points) => points.at(-1)); + } + /** Finds a histogram metric with the given name in the in memory exporter */ async function getHistogramMetric( metricName: string