Skip to content

Commit

Permalink
feat(core): allow masking event input and output
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp committed Oct 25, 2024
1 parent dab2b42 commit b0bd7ff
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
43 changes: 43 additions & 0 deletions integration-test/langfuse-integration-fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Langfuse from "../langfuse";

import axios from "axios";
import { getHeaders, LANGFUSE_BASEURL } from "./integration-utils";
import { randomUUID } from "crypto";

describe("Langfuse (fetch)", () => {
let langfuse: Langfuse;
Expand Down Expand Up @@ -597,5 +598,47 @@ describe("Langfuse (fetch)", () => {
type: "EVENT",
});
});

it("should mask data in the event body", async () => {
const mask = ({ data }: { data: any }): string =>
typeof data === "string" && data.includes("confidential") ? "MASKED" : data;

const langfuse = new Langfuse({ mask });
const traceId = randomUUID();

const trace = langfuse.trace({ id: traceId, input: "confidential data" });
trace.update({ output: "confidential data" });

const spanId = randomUUID();
const span = trace.span({ id: spanId, input: "confidential data" });
span.update({ output: "confidential data" });

const generationId = randomUUID();
const generation = trace.generation({ id: generationId, input: "confidential data" });
generation.update({ output: "confidential data" });

await langfuse.flushAsync();

const fetchedTrace = await await axios.get(`${LANGFUSE_BASEURL}/api/public/traces/${traceId}`, {
headers: getHeaders(),
});

expect(fetchedTrace.data.input).toEqual("MASKED");
expect(fetchedTrace.data.output).toEqual("MASKED");

const fetchedSpan = await await axios.get(`${LANGFUSE_BASEURL}/api/public/observations/${spanId}`, {
headers: getHeaders(),
});

expect(fetchedSpan.data.input).toEqual("MASKED");
expect(fetchedSpan.data.output).toEqual("MASKED");

const fetchedGeneration = await await axios.get(`${LANGFUSE_BASEURL}/api/public/observations/${generationId}`, {
headers: getHeaders(),
});

expect(fetchedGeneration.data.input).toEqual("MASKED");
expect(fetchedGeneration.data.output).toEqual("MASKED");
});
});
});
23 changes: 23 additions & 0 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
type GetLangfuseSessionsResponse,
type EventBody,
type DatasetItem,
type MaskFunction,
} from "./types";
import {
generateUUID,
Expand Down Expand Up @@ -175,6 +176,7 @@ abstract class LangfuseCoreStateless {
private release: string | undefined;
private sdkIntegration: string;
private enabled: boolean;
private mask: MaskFunction | undefined;

// internal
protected _events = new SimpleEventEmitter();
Expand All @@ -200,6 +202,7 @@ abstract class LangfuseCoreStateless {
this.flushAt = options?.flushAt ? Math.max(options?.flushAt, 1) : 15;
this.flushInterval = options?.flushInterval ?? 10000;
this.release = options?.release ?? getEnv("LANGFUSE_RELEASE") ?? getCommonReleaseEnvs() ?? undefined;
this.mask = options?.mask;

this._retryOptions = {
retryCount: options?.fetchRetryCount ?? 3,
Expand Down Expand Up @@ -566,6 +569,7 @@ abstract class LangfuseCoreStateless {
return;
}

this.maskEventBodyInPlace(body);
const finalEventBody = this.truncateEventBody(body, MAX_EVENT_SIZE);

try {
Expand Down Expand Up @@ -603,6 +607,25 @@ abstract class LangfuseCoreStateless {
}
}

private maskEventBodyInPlace(body: EventBody): void {
if (!this.mask) {
return;
}

const maskableKeys = ["input", "output"] as const;

for (const key of maskableKeys) {
if (key in body) {
try {
body[key as keyof EventBody] = this.mask({ data: body[key as keyof EventBody] });
} catch (e) {
this._events.emit("error", `Error masking ${key}: ${e}`);
body[key as keyof EventBody] = "<fully masked due to failed mask function>";
}
}
}
}

/**
* Truncates the event body if its byte size exceeds the specified maximum byte size.
* Emits a warning event if truncation occurs.
Expand Down
4 changes: 4 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type LangfuseCoreOptions = {
sdkIntegration?: string; // DEFAULT, LANGCHAIN, or any other custom value
// Enabled switch for the SDK. If disabled, no observability data will be sent to Langfuse. Defaults to true.
enabled?: boolean;
// Mask function to mask data in the event body
mask?: MaskFunction;
};

export enum LangfusePersistedProperty {
Expand Down Expand Up @@ -230,3 +232,5 @@ export type LinkDatasetItem = (
}
) => Promise<{ id: string }>;
export type DatasetItem = DatasetItemData & { link: LinkDatasetItem };

export type MaskFunction = (params: { data: any }) => any;

0 comments on commit b0bd7ff

Please sign in to comment.