diff --git a/integration-test/langfuse-integration-datasets.spec.ts b/integration-test/langfuse-integration-datasets.spec.ts index 83dd5191..3b6dac65 100644 --- a/integration-test/langfuse-integration-datasets.spec.ts +++ b/integration-test/langfuse-integration-datasets.spec.ts @@ -128,8 +128,8 @@ describe("Langfuse Node.js", () => { for (const item of dataset.items) { const generation = langfuse.generation({ id: "test-generation-id-" + projectNameRandom, - prompt: item.input, - completion: "Hello world generated", + input: item.input, + output: "Hello world generated", }); await item.link(generation, "test-run-" + projectNameRandom); generation.score({ diff --git a/integration-test/langfuse-integration-fetch.spec.ts b/integration-test/langfuse-integration-fetch.spec.ts index a1080d74..7efe751d 100644 --- a/integration-test/langfuse-integration-fetch.spec.ts +++ b/integration-test/langfuse-integration-fetch.spec.ts @@ -152,10 +152,10 @@ describe("Langfuse (fetch)", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: { + input: { text: "prompt", }, - completion: { + output: { foo: "bar", }, }); @@ -180,12 +180,12 @@ describe("Langfuse (fetch)", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: [ + input: [ { text: "prompt", }, ], - completion: [ + output: [ { foo: "bar", }, @@ -215,8 +215,8 @@ describe("Langfuse (fetch)", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: "prompt", - completion: "completion", + input: "prompt", + output: "completion", }); await langfuse.flushAsync(); // check from get api if trace is created @@ -239,7 +239,7 @@ describe("Langfuse (fetch)", () => { completionStartTime: new Date("2020-01-01T00:00:00.000Z"), }); generation.end({ - completion: "Hello world", + output: "Hello world", usage: { promptTokens: 10, completionTokens: 15, diff --git a/integration-test/langfuse-integration-langchain.spec.ts b/integration-test/langfuse-integration-langchain.spec.ts index 822925d3..6cf733d5 100644 --- a/integration-test/langfuse-integration-langchain.spec.ts +++ b/integration-test/langfuse-integration-langchain.spec.ts @@ -44,9 +44,9 @@ describe("simple chains", () => { const generation = trace?.observations.filter((o) => o.type === "GENERATION"); expect(generation?.length).toBe(1); expect(generation?.[0].name).toBe("OpenAI"); - expect(generation?.[0].promptTokens).toBeDefined(); - expect(generation?.[0].completionTokens).toBeDefined(); - expect(generation?.[0].totalTokens).toBeDefined(); + expect(generation?.[0].usage?.input).toBeDefined(); + expect(generation?.[0].usage?.output).toBeDefined(); + expect(generation?.[0].usage?.total).toBeDefined(); }); it("should execute simple llm call twice on two different traces", async () => { @@ -138,9 +138,9 @@ describe("simple chains", () => { if (generation) { expect(generation[0].name).toBe(extractedModel()); - expect(generation[0].promptTokens).toBeDefined(); - expect(generation[0].completionTokens).toBeDefined(); - expect(generation[0].totalTokens).toBeDefined(); + expect(generation?.[0].usage?.input).toBeDefined(); + expect(generation?.[0].usage?.output).toBeDefined(); + expect(generation?.[0].usage?.total).toBeDefined(); } const spans = trace?.observations.filter((o) => o.type === "SPAN"); diff --git a/integration-test/langfuse-integration-langfuse.spec.ts b/integration-test/langfuse-integration-langfuse.spec.ts index 38a3cd1d..420ee8d7 100644 --- a/integration-test/langfuse-integration-langfuse.spec.ts +++ b/integration-test/langfuse-integration-langfuse.spec.ts @@ -16,22 +16,5 @@ describe("Langfuse Langchain", () => { }); expect(langfuse).toBeInstanceOf(Langfuse); }); - - // it("exports the callback handler", async () => { - // const callbackHandler = new CallbackHandler({ - // publicKey: LF_PUBLIC_KEY, - // secretKey: LF_SECRET_KEY, - // baseUrl: LF_HOST, - // }); - // expect(callbackHandler).toBeInstanceOf(CallbackHandler); - - // const llm = new OpenAI({ - // openAIApiKey: "sk-...", - // streaming: true, - // }); - - // const res = await llm.call("Tell me a joke", undefined, [callbackHandler]); - // console.log(res); - // }); }); }); diff --git a/integration-test/langfuse-integration-modules.spec.ts b/integration-test/langfuse-integration-modules.spec.ts index 37f29682..3bbcf2a4 100644 --- a/integration-test/langfuse-integration-modules.spec.ts +++ b/integration-test/langfuse-integration-modules.spec.ts @@ -9,7 +9,10 @@ describe("Test Different Module Conventions", () => { cwd: join(__dirname, "modules"), encoding: "utf-8", }); - + console.log(result.output); + console.log(result.stdout); + console.log(result.stderr); + expect(result.status).toBe(0); expect(result.stdout).toContain("Did construct objects and called them."); expect(result.status).toBe(0); } diff --git a/integration-test/langfuse-integration-node.spec.ts b/integration-test/langfuse-integration-node.spec.ts index 96168b3d..94ec0eca 100644 --- a/integration-test/langfuse-integration-node.spec.ts +++ b/integration-test/langfuse-integration-node.spec.ts @@ -133,10 +133,10 @@ describe("Langfuse Node.js", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: { + input: { text: "prompt", }, - completion: { + output: { foo: "bar", }, }); @@ -161,12 +161,12 @@ describe("Langfuse Node.js", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: [ + input: [ { text: "prompt", }, ], - completion: [ + output: [ { foo: "bar", }, @@ -196,8 +196,8 @@ describe("Langfuse Node.js", () => { const trace = langfuse.trace({ name: "trace-name-generation-new" }); const generation = trace.generation({ name: "generation-name-new", - prompt: "prompt", - completion: "completion", + input: "prompt", + output: "completion", }); await langfuse.flushAsync(); // check from get api if trace is created @@ -211,6 +211,58 @@ describe("Langfuse Node.js", () => { }); }); + it("create many objects", async () => { + const trace = langfuse.trace({ name: "trace-name-generation-new" }); + const generation = trace.generation({ + name: "generation-name-new", + input: "prompt", + output: "completion", + }); + generation.update({ + version: "1.0.0", + }); + const span = generation.span({ + name: "span-name", + input: "span-input", + output: "span-output", + }); + span.end({ metadata: { foo: "bar" } }); + generation.end({ metadata: { foo: "bar" } }); + + await langfuse.flushAsync(); + // check from get api if trace is created + const returnedGeneration = await axios.get(`${LF_HOST}/api/public/observations/${generation.id}`, { + headers: getHeaders, + }); + expect(returnedGeneration.data).toMatchObject({ + id: generation.id, + name: "generation-name-new", + type: "GENERATION", + input: "prompt", + output: "completion", + version: "1.0.0", + endTime: expect.any(String), + metadata: { + foo: "bar", + }, + }); + + const returnedSpan = await axios.get(`${LF_HOST}/api/public/observations/${span.id}`, { + headers: getHeaders, + }); + expect(returnedSpan.data).toMatchObject({ + id: span.id, + name: "span-name", + type: "SPAN", + input: "span-input", + output: "span-output", + endTime: expect.any(String), + metadata: { + foo: "bar", + }, + }); + }); + it("update a generation", async () => { const trace = langfuse.trace({ name: "test-trace", @@ -220,7 +272,7 @@ describe("Langfuse Node.js", () => { completionStartTime: new Date("2020-01-01T00:00:00.000Z"), }); generation.end({ - completion: "Hello world", + output: "Hello world", usage: { promptTokens: 10, completionTokens: 15, diff --git a/integration-test/modules/node/commonjs.cjs b/integration-test/modules/node/commonjs.cjs index c1773112..d3f6963c 100644 --- a/integration-test/modules/node/commonjs.cjs +++ b/integration-test/modules/node/commonjs.cjs @@ -11,9 +11,9 @@ async function run() { dotenv.config(); const secrets = { - baseUrl: String(process.env["LANGFUSE_URL"]), - publicKey: String(process.env["LANGFUSE_PK"]), - secretKey: String(process.env["LANGFUSE_SK"]), + baseUrl: String(process.env["LF_HOST"]), + publicKey: String(process.env["LF_PUBLIC_KEY"]), + secretKey: String(process.env["LF_SECRET_KEY"]), }; const langfuse = new Langfuse(secrets); diff --git a/integration-test/modules/node/esm.mjs b/integration-test/modules/node/esm.mjs index 6a628851..14872bda 100644 --- a/integration-test/modules/node/esm.mjs +++ b/integration-test/modules/node/esm.mjs @@ -7,9 +7,9 @@ export async function run() { dotenv.config(); const langfuse = new Langfuse({ - baseUrl: String(process.env["LANGFUSE_URL"]), - publicKey: String(process.env["LANGFUSE_PK"]), - secretKey: String(process.env["LANGFUSE_SK"]), + baseUrl: String(process.env["LF_HOST"]), + publicKey: String(process.env["LF_PUBLIC_KEY"]), + secretKey: String(process.env["LF_SECRET_KEY"]), }); const trace = langfuse.trace({ userId: "user-id" }); diff --git a/integration-test/modules/package.json b/integration-test/modules/package.json index 2580abcf..6df821bd 100644 --- a/integration-test/modules/package.json +++ b/integration-test/modules/package.json @@ -1,6 +1,6 @@ { "name": "langfuse-integration-test-modules", - "version": "2.0.0-alpha.0", + "version": "2.0.0-alpha.2", "private": true, "description": "", "main": "dist/main.js", @@ -15,7 +15,7 @@ "dependencies": { "dotenv": "^16.3.1", "langchain": "^0.0.163", - "langfuse-langchain": "^2.0.0-alpha.0" + "langfuse-langchain": "^2.0.0-alpha.2" }, "devDependencies": { "tsx": "^4.6.2", diff --git a/integration-test/modules/ts-cjs/commonjs.ts b/integration-test/modules/ts-cjs/commonjs.ts index 9a620eb9..081e09ce 100644 --- a/integration-test/modules/ts-cjs/commonjs.ts +++ b/integration-test/modules/ts-cjs/commonjs.ts @@ -14,9 +14,9 @@ export async function run(): Promise { dotenv.config(); const secrets = { - baseUrl: String(process.env["LANGFUSE_URL"]), - publicKey: String(process.env["LANGFUSE_PK"]), - secretKey: String(process.env["LANGFUSE_SK"]), + baseUrl: String(process.env["LF_HOST"]), + publicKey: String(process.env["LF_PUBLIC_KEY"]), + secretKey: String(process.env["LF_SECRET_KEY"]), }; const langfuse = new Langfuse(secrets); diff --git a/integration-test/modules/ts-nodenext/nodenext.mts b/integration-test/modules/ts-nodenext/nodenext.mts index 0753a3aa..77e7abb7 100644 --- a/integration-test/modules/ts-nodenext/nodenext.mts +++ b/integration-test/modules/ts-nodenext/nodenext.mts @@ -13,9 +13,9 @@ export async function run(): Promise { dotenv.config(); const langfuse = new Langfuse({ - baseUrl: String(process.env["LANGFUSE_URL"]), - publicKey: String(process.env["LANGFUSE_PK"]), - secretKey: String(process.env["LANGFUSE_SK"]), + baseUrl: String(process.env["LF_HOST"]), + publicKey: String(process.env["LF_PUBLIC_KEY"]), + secretKey: String(process.env["LF_SECRET_KEY"]), }); const trace = langfuse.trace({ userId: "user-id" }); diff --git a/langfuse-core/openapi-spec/openapi-client.yaml b/langfuse-core/openapi-spec/openapi-client.yaml index b9995153..0b75ec4d 100644 --- a/langfuse-core/openapi-spec/openapi-client.yaml +++ b/langfuse-core/openapi-spec/openapi-client.yaml @@ -5,7 +5,7 @@ info: paths: /api/public/scores: post: - description: Add a score to the database + description: Add a score to the database, upserts on id operationId: score_create tags: - Score @@ -59,8 +59,6 @@ components: type: string traceId: type: string - traceIdType: - $ref: "#/components/schemas/TraceIdType" name: type: string value: @@ -100,12 +98,6 @@ components: - name - value - timestamp - TraceIdType: - title: TraceIdType - type: string - enum: - - LANGFUSE - - EXTERNAL securitySchemes: BearerAuth: type: http diff --git a/langfuse-core/openapi-spec/openapi-server.yaml b/langfuse-core/openapi-spec/openapi-server.yaml index a9614dba..eb8bfc08 100644 --- a/langfuse-core/openapi-spec/openapi-server.yaml +++ b/langfuse-core/openapi-spec/openapi-server.yaml @@ -282,141 +282,6 @@ paths: application/json: schema: {} security: *ref_0 - /api/public/events: - post: - description: Add an event to the database - operationId: event_create - tags: - - Event - parameters: [] - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/Observation" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/CreateEventRequest" - /api/public/generations: - post: - operationId: generations_log - tags: - - Generations - parameters: [] - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/Observation" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/CreateGenerationRequest" - patch: - operationId: generations_update - tags: - - Generations - parameters: [] - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/Observation" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/UpdateGenerationRequest" /api/public/health: get: description: Check health of API and database @@ -467,7 +332,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/Score" + $ref: "#/components/schemas/IngestionResponse" "400": description: "" content: @@ -831,111 +696,26 @@ paths: application/json: schema: {} security: *ref_0 - /api/public/spans: - post: - description: Add a span to the database - operationId: span_create - tags: - - Span - parameters: [] - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/Observation" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/CreateSpanRequest" - patch: - description: Update a span to the database - operationId: span_update - tags: - - Span - parameters: [] - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/Observation" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/UpdateSpanRequest" - /api/public/traces: - post: - description: Add a trace to the database - operationId: trace_create + /api/public/traces/{traceId}: + get: + description: Get a specific trace + operationId: trace_get tags: - Trace - parameters: [] + parameters: + - name: traceId + in: path + description: The unique langfuse identifier of a trace + required: true + schema: + type: string responses: "200": description: "" content: application/json: schema: - $ref: "#/components/schemas/Trace" + $ref: "#/components/schemas/TraceWithFullDetails" "400": description: "" content: @@ -962,12 +742,7 @@ paths: application/json: schema: {} security: *ref_0 - requestBody: - required: true - content: - application/json: - schema: - $ref: "#/components/schemas/CreateTraceRequest" + /api/public/traces: get: description: Get list of traces operationId: trace_list @@ -1031,124 +806,8 @@ paths: application/json: schema: {} security: *ref_0 - /api/public/traces/{traceId}: - get: - description: Get a specific trace - operationId: trace_get - tags: - - Trace - parameters: - - name: traceId - in: path - description: The unique langfuse identifier of a trace - required: true - schema: - type: string - responses: - "200": - description: "" - content: - application/json: - schema: - $ref: "#/components/schemas/TraceWithFullDetails" - "400": - description: "" - content: - application/json: - schema: {} - "401": - description: "" - content: - application/json: - schema: {} - "403": - description: "" - content: - application/json: - schema: {} - "404": - description: "" - content: - application/json: - schema: {} - "405": - description: "" - content: - application/json: - schema: {} - security: *ref_0 components: schemas: - CreateEventRequest: - title: CreateEventRequest - type: object - properties: - id: - type: string - nullable: true - traceId: - type: string - nullable: true - name: - type: string - nullable: true - startTime: - type: string - format: date-time - nullable: true - metadata: - nullable: true - input: - nullable: true - output: - nullable: true - level: - $ref: "#/components/schemas/ObservationLevel" - nullable: true - statusMessage: - type: string - nullable: true - parentObservationId: - type: string - nullable: true - version: - type: string - nullable: true - CreateSpanRequest: - title: CreateSpanRequest - type: object - properties: - endTime: - type: string - format: date-time - nullable: true - allOf: - - $ref: "#/components/schemas/CreateEventRequest" - CreateGenerationRequest: - title: CreateGenerationRequest - type: object - properties: - completionStartTime: - type: string - format: date-time - nullable: true - model: - type: string - nullable: true - modelParameters: - type: object - additionalProperties: - $ref: "#/components/schemas/MapValue" - nullable: true - prompt: - nullable: true - completion: - nullable: true - usage: - $ref: "#/components/schemas/LLMUsage" - nullable: true - allOf: - - $ref: "#/components/schemas/CreateSpanRequest" Trace: title: Trace type: object @@ -1292,12 +951,9 @@ components: nullable: true output: nullable: true - promptTokens: - type: integer - completionTokens: - type: integer - totalTokens: - type: integer + usage: + $ref: "#/components/schemas/Usage" + nullable: true level: $ref: "#/components/schemas/ObservationLevel" statusMessage: @@ -1310,10 +966,23 @@ components: - id - type - startTime - - promptTokens - - completionTokens - - totalTokens - level + Usage: + title: Usage + type: object + properties: + input: + type: integer + nullable: true + output: + type: integer + nullable: true + total: + type: integer + nullable: true + unit: + $ref: "#/components/schemas/ModelUsageUnit" + nullable: true Score: title: Score type: object @@ -1458,6 +1127,12 @@ components: - createdAt - updatedAt - datasetRunItems + ModelUsageUnit: + title: ModelUsageUnit + type: string + enum: + - CHARACTERS + - TOKENS ObservationLevel: title: ObservationLevel type: string @@ -1475,19 +1150,6 @@ components: nullable: true - type: boolean nullable: true - LLMUsage: - title: LLMUsage - type: object - properties: - promptTokens: - type: integer - nullable: true - completionTokens: - type: integer - nullable: true - totalTokens: - type: integer - nullable: true DatasetStatus: title: DatasetStatus type: string @@ -1531,58 +1193,6 @@ components: type: string required: - name - UpdateGenerationRequest: - title: UpdateGenerationRequest - type: object - properties: - generationId: - type: string - traceId: - type: string - nullable: true - name: - type: string - nullable: true - startTime: - type: string - format: date-time - nullable: true - endTime: - type: string - format: date-time - nullable: true - completionStartTime: - type: string - format: date-time - nullable: true - model: - type: string - nullable: true - modelParameters: - type: object - additionalProperties: - $ref: "#/components/schemas/MapValue" - nullable: true - prompt: - nullable: true - version: - type: string - nullable: true - metadata: - nullable: true - completion: - nullable: true - usage: - $ref: "#/components/schemas/LLMUsage" - nullable: true - level: - $ref: "#/components/schemas/ObservationLevel" - nullable: true - statusMessage: - type: string - nullable: true - required: - - generationId IngestionEvent: title: IngestionEvent oneOf: @@ -1608,6 +1218,61 @@ components: - $ref: "#/components/schemas/ScoreEvent" required: - type + - type: object + allOf: + - type: object + properties: + type: + type: string + enum: + - event-create + - $ref: "#/components/schemas/CreateEventEvent" + required: + - type + - type: object + allOf: + - type: object + properties: + type: + type: string + enum: + - generation-create + - $ref: "#/components/schemas/CreateGenerationEvent" + required: + - type + - type: object + allOf: + - type: object + properties: + type: + type: string + enum: + - generation-update + - $ref: "#/components/schemas/UpdateGenerationEvent" + required: + - type + - type: object + allOf: + - type: object + properties: + type: + type: string + enum: + - span-create + - $ref: "#/components/schemas/CreateSpanEvent" + required: + - type + - type: object + allOf: + - type: object + properties: + type: + type: string + enum: + - span-update + - $ref: "#/components/schemas/UpdateSpanEvent" + required: + - type - type: object allOf: - type: object @@ -1616,7 +1281,7 @@ components: type: string enum: - observation-create - - $ref: "#/components/schemas/ObservationCreateEvent" + - $ref: "#/components/schemas/CreateObservationEvent" required: - type - type: object @@ -1627,47 +1292,299 @@ components: type: string enum: - observation-update - - $ref: "#/components/schemas/ObservationUpdateEvent" + - $ref: "#/components/schemas/UpdateObservationEvent" required: - type - TraceEvent: - title: TraceEvent + ObservationType: + title: ObservationType + type: string + enum: + - SPAN + - GENERATION + - EVENT + IngestionUsage: + title: IngestionUsage + oneOf: + - $ref: "#/components/schemas/Usage" + - $ref: "#/components/schemas/OpenAIUsage" + OpenAIUsage: + title: OpenAIUsage type: object properties: - id: + promptTokens: + type: integer + nullable: true + completionTokens: + type: integer + nullable: true + totalTokens: + type: integer + nullable: true + OptionalObservationBody: + title: OptionalObservationBody + type: object + properties: + traceId: type: string - timestamp: + nullable: true + name: type: string - body: - $ref: "#/components/schemas/Trace" - required: - - id - - timestamp - - body - ObservationCreateEvent: - title: ObservationCreateEvent + nullable: true + startTime: + type: string + format: date-time + nullable: true + metadata: + nullable: true + input: + nullable: true + output: + nullable: true + level: + $ref: "#/components/schemas/ObservationLevel" + nullable: true + statusMessage: + type: string + nullable: true + parentObservationId: + type: string + nullable: true + version: + type: string + nullable: true + CreateEventBody: + title: CreateEventBody type: object properties: id: type: string - timestamp: + nullable: true + allOf: + - $ref: "#/components/schemas/OptionalObservationBody" + UpdateEventBody: + title: UpdateEventBody + type: object + properties: + id: type: string - body: - $ref: "#/components/schemas/Observation" required: - id - - timestamp - - body - ObservationUpdateEvent: - title: ObservationUpdateEvent + allOf: + - $ref: "#/components/schemas/OptionalObservationBody" + CreateSpanBody: + title: CreateSpanBody type: object properties: - id: + endTime: + type: string + format: date-time + nullable: true + allOf: + - $ref: "#/components/schemas/CreateEventBody" + UpdateSpanBody: + title: UpdateSpanBody + type: object + properties: + endTime: + type: string + format: date-time + nullable: true + allOf: + - $ref: "#/components/schemas/UpdateEventBody" + CreateGenerationBody: + title: CreateGenerationBody + type: object + properties: + completionStartTime: + type: string + format: date-time + nullable: true + model: + type: string + nullable: true + modelParameters: + type: object + additionalProperties: + $ref: "#/components/schemas/MapValue" + nullable: true + usage: + $ref: "#/components/schemas/IngestionUsage" + nullable: true + allOf: + - $ref: "#/components/schemas/CreateSpanBody" + UpdateGenerationBody: + title: UpdateGenerationBody + type: object + properties: + completionStartTime: + type: string + format: date-time + nullable: true + model: + type: string + nullable: true + modelParameters: + type: object + additionalProperties: + $ref: "#/components/schemas/MapValue" + nullable: true + usage: + $ref: "#/components/schemas/IngestionUsage" + nullable: true + allOf: + - $ref: "#/components/schemas/UpdateSpanBody" + ObservationBody: + title: ObservationBody + type: object + properties: + id: + type: string + nullable: true + traceId: + type: string + nullable: true + type: + $ref: "#/components/schemas/ObservationType" + name: + type: string + nullable: true + startTime: + type: string + format: date-time + nullable: true + endTime: + type: string + format: date-time + nullable: true + completionStartTime: + type: string + format: date-time + nullable: true + model: + type: string + nullable: true + modelParameters: + type: object + additionalProperties: + $ref: "#/components/schemas/MapValue" + nullable: true + input: + nullable: true + version: + type: string + nullable: true + metadata: + nullable: true + output: + nullable: true + usage: + $ref: "#/components/schemas/Usage" + nullable: true + level: + $ref: "#/components/schemas/ObservationLevel" + nullable: true + statusMessage: + type: string + nullable: true + parentObservationId: + type: string + nullable: true + required: + - type + TraceBody: + title: TraceBody + type: object + properties: + id: + type: string + nullable: true + name: + type: string + nullable: true + userId: + type: string + nullable: true + input: + nullable: true + output: + nullable: true + sessionId: + type: string + nullable: true + release: + type: string + nullable: true + version: + type: string + nullable: true + metadata: + nullable: true + public: + type: boolean + nullable: true + description: Make trace publicly accessible via url + ScoreBody: + title: ScoreBody + type: object + properties: + id: + type: string + nullable: true + traceId: + type: string + name: + type: string + value: + type: number + format: double + observationId: + type: string + nullable: true + comment: + type: string + nullable: true + required: + - traceId + - name + - value + TraceEvent: + title: TraceEvent + type: object + properties: + id: type: string timestamp: type: string body: - $ref: "#/components/schemas/Observation" + $ref: "#/components/schemas/TraceBody" + required: + - id + - timestamp + - body + CreateObservationEvent: + title: CreateObservationEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/ObservationBody" + required: + - id + - timestamp + - body + UpdateObservationEvent: + title: UpdateObservationEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/ObservationBody" required: - id - timestamp @@ -1681,11 +1598,123 @@ components: timestamp: type: string body: - $ref: "#/components/schemas/Score" + $ref: "#/components/schemas/ScoreBody" + required: + - id + - timestamp + - body + CreateGenerationEvent: + title: CreateGenerationEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/CreateGenerationBody" + required: + - id + - timestamp + - body + UpdateGenerationEvent: + title: UpdateGenerationEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/UpdateGenerationBody" + required: + - id + - timestamp + - body + CreateSpanEvent: + title: CreateSpanEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/CreateSpanBody" required: - id - timestamp - body + UpdateSpanEvent: + title: UpdateSpanEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/UpdateSpanBody" + required: + - id + - timestamp + - body + CreateEventEvent: + title: CreateEventEvent + type: object + properties: + id: + type: string + timestamp: + type: string + body: + $ref: "#/components/schemas/CreateEventBody" + required: + - id + - timestamp + - body + IngestionSuccess: + title: IngestionSuccess + type: object + properties: + id: + type: string + status: + type: integer + required: + - id + - status + IngestionError: + title: IngestionError + type: object + properties: + id: + type: string + status: + type: integer + message: + type: string + nullable: true + error: + nullable: true + required: + - id + - status + IngestionResponse: + title: IngestionResponse + type: object + properties: + successes: + type: array + items: + $ref: "#/components/schemas/IngestionSuccess" + errors: + type: array + items: + $ref: "#/components/schemas/IngestionError" + required: + - successes + - errors Observations: title: Observations type: object @@ -1757,78 +1786,6 @@ components: required: - data - meta - UpdateSpanRequest: - title: UpdateSpanRequest - type: object - properties: - spanId: - type: string - traceId: - type: string - nullable: true - startTime: - type: string - format: date-time - nullable: true - endTime: - type: string - format: date-time - nullable: true - name: - type: string - nullable: true - metadata: - nullable: true - input: - nullable: true - output: - nullable: true - level: - $ref: "#/components/schemas/ObservationLevel" - nullable: true - version: - type: string - nullable: true - statusMessage: - type: string - nullable: true - required: - - spanId - CreateTraceRequest: - title: CreateTraceRequest - type: object - properties: - id: - type: string - nullable: true - name: - type: string - nullable: true - userId: - type: string - nullable: true - externalId: - type: string - nullable: true - input: - nullable: true - output: - nullable: true - sessionId: - type: string - nullable: true - release: - type: string - nullable: true - version: - type: string - nullable: true - metadata: - nullable: true - public: - type: boolean - nullable: true - description: Make trace publicly accessible via url Traces: title: Traces type: object diff --git a/langfuse-core/src/index.ts b/langfuse-core/src/index.ts index ec8c134c..781fe115 100644 --- a/langfuse-core/src/index.ts +++ b/langfuse-core/src/index.ts @@ -6,7 +6,6 @@ import { LangfusePersistedProperty, type CreateLangfuseTraceBody, type LangfuseObject, - LangfusePostApiRoutes, type CreateLangfuseEventBody, type CreateLangfuseSpanBody, type CreateLangfuseGenerationBody, @@ -24,6 +23,8 @@ import { type GetLangfuseDatasetRunResponse, type GetLangfuseDatasetRunParams, type DeferRuntime, + type IngestionReturnType, + type SingleIngestionEvent, } from "./types"; import { assert, @@ -33,12 +34,15 @@ import { type RetriableOptions, safeSetTimeout, getEnv, + currentISOTime, } from "./utils"; export * as utils from "./utils"; import { SimpleEventEmitter } from "./eventemitter"; import { getCommonReleaseEnvs } from "./release-env"; export { LangfuseMemoryStorage } from "./storage-memory"; +export type IngestionBody = SingleIngestionEvent["body"]; + class LangfuseFetchHttpError extends Error { name = "LangfuseFetchHttpError"; @@ -93,7 +97,7 @@ abstract class LangfuseCoreStateless { this.publicKey = publicKey; this.secretKey = secretKey; this.baseUrl = removeTrailingSlash(options?.baseUrl || "https://cloud.langfuse.com"); - this.flushAt = options?.flushAt ? Math.max(options?.flushAt, 1) : 1; + this.flushAt = options?.flushAt ? Math.max(options?.flushAt, 1) : 20; this.flushInterval = options?.flushInterval ?? 10000; this.release = options?.release ?? getEnv("LANGFUSE_RELEASE") ?? getCommonReleaseEnvs() ?? undefined; @@ -140,7 +144,7 @@ abstract class LangfuseCoreStateless { release, ...rest, }; - this.enqueue("createTrace", parsedBody); + this.enqueue("trace-create", parsedBody); return id; } @@ -154,7 +158,7 @@ abstract class LangfuseCoreStateless { startTime: bodyStartTime ?? new Date(), ...rest, }; - this.enqueue("createEvent", parsedBody); + this.enqueue("event-create", parsedBody); return id; } @@ -168,7 +172,7 @@ abstract class LangfuseCoreStateless { startTime: bodyStartTime ?? new Date(), ...rest, }; - this.enqueue("createSpan", parsedBody); + this.enqueue("span-create", parsedBody); return id; } @@ -182,7 +186,8 @@ abstract class LangfuseCoreStateless { startTime: bodyStartTime ?? new Date(), ...rest, }; - this.enqueue("createGeneration", parsedBody); + + this.enqueue("generation-create", parsedBody); return id; } @@ -195,18 +200,18 @@ abstract class LangfuseCoreStateless { id, ...rest, }; - this.enqueue("createScore", parsedBody); + this.enqueue("score-create", parsedBody); return id; } protected updateSpanStateless(body: UpdateLangfuseSpanBody): string { - this.enqueue("updateSpan", body); - return body.spanId; + this.enqueue("span-update", body); + return body.id; } protected updateGenerationStateless(body: UpdateLangfuseGenerationBody): string { - this.enqueue("updateGeneration", body); - return body.generationId; + this.enqueue("generation-update", body); + return body.id; } protected async _getDataset(name: GetLangfuseDatasetParams["datasetName"]): Promise { @@ -269,12 +274,10 @@ abstract class LangfuseCoreStateless { protected enqueue(type: LangfuseObject, body: any): void { const queue = this.getPersistedProperty(LangfusePersistedProperty.Queue) || []; - const id = generateUUID(); - queue.push({ - id, - method: LangfusePostApiRoutes[type][0], - apiRoute: LangfusePostApiRoutes[type][1], + id: generateUUID(), + type, + timestamp: currentISOTime(), body, }); this.setPersistedProperty(LangfusePersistedProperty.Queue, queue); @@ -291,13 +294,17 @@ abstract class LangfuseCoreStateless { } } - flushAsync(): Promise { - return Promise.all(this.flush()); + flushAsync(): Promise { + return new Promise((resolve, reject) => { + this.flush((err, data) => { + return err ? reject(err) : resolve(data); + }); + }); } // Flushes all events that are not yet sent to the server // @returns {Promise[]} - list of promises for each item in the queue that is flushed - flush(): Promise[] { + flush(callback?: (err?: any, data?: any) => void): void { if (this._flushTimer) { clearTimeout(this._flushTimer); this._flushTimer = null; @@ -306,42 +313,41 @@ abstract class LangfuseCoreStateless { const queue = this.getPersistedProperty(LangfusePersistedProperty.Queue) || []; if (!queue.length) { - return []; + return callback?.(); } - // Flush all items in queue, could also use flushAt with splice to flush only a certain number of items (e.g. when batching) - const items = queue; - this.setPersistedProperty(LangfusePersistedProperty.Queue, []); + const items = queue.splice(0, this.flushAt); + this.setPersistedProperty(LangfusePersistedProperty.Queue, queue); - // TODO: add /batch endpoint to ingest multiple events at once - const promises = items.map((item) => { - const done = (err?: any): void => { - if (err) { - this._events.emit("error", err); - } - // remove promise from pendingPromises - delete this.pendingPromises[item.id]; - this._events.emit("flush", item); - }; - const payload = JSON.stringify(item.body); // implicit conversion also of dates to strings - const url = `${this.baseUrl}${item.apiRoute}`; - - const fetchOptions = this.getFetchOptions({ - method: item.method, - body: payload, - }); + const promiseUUID = generateUUID(); - const requestPromise = this.fetchWithRetry(url, fetchOptions); - this.pendingPromises[item.id] = requestPromise; - requestPromise - .then(() => done()) - .catch((err) => { - done(err); - }); - return requestPromise; - }); + const done = (err?: any): void => { + if (err) { + this._events.emit("error", err); + } + callback?.(err, items); + // remove promise from pendingPromises + delete this.pendingPromises[promiseUUID]; + this._events.emit("flush", items); + }; - return promises; + const payload = JSON.stringify({ batch: items }); // implicit conversion also of dates to strings + + const url = `${this.baseUrl}/api/public/ingestion`; + + const fetchOptions = this.getFetchOptions({ + method: "POST", + body: payload, + }); + console.debug("Sending payload", payload); + const requestPromise = this.fetchWithRetry(url, fetchOptions); + this.pendingPromises[promiseUUID] = requestPromise; + + requestPromise + .then(() => done()) + .catch((err) => { + done(err); + }); } private getFetchOptions(p: { @@ -397,7 +403,7 @@ abstract class LangfuseCoreStateless { return await retriable( async () => { - let res: LangfuseFetchResponse | null = null; + let res: LangfuseFetchResponse | null = null; try { res = await this.fetch(url, { signal: (AbortSignal as any).timeout(this.requestTimeout), @@ -407,9 +413,15 @@ abstract class LangfuseCoreStateless { // fetch will only throw on network errors or on timeouts throw new LangfuseFetchNetworkError(e); } + if (res.status < 200 || res.status >= 400) { throw new LangfuseFetchHttpError(res); } + const returnBody = await res.json(); + if (res.status === 207 && returnBody.errors.length > 0) { + throw new LangfuseFetchHttpError(res); + } + return res; }, { ...this._retryOptions, ...retryOptions }, @@ -647,19 +659,19 @@ export class LangfuseSpanClient extends LangfuseObservationClient { super(client, id, traceId); } - update(body: Omit): this { + update(body: Omit): this { this.client._updateSpan({ ...body, - spanId: this.id, + id: this.id, traceId: this.traceId, }); return this; } - end(body?: Omit): this { + end(body?: Omit): this { this.client._updateSpan({ ...body, - spanId: this.id, + id: this.id, traceId: this.traceId, endTime: new Date(), }); @@ -672,19 +684,19 @@ export class LangfuseGenerationClient extends LangfuseObservationClient { super(client, id, traceId); } - update(body: Omit): this { + update(body: Omit): this { this.client._updateGeneration({ ...body, - generationId: this.id, + id: this.id, traceId: this.traceId, }); return this; } - end(body?: Omit): this { + end(body?: Omit): this { this.client._updateGeneration({ ...body, - generationId: this.id, + id: this.id, traceId: this.traceId, endTime: new Date(), }); diff --git a/langfuse-core/src/openapi/client.ts b/langfuse-core/src/openapi/client.ts index 6d6af551..33f07247 100644 --- a/langfuse-core/src/openapi/client.ts +++ b/langfuse-core/src/openapi/client.ts @@ -5,7 +5,7 @@ export interface paths { "/api/public/scores": { - /** @description Add a score to the database */ + /** @description Add a score to the database, upserts on id */ post: operations["score_create"]; }; } @@ -18,7 +18,6 @@ export interface components { CreateScoreRequest: { id?: string; traceId: string; - traceIdType?: components["schemas"]["TraceIdType"]; name: string; /** Format: double */ value: number; @@ -37,11 +36,6 @@ export interface components { timestamp: string; comment?: string; }; - /** - * TraceIdType - * @enum {string} - */ - TraceIdType: "LANGFUSE" | "EXTERNAL"; }; responses: never; parameters: never; @@ -53,7 +47,7 @@ export interface components { export type external = Record; export interface operations { - /** @description Add a score to the database */ + /** @description Add a score to the database, upserts on id */ score_create: { requestBody: { content: { diff --git a/langfuse-core/src/openapi/server.ts b/langfuse-core/src/openapi/server.ts index 2666e305..16f09b7f 100644 --- a/langfuse-core/src/openapi/server.ts +++ b/langfuse-core/src/openapi/server.ts @@ -6,15 +6,6 @@ /** WithRequired type helpers */ type WithRequired = T & { [P in K]-?: T[P] }; -/** OneOf type helpers */ -type Without = { [P in Exclude]?: never }; -type XOR = T | U extends object ? (Without & U) | (Without & T) : T | U; -type OneOf = T extends [infer Only] - ? Only - : T extends [infer A, infer B, ...infer Rest] - ? OneOf<[XOR, ...Rest]> - : never; - export interface paths { "/api/public/dataset-items": { /** @description Create a dataset item, upserts on id */ @@ -40,14 +31,6 @@ export interface paths { /** @description Get a dataset run and its items */ get: operations["datasets_getRuns"]; }; - "/api/public/events": { - /** @description Add an event to the database */ - post: operations["event_create"]; - }; - "/api/public/generations": { - post: operations["generations_log"]; - patch: operations["generations_update"]; - }; "/api/public/health": { /** @description Check health of API and database */ get: operations["health_health"]; @@ -77,21 +60,13 @@ export interface paths { /** @description Get a session */ get: operations["sessions_get"]; }; - "/api/public/spans": { - /** @description Add a span to the database */ - post: operations["span_create"]; - /** @description Update a span to the database */ - patch: operations["span_update"]; + "/api/public/traces/{traceId}": { + /** @description Get a specific trace */ + get: operations["trace_get"]; }; "/api/public/traces": { /** @description Get list of traces */ get: operations["trace_list"]; - /** @description Add a trace to the database */ - post: operations["trace_create"]; - }; - "/api/public/traces/{traceId}": { - /** @description Get a specific trace */ - get: operations["trace_get"]; }; } @@ -99,38 +74,6 @@ export type webhooks = Record; export interface components { schemas: { - /** CreateEventRequest */ - CreateEventRequest: { - id?: string | null; - traceId?: string | null; - name?: string | null; - /** Format: date-time */ - startTime?: string | null; - metadata?: Record | null; - input?: Record | null; - output?: Record | null; - level?: components["schemas"]["ObservationLevel"]; - statusMessage?: string | null; - parentObservationId?: string | null; - version?: string | null; - }; - /** CreateSpanRequest */ - CreateSpanRequest: { - /** Format: date-time */ - endTime?: string | null; - } & components["schemas"]["CreateEventRequest"]; - /** CreateGenerationRequest */ - CreateGenerationRequest: { - /** Format: date-time */ - completionStartTime?: string | null; - model?: string | null; - modelParameters?: { - [key: string]: components["schemas"]["MapValue"] | undefined; - } | null; - prompt?: Record | null; - completion?: Record | null; - usage?: components["schemas"]["LLMUsage"]; - } & components["schemas"]["CreateSpanRequest"]; /** Trace */ Trace: { /** @description The unique identifier of a trace */ @@ -200,13 +143,18 @@ export interface components { version?: string | null; metadata?: Record | null; output?: Record | null; - promptTokens: number; - completionTokens: number; - totalTokens: number; + usage?: components["schemas"]["Usage"]; level: components["schemas"]["ObservationLevel"]; statusMessage?: string | null; parentObservationId?: string | null; }; + /** Usage */ + Usage: { + input?: number | null; + output?: number | null; + total?: number | null; + unit?: components["schemas"]["ModelUsageUnit"]; + }; /** Score */ Score: { id: string; @@ -267,6 +215,11 @@ export interface components { updatedAt: string; datasetRunItems: components["schemas"]["DatasetRunItem"][]; }; + /** + * ModelUsageUnit + * @enum {string} + */ + ModelUsageUnit: "CHARACTERS" | "TOKENS"; /** * ObservationLevel * @enum {string} @@ -274,12 +227,6 @@ export interface components { ObservationLevel: "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"; /** MapValue */ MapValue: (string | null) | (number | null) | (boolean | null); - /** LLMUsage */ - LLMUsage: { - promptTokens?: number | null; - completionTokens?: number | null; - totalTokens?: number | null; - }; /** * DatasetStatus * @enum {string} @@ -302,85 +249,258 @@ export interface components { CreateDatasetRequest: { name: string; }; - /** UpdateGenerationRequest */ - UpdateGenerationRequest: { - generationId: string; - traceId?: string | null; - name?: string | null; - /** Format: date-time */ - startTime?: string | null; - /** Format: date-time */ - endTime?: string | null; - /** Format: date-time */ - completionStartTime?: string | null; - model?: string | null; - modelParameters?: { - [key: string]: components["schemas"]["MapValue"] | undefined; - } | null; - prompt?: Record | null; - version?: string | null; - metadata?: Record | null; - completion?: Record | null; - usage?: components["schemas"]["LLMUsage"]; - level?: components["schemas"]["ObservationLevel"]; - statusMessage?: string | null; - }; /** IngestionEvent */ - IngestionEvent: OneOf< - [ - WithRequired< + IngestionEvent: + | WithRequired< { /** @enum {string} */ type?: "trace-create"; } & components["schemas"]["TraceEvent"], "type" - >, - WithRequired< + > + | WithRequired< { /** @enum {string} */ type?: "score-create"; } & components["schemas"]["ScoreEvent"], "type" - >, - WithRequired< + > + | WithRequired< + { + /** @enum {string} */ + type?: "event-create"; + } & components["schemas"]["CreateEventEvent"], + "type" + > + | WithRequired< + { + /** @enum {string} */ + type?: "generation-create"; + } & components["schemas"]["CreateGenerationEvent"], + "type" + > + | WithRequired< + { + /** @enum {string} */ + type?: "generation-update"; + } & components["schemas"]["UpdateGenerationEvent"], + "type" + > + | WithRequired< + { + /** @enum {string} */ + type?: "span-create"; + } & components["schemas"]["CreateSpanEvent"], + "type" + > + | WithRequired< + { + /** @enum {string} */ + type?: "span-update"; + } & components["schemas"]["UpdateSpanEvent"], + "type" + > + | WithRequired< { /** @enum {string} */ type?: "observation-create"; - } & components["schemas"]["ObservationCreateEvent"], + } & components["schemas"]["CreateObservationEvent"], "type" - >, - WithRequired< + > + | WithRequired< { /** @enum {string} */ type?: "observation-update"; - } & components["schemas"]["ObservationUpdateEvent"], + } & components["schemas"]["UpdateObservationEvent"], "type" - >, - ] + >; + /** + * ObservationType + * @enum {string} + */ + ObservationType: "SPAN" | "GENERATION" | "EVENT"; + /** IngestionUsage */ + IngestionUsage: components["schemas"]["Usage"] | components["schemas"]["OpenAIUsage"]; + /** OpenAIUsage */ + OpenAIUsage: { + promptTokens?: number | null; + completionTokens?: number | null; + totalTokens?: number | null; + }; + /** OptionalObservationBody */ + OptionalObservationBody: { + traceId?: string | null; + name?: string | null; + /** Format: date-time */ + startTime?: string | null; + metadata?: Record | null; + input?: Record | null; + output?: Record | null; + level?: components["schemas"]["ObservationLevel"]; + statusMessage?: string | null; + parentObservationId?: string | null; + version?: string | null; + }; + /** CreateEventBody */ + CreateEventBody: { + id?: string | null; + } & components["schemas"]["OptionalObservationBody"]; + /** UpdateEventBody */ + UpdateEventBody: WithRequired< + { + id: string; + } & components["schemas"]["OptionalObservationBody"], + "id" >; + /** CreateSpanBody */ + CreateSpanBody: { + /** Format: date-time */ + endTime?: string | null; + } & components["schemas"]["CreateEventBody"]; + /** UpdateSpanBody */ + UpdateSpanBody: { + /** Format: date-time */ + endTime?: string | null; + } & components["schemas"]["UpdateEventBody"]; + /** CreateGenerationBody */ + CreateGenerationBody: { + /** Format: date-time */ + completionStartTime?: string | null; + model?: string | null; + modelParameters?: { + [key: string]: components["schemas"]["MapValue"] | undefined; + } | null; + usage?: components["schemas"]["IngestionUsage"]; + } & components["schemas"]["CreateSpanBody"]; + /** UpdateGenerationBody */ + UpdateGenerationBody: { + /** Format: date-time */ + completionStartTime?: string | null; + model?: string | null; + modelParameters?: { + [key: string]: components["schemas"]["MapValue"] | undefined; + } | null; + usage?: components["schemas"]["IngestionUsage"]; + } & components["schemas"]["UpdateSpanBody"]; + /** ObservationBody */ + ObservationBody: { + id?: string | null; + traceId?: string | null; + type: components["schemas"]["ObservationType"]; + name?: string | null; + /** Format: date-time */ + startTime?: string | null; + /** Format: date-time */ + endTime?: string | null; + /** Format: date-time */ + completionStartTime?: string | null; + model?: string | null; + modelParameters?: { + [key: string]: components["schemas"]["MapValue"] | undefined; + } | null; + input?: Record | null; + version?: string | null; + metadata?: Record | null; + output?: Record | null; + usage?: components["schemas"]["Usage"]; + level?: components["schemas"]["ObservationLevel"]; + statusMessage?: string | null; + parentObservationId?: string | null; + }; + /** TraceBody */ + TraceBody: { + id?: string | null; + name?: string | null; + userId?: string | null; + input?: Record | null; + output?: Record | null; + sessionId?: string | null; + release?: string | null; + version?: string | null; + metadata?: Record | null; + /** @description Make trace publicly accessible via url */ + public?: boolean | null; + }; + /** ScoreBody */ + ScoreBody: { + id?: string | null; + traceId: string; + name: string; + /** Format: double */ + value: number; + observationId?: string | null; + comment?: string | null; + }; /** TraceEvent */ TraceEvent: { id: string; timestamp: string; - body: components["schemas"]["Trace"]; + body: components["schemas"]["TraceBody"]; }; - /** ObservationCreateEvent */ - ObservationCreateEvent: { + /** CreateObservationEvent */ + CreateObservationEvent: { id: string; timestamp: string; - body: components["schemas"]["Observation"]; + body: components["schemas"]["ObservationBody"]; }; - /** ObservationUpdateEvent */ - ObservationUpdateEvent: { + /** UpdateObservationEvent */ + UpdateObservationEvent: { id: string; timestamp: string; - body: components["schemas"]["Observation"]; + body: components["schemas"]["ObservationBody"]; }; /** ScoreEvent */ ScoreEvent: { id: string; timestamp: string; - body: components["schemas"]["Score"]; + body: components["schemas"]["ScoreBody"]; + }; + /** CreateGenerationEvent */ + CreateGenerationEvent: { + id: string; + timestamp: string; + body: components["schemas"]["CreateGenerationBody"]; + }; + /** UpdateGenerationEvent */ + UpdateGenerationEvent: { + id: string; + timestamp: string; + body: components["schemas"]["UpdateGenerationBody"]; + }; + /** CreateSpanEvent */ + CreateSpanEvent: { + id: string; + timestamp: string; + body: components["schemas"]["CreateSpanBody"]; + }; + /** UpdateSpanEvent */ + UpdateSpanEvent: { + id: string; + timestamp: string; + body: components["schemas"]["UpdateSpanBody"]; + }; + /** CreateEventEvent */ + CreateEventEvent: { + id: string; + timestamp: string; + body: components["schemas"]["CreateEventBody"]; + }; + /** IngestionSuccess */ + IngestionSuccess: { + id: string; + status: number; + }; + /** IngestionError */ + IngestionError: { + id: string; + status: number; + message?: string | null; + error?: Record | null; + }; + /** IngestionResponse */ + IngestionResponse: { + successes: components["schemas"]["IngestionSuccess"][]; + errors: components["schemas"]["IngestionError"][]; }; /** Observations */ Observations: { @@ -411,37 +531,6 @@ export interface components { data: components["schemas"]["Score"][]; meta: components["schemas"]["utilsMetaResponse"]; }; - /** UpdateSpanRequest */ - UpdateSpanRequest: { - spanId: string; - traceId?: string | null; - /** Format: date-time */ - startTime?: string | null; - /** Format: date-time */ - endTime?: string | null; - name?: string | null; - metadata?: Record | null; - input?: Record | null; - output?: Record | null; - level?: components["schemas"]["ObservationLevel"]; - version?: string | null; - statusMessage?: string | null; - }; - /** CreateTraceRequest */ - CreateTraceRequest: { - id?: string | null; - name?: string | null; - userId?: string | null; - externalId?: string | null; - input?: Record | null; - output?: Record | null; - sessionId?: string | null; - release?: string | null; - version?: string | null; - metadata?: Record | null; - /** @description Make trace publicly accessible via url */ - public?: boolean | null; - }; /** Traces */ Traces: { data: components["schemas"]["TraceWithDetails"][]; @@ -710,124 +799,6 @@ export interface operations { }; }; }; - /** @description Add an event to the database */ - event_create: { - requestBody: { - content: { - "application/json": components["schemas"]["CreateEventRequest"]; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["Observation"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; - generations_log: { - requestBody: { - content: { - "application/json": components["schemas"]["CreateGenerationRequest"]; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["Observation"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; - generations_update: { - requestBody: { - content: { - "application/json": components["schemas"]["UpdateGenerationRequest"]; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["Observation"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; /** @description Check health of API and database */ health_health: { responses: { @@ -876,7 +847,7 @@ export interface operations { responses: { 200: { content: { - "application/json": components["schemas"]["Score"]; + "application/json": components["schemas"]["IngestionResponse"]; }; }; 400: { @@ -1151,57 +1122,18 @@ export interface operations { }; }; }; - /** @description Add a span to the database */ - span_create: { - requestBody: { - content: { - "application/json": components["schemas"]["CreateSpanRequest"]; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["Observation"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; - /** @description Update a span to the database */ - span_update: { - requestBody: { - content: { - "application/json": components["schemas"]["UpdateSpanRequest"]; + /** @description Get a specific trace */ + trace_get: { + parameters: { + path: { + /** @description The unique langfuse identifier of a trace */ + traceId: string; }; }; responses: { 200: { content: { - "application/json": components["schemas"]["Observation"]; + "application/json": components["schemas"]["TraceWithFullDetails"]; }; }; 400: { @@ -1274,85 +1206,4 @@ export interface operations { }; }; }; - /** @description Add a trace to the database */ - trace_create: { - requestBody: { - content: { - "application/json": components["schemas"]["CreateTraceRequest"]; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["Trace"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; - /** @description Get a specific trace */ - trace_get: { - parameters: { - path: { - /** @description The unique langfuse identifier of a trace */ - traceId: string; - }; - }; - responses: { - 200: { - content: { - "application/json": components["schemas"]["TraceWithFullDetails"]; - }; - }; - 400: { - content: { - "application/json": unknown; - }; - }; - 401: { - content: { - "application/json": unknown; - }; - }; - 403: { - content: { - "application/json": unknown; - }; - }; - 404: { - content: { - "application/json": unknown; - }; - }; - 405: { - content: { - "application/json": unknown; - }; - }; - }; - }; } diff --git a/langfuse-core/src/types.ts b/langfuse-core/src/types.ts index a6958e3c..ef70d6a3 100644 --- a/langfuse-core/src/types.ts +++ b/langfuse-core/src/types.ts @@ -1,4 +1,4 @@ -import { type paths } from "./openapi/server"; +import { type components, type paths } from "./openapi/server"; export type LangfuseCoreOptions = { // Langfuse API baseUrl (https://cloud.langfuse.com by default) @@ -36,14 +36,17 @@ export type LangfuseFetchResponse = { json: () => Promise; }; -export type LangfuseQueueItem = { - apiRoute: keyof paths; - method: "POST" | "PATCH"; - id: string; - body: any; +export type LangfuseQueueItem = SingleIngestionEvent & { callback?: (err: any) => void; }; +export type SingleIngestionEvent = + paths["/api/public/ingestion"]["post"]["requestBody"]["content"]["application/json"]["batch"][number]; + +// return type of ingestion endpoint defined on 200 status error in fern as 207 is not possible +export type IngestionReturnType = + paths["/api/public/ingestion"]["post"]["responses"][200]["content"]["application/json"]; + export type LangfuseEventProperties = { [key: string]: any; }; @@ -53,47 +56,20 @@ export type LangfuseMetadataProperties = { }; // ASYNC -export type CreateLangfuseTraceBody = FixTypes< - paths["/api/public/traces"]["post"]["requestBody"]["content"]["application/json"] ->; -export type CreateLangfuseEventBody = FixTypes< - paths["/api/public/events"]["post"]["requestBody"]["content"]["application/json"] ->; -export type CreateLangfuseSpanBody = FixTypes< - paths["/api/public/spans"]["post"]["requestBody"]["content"]["application/json"] ->; -export type CreateLangfuseGenerationBody = Omit< - FixTypes, - "input" | "output" ->; -export type CreateLangfuseScoreBody = FixTypes< - paths["/api/public/scores"]["post"]["requestBody"]["content"]["application/json"] ->; -export type UpdateLangfuseSpanBody = FixTypes< - paths["/api/public/spans"]["patch"]["requestBody"]["content"]["application/json"] ->; -export type UpdateLangfuseGenerationBody = FixTypes< - paths["/api/public/generations"]["patch"]["requestBody"]["content"]["application/json"] ->; +export type CreateLangfuseTraceBody = FixTypes; -export type LangfuseObject = - | "createTrace" - | "createEvent" - | "createSpan" - | "createGeneration" - | "createScore" - | "updateSpan" - | "updateGeneration"; - -export const LangfusePostApiRoutes: Record = { - createTrace: ["POST", "/api/public/traces"], - createEvent: ["POST", "/api/public/events"], - createSpan: ["POST", "/api/public/spans"], - updateSpan: ["PATCH", "/api/public/spans"], - createGeneration: ["POST", "/api/public/generations"], - updateGeneration: ["PATCH", "/api/public/generations"], - createScore: ["POST", "/api/public/scores"], -}; +export type CreateLangfuseEventBody = FixTypes; + +export type CreateLangfuseSpanBody = FixTypes; +export type UpdateLangfuseSpanBody = FixTypes; + +export type Usage = FixTypes; +export type CreateLangfuseGenerationBody = FixTypes; +export type UpdateLangfuseGenerationBody = FixTypes; + +export type CreateLangfuseScoreBody = FixTypes; + +export type LangfuseObject = SingleIngestionEvent["type"]; // SYNC export type GetLangfuseDatasetParams = FixTypes< diff --git a/langfuse-core/test/langfuse.debug.spec.ts b/langfuse-core/test/langfuse.debug.spec.ts index 1f344760..2858793b 100644 --- a/langfuse-core/test/langfuse.debug.spec.ts +++ b/langfuse-core/test/langfuse.debug.spec.ts @@ -13,7 +13,7 @@ describe("Langfuse Core", () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", secretKey: "sk-lf-111", - flushAt: 3, + flushAt: 1, }); }); @@ -29,7 +29,7 @@ describe("Langfuse Core", () => { expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith( "Langfuse Debug", - "createTrace", + "trace-create", expect.objectContaining({ name: "test-trace2", }) diff --git a/langfuse-core/test/langfuse.end.spec.ts b/langfuse-core/test/langfuse.end.spec.ts index 6165d1e8..29468a06 100644 --- a/langfuse-core/test/langfuse.end.spec.ts +++ b/langfuse-core/test/langfuse.end.spec.ts @@ -15,7 +15,7 @@ describe("Langfuse Core", () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", secretKey: "sk-lf-111", - flushAt: 1000, + flushAt: 1, }); }); @@ -37,14 +37,23 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/spans"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toMatchObject({ - traceId: trace.id, - spanId: span.id, - output: { text: "test-output" }, - endTime: "2022-01-01T00:00:00.000Z", + batch: [ + { + id: expect.any(String), + type: "span-update", + timestamp: expect.any(String), + body: { + traceId: trace.id, + id: span.id, + output: { text: "test-output" }, + endTime: "2022-01-01T00:00:00.000Z", + }, + }, + ], }); }); @@ -63,13 +72,22 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/spans"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toEqual({ - traceId: trace.id, - spanId: span.id, - endTime: "2022-01-01T00:00:00.000Z", + batch: [ + { + id: expect.any(String), + type: "span-update", + timestamp: expect.any(String), + body: { + traceId: trace.id, + id: span.id, + endTime: "2022-01-01T00:00:00.000Z", + }, + }, + ], }); }); }); @@ -92,14 +110,23 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/generations"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toMatchObject({ - traceId: trace.id, - generationId: generation.id, - version: "1.0.0", - endTime: "2022-01-01T00:00:00.000Z", + batch: [ + { + id: expect.any(String), + type: "generation-update", + timestamp: expect.any(String), + body: { + traceId: trace.id, + id: generation.id, + version: "1.0.0", + endTime: "2022-01-01T00:00:00.000Z", + }, + }, + ], }); }); @@ -118,13 +145,22 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/generations"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toMatchObject({ - traceId: trace.id, - generationId: generation.id, - endTime: "2022-01-01T00:00:00.000Z", + batch: [ + { + id: expect.any(String), + type: "generation-update", + timestamp: expect.any(String), + body: { + traceId: trace.id, + id: generation.id, + endTime: "2022-01-01T00:00:00.000Z", + }, + }, + ], }); }); }); diff --git a/langfuse-core/test/langfuse.enqueue.spec.ts b/langfuse-core/test/langfuse.enqueue.spec.ts index 25521d22..b46185a4 100644 --- a/langfuse-core/test/langfuse.enqueue.spec.ts +++ b/langfuse-core/test/langfuse.enqueue.spec.ts @@ -28,13 +28,13 @@ describe("Langfuse Core", () => { expect(langfuse.getPersistedProperty(LangfusePersistedProperty.Queue)).toHaveLength(1); const item = langfuse.getPersistedProperty(LangfusePersistedProperty.Queue)?.pop(); + console.log(item); expect(item).toMatchObject({ - apiRoute: "/api/public/traces", - body: { - name: "test-trace", - id: "123456789", - }, + id: expect.any(String), + type: "trace-create", + timestamp: expect.any(String), + body: { id: "123456789", name: "test-trace" }, }); expect(mocks.fetch).not.toHaveBeenCalled(); diff --git a/langfuse-core/test/langfuse.flush.spec.ts b/langfuse-core/test/langfuse.flush.spec.ts index ccabf5e3..93e4f4e3 100644 --- a/langfuse-core/test/langfuse.flush.spec.ts +++ b/langfuse-core/test/langfuse.flush.spec.ts @@ -1,3 +1,4 @@ +import exp from "constants"; import { createTestClient, type LangfuseCoreTestClient, @@ -22,7 +23,7 @@ describe("Langfuse Core", () => { it("doesn't fail when queue is empty", async () => { jest.useRealTimers(); - await expect(langfuse.flushAsync()).resolves.toEqual([]); + await expect(langfuse.flushAsync()).resolves.not.toThrow(); }); it("flush messsages once called", async () => { @@ -31,7 +32,7 @@ describe("Langfuse Core", () => { langfuse.trace({ name: "test-trace-3" }); expect(mocks.fetch).not.toHaveBeenCalled(); await expect(langfuse.flushAsync()).resolves.toHaveLength(3); - expect(mocks.fetch).toHaveBeenCalled(); + expect(mocks.fetch).toHaveBeenCalledTimes(1); }); it("responds with an error after retries", async () => { @@ -52,6 +53,65 @@ describe("Langfuse Core", () => { expect(Date.now() - time).toBeLessThan(500); }); + it("responds with an error after retries 207", async () => { + const trace = langfuse.trace({ name: "test-trace-1" }); + mocks.fetch.mockImplementation(() => { + return Promise.resolve({ + status: 207, + text: async () => "err", + json: async () => ({ successes: [], errors: [{ id: trace.id, message: "Something failed" }] }), + }); + }); + + const time = Date.now(); + jest.useRealTimers(); + await expect(langfuse.flushAsync()).rejects.toHaveProperty("name", "LangfuseFetchHttpError"); + expect(mocks.fetch).toHaveBeenCalledTimes(4); + expect(Date.now() - time).toBeGreaterThan(300); + expect(Date.now() - time).toBeLessThan(500); + }); + + it("responds with an error after retries 207 and then continues after fail", async () => { + let index = 0; + // 5 events in one network request which fail + for (let i = 0; i < 5; i++) { + langfuse.trace({ name: `test-trace-failing-${i}` }); + } + + // 2 more events which succeed + for (let i = 0; i < 2; i++) { + langfuse.trace({ name: `test-trace-succeeding-${i}` }); + } + + mocks.fetch.mockImplementation(() => { + console.log("mocks.fetch called", index); + + if (index < 3) { + index++; + return Promise.resolve({ + status: 207, + text: async () => "err", + json: async () => ({ successes: [], errors: [{ id: "someId", message: "Something failed" }] }), + }); + } else { + index++; + return Promise.resolve({ + status: 200, + text: async () => "ok", + json: async () => ({ successes: [], errors: [] }), + }); + } + }); + + const time = Date.now(); + jest.useRealTimers(); + await langfuse.flushAsync(); + expect(index).toBe(4); + expect(mocks.fetch).toHaveBeenCalledTimes(5); + expect(Date.now() - time).toBeGreaterThan(300); + expect(Date.now() - time).toBeLessThan(500); + }); + it("expect number of calls to match the number of items", async () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", @@ -82,7 +142,7 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(0); jest.advanceTimersByTime(300); - expect(mocks.fetch).toHaveBeenCalledTimes(3); + expect(mocks.fetch).toHaveBeenCalledTimes(1); }); }); }); diff --git a/langfuse-core/test/langfuse.init.spec.ts b/langfuse-core/test/langfuse.init.spec.ts index a54bfe9c..ac91edd1 100644 --- a/langfuse-core/test/langfuse.init.spec.ts +++ b/langfuse-core/test/langfuse.init.spec.ts @@ -49,7 +49,6 @@ describe("Langfuse Core", () => { secretKey: "sk-lf-111", publicKey: "pk-lf-111", baseUrl: "https://cloud.langfuse.com", - flushAt: 1, flushInterval: 10000, }); }); diff --git a/langfuse-core/test/langfuse.listeners.spec.ts b/langfuse-core/test/langfuse.listeners.spec.ts index 558818e3..eabe7af9 100644 --- a/langfuse-core/test/langfuse.listeners.spec.ts +++ b/langfuse-core/test/langfuse.listeners.spec.ts @@ -17,7 +17,7 @@ describe("Langfuse Core", () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", secretKey: "sk-lf-111", - flushAt: 10, + flushAt: 1, }); }); @@ -26,8 +26,8 @@ describe("Langfuse Core", () => { const mock = jest.fn(); const mockOther = jest.fn(); const mockOther2 = jest.fn(); - langfuse.on("createTrace", mock); - langfuse.on("createTrace", mockOther); + langfuse.on("trace-create", mock); + langfuse.on("trace-create", mockOther); langfuse.on("somethingElse", mockOther2); langfuse.trace({ name: "test-trace" }); @@ -39,7 +39,7 @@ describe("Langfuse Core", () => { it("should unsubscribe when called", () => { const mock = jest.fn(); - const unsubscribe = langfuse.on("createTrace", mock); + const unsubscribe = langfuse.on("trace-create", mock); langfuse.trace({ name: "test-trace1" }); expect(mock).toHaveBeenCalledTimes(1); diff --git a/langfuse-core/test/langfuse.nesting.spec.ts b/langfuse-core/test/langfuse.nesting.spec.ts index ee280dd0..6100969c 100644 --- a/langfuse-core/test/langfuse.nesting.spec.ts +++ b/langfuse-core/test/langfuse.nesting.spec.ts @@ -16,7 +16,7 @@ describe("Langfuse Core", () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", secretKey: "sk-lf-111", - flushAt: 1000, + flushAt: 1, }); }); @@ -45,32 +45,77 @@ describe("Langfuse Core", () => { const checks = [ { - url: "https://cloud.langfuse.com/api/public/traces", - object: { name: "test-trace" }, + url: "https://cloud.langfuse.com/api/public/ingestion", + object: { + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { name: "test-trace" }, + }, + ], + }, }, { - url: "https://cloud.langfuse.com/api/public/spans", - object: { name: "test-span-1", traceId: trace.id }, + url: "https://cloud.langfuse.com/api/public/ingestion", + object: { + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "span-create", + body: { name: "test-span-1", traceId: trace.id }, + }, + ], + }, }, { - url: "https://cloud.langfuse.com/api/public/spans", - object: { name: "test-span-2", traceId: trace.id }, + url: "https://cloud.langfuse.com/api/public/ingestion", + object: { + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "span-create", + body: { name: "test-span-2", traceId: trace.id }, + }, + ], + }, }, { - url: "https://cloud.langfuse.com/api/public/events", + url: "https://cloud.langfuse.com/api/public/ingestion", object: { - name: "test-event-1", - traceId: trace.id, - parentObservationId: span2.id, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "event-create", + body: { + name: "test-event-1", + traceId: trace.id, + parentObservationId: span2.id, + }, + }, + ], }, }, { - url: "https://cloud.langfuse.com/api/public/scores", + url: "https://cloud.langfuse.com/api/public/ingestion", object: { - name: "test-score-1", - traceId: trace.id, - observationId: event.id, - value: 0.5, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "score-create", + body: { + name: "test-score-1", + traceId: trace.id, + observationId: event.id, + value: 0.5, + }, + }, + ], }, }, ]; @@ -102,10 +147,19 @@ describe("Langfuse Core", () => { }); await langfuse.flushAsync(); expect(parseBody(mocks.fetch.mock.calls.pop())).toMatchObject({ - traceId: trace.id, - parentObservationId: client.id, - id: nextClient.id, - name: `test-span-${i}`, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "span-create", + body: { + traceId: trace.id, + parentObservationId: client.id, + id: nextClient.id, + name: `test-span-${i}`, + }, + }, + ], }); } else if (rand < 0.66) { nextClient = client.event({ @@ -113,10 +167,19 @@ describe("Langfuse Core", () => { }); await langfuse.flushAsync(); expect(parseBody(mocks.fetch.mock.calls.pop())).toMatchObject({ - traceId: trace.id, - parentObservationId: client.id, - id: nextClient.id, - name: `test-event-${i}`, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "event-create", + body: { + traceId: trace.id, + parentObservationId: client.id, + id: nextClient.id, + name: `test-event-${i}`, + }, + }, + ], }); } else { nextClient = client.generation({ @@ -124,10 +187,19 @@ describe("Langfuse Core", () => { }); await langfuse.flushAsync(); expect(parseBody(mocks.fetch.mock.calls.pop())).toMatchObject({ - traceId: trace.id, - parentObservationId: client.id, - id: nextClient.id, - name: `test-generation-${i}`, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "generation-create", + body: { + traceId: trace.id, + parentObservationId: client.id, + id: nextClient.id, + name: `test-generation-${i}`, + }, + }, + ], }); } client = nextClient; diff --git a/langfuse-core/test/langfuse.observations.spec.ts b/langfuse-core/test/langfuse.observations.spec.ts index ee4082fb..201ccd1c 100644 --- a/langfuse-core/test/langfuse.observations.spec.ts +++ b/langfuse-core/test/langfuse.observations.spec.ts @@ -20,6 +20,84 @@ describe("Langfuse Core", () => { }); describe("observations", () => { + [ + { + usage: { + input: 1, + output: 2, + total: 3, + unit: "CHARACTERS", + }, + expectedOutput: { + input: 1, + output: 2, + total: 3, + unit: "CHARACTERS", + }, + }, + { + usage: { + output: 2, + unit: "CHARACTERS", + }, + expectedOutput: { + output: 2, + unit: "CHARACTERS", + }, + }, + { + usage: { + promptTokens: 1, + completionTokens: 2, + totalTokens: 3, + }, + expectedOutput: { + promptTokens: 1, + completionTokens: 2, + totalTokens: 3, + }, + }, + { + usage: { + promptTokens: 1, + }, + expectedOutput: { + promptTokens: 1, + }, + }, + ].forEach((usageConfig) => { + it(`should create observations with different usage types correctly ${JSON.stringify(usageConfig)}`, async () => { + jest.setSystemTime(new Date("2022-01-01")); + + const trace = langfuse.trace({ + name: "test-trace", + }); + + // explicit start/end + trace.generation({ + name: "test-observation-1", + startTime: new Date("2023-01-02"), + endTime: new Date("2023-01-03"), + usage: usageConfig.usage, + }); + expect(mocks.fetch).toHaveBeenCalledTimes(2); + expect(parseBody(mocks.fetch.mock.calls[1])).toMatchObject({ + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "generation-create", + body: { + name: "test-observation-1", + startTime: new Date("2023-01-02").toISOString(), + endTime: new Date("2023-01-03").toISOString(), + usage: usageConfig.expectedOutput, + }, + }, + ], + }); + }); + }); it("should create each observation and handle dates correctly", async () => { jest.setSystemTime(new Date("2022-01-01")); @@ -35,9 +113,18 @@ describe("Langfuse Core", () => { }); expect(mocks.fetch).toHaveBeenCalledTimes(2); expect(parseBody(mocks.fetch.mock.calls[1])).toMatchObject({ - name: "test-observation-1", - startTime: new Date("2023-01-02").toISOString(), - endTime: new Date("2023-01-03").toISOString(), + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "generation-create", + body: { + name: "test-observation-1", + startTime: new Date("2023-01-02").toISOString(), + endTime: new Date("2023-01-03").toISOString(), + }, + }, + ], }); // implicit start @@ -46,8 +133,17 @@ describe("Langfuse Core", () => { }); expect(mocks.fetch).toHaveBeenCalledTimes(3); expect(parseBody(mocks.fetch.mock.calls[2])).toMatchObject({ - name: "test-observation-2", - startTime: new Date().toISOString(), + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "span-create", + body: { + name: "test-observation-2", + startTime: new Date().toISOString(), + }, + }, + ], }); // implicit start @@ -56,8 +152,17 @@ describe("Langfuse Core", () => { }); expect(mocks.fetch).toHaveBeenCalledTimes(4); expect(parseBody(mocks.fetch.mock.calls[3])).toMatchObject({ - name: "test-observation-3", - startTime: new Date().toISOString(), + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "event-create", + body: { + name: "test-observation-3", + startTime: new Date().toISOString(), + }, + }, + ], }); }); @@ -69,7 +174,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toEqual({ - id: "123456789", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + id: "123456789", + }, + }, + ], }); }); @@ -91,15 +205,24 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(1); const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - name: "test-trace", - id: "123456789", - metadata: { - test: "test", - mira: { - hello: "world", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + name: "test-trace", + id: "123456789", + metadata: { + test: "test", + mira: { + hello: "world", + }, + }, + version: "1.0.0", + }, }, - }, - version: "1.0.0", + ], }); }); }); diff --git a/langfuse-core/test/langfuse.traces.spec.ts b/langfuse-core/test/langfuse.traces.spec.ts index 86cd5aee..87c72e64 100644 --- a/langfuse-core/test/langfuse.traces.spec.ts +++ b/langfuse-core/test/langfuse.traces.spec.ts @@ -37,19 +37,30 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(1); const [url, options] = mocks.fetch.mock.calls[0]; - expect(url).toMatch(/^https:\/\/cloud\.langfuse\.com\/api\/public\/traces$/); + expect(url).toMatch(/^https:\/\/cloud\.langfuse\.com\/api\/public\/ingestion$/); expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[0]); + console.log(body); expect(body).toMatchObject({ - name: "test-trace", - sessionId: "123456789", - input: { - hello: "world", - }, - output: { - hello: "world", - }, + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + id: expect.any(String), + name: "test-trace", + sessionId: "123456789", + input: { + hello: "world", + }, + output: { + hello: "world", + }, + }, + }, + ], }); }); @@ -61,7 +72,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - id: expect.any(String), + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + id: expect.any(String), + }, + }, + ], }); }); @@ -73,7 +93,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toEqual({ - id: "123456789", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + id: "123456789", + }, + }, + ], }); }); @@ -95,15 +124,24 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(1); const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - name: "test-trace", - id: "123456789", - metadata: { - test: "test", - mira: { - hello: "world", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + name: "test-trace", + id: "123456789", + metadata: { + test: "test", + mira: { + hello: "world", + }, + }, + version: "1.0.0", + }, }, - }, - version: "1.0.0", + ], }); }); @@ -119,8 +157,17 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(2); const body = parseBody(mocks.fetch.mock.calls[1]); expect(body).toMatchObject({ - id: trace.id, - userId: "123456789", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + id: trace.id, + userId: "123456789", + }, + }, + ], }); }); }); @@ -141,7 +188,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v1.0.0-alpha.1", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v1.0.0-alpha.1", + }, + }, + ], }); }); @@ -161,7 +217,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v1.0.0-alpha.200", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v1.0.0-alpha.200", + }, + }, + ], }); }); @@ -182,7 +247,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v1.0.0-alpha.10", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v1.0.0-alpha.10", + }, + }, + ], }); }); @@ -201,7 +275,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v2", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v2", + }, + }, + ], }); }); @@ -214,7 +297,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v5", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v5", + }, + }, + ], }); }); @@ -223,7 +315,7 @@ describe("Langfuse Core", () => { name: "test-trace", }); const body = parseBody(mocks.fetch.mock.calls[0]); - expect(body).not.toHaveProperty("release"); + expect(body["batch"][0]).not.toHaveProperty("release"); }); it("should allow overridding the release in constructor", async () => { @@ -242,7 +334,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v4", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v4", + }, + }, + ], }); }); @@ -263,7 +364,16 @@ describe("Langfuse Core", () => { const body = parseBody(mocks.fetch.mock.calls[0]); expect(body).toMatchObject({ - release: "v3", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "trace-create", + body: { + release: "v3", + }, + }, + ], }); }); }); diff --git a/langfuse-core/test/langfuse.update.spec.ts b/langfuse-core/test/langfuse.update.spec.ts index f67d8aec..54d913d5 100644 --- a/langfuse-core/test/langfuse.update.spec.ts +++ b/langfuse-core/test/langfuse.update.spec.ts @@ -15,7 +15,7 @@ describe("Langfuse Core", () => { [langfuse, mocks] = createTestClient({ publicKey: "pk-lf-111", secretKey: "sk-lf-111", - flushAt: 1000, + flushAt: 1, }); }); @@ -38,14 +38,23 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/spans"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toMatchObject({ - traceId: trace.id, - spanId: span.id, - version: "1.0.0", - name: "test-span-2", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "span-update", + body: { + traceId: trace.id, + id: span.id, + version: "1.0.0", + name: "test-span-2", + }, + }, + ], }); }); @@ -66,13 +75,22 @@ describe("Langfuse Core", () => { expect(mocks.fetch).toHaveBeenCalledTimes(3); const [url, options] = mocks.fetch.mock.calls[2]; - expect(url).toMatch("https://cloud.langfuse.com/api/public/generations"); - expect(options.method).toBe("PATCH"); + expect(url).toMatch("https://cloud.langfuse.com/api/public/ingestion"); + expect(options.method).toBe("POST"); const body = parseBody(mocks.fetch.mock.calls[2]); expect(body).toMatchObject({ - traceId: trace.id, - generationId: generation.id, - version: "1.0.0", + batch: [ + { + id: expect.any(String), + timestamp: expect.any(String), + type: "generation-update", + body: { + traceId: trace.id, + id: generation.id, + version: "1.0.0", + }, + }, + ], }); }); }); diff --git a/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts b/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts index e3b93ab6..f6ddbbdc 100644 --- a/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts +++ b/langfuse-core/test/test-utils/LangfuseCoreTestClient.ts @@ -5,7 +5,7 @@ import { type LangfuseFetchResponse, } from "../../src"; -const version = "2.0.0-alpha"; +const version = "2.0.0-alpha.2"; export interface LangfuseCoreTestClientMocks { fetch: jest.Mock, [string, LangfuseFetchOptions]>; diff --git a/langfuse-langchain/src/callback.ts b/langfuse-langchain/src/callback.ts index 7dbedd94..e6525445 100644 --- a/langfuse-langchain/src/callback.ts +++ b/langfuse-langchain/src/callback.ts @@ -87,7 +87,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Retriever error:", err, runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, level: "ERROR", statusMessage: err.toString(), @@ -118,7 +118,6 @@ export class CallbackHandler extends BaseCallbackHandler { name: chain.id.at(-1)?.toString(), metadata: this.joinTagsAndMetaData(tags, metadata), input: inputs, - startTime: new Date(), version: this.version, }); } catch (e) { @@ -129,6 +128,7 @@ export class CallbackHandler extends BaseCallbackHandler { async handleAgentAction(action: AgentAction, runId?: string, parentRunId?: string): Promise { try { console.log("Agent action:", runId); + this.langfuse.span({ id: runId, parentObservationId: parentRunId, @@ -146,7 +146,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Agent finish:", runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, endTime: new Date(), output: action, @@ -162,7 +162,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Chain error:", err, runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, level: "ERROR", statusMessage: err.toString(), @@ -248,10 +248,9 @@ export class CallbackHandler extends BaseCallbackHandler { id: runId, traceId: this.traceId, name: llm.id.at(-1)?.toString(), - startTime: new Date(), metadata: this.joinTagsAndMetaData(tags, metadata), parentObservationId: parentRunId ?? this.rootObservationId, - prompt: messages, + input: messages, model: extractedModelName, modelParameters: modelParameters, version: this.version, @@ -279,7 +278,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Chain end:", runId, parentRunId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, output: outputs, endTime: new Date(), @@ -326,7 +325,6 @@ export class CallbackHandler extends BaseCallbackHandler { name: tool.id.at(-1)?.toString(), input: input, metadata: this.joinTagsAndMetaData(tags, metadata), - startTime: new Date(), version: this.version, }); } catch (e) { @@ -344,6 +342,7 @@ export class CallbackHandler extends BaseCallbackHandler { ): Promise { try { console.log("Retriever start:", runId); + this.langfuse.span({ id: runId, parentObservationId: parentRunId, @@ -351,7 +350,6 @@ export class CallbackHandler extends BaseCallbackHandler { name: retriever.id.at(-1)?.toString(), input: query, metadata: this.joinTagsAndMetaData(tags, metadata), - startTime: new Date(), version: this.version, }); } catch (e) { @@ -367,7 +365,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Retriever end:", runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, output: documents, endTime: new Date(), @@ -383,7 +381,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Tool end:", runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, output: output, endTime: new Date(), @@ -399,7 +397,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("Tool error:", err, runId); this.langfuse._updateSpan({ - spanId: runId, + id: runId, traceId: this.traceId, level: "ERROR", statusMessage: err.toString(), @@ -421,9 +419,9 @@ export class CallbackHandler extends BaseCallbackHandler { const llmUsage = output.llmOutput?.["tokenUsage"]; this.langfuse._updateGeneration({ - generationId: runId, + id: runId, traceId: this.traceId, - completion: + output: !lastResponse.text && "message" in lastResponse && lastResponse["message"] instanceof AIMessage && @@ -444,7 +442,7 @@ export class CallbackHandler extends BaseCallbackHandler { try { console.log("LLM error:", err, runId); this.langfuse._updateGeneration({ - generationId: runId, + id: runId, traceId: this.traceId, level: "ERROR", statusMessage: err.toString(), diff --git a/langfuse-node/test/langfuse-node.spec.ts b/langfuse-node/test/langfuse-node.spec.ts index 438757e9..4eb24337 100644 --- a/langfuse-node/test/langfuse-node.spec.ts +++ b/langfuse-node/test/langfuse-node.spec.ts @@ -49,10 +49,10 @@ describe("Langfuse Node.js", () => { expect(mockedFetch).toHaveBeenCalledTimes(1); expect(mockedFetch).toHaveBeenCalledWith( - "http://example.com/api/public/traces", + "http://example.com/api/public/ingestion", expect.objectContaining({ method: "POST", - body: JSON.stringify({ id: "test-id", name: "trace-name" }), + body: expect.stringContaining(JSON.stringify({ id: "test-id", name: "trace-name" })), headers: expect.objectContaining({ "Content-Type": "application/json", Authorization: "Basic " + Buffer.from("pk:sk").toString("base64"), @@ -112,7 +112,7 @@ describe("Langfuse Node.js", () => { // 10 capture calls to debug log // 6 flush calls to debug log expect(logSpy).toHaveBeenCalledTimes(16); - expect(10).toEqual(logSpy.mock.calls.filter((call) => call[1].includes("createTrace")).length); + expect(10).toEqual(logSpy.mock.calls.filter((call) => call[1].includes("trace-create")).length); expect(6).toEqual(logSpy.mock.calls.filter((call) => call[1].includes("flush")).length); logSpy.mockClear(); diff --git a/langfuse/test/langfuse-web.spec.ts b/langfuse/test/langfuse-web.spec.ts index 4d06c3b4..b9b4b1c1 100644 --- a/langfuse/test/langfuse-web.spec.ts +++ b/langfuse/test/langfuse-web.spec.ts @@ -60,16 +60,18 @@ describe("langfuseWeb", () => { expect(fetch).toHaveBeenCalledTimes(1); expect(fetch).toHaveBeenCalledWith( - "https://cloud.langfuse.com/api/public/scores", + "https://cloud.langfuse.com/api/public/ingestion", expect.objectContaining({ - body: JSON.stringify({ - id, - name: "test", - traceId: "test-trace-1", - value: 200, - comment: "test comment", - observationId: "test-observation-id", - }), + body: expect.stringContaining( + JSON.stringify({ + id, + name: "test", + traceId: "test-trace-1", + value: 200, + comment: "test comment", + observationId: "test-observation-id", + }) + ), method: "POST", headers: expect.objectContaining({ "Content-Type": "application/json", diff --git a/langfuse/test/langfuse.spec.ts b/langfuse/test/langfuse.spec.ts index d13668c5..2a98a0c1 100644 --- a/langfuse/test/langfuse.spec.ts +++ b/langfuse/test/langfuse.spec.ts @@ -50,11 +50,13 @@ describe("langfuseWeb", () => { langfuse.trace({ name: "test-trace-1", id: "test-id" }); - expect(fetch).toHaveBeenCalledWith("https://cloud.langfuse.com/api/public/traces", { - body: JSON.stringify({ - id: "test-id", - name: "test-trace-1", - }), + expect(fetch).toHaveBeenCalledWith("https://cloud.langfuse.com/api/public/ingestion", { + body: expect.stringContaining( + JSON.stringify({ + id: "test-id", + name: "test-trace-1", + }) + ), method: "POST", headers: expect.objectContaining({ "Content-Type": "application/json", diff --git a/yarn.lock b/yarn.lock index f03f5365..ce41d46e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2637,6 +2637,15 @@ axios@^1.0.0, axios@^1.4.0: form-data "^4.0.0" proxy-from-env "^1.1.0" +axios@^1.6.0: + version "1.6.2" + resolved "https://registry.yarnpkg.com/axios/-/axios-1.6.2.tgz#de67d42c755b571d3e698df1b6504cde9b0ee9f2" + integrity sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A== + dependencies: + follow-redirects "^1.15.0" + form-data "^4.0.0" + proxy-from-env "^1.1.0" + babel-jest@^29.7.0: version "29.7.0" resolved "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz"