diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c096fc9a..8d78cafa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,7 +89,7 @@ jobs: rm -rf .env echo "::group::Run server" - TELEMETRY_ENABLED=false LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d + TELEMETRY_ENABLED=false LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT=http://host.docker.internal:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d echo "::endgroup::" # Add this step to check the health of the container diff --git a/integration-test/langfuse-integration-fetch.spec.ts b/integration-test/langfuse-integration-fetch.spec.ts index 54d43696..5874b286 100644 --- a/integration-test/langfuse-integration-fetch.spec.ts +++ b/integration-test/langfuse-integration-fetch.spec.ts @@ -1,9 +1,10 @@ -// uses the compiled fetch version, run yarn build after making changes to the SDKs -import Langfuse from "../langfuse"; - import axios from "axios"; -import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils"; import { randomUUID } from "crypto"; +import fs from "fs"; + +// uses the compiled fetch version, run yarn build after making changes to the SDKs +import Langfuse, { LangfuseMedia } from "../langfuse"; +import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils"; describe("Langfuse (fetch)", () => { let langfuse: Langfuse; @@ -641,4 +642,68 @@ describe("Langfuse (fetch)", () => { expect(fetchedGeneration.data.output).toEqual("MASKED"); }); }); + + // TODO: enable this test once networking issue is fixed + it.skip("replace media reference string in object", async () => { + const langfuse = new Langfuse(); + const mockTraceName = "test-trace-with-audio" + Math.random().toString(36); + const mockAudioBytes = fs.readFileSync("./static/joke_prompt.wav"); // Simple mock audio bytes + + const trace = langfuse.trace({ + name: mockTraceName, + metadata: { + context: { + nested: new LangfuseMedia({ + base64DataUri: `data:audio/wav;base64,${Buffer.from(mockAudioBytes).toString("base64")}`, + }), + }, + }, + }); + + await langfuse.flushAsync(); + const res = await langfuse.fetchTrace(trace.id); + + expect(res.data).toMatchObject({ + id: trace.id, + name: mockTraceName, + metadata: { + context: { + nested: expect.stringMatching(/^@@@langfuseMedia:type=audio\/wav\|id=.+\|source=base64_data_uri@@@$/), + }, + }, + }); + + const mediaReplacedTrace = await langfuse.resolveMediaReferences({ + resolveWith: "base64DataUri", + obj: res.data, + }); + + // Check that the replaced base64 data is the same as the original + expect(mediaReplacedTrace.metadata.context.nested).toEqual( + "data:audio/wav;base64," + Buffer.from(mockAudioBytes).toString("base64") + ); + + // Double check: reference strings must be the same if data URI is reused + const trace2 = langfuse.trace({ + name: "2-" + mockTraceName, + metadata: { + context: { + nested: mediaReplacedTrace.metadata.context.nested, + }, + }, + }); + + await langfuse.flushAsync(); + + const res2 = await axios.get(`${LANGFUSE_BASEURL}/api/public/traces/${trace2.id}`, { headers: getHeaders() }); + expect(res2.data).toMatchObject({ + id: trace2.id, + name: "2-" + mockTraceName, + metadata: { + context: { + nested: res.data.metadata.context.nested, + }, + }, + }); + }, 20_000); }); diff --git a/integration-test/langfuse-integration-vercel.spec.ts b/integration-test/langfuse-integration-vercel.spec.ts index 126d9794..d0299a8d 100644 --- a/integration-test/langfuse-integration-vercel.spec.ts +++ b/integration-test/langfuse-integration-vercel.spec.ts @@ -30,7 +30,7 @@ describe("langfuse-integration-vercel", () => { beforeEach(() => { sdk = new NodeSDK({ - traceExporter: new LangfuseExporter({ debug: true }), + traceExporter: new LangfuseExporter({ debug: false }), instrumentations: [getNodeAutoInstrumentations()], }); diff --git a/integration-test/langfuse-openai.spec.ts b/integration-test/langfuse-openai.spec.ts index 0207c631..767d9491 100644 --- a/integration-test/langfuse-openai.spec.ts +++ b/integration-test/langfuse-openai.spec.ts @@ -1038,7 +1038,7 @@ describe("Langfuse-OpenAI-Integation", () => { expect(generation.output).toMatchObject(completion.choices[0].message); expect(generation.metadata).toMatchObject({ someKey: "someValue", response_format }); expect(generation.model).toBe("gpt-4o-2024-08-06"); - }, 10000); + }, 15_000); it("should work with structured output parsing with beta API", async () => { const traceId = randomUUID(); diff --git a/langfuse-core/src/index.ts b/langfuse-core/src/index.ts index 1fa9b913..53a7dea0 100644 --- a/langfuse-core/src/index.ts +++ b/langfuse-core/src/index.ts @@ -55,8 +55,9 @@ import { type SingleIngestionEvent, type UpdateLangfuseGenerationBody, type UpdateLangfuseSpanBody, + type GetMediaResponse, } from "./types"; -import { LangfuseMedia } from "./media/LangfuseMedia"; +import { LangfuseMedia, type LangfuseMediaResolveMediaReferencesParams } from "./media/LangfuseMedia"; import { currentISOTime, encodeQueryParams, @@ -69,6 +70,7 @@ import { } from "./utils"; export * from "./prompts/promptClients"; +export * from "./media/LangfuseMedia"; export { LangfuseMemoryStorage } from "./storage-memory"; export type { LangfusePromptRecord } from "./types"; export * as utils from "./utils"; @@ -393,6 +395,10 @@ abstract class LangfuseCoreStateless { ); } + protected async _fetchMedia(id: string): Promise { + return this.fetchAndLogErrors(`${this.baseUrl}/api/public/media/${id}`, this._getFetchOptions({ method: "GET" })); + } + async fetchTraces(query?: GetLangfuseTracesQuery): Promise { // destructure the response into data and meta to be explicit about the shape of the response and add type-warnings in case the API changes const { data, meta } = await this.fetchAndLogErrors( @@ -1534,6 +1540,57 @@ export abstract class LangfuseCore extends LangfuseCoreStateless { } } + public async fetchMedia(id: string): Promise { + return await this._fetchMedia(id); + } + + /** + * Replaces the media reference strings in an object with base64 data URIs for the media content. + * + * This method recursively traverses an object (up to a maximum depth of 10) looking for media reference strings + * in the format "@@@langfuseMedia:...@@@". When found, it fetches the actual media content using the provided + * Langfuse client and replaces the reference string with a base64 data URI. + * + * If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged. + * + * @param params - Configuration object + * @param params.obj - The object to process. Can be a primitive value, array, or nested object + * @param params.langfuseClient - Langfuse client instance used to fetch media content + * @param params.resolveWith - The representation of the media content to replace the media reference string with. Currently only "base64DataUri" is supported. + * @param params.maxDepth - Optional. Default is 10. The maximum depth to traverse the object. + * + * @returns A deep copy of the input object with all media references replaced with base64 data URIs where possible + * + * @example + * ```typescript + * const obj = { + * image: "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", + * nested: { + * pdf: "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" + * } + * }; + * + * const result = await LangfuseMedia.resolveMediaReferences({ + * obj, + * langfuseClient + * }); + * + * // Result: + * // { + * // image: "...", + * // nested: { + * // pdf: "data:application/pdf;base64,JVBERi0xLjcK..." + * // } + * // } + * ``` + */ + public async resolveMediaReferences( + params: Omit, "langfuseClient"> + ): Promise { + const { obj, ...rest } = params; + + return LangfuseMedia.resolveMediaReferences({ ...rest, langfuseClient: this, obj }); + } _updateSpan(body: UpdateLangfuseSpanBody): this { this.updateSpanStateless(body); return this; diff --git a/langfuse-core/src/media/LangfuseMedia.ts b/langfuse-core/src/media/LangfuseMedia.ts index 835cbbbf..ce15fa64 100644 --- a/langfuse-core/src/media/LangfuseMedia.ts +++ b/langfuse-core/src/media/LangfuseMedia.ts @@ -1,3 +1,5 @@ +import { type LangfuseCore } from "../index"; + let fs: any = null; let cryptoModule: any = null; @@ -28,6 +30,13 @@ interface ParsedMediaReference { contentType: MediaContentType; } +export type LangfuseMediaResolveMediaReferencesParams = { + obj: T; + langfuseClient: LangfuseCore; + resolveWith: "base64DataUri"; + maxDepth?: number; +}; + /** * A class for wrapping media objects for upload to Langfuse. * @@ -194,6 +203,111 @@ class LangfuseMedia { contentType: parsedData["type"] as MediaContentType, }; } + + /** + * Replaces the media reference strings in an object with base64 data URIs for the media content. + * + * This method recursively traverses an object (up to a maximum depth of 10) looking for media reference strings + * in the format "@@@langfuseMedia:...@@@". When found, it fetches the actual media content using the provided + * Langfuse client and replaces the reference string with a base64 data URI. + * + * If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged. + * + * @param params - Configuration object + * @param params.obj - The object to process. Can be a primitive value, array, or nested object + * @param params.langfuseClient - Langfuse client instance used to fetch media content + * @param params.resolveWith - The representation of the media content to replace the media reference string with. Currently only "base64DataUri" is supported. + * @param params.maxDepth - Optional. Default is 10. The maximum depth to traverse the object. + * + * @returns A deep copy of the input object with all media references replaced with base64 data URIs where possible + * + * @example + * ```typescript + * const obj = { + * image: "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", + * nested: { + * pdf: "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" + * } + * }; + * + * const result = await LangfuseMedia.resolveMediaReferences({ + * obj, + * langfuseClient + * }); + * + * // Result: + * // { + * // image: "...", + * // nested: { + * // pdf: "data:application/pdf;base64,JVBERi0xLjcK..." + * // } + * // } + * ``` + */ + public static async resolveMediaReferences(params: LangfuseMediaResolveMediaReferencesParams): Promise { + const { obj, langfuseClient, maxDepth = 10 } = params; + + async function traverse(obj: T, depth: number): Promise { + if (depth > maxDepth) { + return obj; + } + + // Handle string with potential media references + if (typeof obj === "string") { + const regex = /@@@langfuseMedia:.+?@@@/g; + const referenceStringMatches = obj.match(regex); + if (!referenceStringMatches) { + return obj; + } + + let result = obj; + const referenceStringToMediaContentMap = new Map(); + + await Promise.all( + referenceStringMatches.map(async (referenceString) => { + try { + const parsedMediaReference = LangfuseMedia.parseReferenceString(referenceString); + const mediaData = await langfuseClient.fetchMedia(parsedMediaReference.mediaId); + const mediaContent = await langfuseClient.fetch(mediaData.url, { method: "GET", headers: {} }); + if (mediaContent.status !== 200) { + throw new Error("Failed to fetch media content"); + } + + const base64MediaContent = Buffer.from(await mediaContent.arrayBuffer()).toString("base64"); + const base64DataUri = `data:${mediaData.contentType};base64,${base64MediaContent}`; + + referenceStringToMediaContentMap.set(referenceString, base64DataUri); + } catch (error) { + console.warn("Error fetching media content for reference string", referenceString, error); + // Do not replace the reference string if there's an error + } + }) + ); + + for (const [referenceString, base64MediaContent] of referenceStringToMediaContentMap.entries()) { + result = result.replaceAll(referenceString, base64MediaContent) as T & string; + } + + return result; + } + + // Handle arrays + if (Array.isArray(obj)) { + return Promise.all(obj.map(async (item) => await traverse(item, depth + 1))) as Promise; + } + + // Handle objects + if (typeof obj === "object" && obj !== null) { + return Object.fromEntries( + await Promise.all(Object.entries(obj).map(async ([key, value]) => [key, await traverse(value, depth + 1)])) + ); + } + + return obj; + } + + return traverse(obj, 0); + } } export { LangfuseMedia, type MediaContentType, type ParsedMediaReference }; diff --git a/langfuse-core/src/types.ts b/langfuse-core/src/types.ts index 56de6847..034abc32 100644 --- a/langfuse-core/src/types.ts +++ b/langfuse-core/src/types.ts @@ -52,6 +52,7 @@ export type LangfuseFetchResponse = { status: number; text: () => Promise; json: () => Promise; + arrayBuffer: () => Promise; }; export type LangfuseObject = SingleIngestionEvent["type"]; @@ -177,10 +178,13 @@ export type ChatMessage = FixTypes; export type ChatPrompt = FixTypes & { type: "chat" }; export type TextPrompt = FixTypes & { type: "text" }; +// Media export type GetMediaUploadUrlRequest = FixTypes; export type GetMediaUploadUrlResponse = FixTypes; export type MediaContentType = components["schemas"]["MediaContentType"]; export type PatchMediaBody = FixTypes; +export type GetMediaResponse = FixTypes; + type CreateTextPromptRequest = FixTypes; type CreateChatPromptRequest = FixTypes; export type CreateTextPromptBody = { type?: "text" } & Omit & { isActive?: boolean }; // isActive is optional for backward compatibility diff --git a/langfuse-core/test/langfuse.flush.spec.ts b/langfuse-core/test/langfuse.flush.spec.ts index d16519c2..95115ee8 100644 --- a/langfuse-core/test/langfuse.flush.spec.ts +++ b/langfuse-core/test/langfuse.flush.spec.ts @@ -41,6 +41,7 @@ describe("Langfuse Core", () => { status: 400, text: async () => "err", json: async () => ({ status: "err" }), + arrayBuffer: async () => new Uint8Array(), }); }); @@ -59,6 +60,7 @@ describe("Langfuse Core", () => { status: 207, text: async () => "err", json: async () => ({ successes: [], errors: [{ id: trace.id, message: "Something failed" }] }), + arrayBuffer: async () => new Uint8Array(), }); }); @@ -90,6 +92,7 @@ describe("Langfuse Core", () => { status: 207, text: async () => "err", json: async () => ({ successes: [], errors: [{ id: "someId", message: "Something failed" }] }), + arrayBuffer: async () => new Uint8Array(), }); } else { index++; @@ -97,6 +100,7 @@ describe("Langfuse Core", () => { status: 200, text: async () => "ok", json: async () => ({ successes: [], errors: [] }), + arrayBuffer: async () => new Uint8Array(), }); } }); @@ -168,6 +172,7 @@ describe("Langfuse Core", () => { status: 200, text: async () => "ok", json: async () => ({ status: "ok" }), + arrayBuffer: async () => new Uint8Array(), }); }, 500); // add delay to simulate network request }); diff --git a/langfuse-core/test/langfuse.prompts.spec.ts b/langfuse-core/test/langfuse.prompts.spec.ts index 2d3a29f3..1dd81514 100644 --- a/langfuse-core/test/langfuse.prompts.spec.ts +++ b/langfuse-core/test/langfuse.prompts.spec.ts @@ -191,6 +191,7 @@ describe("Langfuse Core", () => { status: 200, json: async () => ({ status: "200" }), text: async () => "ok", + arrayBuffer: async () => new Uint8Array(), }); }, 1000); }); diff --git a/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts b/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts index f6ddbbdc..fe509419 100644 --- a/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts +++ b/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts @@ -68,6 +68,7 @@ export const createTestClient = ( status: 200, text: () => Promise.resolve("ok"), json: () => Promise.resolve({ status: "ok" }), + arrayBuffer: () => Promise.resolve(new Uint8Array()), }) ); diff --git a/langfuse-node/src/fetch.ts b/langfuse-node/src/fetch.ts index 89953854..9571228b 100644 --- a/langfuse-node/src/fetch.ts +++ b/langfuse-node/src/fetch.ts @@ -18,5 +18,6 @@ export const fetch = async (url: string, options: LangfuseFetchOptions): Promise status: res.status, text: async () => res.data, json: async () => res.data, + arrayBuffer: async () => Buffer.from(res.data), }; }; diff --git a/langfuse/src/langfuse.ts b/langfuse/src/langfuse.ts index 47c755eb..19c5d90f 100644 --- a/langfuse/src/langfuse.ts +++ b/langfuse/src/langfuse.ts @@ -18,6 +18,7 @@ export { type LangfuseSpanClient, type LangfuseEventClient, type LangfuseGenerationClient, + LangfuseMedia, } from "langfuse-core"; export class Langfuse extends LangfuseCore {