Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(media): add utility to resolve media references #479

Merged
merged 15 commits into from
Dec 11, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
rm -rf .env

echo "::group::Run server"
TELEMETRY_ENABLED=false LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
TELEMETRY_ENABLED=false LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT=http://host.docker.internal:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
echo "::endgroup::"

# Add this step to check the health of the container
Expand Down
73 changes: 69 additions & 4 deletions integration-test/langfuse-integration-fetch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// uses the compiled fetch version, run yarn build after making changes to the SDKs
import Langfuse from "../langfuse";

import axios from "axios";
import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils";
import { randomUUID } from "crypto";
import fs from "fs";

// uses the compiled fetch version, run yarn build after making changes to the SDKs
import Langfuse, { LangfuseMedia } from "../langfuse";
import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils";

describe("Langfuse (fetch)", () => {
let langfuse: Langfuse;
Expand Down Expand Up @@ -641,4 +642,68 @@ describe("Langfuse (fetch)", () => {
expect(fetchedGeneration.data.output).toEqual("MASKED");
});
});

// TODO: enable this test once networking issue is fixed
it.skip("replace media reference string in object", async () => {
const langfuse = new Langfuse();
const mockTraceName = "test-trace-with-audio" + Math.random().toString(36);
const mockAudioBytes = fs.readFileSync("./static/joke_prompt.wav"); // Simple mock audio bytes

const trace = langfuse.trace({
name: mockTraceName,
metadata: {
context: {
nested: new LangfuseMedia({
base64DataUri: `data:audio/wav;base64,${Buffer.from(mockAudioBytes).toString("base64")}`,
}),
},
},
});

await langfuse.flushAsync();
const res = await langfuse.fetchTrace(trace.id);

expect(res.data).toMatchObject({
id: trace.id,
name: mockTraceName,
metadata: {
context: {
nested: expect.stringMatching(/^@@@langfuseMedia:type=audio\/wav\|id=.+\|source=base64_data_uri@@@$/),
},
},
});

const mediaReplacedTrace = await langfuse.resolveMediaReferences({
resolveWith: "base64DataUri",
obj: res.data,
});

// Check that the replaced base64 data is the same as the original
expect(mediaReplacedTrace.metadata.context.nested).toEqual(
"data:audio/wav;base64," + Buffer.from(mockAudioBytes).toString("base64")
);

// Double check: reference strings must be the same if data URI is reused
const trace2 = langfuse.trace({
name: "2-" + mockTraceName,
metadata: {
context: {
nested: mediaReplacedTrace.metadata.context.nested,
},
},
});

await langfuse.flushAsync();

const res2 = await axios.get(`${LANGFUSE_BASEURL}/api/public/traces/${trace2.id}`, { headers: getHeaders() });
expect(res2.data).toMatchObject({
id: trace2.id,
name: "2-" + mockTraceName,
metadata: {
context: {
nested: res.data.metadata.context.nested,
},
},
});
}, 20_000);
});
2 changes: 1 addition & 1 deletion integration-test/langfuse-integration-vercel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe("langfuse-integration-vercel", () => {

beforeEach(() => {
sdk = new NodeSDK({
traceExporter: new LangfuseExporter({ debug: true }),
traceExporter: new LangfuseExporter({ debug: false }),
instrumentations: [getNodeAutoInstrumentations()],
});

Expand Down
2 changes: 1 addition & 1 deletion integration-test/langfuse-openai.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ describe("Langfuse-OpenAI-Integation", () => {
expect(generation.output).toMatchObject(completion.choices[0].message);
expect(generation.metadata).toMatchObject({ someKey: "someValue", response_format });
expect(generation.model).toBe("gpt-4o-2024-08-06");
}, 10000);
}, 15_000);

it("should work with structured output parsing with beta API", async () => {
const traceId = randomUUID();
Expand Down
59 changes: 58 additions & 1 deletion langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ import {
type SingleIngestionEvent,
type UpdateLangfuseGenerationBody,
type UpdateLangfuseSpanBody,
type GetMediaResponse,
} from "./types";
import { LangfuseMedia } from "./media/LangfuseMedia";
import { LangfuseMedia, type LangfuseMediaResolveMediaReferencesParams } from "./media/LangfuseMedia";
import {
currentISOTime,
encodeQueryParams,
Expand All @@ -69,6 +70,7 @@ import {
} from "./utils";

export * from "./prompts/promptClients";
export * from "./media/LangfuseMedia";
export { LangfuseMemoryStorage } from "./storage-memory";
export type { LangfusePromptRecord } from "./types";
export * as utils from "./utils";
Expand Down Expand Up @@ -393,6 +395,10 @@ abstract class LangfuseCoreStateless {
);
}

protected async _fetchMedia(id: string): Promise<GetMediaResponse> {
return this.fetchAndLogErrors(`${this.baseUrl}/api/public/media/${id}`, this._getFetchOptions({ method: "GET" }));
}

async fetchTraces(query?: GetLangfuseTracesQuery): Promise<GetLangfuseTracesResponse> {
// destructure the response into data and meta to be explicit about the shape of the response and add type-warnings in case the API changes
const { data, meta } = await this.fetchAndLogErrors<GetLangfuseTracesResponse>(
Expand Down Expand Up @@ -1534,6 +1540,57 @@ export abstract class LangfuseCore extends LangfuseCoreStateless {
}
}

public async fetchMedia(id: string): Promise<GetMediaResponse> {
return await this._fetchMedia(id);
}

/**
* Replaces the media reference strings in an object with base64 data URIs for the media content.
*
* This method recursively traverses an object (up to a maximum depth of 10) looking for media reference strings
* in the format "@@@langfuseMedia:...@@@". When found, it fetches the actual media content using the provided
* Langfuse client and replaces the reference string with a base64 data URI.
*
* If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
*
* @param params - Configuration object
* @param params.obj - The object to process. Can be a primitive value, array, or nested object
* @param params.langfuseClient - Langfuse client instance used to fetch media content
* @param params.resolveWith - The representation of the media content to replace the media reference string with. Currently only "base64DataUri" is supported.
* @param params.maxDepth - Optional. Default is 10. The maximum depth to traverse the object.
*
* @returns A deep copy of the input object with all media references replaced with base64 data URIs where possible
*
* @example
* ```typescript
* const obj = {
* image: "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
* nested: {
* pdf: "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
* }
* };
*
* const result = await LangfuseMedia.resolveMediaReferences({
* obj,
* langfuseClient
* });
*
* // Result:
* // {
* // image: "...",
* // nested: {
* // pdf: "data:application/pdf;base64,JVBERi0xLjcK..."
* // }
* // }
* ```
*/
public async resolveMediaReferences<T>(
params: Omit<LangfuseMediaResolveMediaReferencesParams<T>, "langfuseClient">
): Promise<T> {
const { obj, ...rest } = params;

return LangfuseMedia.resolveMediaReferences<T>({ ...rest, langfuseClient: this, obj });
}
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
_updateSpan(body: UpdateLangfuseSpanBody): this {
this.updateSpanStateless(body);
return this;
Expand Down
114 changes: 114 additions & 0 deletions langfuse-core/src/media/LangfuseMedia.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { type LangfuseCore } from "../index";

let fs: any = null;
let cryptoModule: any = null;

Expand Down Expand Up @@ -28,6 +30,13 @@ interface ParsedMediaReference {
contentType: MediaContentType;
}

export type LangfuseMediaResolveMediaReferencesParams<T> = {
obj: T;
langfuseClient: LangfuseCore;
resolveWith: "base64DataUri";
maxDepth?: number;
};

/**
* A class for wrapping media objects for upload to Langfuse.
*
Expand Down Expand Up @@ -194,6 +203,111 @@ class LangfuseMedia {
contentType: parsedData["type"] as MediaContentType,
};
}

/**
* Replaces the media reference strings in an object with base64 data URIs for the media content.
*
* This method recursively traverses an object (up to a maximum depth of 10) looking for media reference strings
* in the format "@@@langfuseMedia:...@@@". When found, it fetches the actual media content using the provided
* Langfuse client and replaces the reference string with a base64 data URI.
*
* If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
*
* @param params - Configuration object
* @param params.obj - The object to process. Can be a primitive value, array, or nested object
* @param params.langfuseClient - Langfuse client instance used to fetch media content
* @param params.resolveWith - The representation of the media content to replace the media reference string with. Currently only "base64DataUri" is supported.
* @param params.maxDepth - Optional. Default is 10. The maximum depth to traverse the object.
*
* @returns A deep copy of the input object with all media references replaced with base64 data URIs where possible
*
* @example
* ```typescript
* const obj = {
* image: "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
* nested: {
* pdf: "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
* }
* };
*
* const result = await LangfuseMedia.resolveMediaReferences({
* obj,
* langfuseClient
* });
*
* // Result:
* // {
* // image: "...",
* // nested: {
* // pdf: "data:application/pdf;base64,JVBERi0xLjcK..."
* // }
* // }
* ```
*/
public static async resolveMediaReferences<T>(params: LangfuseMediaResolveMediaReferencesParams<T>): Promise<T> {
const { obj, langfuseClient, maxDepth = 10 } = params;

async function traverse<T>(obj: T, depth: number): Promise<T> {
if (depth > maxDepth) {
return obj;
}

// Handle string with potential media references
if (typeof obj === "string") {
const regex = /@@@langfuseMedia:.+?@@@/g;
const referenceStringMatches = obj.match(regex);
if (!referenceStringMatches) {
return obj;
}

let result = obj;
const referenceStringToMediaContentMap = new Map<string, string>();

await Promise.all(
referenceStringMatches.map(async (referenceString) => {
try {
const parsedMediaReference = LangfuseMedia.parseReferenceString(referenceString);
const mediaData = await langfuseClient.fetchMedia(parsedMediaReference.mediaId);
const mediaContent = await langfuseClient.fetch(mediaData.url, { method: "GET", headers: {} });
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
if (mediaContent.status !== 200) {
throw new Error("Failed to fetch media content");
}

const base64MediaContent = Buffer.from(await mediaContent.arrayBuffer()).toString("base64");
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
const base64DataUri = `data:${mediaData.contentType};base64,${base64MediaContent}`;

referenceStringToMediaContentMap.set(referenceString, base64DataUri);
} catch (error) {
console.warn("Error fetching media content for reference string", referenceString, error);
// Do not replace the reference string if there's an error
}
})
);

for (const [referenceString, base64MediaContent] of referenceStringToMediaContentMap.entries()) {
result = result.replaceAll(referenceString, base64MediaContent) as T & string;
}

return result;
}

// Handle arrays
if (Array.isArray(obj)) {
return Promise.all(obj.map(async (item) => await traverse(item, depth + 1))) as Promise<T>;
}

// Handle objects
if (typeof obj === "object" && obj !== null) {
return Object.fromEntries(
await Promise.all(Object.entries(obj).map(async ([key, value]) => [key, await traverse(value, depth + 1)]))
);
}

return obj;
}

return traverse(obj, 0);
}
}

export { LangfuseMedia, type MediaContentType, type ParsedMediaReference };
4 changes: 4 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export type LangfuseFetchResponse<T = any> = {
status: number;
text: () => Promise<string>;
json: () => Promise<T>;
arrayBuffer: () => Promise<ArrayBuffer>;
};

export type LangfuseObject = SingleIngestionEvent["type"];
Expand Down Expand Up @@ -177,10 +178,13 @@ export type ChatMessage = FixTypes<components["schemas"]["ChatMessage"]>;
export type ChatPrompt = FixTypes<components["schemas"]["ChatPrompt"]> & { type: "chat" };
export type TextPrompt = FixTypes<components["schemas"]["TextPrompt"]> & { type: "text" };

// Media
export type GetMediaUploadUrlRequest = FixTypes<components["schemas"]["GetMediaUploadUrlRequest"]>;
export type GetMediaUploadUrlResponse = FixTypes<components["schemas"]["GetMediaUploadUrlResponse"]>;
export type MediaContentType = components["schemas"]["MediaContentType"];
export type PatchMediaBody = FixTypes<components["schemas"]["PatchMediaBody"]>;
export type GetMediaResponse = FixTypes<components["schemas"]["GetMediaResponse"]>;

type CreateTextPromptRequest = FixTypes<components["schemas"]["CreateTextPromptRequest"]>;
type CreateChatPromptRequest = FixTypes<components["schemas"]["CreateChatPromptRequest"]>;
export type CreateTextPromptBody = { type?: "text" } & Omit<CreateTextPromptRequest, "type"> & { isActive?: boolean }; // isActive is optional for backward compatibility
Expand Down
5 changes: 5 additions & 0 deletions langfuse-core/test/langfuse.flush.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("Langfuse Core", () => {
status: 400,
text: async () => "err",
json: async () => ({ status: "err" }),
arrayBuffer: async () => new Uint8Array(),
});
});

Expand All @@ -59,6 +60,7 @@ describe("Langfuse Core", () => {
status: 207,
text: async () => "err",
json: async () => ({ successes: [], errors: [{ id: trace.id, message: "Something failed" }] }),
arrayBuffer: async () => new Uint8Array(),
});
});

Expand Down Expand Up @@ -90,13 +92,15 @@ describe("Langfuse Core", () => {
status: 207,
text: async () => "err",
json: async () => ({ successes: [], errors: [{ id: "someId", message: "Something failed" }] }),
arrayBuffer: async () => new Uint8Array(),
});
} else {
index++;
return Promise.resolve({
status: 200,
text: async () => "ok",
json: async () => ({ successes: [], errors: [] }),
arrayBuffer: async () => new Uint8Array(),
});
}
});
Expand Down Expand Up @@ -168,6 +172,7 @@ describe("Langfuse Core", () => {
status: 200,
text: async () => "ok",
json: async () => ({ status: "ok" }),
arrayBuffer: async () => new Uint8Array(),
});
}, 500); // add delay to simulate network request
});
Expand Down
Loading
Loading