Skip to content

Commit

Permalink
feat(datasets): add langchain handler convenience wrapper (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Oct 2, 2024
1 parent 79959d5 commit 2c4bcec
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 15 deletions.
84 changes: 84 additions & 0 deletions integration-test/langfuse-integration-datasets.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// uses the compiled node.js version, run yarn build after making changes to the SDKs
import Langfuse from "../langfuse-node";
import { createDatasetItemHandler } from "../langfuse-langchain";
import { randomUUID } from "crypto";
import { PromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatOpenAI } from "@langchain/openai";

describe("Langfuse Node.js", () => {
let langfuse: Langfuse;
Expand Down Expand Up @@ -323,5 +328,84 @@ describe("Langfuse Node.js", () => {
totalPages: 5,
});
}, 10000);

it("createDatasetItemHandler", async () => {
// Create simple Langchain chain
const prompt = new PromptTemplate({
template: "What is the capital of {country}? Give ONLY the name of the capital.",
inputVariables: ["country"],
});
const llm = new ChatOpenAI();
const parser = new StringOutputParser();
const chain = prompt.pipe(llm).pipe(parser);

// Create a dataset
const datasetName = randomUUID().slice(0, 8);
await langfuse.createDataset(datasetName);

// Add two items to the dataset
await Promise.all([
langfuse.createDatasetItem({
datasetName: datasetName,
input: "Germany",
expectedOutput: "Berlin",
}),

langfuse.createDatasetItem({
datasetName: datasetName,
input: "France",
expectedOutput: "Paris",
}),
]);

// Execute chain on dataset items
const dataset = await langfuse.getDataset(datasetName);
const runName = "test-run-" + new Date().toISOString();
const runDescription = "test-run-description";
const runMetadata = { test: "test" };
const traceIds: string[] = [];

for (const item of dataset.items) {
const { handler, trace } = await createDatasetItemHandler({
item,
runName,
langfuseClient: langfuse,
options: {
runDescription,
runMetadata,
},
});

await chain.invoke({ country: item.input }, { callbacks: [handler] });

trace.score({
name: "test-score",
value: 0.5,
});

// Add trace id to list
traceIds.push(trace.id);
}

await langfuse.flushAsync();

// Verify that the dataset item is updated with the run name
const getRun = await langfuse.getDatasetRun({ datasetName, runName });

expect(getRun).toMatchObject({
name: runName,
description: "test-run-description", // from second link
metadata: { test: "test" }, // from second link
datasetId: dataset.id,
datasetRunItems: expect.arrayContaining([
expect.objectContaining({
traceId: traceIds[0],
}),
expect.objectContaining({
traceId: traceIds[1],
}),
]),
});
}, 15000);
});
});
17 changes: 2 additions & 15 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
type GetLangfuseSessionsQuery,
type GetLangfuseSessionsResponse,
type EventBody,
type DatasetItem,
} from "./types";
import {
generateUUID,
Expand Down Expand Up @@ -1012,21 +1013,7 @@ export abstract class LangfuseCore extends LangfuseCoreStateless {
description?: string;
metadata?: any;
projectId: string;
items: Array<{
id: string;
input?: any;
expectedOutput?: any;
metadata?: any;
sourceObservationId?: string | null;
link: (
obj: LangfuseObjectClient,
runName: string,
runArgs?: {
description?: string;
metadata?: any;
}
) => Promise<{ id: string }>;
}>;
items: DatasetItem[];
}> {
const dataset = await this._getDataset(name);
const items: GetLangfuseDatasetItemsResponse["data"] = [];
Expand Down
13 changes: 13 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type LangfuseObjectClient } from "./index";
import { type LangfusePromptClient } from "./prompts/promptClients";
import { type components, type paths } from "./openapi/server";

Expand Down Expand Up @@ -217,3 +218,15 @@ export type DeferRuntime = {
}[]
) => void;
};

// Datasets
export type DatasetItemData = GetLangfuseDatasetItemsResponse["data"][number];
export type LinkDatasetItem = (
obj: LangfuseObjectClient,
runName: string,
runArgs?: {
description?: string;
metadata?: any;
}
) => Promise<{ id: string }>;
export type DatasetItem = DatasetItemData & { link: LinkDatasetItem };
1 change: 1 addition & 0 deletions langfuse-langchain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import { CallbackHandler } from "./src/callback";
export default CallbackHandler;

export { CallbackHandler };
export { createDatasetItemHandler } from "./src/createDatasetItemHandler";
export * from "./src/callback";
1 change: 1 addition & 0 deletions langfuse-langchain/src/callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class CallbackHandler extends BaseCallbackHandler {
this.traceId = params.root.traceId;
this.rootProvided = true;
this.updateRoot = params.updateRoot ?? false;
this.metadata = params.metadata;
} else {
this.langfuse = new Langfuse({
...params,
Expand Down
40 changes: 40 additions & 0 deletions langfuse-langchain/src/createDatasetItemHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { LangfuseCore } from "langfuse-core";

import { CallbackHandler } from "./callback";

import type { DatasetItem, LangfuseTraceClient } from "langfuse-core";

type CreateDatasetItemHandlerParams = {
runName: string;
item: DatasetItem;
langfuseClient: LangfuseCore;
options?: {
runDescription?: string;
runMetadata?: Record<string, any>;
};
};

export const createDatasetItemHandler = async (
params: CreateDatasetItemHandlerParams
): Promise<{ handler: CallbackHandler; trace: LangfuseTraceClient }> => {
const { runName, item, langfuseClient, options } = params;

// Snake case properties to match Python SDK
const metadata: Record<string, string> = {
dataset_item_id: item.id,
dataset_id: item.datasetId,
dataset_run_name: runName,
};

const trace = langfuseClient.trace();

await item.link(trace, runName, {
description: options?.runDescription,
metadata: options?.runMetadata,
});

return {
handler: new CallbackHandler({ root: trace, updateRoot: true, metadata }),
trace,
};
};

0 comments on commit 2c4bcec

Please sign in to comment.