Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(media): add utility to resolve media references #479

Merged
merged 15 commits into from
Dec 11, 2024
70 changes: 66 additions & 4 deletions integration-test/langfuse-integration-fetch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// uses the compiled fetch version, run yarn build after making changes to the SDKs
import Langfuse from "../langfuse";

import axios from "axios";
import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils";
import { randomUUID } from "crypto";
import fs from "fs";

// uses the compiled fetch version, run yarn build after making changes to the SDKs
import Langfuse, { LangfuseMedia } from "../langfuse";
import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils";

describe("Langfuse (fetch)", () => {
let langfuse: Langfuse;
Expand Down Expand Up @@ -641,4 +642,65 @@ describe("Langfuse (fetch)", () => {
expect(fetchedGeneration.data.output).toEqual("MASKED");
});
});
it("replace media reference string in object", async () => {
const mockTraceName = "test-trace-with-audio" + Math.random().toString(36);
const mockAudioBytes = fs.readFileSync("./static/joke_prompt.wav"); // Simple mock audio bytes

const trace = langfuse.trace({
name: mockTraceName,
metadata: {
context: {
nested: new LangfuseMedia({
base64DataUri: `data:audio/wav;base64,${Buffer.from(mockAudioBytes).toString("base64")}`,
}),
},
},
});

await langfuse.flushAsync();
const res = await axios.get(`${LANGFUSE_BASEURL}/api/public/traces/${trace.id}`, { headers: getHeaders() });

hassiebp marked this conversation as resolved.
Show resolved Hide resolved
expect(res.data).toMatchObject({
id: trace.id,
name: mockTraceName,
metadata: {
context: {
nested: expect.stringMatching(/^@@@langfuseMedia:type=audio\/wav\|id=.+\|source=base64_data_uri@@@$/),
},
},
});

const mediaReplacedTrace = await LangfuseMedia.resolveMediaReferences({
obj: res.data,
langfuseClient: langfuse,
});
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

// Check that the replaced base64 data is the same as the original
expect(mediaReplacedTrace.metadata.context.nested).toEqual(
`data:audio/wav;base64,${Buffer.from(mockAudioBytes).toString("base64")}`
);

// Double check: reference strings must be the same if data URI is reused
const trace2 = langfuse.trace({
name: "2-" + mockTraceName,
metadata: {
context: {
nested: mediaReplacedTrace.metadata.context.nested,
},
},
});

await langfuse.flushAsync();

const res2 = await axios.get(`${LANGFUSE_BASEURL}/api/public/traces/${trace2.id}`, { headers: getHeaders() });
expect(res2.data).toMatchObject({
id: trace2.id,
name: "2-" + mockTraceName,
metadata: {
context: {
nested: res.data.metadata.context.nested,
},
},
});
}, 20_000);
});
10 changes: 10 additions & 0 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
type SingleIngestionEvent,
type UpdateLangfuseGenerationBody,
type UpdateLangfuseSpanBody,
type GetMediaResponse,
} from "./types";
import { LangfuseMedia } from "./media/LangfuseMedia";
import {
Expand All @@ -69,6 +70,7 @@ import {
} from "./utils";

export * from "./prompts/promptClients";
export * from "./media/LangfuseMedia";
export { LangfuseMemoryStorage } from "./storage-memory";
export type { LangfusePromptRecord } from "./types";
export * as utils from "./utils";
Expand Down Expand Up @@ -393,6 +395,10 @@ abstract class LangfuseCoreStateless {
);
}

protected async _getMediaById(id: string): Promise<GetMediaResponse> {
return this.fetchAndLogErrors(`${this.baseUrl}/api/public/media/${id}`, this._getFetchOptions({ method: "GET" }));
}

