From 2fd0e77c8c2534d4da9de8f935485359c7a5a667 Mon Sep 17 00:00:00 2001 From: Adam Savitzky Date: Fri, 4 Oct 2024 10:29:31 -0700 Subject: [PATCH] Always process queue items that were skipped due to batch size limit --- langfuse-core/src/index.ts | 9 ++++--- langfuse-core/test/langfuse.flush.spec.ts | 32 +++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/langfuse-core/src/index.ts b/langfuse-core/src/index.ts index 693f707f..0d19a418 100644 --- a/langfuse-core/src/index.ts +++ b/langfuse-core/src/index.ts @@ -695,7 +695,10 @@ abstract class LangfuseCoreStateless { const MAX_MSG_SIZE = 1_000_000; const BATCH_SIZE_LIMIT = 2_500_000; - this.processQueueItems(items, MAX_MSG_SIZE, BATCH_SIZE_LIMIT); + const { processedItems, remainingItems } = this.processQueueItems(items, MAX_MSG_SIZE, BATCH_SIZE_LIMIT); + + // Add remaining items back to the start of the queue + queue.unshift(...remainingItems); const promiseUUID = generateUUID(); @@ -708,9 +711,9 @@ abstract class LangfuseCoreStateless { }; const payload = JSON.stringify({ - batch: items, + batch: processedItems, metadata: { - batch_size: items.length, + batch_size: processedItems.length, sdk_integration: this.sdkIntegration, sdk_version: this.getLibraryVersion(), sdk_variant: this.getLibraryId(), diff --git a/langfuse-core/test/langfuse.flush.spec.ts b/langfuse-core/test/langfuse.flush.spec.ts index d0e2f0a6..928d1c68 100644 --- a/langfuse-core/test/langfuse.flush.spec.ts +++ b/langfuse-core/test/langfuse.flush.spec.ts @@ -231,5 +231,37 @@ describe("Langfuse Core", () => { jest.advanceTimersByTime(300); expect(mocks.fetch).toHaveBeenCalledTimes(1); }); + + describe("when queue is completely full", () => { + const MAX_MSG_SIZE = 1_000_000; + const BATCH_SIZE_LIMIT = 2_500_000; + // Message is right under the message size limit + const MSG_SIZE = MAX_MSG_SIZE - 1000; + const BIG_STRING = "a".repeat(MSG_SIZE); + + it("should flush remaining items on subsequent flush", () => { + const n = Math.floor(BATCH_SIZE_LIMIT / MSG_SIZE) + 1; + + [langfuse, mocks] = createTestClient({ + publicKey: "pk-lf-111", + secretKey: "sk-lf-111", + flushAt: n, + flushInterval: 200, + }); + + // Adds enough messages to exceed batch size limit + for (let i = 0; i < n; i++) { + langfuse.trace({ name: `test-trace-${i}`, input: { content: BIG_STRING } }); + } + + // First call flushes the messages that fit under the batch size limit + expect(mocks.fetch).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(300); + + // Second call flushes the remaining messages + expect(mocks.fetch).toHaveBeenCalledTimes(2); + }); + }); }); });