Skip to content

Commit

Permalink
fix(vercel): allow nested executions in parent trace (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Nov 26, 2024
1 parent 3c94059 commit a904428
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
rm -rf .env
echo "::group::Run server"
TELEMETRY_ENABLED=false LANGFUSE_ASYNC_INGESTION_PROCESSING=false LANGFUSE_ASYNC_CLICKHOUSE_INGESTION_PROCESSING=false LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false docker compose -f docker-compose.v3preview.yml up -d
TELEMETRY_ENABLED=false CLICKHOUSE_CLUSTER_ENABLED=false LANGFUSE_ASYNC_INGESTION_PROCESSING=false LANGFUSE_ASYNC_CLICKHOUSE_INGESTION_PROCESSING=false LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false docker compose -f docker-compose.v3preview.yml up -d
echo "::endgroup::"
# Add this step to check the health of the container
Expand Down
69 changes: 69 additions & 0 deletions integration-test/langfuse-integration-vercel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,4 +593,73 @@ describe("langfuse-integration-vercel", () => {
expect(generation.promptVersion).toBe(fetchedPrompt.version);
}
}, 10_000);

it("should nest multiple generateText call under a trace", async () => {
const langfuse = new Langfuse();
const traceName = "parent_generate_text_multiple";

// Create parent trace
const parentTraceId = randomUUID();
langfuse.trace({ id: parentTraceId, name: traceName });
const baseRootSpanName = "root-span";

const NESTED_RUN_COUNT = 3;

for (let i = 0; i < NESTED_RUN_COUNT; i++) {
const testParams = {
traceId: parentTraceId,
modelName: "gpt-3.5-turbo",
maxTokens: 50,
prompt: "Invent a new holiday and describe its traditions.",
functionId: `${baseRootSpanName}-${i}`,
userId: "some-user-id",
sessionId: "some-session-id",
metadata: {
something: "custom",
someOtherThing: "other-value",
},
tags: ["vercel", "openai"],
};

const { traceId, modelName, maxTokens, prompt, functionId, userId, sessionId, metadata, tags } = testParams;

await generateText({
model: openai(modelName),
maxTokens,
prompt,
experimental_telemetry: {
isEnabled: true,
functionId,
metadata: {
langfuseTraceId: traceId,
langfuseUpdateParent: false,
userId,
sessionId,
tags,
...metadata,
},
},
});
}

await langfuse.flushAsync();
await sdk.shutdown();

// Fetch trace
const traceFetchResult = await fetchTraceById(parentTraceId);
expect(traceFetchResult.status).toBe(200);

// Validate trace
const fetchedTrace = traceFetchResult.data;
expect(fetchedTrace.name).toBe(traceName);

const rootObservations = fetchedTrace.observations
.filter((o: any) => !Boolean(o.parentObservationId))
.sort((a: any, b: any) => a.name.localeCompare(b.name));

for (let i = 0; i < NESTED_RUN_COUNT; i++) {
const obs = rootObservations[i];
expect(obs.name).toBe(`${baseRootSpanName}-${i}`);
}
}, 15_000);
});
55 changes: 45 additions & 10 deletions langfuse-vercel/src/LangfuseExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,58 @@ export class LangfuseExporter implements SpanExporter {
const userProvidedTraceId = this.parseTraceId(spans);
const finalTraceId = userProvidedTraceId ?? traceId;
const langfusePrompt = this.parseLangfusePromptTraceAttribute(spans);
const updateParent = this.parseLangfuseUpdateParentTraceAttribute(spans);

this.langfuse.trace({
id: finalTraceId,
name: this.parseTraceName(spans) ?? rootSpan?.name,
const traceParams = {
userId: this.parseUserIdTraceAttribute(spans),
sessionId: this.parseSessionIdTraceAttribute(spans),
tags: this.parseTagsTraceAttribute(spans).length > 0 ? this.parseTagsTraceAttribute(spans) : undefined,
name: this.parseTraceName(spans) ?? rootSpan?.name,
input: this.parseInput(rootSpan),
output: this.parseOutput(rootSpan),
metadata: this.filterTraceAttributes(this.parseMetadataTraceAttribute(spans)),
});
};

const finalTraceParams = {
id: finalTraceId,
...(updateParent ? traceParams : {}),
};

this.langfuse.trace(finalTraceParams);

for (const span of spans) {
if (this.isGenerationSpan(span)) {
this.processSpanAsLangfuseGeneration(finalTraceId, span, this.isRootAiSdkSpan(span, spans), langfusePrompt);
} else {
this.processSpanAsLangfuseSpan(finalTraceId, span, this.isRootAiSdkSpan(span, spans));
this.processSpanAsLangfuseSpan(
finalTraceId,
span,
this.isRootAiSdkSpan(span, spans),
userProvidedTraceId ? this.parseTraceName(spans) : undefined
);
}
}
}

private processSpanAsLangfuseSpan(traceId: string, span: ReadableSpan, isRootSpan: boolean): void {
private processSpanAsLangfuseSpan(
traceId: string,
span: ReadableSpan,
isRootSpan: boolean,
rootSpanName?: string
): void {
const spanContext = span.spanContext();
const attributes = span.attributes;

this.langfuse.span({
traceId,
parentObservationId: isRootSpan ? undefined : span.parentSpanId,
id: spanContext.spanId,
name: "ai.toolCall.name" in attributes ? "ai.toolCall " + attributes["ai.toolCall.name"]?.toString() : span.name,
name:
isRootSpan && rootSpanName
? rootSpanName
: "ai.toolCall.name" in attributes
? "ai.toolCall " + attributes["ai.toolCall.name"]?.toString()
: span.name,
startTime: this.hrTimeToDate(span.startTime),
endTime: this.hrTimeToDate(span.endTime),

Expand Down Expand Up @@ -190,15 +212,15 @@ export class LangfuseExporter implements SpanExporter {
(acc, [key, value]) => {
const metadataPrefix = "ai.telemetry.metadata.";

if (key.startsWith(metadataPrefix) && value) {
if (key.startsWith(metadataPrefix) && value != null) {
const strippedKey = key.slice(metadataPrefix.length);

acc[strippedKey] = value;
}

const spanKeysToAdd = ["ai.settings.maxToolRoundtrips", "ai.prompt.format", "ai.toolCall.id", "ai.schema"];

if (spanKeysToAdd.includes(key) && value) {
if (spanKeysToAdd.includes(key) && value != null) {
acc[key] = value;
}

Expand Down Expand Up @@ -347,6 +369,12 @@ export class LangfuseExporter implements SpanExporter {
}
}

private parseLangfuseUpdateParentTraceAttribute(spans: ReadableSpan[]): boolean {
return Boolean(
spans.map((span) => this.parseSpanMetadata(span)["langfuseUpdateParent"]).find((val) => val != null) ?? true // default to true if no attribute is set
);
}

private parseTagsTraceAttribute(spans: ReadableSpan[]): string[] {
return [
...new Set(
Expand Down Expand Up @@ -376,7 +404,14 @@ export class LangfuseExporter implements SpanExporter {
}

private filterTraceAttributes(obj: Record<string, any>): Record<string, any> {
const langfuseTraceAttributes = ["userId", "sessionId", "tags", "langfuseTraceId", "langfusePrompt"];
const langfuseTraceAttributes = [
"userId",
"sessionId",
"tags",
"langfuseTraceId",
"langfusePrompt",
"langfuseUpdateParent",
];

return Object.entries(obj).reduce(
(acc, [key, value]) => {
Expand Down

0 comments on commit a904428

Please sign in to comment.