async fetchTraces(query?: GetLangfuseTracesQuery): Promise<GetLangfuseTracesResponse> {
// destructure the response into data and meta to be explicit about the shape of the response and add type-warnings in case the API changes
const { data, meta } = await this.fetchAndLogErrors<GetLangfuseTracesResponse>(
Expand Down Expand Up @@ -1534,6 +1540,10 @@ export abstract class LangfuseCore extends LangfuseCoreStateless {
}
}

public async getMediaById(id: string): Promise<GetMediaResponse> {
return await this._getMediaById(id);
}

_updateSpan(body: UpdateLangfuseSpanBody): this {
this.updateSpanStateless(body);
return this;
Expand Down
107 changes: 107 additions & 0 deletions langfuse-core/src/media/LangfuseMedia.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { type LangfuseCore } from "../index";

let fs: any = null;
let cryptoModule: any = null;

Expand Down Expand Up @@ -194,6 +196,111 @@ class LangfuseMedia {
contentType: parsedData["type"] as MediaContentType,
};
}

/**
* Replaces the media reference strings in an object with base64 data URIs for the media content.
*
* This method recursively traverses an object (up to a maximum depth of 10) looking for media reference strings
* in the format "@@@langfuseMedia:...@@@". When found, it fetches the actual media content using the provided
* Langfuse client and replaces the reference string with a base64 data URI.
*
* If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
*
* @param params - Configuration object
* @param params.obj - The object to process. Can be a primitive value, array, or nested object
* @param params.langfuseClient - Langfuse client instance used to fetch media content
* @param params.resolveWith - Optional. Default is "base64DataUri". The type of data to replace the media reference string with. Currently only "base64DataUri" is supported.
*
* @returns A deep copy of the input object with all media references replaced with base64 data URIs where possible
*
* @example
* ```typescript
* const obj = {
* image: "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
* nested: {
* pdf: "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
* }
* };
*
* const result = await LangfuseMedia.resolveMediaReferences({
* obj,
* langfuseClient
* });
*
* // Result:
* // {
* // image: "...",
* // nested: {
* // pdf: "data:application/pdf;base64,JVBERi0xLjcK..."
* // }
* // }
* ```
*/
public static async resolveMediaReferences<T>(params: {
obj: T;
langfuseClient: LangfuseCore;
resolveWith?: "base64DataUri";
}): Promise<T> {
const { obj, langfuseClient } = params;
const MAX_DEPTH = 10;
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

async function traverse<T>(obj: T, depth: number): Promise<T> {
if (depth > MAX_DEPTH) {
return obj;
}

// Handle string with potential media references
if (typeof obj === "string") {
const regex = /@@@langfuseMedia:.+@@@/g;
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
const referenceStringMatches = obj.match(regex);
if (!referenceStringMatches) {
return obj;
}

let result = obj;
const referenceStringToMediaContentMap = new Map<string, string>();

await Promise.all(
referenceStringMatches.map(async (referenceString) => {
try {
const parsedMediaReference = LangfuseMedia.parseReferenceString(referenceString);
const mediaData = await langfuseClient.getMediaById(parsedMediaReference.mediaId);
const mediaContent = await langfuseClient.fetch(mediaData.url, { method: "GET", headers: {} });
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
const base64MediaContent = Buffer.from(await mediaContent.arrayBuffer()).toString("base64");
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
const base64DataUri = `data:${mediaData.contentType};base64,${base64MediaContent}`;

referenceStringToMediaContentMap.set(referenceString, base64DataUri);
} catch (error) {
console.warn("Error fetching media content for reference string", referenceString, error);
// Do not replace the reference string if there's an error
}
})
);

for (const [referenceString, base64MediaContent] of referenceStringToMediaContentMap.entries()) {
result = result.replace(referenceString, base64MediaContent) as T & string;
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
}

return result;
}

// Handle arrays
if (Array.isArray(obj)) {
return Promise.all(obj.map(async (item) => await traverse(item, depth + 1))) as Promise<T>;
}

// Handle objects
if (typeof obj === "object" && obj !== null) {
return Object.fromEntries(
await Promise.all(Object.entries(obj).map(async ([key, value]) => [key, await traverse(value, depth + 1)]))
);
}

return obj;
}

return traverse(obj, 0);
}
}

export { LangfuseMedia, type MediaContentType, type ParsedMediaReference };
4 changes: 4 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export type LangfuseFetchResponse<T = any> = {
status: number;
text: () => Promise<string>;
json: () => Promise<T>;
arrayBuffer: () => Promise<ArrayBuffer>;
};

export type LangfuseObject = SingleIngestionEvent["type"];
Expand Down Expand Up @@ -177,10 +178,13 @@ export type ChatMessage = FixTypes<components["schemas"]["ChatMessage"]>;
export type ChatPrompt = FixTypes<components["schemas"]["ChatPrompt"]> & { type: "chat" };
export type TextPrompt = FixTypes<components["schemas"]["TextPrompt"]> & { type: "text" };

// Media
export type GetMediaUploadUrlRequest = FixTypes<components["schemas"]["GetMediaUploadUrlRequest"]>;
export type GetMediaUploadUrlResponse = FixTypes<components["schemas"]["GetMediaUploadUrlResponse"]>;
export type MediaContentType = components["schemas"]["MediaContentType"];
export type PatchMediaBody = FixTypes<components["schemas"]["PatchMediaBody"]>;
export type GetMediaResponse = FixTypes<components["schemas"]["GetMediaResponse"]>;

type CreateTextPromptRequest = FixTypes<components["schemas"]["CreateTextPromptRequest"]>;
type CreateChatPromptRequest = FixTypes<components["schemas"]["CreateChatPromptRequest"]>;
export type CreateTextPromptBody = { type?: "text" } & Omit<CreateTextPromptRequest, "type"> & { isActive?: boolean }; // isActive is optional for backward compatibility
Expand Down
1 change: 1 addition & 0 deletions langfuse-core/test/test-utils/LangfuseCoreTestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export const createTestClient = (
status: 200,
text: () => Promise.resolve("ok"),
json: () => Promise.resolve({ status: "ok" }),
arrayBuffer: () => Promise.resolve(new Uint8Array()),
})
);

Expand Down
1 change: 1 addition & 0 deletions langfuse/src/langfuse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export {
type LangfuseSpanClient,
type LangfuseEventClient,
type LangfuseGenerationClient,
LangfuseMedia,
} from "langfuse-core";

export class Langfuse extends LangfuseCore {
Expand Down
Loading