diff --git a/integration-test/langfuse-integration-datasets.spec.ts b/integration-test/langfuse-integration-datasets.spec.ts index 8dc8a811..f308c794 100644 --- a/integration-test/langfuse-integration-datasets.spec.ts +++ b/integration-test/langfuse-integration-datasets.spec.ts @@ -1,5 +1,10 @@ // uses the compiled node.js version, run yarn build after making changes to the SDKs import Langfuse from "../langfuse-node"; +import { createDatasetItemHandler } from "../langfuse-langchain"; +import { randomUUID } from "crypto"; +import { PromptTemplate } from "@langchain/core/prompts"; +import { StringOutputParser } from "@langchain/core/output_parsers"; +import { ChatOpenAI } from "@langchain/openai"; describe("Langfuse Node.js", () => { let langfuse: Langfuse; @@ -323,5 +328,84 @@ describe("Langfuse Node.js", () => { totalPages: 5, }); }, 10000); + + it("createDatasetItemHandler", async () => { + // Create simple Langchain chain + const prompt = new PromptTemplate({ + template: "What is the capital of {country}? Give ONLY the name of the capital.", + inputVariables: ["country"], + }); + const llm = new ChatOpenAI(); + const parser = new StringOutputParser(); + const chain = prompt.pipe(llm).pipe(parser); + + // Create a dataset + const datasetName = randomUUID().slice(0, 8); + await langfuse.createDataset(datasetName); + + // Add two items to the dataset + await Promise.all([ + langfuse.createDatasetItem({ + datasetName: datasetName, + input: "Germany", + expectedOutput: "Berlin", + }), + + langfuse.createDatasetItem({ + datasetName: datasetName, + input: "France", + expectedOutput: "Paris", + }), + ]); + + // Execute chain on dataset items + const dataset = await langfuse.getDataset(datasetName); + const runName = "test-run-" + new Date().toISOString(); + const runDescription = "test-run-description"; + const runMetadata = { test: "test" }; + const traceIds: string[] = []; + + for (const item of dataset.items) { + const { handler, trace } = await createDatasetItemHandler({ + item, + runName, + langfuseClient: langfuse, + options: { + runDescription, + runMetadata, + }, + }); + + await chain.invoke({ country: item.input }, { callbacks: [handler] }); + + trace.score({ + name: "test-score", + value: 0.5, + }); + + // Add trace id to list + traceIds.push(trace.id); + } + + await langfuse.flushAsync(); + + // Verify that the dataset item is updated with the run name + const getRun = await langfuse.getDatasetRun({ datasetName, runName }); + + expect(getRun).toMatchObject({ + name: runName, + description: "test-run-description", // from second link + metadata: { test: "test" }, // from second link + datasetId: dataset.id, + datasetRunItems: expect.arrayContaining([ + expect.objectContaining({ + traceId: traceIds[0], + }), + expect.objectContaining({ + traceId: traceIds[1], + }), + ]), + }); + }, 15000); }); }); diff --git a/langfuse-core/src/index.ts b/langfuse-core/src/index.ts index c0f35cfe..693f707f 100644 --- a/langfuse-core/src/index.ts +++ b/langfuse-core/src/index.ts @@ -46,6 +46,7 @@ import { type GetLangfuseSessionsQuery, type GetLangfuseSessionsResponse, type EventBody, + type DatasetItem, } from "./types"; import { generateUUID, @@ -1012,21 +1013,7 @@ export abstract class LangfuseCore extends LangfuseCoreStateless { description?: string; metadata?: any; projectId: string; - items: Array<{ - id: string; - input?: any; - expectedOutput?: any; - metadata?: any; - sourceObservationId?: string | null; - link: ( - obj: LangfuseObjectClient, - runName: string, - runArgs?: { - description?: string; - metadata?: any; - } - ) => Promise<{ id: string }>; - }>; + items: DatasetItem[]; }> { const dataset = await this._getDataset(name); const items: GetLangfuseDatasetItemsResponse["data"] = []; diff --git a/langfuse-core/src/types.ts b/langfuse-core/src/types.ts index 5b0155e1..642be7d1 100644 --- a/langfuse-core/src/types.ts +++ b/langfuse-core/src/types.ts @@ -1,3 +1,4 @@ +import { type LangfuseObjectClient } from "./index"; import { type LangfusePromptClient } from "./prompts/promptClients"; import { type components, type paths } from "./openapi/server"; @@ -217,3 +218,15 @@ export type DeferRuntime = { }[] ) => void; }; + +// Datasets +export type DatasetItemData = GetLangfuseDatasetItemsResponse["data"][number]; +export type LinkDatasetItem = ( + obj: LangfuseObjectClient, + runName: string, + runArgs?: { + description?: string; + metadata?: any; + } +) => Promise<{ id: string }>; +export type DatasetItem = DatasetItemData & { link: LinkDatasetItem }; diff --git a/langfuse-langchain/index.ts b/langfuse-langchain/index.ts index b79d1b1c..3e4aaebf 100644 --- a/langfuse-langchain/index.ts +++ b/langfuse-langchain/index.ts @@ -5,4 +5,5 @@ import { CallbackHandler } from "./src/callback"; export default CallbackHandler; export { CallbackHandler }; +export { createDatasetItemHandler } from "./src/createDatasetItemHandler"; export * from "./src/callback"; diff --git a/langfuse-langchain/src/callback.ts b/langfuse-langchain/src/callback.ts index 8f5965d2..9dc82511 100644 --- a/langfuse-langchain/src/callback.ts +++ b/langfuse-langchain/src/callback.ts @@ -84,6 +84,7 @@ export class CallbackHandler extends BaseCallbackHandler { this.traceId = params.root.traceId; this.rootProvided = true; this.updateRoot = params.updateRoot ?? false; + this.metadata = params.metadata; } else { this.langfuse = new Langfuse({ ...params, diff --git a/langfuse-langchain/src/createDatasetItemHandler.ts b/langfuse-langchain/src/createDatasetItemHandler.ts new file mode 100644 index 00000000..fedeec9d --- /dev/null +++ b/langfuse-langchain/src/createDatasetItemHandler.ts @@ -0,0 +1,40 @@ +import type { LangfuseCore } from "langfuse-core"; + +import { CallbackHandler } from "./callback"; + +import type { DatasetItem, LangfuseTraceClient } from "langfuse-core"; + +type CreateDatasetItemHandlerParams = { + runName: string; + item: DatasetItem; + langfuseClient: LangfuseCore; + options?: { + runDescription?: string; + runMetadata?: Record; + }; +}; + +export const createDatasetItemHandler = async ( + params: CreateDatasetItemHandlerParams +): Promise<{ handler: CallbackHandler; trace: LangfuseTraceClient }> => { + const { runName, item, langfuseClient, options } = params; + + // Snake case properties to match Python SDK + const metadata: Record = { + dataset_item_id: item.id, + dataset_id: item.datasetId, + dataset_run_name: runName, + }; + + const trace = langfuseClient.trace(); + + await item.link(trace, runName, { + description: options?.runDescription, + metadata: options?.runMetadata, + }); + + return { + handler: new CallbackHandler({ root: trace, updateRoot: true, metadata }), + trace, + }; +};