diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d6c37701..42d5efa3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,7 +89,7 @@ jobs: rm -rf .env echo "::group::Run server" - TELEMETRY_ENABLED=false LANGFUSE_ASYNC_INGESTION_PROCESSING=false LANGFUSE_ASYNC_CLICKHOUSE_INGESTION_PROCESSING=false LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false docker compose -f docker-compose.v3preview.yml up -d + TELEMETRY_ENABLED=false CLICKHOUSE_CLUSTER_ENABLED=false LANGFUSE_ASYNC_INGESTION_PROCESSING=false LANGFUSE_ASYNC_CLICKHOUSE_INGESTION_PROCESSING=false LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false docker compose -f docker-compose.v3preview.yml up -d echo "::endgroup::" # Add this step to check the health of the container diff --git a/integration-test/langfuse-integration-vercel.spec.ts b/integration-test/langfuse-integration-vercel.spec.ts index 6da5c507..2455138b 100644 --- a/integration-test/langfuse-integration-vercel.spec.ts +++ b/integration-test/langfuse-integration-vercel.spec.ts @@ -593,4 +593,73 @@ describe("langfuse-integration-vercel", () => { expect(generation.promptVersion).toBe(fetchedPrompt.version); } }, 10_000); + + it("should nest multiple generateText call under a trace", async () => { + const langfuse = new Langfuse(); + const traceName = "parent_generate_text_multiple"; + + // Create parent trace + const parentTraceId = randomUUID(); + langfuse.trace({ id: parentTraceId, name: traceName }); + const baseRootSpanName = "root-span"; + + const NESTED_RUN_COUNT = 3; + + for (let i = 0; i < NESTED_RUN_COUNT; i++) { + const testParams = { + traceId: parentTraceId, + modelName: "gpt-3.5-turbo", + maxTokens: 50, + prompt: "Invent a new holiday and describe its traditions.", + functionId: `${baseRootSpanName}-${i}`, + userId: "some-user-id", + sessionId: "some-session-id", + metadata: { + something: "custom", + someOtherThing: "other-value", + }, + tags: ["vercel", "openai"], + }; + + const { traceId, modelName, maxTokens, prompt, functionId, userId, sessionId, metadata, tags } = testParams; + + await generateText({ + model: openai(modelName), + maxTokens, + prompt, + experimental_telemetry: { + isEnabled: true, + functionId, + metadata: { + langfuseTraceId: traceId, + langfuseUpdateParent: false, + userId, + sessionId, + tags, + ...metadata, + }, + }, + }); + } + + await langfuse.flushAsync(); + await sdk.shutdown(); + + // Fetch trace + const traceFetchResult = await fetchTraceById(parentTraceId); + expect(traceFetchResult.status).toBe(200); + + // Validate trace + const fetchedTrace = traceFetchResult.data; + expect(fetchedTrace.name).toBe(traceName); + + const rootObservations = fetchedTrace.observations + .filter((o: any) => !Boolean(o.parentObservationId)) + .sort((a: any, b: any) => a.name.localeCompare(b.name)); + + for (let i = 0; i < NESTED_RUN_COUNT; i++) { + const obs = rootObservations[i]; + expect(obs.name).toBe(`${baseRootSpanName}-${i}`); + } + }, 15_000); }); diff --git a/langfuse-vercel/src/LangfuseExporter.ts b/langfuse-vercel/src/LangfuseExporter.ts index e124607c..b323c680 100644 --- a/langfuse-vercel/src/LangfuseExporter.ts +++ b/langfuse-vercel/src/LangfuseExporter.ts @@ -76,28 +76,45 @@ export class LangfuseExporter implements SpanExporter { const userProvidedTraceId = this.parseTraceId(spans); const finalTraceId = userProvidedTraceId ?? traceId; const langfusePrompt = this.parseLangfusePromptTraceAttribute(spans); + const updateParent = this.parseLangfuseUpdateParentTraceAttribute(spans); - this.langfuse.trace({ - id: finalTraceId, - name: this.parseTraceName(spans) ?? rootSpan?.name, + const traceParams = { userId: this.parseUserIdTraceAttribute(spans), sessionId: this.parseSessionIdTraceAttribute(spans), tags: this.parseTagsTraceAttribute(spans).length > 0 ? this.parseTagsTraceAttribute(spans) : undefined, + name: this.parseTraceName(spans) ?? rootSpan?.name, input: this.parseInput(rootSpan), output: this.parseOutput(rootSpan), metadata: this.filterTraceAttributes(this.parseMetadataTraceAttribute(spans)), - }); + }; + + const finalTraceParams = { + id: finalTraceId, + ...(updateParent ? traceParams : {}), + }; + + this.langfuse.trace(finalTraceParams); for (const span of spans) { if (this.isGenerationSpan(span)) { this.processSpanAsLangfuseGeneration(finalTraceId, span, this.isRootAiSdkSpan(span, spans), langfusePrompt); } else { - this.processSpanAsLangfuseSpan(finalTraceId, span, this.isRootAiSdkSpan(span, spans)); + this.processSpanAsLangfuseSpan( + finalTraceId, + span, + this.isRootAiSdkSpan(span, spans), + userProvidedTraceId ? this.parseTraceName(spans) : undefined + ); } } } - private processSpanAsLangfuseSpan(traceId: string, span: ReadableSpan, isRootSpan: boolean): void { + private processSpanAsLangfuseSpan( + traceId: string, + span: ReadableSpan, + isRootSpan: boolean, + rootSpanName?: string + ): void { const spanContext = span.spanContext(); const attributes = span.attributes; @@ -105,7 +122,12 @@ export class LangfuseExporter implements SpanExporter { traceId, parentObservationId: isRootSpan ? undefined : span.parentSpanId, id: spanContext.spanId, - name: "ai.toolCall.name" in attributes ? "ai.toolCall " + attributes["ai.toolCall.name"]?.toString() : span.name, + name: + isRootSpan && rootSpanName + ? rootSpanName + : "ai.toolCall.name" in attributes + ? "ai.toolCall " + attributes["ai.toolCall.name"]?.toString() + : span.name, startTime: this.hrTimeToDate(span.startTime), endTime: this.hrTimeToDate(span.endTime), @@ -190,7 +212,7 @@ export class LangfuseExporter implements SpanExporter { (acc, [key, value]) => { const metadataPrefix = "ai.telemetry.metadata."; - if (key.startsWith(metadataPrefix) && value) { + if (key.startsWith(metadataPrefix) && value != null) { const strippedKey = key.slice(metadataPrefix.length); acc[strippedKey] = value; @@ -198,7 +220,7 @@ export class LangfuseExporter implements SpanExporter { const spanKeysToAdd = ["ai.settings.maxToolRoundtrips", "ai.prompt.format", "ai.toolCall.id", "ai.schema"]; - if (spanKeysToAdd.includes(key) && value) { + if (spanKeysToAdd.includes(key) && value != null) { acc[key] = value; } @@ -347,6 +369,12 @@ export class LangfuseExporter implements SpanExporter { } } + private parseLangfuseUpdateParentTraceAttribute(spans: ReadableSpan[]): boolean { + return Boolean( + spans.map((span) => this.parseSpanMetadata(span)["langfuseUpdateParent"]).find((val) => val != null) ?? true // default to true if no attribute is set + ); + } + private parseTagsTraceAttribute(spans: ReadableSpan[]): string[] { return [ ...new Set( @@ -376,7 +404,14 @@ export class LangfuseExporter implements SpanExporter { } private filterTraceAttributes(obj: Record): Record { - const langfuseTraceAttributes = ["userId", "sessionId", "tags", "langfuseTraceId", "langfusePrompt"]; + const langfuseTraceAttributes = [ + "userId", + "sessionId", + "tags", + "langfuseTraceId", + "langfusePrompt", + "langfuseUpdateParent", + ]; return Object.entries(obj).reduce( (acc, [key, value]) => {