Skip to content

Commit

Permalink
chore: remove enabling local export mode via env (#459)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Nov 20, 2024
1 parent 0a0953d commit 930f663
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 148 deletions.
219 changes: 107 additions & 112 deletions integration-test/langfuse-integration-langchain.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -694,130 +694,125 @@ describe("Langchain", () => {
});

it("should export events in admin mode", async () => {
process.env.LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED = "true";

try {
const projectId = "test-project-id";
const handler = new CallbackHandler({
sessionId: "test-session",
userId: "test-user",
metadata: {
foo: "bar",
array: ["a", "b"],
},
tags: ["test-tag", "test-tag-2"],
_projectId: projectId,
version: "1.0.0",
});
const projectId = "test-project-id";
const handler = new CallbackHandler({
sessionId: "test-session",
userId: "test-user",
metadata: {
foo: "bar",
array: ["a", "b"],
},
tags: ["test-tag", "test-tag-2"],
_projectId: projectId,
_isLocalEventExportEnabled: true,
version: "1.0.0",
});

handler.debug(true);
handler.debug(true);

const messages = [new SystemMessage("You are an excellent Comedian"), new HumanMessage("Tell me a joke")];
const messages = [new SystemMessage("You are an excellent Comedian"), new HumanMessage("Tell me a joke")];

const llm = new ChatOpenAI({ modelName: "gpt-4-turbo-preview" });
await llm.invoke(messages, { callbacks: [handler] });
const llm = new ChatOpenAI({ modelName: "gpt-4-turbo-preview" });
await llm.invoke(messages, { callbacks: [handler] });

await handler.flushAsync();
const shutdownResult = await handler.langfuse._shutdownAdmin(projectId);
if (!shutdownResult) {
throw new Error("No shutdown result");
}
await handler.flushAsync();
const shutdownResult = await handler.langfuse._exportLocalEvents(projectId);
if (!shutdownResult) {
throw new Error("No shutdown result");
}

console.log(JSON.stringify(shutdownResult, null, 2));
console.log(JSON.stringify(shutdownResult, null, 2));

const events = shutdownResult;
const events = shutdownResult;

expect(events.length).toBe(4);
const [traceCreate, generationCreate, generationUpdate, traceUpdate] = events;
expect(events.length).toBe(4);
const [traceCreate, generationCreate, generationUpdate, traceUpdate] = events;

// Check trace create event
expect(traceCreate.type).toBe("trace-create");
expect(traceCreate.body).toMatchObject({
name: "ChatOpenAI",
metadata: {
ls_provider: "openai",
ls_model_name: "gpt-4-turbo-preview",
ls_model_type: "chat",
ls_temperature: 1,
foo: "bar",
array: ["a", "b"],
},
userId: "test-user",
version: "1.0.0",
sessionId: "test-session",
input: [
{
content: "You are an excellent Comedian",
role: "system",
},
{
content: "Tell me a joke",
role: "user",
},
],
tags: ["test-tag", "test-tag-2"],
});

// Check generation create event
expect(generationCreate.type).toBe("generation-create");
expect(generationCreate.body).toMatchObject({
name: "ChatOpenAI",
metadata: {
ls_provider: "openai",
ls_model_name: "gpt-4-turbo-preview",
ls_model_type: "chat",
ls_temperature: 1,
},
input: [
{
content: "You are an excellent Comedian",
role: "system",
},
{
content: "Tell me a joke",
role: "user",
},
],
model: "gpt-4-turbo-preview",
modelParameters: {
temperature: 1,
top_p: 1,
frequency_penalty: 0,
presence_penalty: 0,
// Check trace create event
expect(traceCreate.type).toBe("trace-create");
expect(traceCreate.body).toMatchObject({
name: "ChatOpenAI",
metadata: {
ls_provider: "openai",
ls_model_name: "gpt-4-turbo-preview",
ls_model_type: "chat",
ls_temperature: 1,
foo: "bar",
array: ["a", "b"],
},
userId: "test-user",
version: "1.0.0",
sessionId: "test-session",
input: [
{
content: "You are an excellent Comedian",
role: "system",
},
version: "1.0.0",
});

// Check generation update event
expect(generationUpdate.type).toBe("generation-update");
expect(generationUpdate.body).toMatchObject({
output: {
role: "assistant",
{
content: "Tell me a joke",
role: "user",
},
usage: {
completionTokens: expect.any(Number),
promptTokens: expect.any(Number),
totalTokens: expect.any(Number),
],
tags: ["test-tag", "test-tag-2"],
});

// Check generation create event
expect(generationCreate.type).toBe("generation-create");
expect(generationCreate.body).toMatchObject({
name: "ChatOpenAI",
metadata: {
ls_provider: "openai",
ls_model_name: "gpt-4-turbo-preview",
ls_model_type: "chat",
ls_temperature: 1,
},
input: [
{
content: "You are an excellent Comedian",
role: "system",
},
version: "1.0.0",
});

// Check trace update event
expect(traceUpdate.type).toBe("trace-create");
expect(traceUpdate.body).toMatchObject({
output: {
role: "assistant",
{
content: "Tell me a joke",
role: "user",
},
});

// Check IDs match between events
const traceId = (traceCreate.body as any).id;
expect((generationCreate.body as any).traceId).toBe(traceId);
expect((generationUpdate.body as any).traceId).toBe(traceId);
expect((traceUpdate.body as any).id).toBe(traceId);
} finally {
process.env.LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED = undefined;
}
],
model: "gpt-4-turbo-preview",
modelParameters: {
temperature: 1,
top_p: 1,
frequency_penalty: 0,
presence_penalty: 0,
},
version: "1.0.0",
});

// Check generation update event
expect(generationUpdate.type).toBe("generation-update");
expect(generationUpdate.body).toMatchObject({
output: {
role: "assistant",
},
usage: {
completionTokens: expect.any(Number),
promptTokens: expect.any(Number),
totalTokens: expect.any(Number),
},
version: "1.0.0",
});

// Check trace update event
expect(traceUpdate.type).toBe("trace-create");
expect(traceUpdate.body).toMatchObject({
output: {
role: "assistant",
},
});

// Check IDs match between events
const traceId = (traceCreate.body as any).id;
expect((generationCreate.body as any).traceId).toBe(traceId);
expect((generationUpdate.body as any).traceId).toBe(traceId);
expect((traceUpdate.body as any).id).toBe(traceId);
});
});
});
Expand Down
33 changes: 15 additions & 18 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ abstract class LangfuseCoreStateless {
private sdkIntegration: string;
private enabled: boolean;
protected isLocalEventExportEnabled: boolean;
private adminIngestionEvents: Map<string, SingleIngestionEvent[]> = new Map();
private localEventExportMap: Map<string, SingleIngestionEvent[]> = new Map();
private projectId: string | undefined;
private mask: MaskFunction | undefined;

Expand All @@ -201,7 +201,7 @@ abstract class LangfuseCoreStateless {
abstract setPersistedProperty<T>(key: LangfusePersistedProperty, value: T | null): void;

constructor(params: LangfuseCoreOptions) {
const { publicKey, secretKey, enabled, _projectId, ...options } = params;
const { publicKey, secretKey, enabled, _projectId, _isLocalEventExportEnabled, ...options } = params;

this.enabled = enabled === false ? false : true;
this.publicKey = publicKey ?? "";
Expand All @@ -221,19 +221,19 @@ abstract class LangfuseCoreStateless {

this.sdkIntegration = options?.sdkIntegration ?? "DEFAULT";

this.isLocalEventExportEnabled = getEnv("LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED") === "true";
this.isLocalEventExportEnabled = _isLocalEventExportEnabled ?? false;

if (this.isLocalEventExportEnabled && !_projectId) {
this._events.emit(
"error",
"LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED is true, but no project ID was provided. Local export mode will not be enabled."
"Local event export is enabled, but no project ID was provided. Disabling local export."
);
this.isLocalEventExportEnabled = false;
return;
} else if (!this.isLocalEventExportEnabled && _projectId) {
this._events.emit(
"error",
"LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED is false, but a project ID was provided. Local export mode will not be enabled."
"Local event export is disabled, but a project ID was provided. Disabling local export."
);
this.isLocalEventExportEnabled = false;
return;
Expand Down Expand Up @@ -963,12 +963,12 @@ abstract class LangfuseCoreStateless {
this._events.emit("flush", items);
};

// If admin mode is enabled, we don't send the events to the server, but instead store them in the adminIngestionEvents array
// If local event export is enabled, we don't send the events to the server, but instead store them in the localEventExportMap
if (this.isLocalEventExportEnabled && this.projectId) {
if (!this.adminIngestionEvents.has(this.projectId)) {
this.adminIngestionEvents.set(this.projectId, [...items]);
if (!this.localEventExportMap.has(this.projectId)) {
this.localEventExportMap.set(this.projectId, [...items]);
} else {
this.adminIngestionEvents.get(this.projectId)?.push(...items);
this.localEventExportMap.get(this.projectId)?.push(...items);
}

done();
Expand Down Expand Up @@ -1158,20 +1158,17 @@ abstract class LangfuseCoreStateless {
}
}

async _shutdownAdmin(projectId: string): Promise<SingleIngestionEvent[]> {
async _exportLocalEvents(projectId: string): Promise<SingleIngestionEvent[]> {
if (this.isLocalEventExportEnabled) {
clearTimeout(this._flushTimer);
await this.flushAsync();

const events = this.adminIngestionEvents.get(projectId) ?? [];
this.adminIngestionEvents.delete(projectId);
const events = this.localEventExportMap.get(projectId) ?? [];
this.localEventExportMap.delete(projectId);

return events;
} else {
this._events.emit(
"error",
"LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED is false, but _shutdownAdmin() was called."
);
this._events.emit("error", "Local event exports are disabled, but _exportLocalEvents() was called.");
return [];
}
}
Expand Down Expand Up @@ -1222,10 +1219,10 @@ export abstract class LangfuseCore extends LangfuseCoreStateless {
private _promptCache: LangfusePromptCache;

constructor(params: LangfuseCoreOptions) {
const { publicKey, secretKey, enabled } = params;
const { publicKey, secretKey, enabled, _isLocalEventExportEnabled } = params;
let isObservabilityEnabled = enabled === false ? false : true;

if (getEnv("LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED") === "true") {
if (_isLocalEventExportEnabled) {
isObservabilityEnabled = true;
} else if (!isObservabilityEnabled) {
console.warn("Langfuse is disabled. No observability data will be sent to Langfuse.");
Expand Down
2 changes: 2 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export type LangfuseCoreOptions = {
mask?: MaskFunction;
// Project ID to use for the SDK in admin mode. This should never be set by users.
_projectId?: string;
// Whether to enable local event export. Defaults to false.
_isLocalEventExportEnabled?: boolean;
};

export enum LangfusePersistedProperty {
Expand Down
32 changes: 14 additions & 18 deletions langfuse-core/test/langfuse.flush.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,28 +241,24 @@ describe("Langfuse Core", () => {
});

it("should not send events in admin mode", async () => {
process.env.LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED = "true";
try {
[langfuse, mocks] = createTestClient({
publicKey: "pk-lf-111",
secretKey: "sk-lf-111",
_projectId: "test-project-id",
flushAt: 5,
flushInterval: 200,
});
[langfuse, mocks] = createTestClient({
publicKey: "pk-lf-111",
secretKey: "sk-lf-111",
_projectId: "test-project-id",
_isLocalEventExportEnabled: true,
flushAt: 5,
flushInterval: 200,
});

// Create multiple traces
const traces = ["test-trace-1", "test-trace-2", "test-trace-3"];
traces.forEach((name) => langfuse.trace({ name }));
// Create multiple traces
const traces = ["test-trace-1", "test-trace-2", "test-trace-3"];
traces.forEach((name) => langfuse.trace({ name }));

expect(mocks.fetch).not.toHaveBeenCalled();
expect(mocks.fetch).not.toHaveBeenCalled();

await jest.runAllTimersAsync();
await jest.runAllTimersAsync();

expect(mocks.fetch).not.toHaveBeenCalled();
} finally {
process.env.LANGFUSE_JS_SDK_LOCAL_EVENT_EXPORT_ENABLED = undefined;
}
expect(mocks.fetch).not.toHaveBeenCalled();
});
});
});

0 comments on commit 930f663

Please sign in to comment.