Skip to content

Commit

Permalink
Always process queue items that were skipped due to batch size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-on-tailwind committed Oct 4, 2024
1 parent 8ca08a9 commit 0d140e1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
12 changes: 7 additions & 5 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,17 @@ abstract class LangfuseCoreStateless {
if (!queue.length) {
return callback?.();
}

const items = queue.splice(0, this.flushAt);
this.setPersistedProperty<LangfuseQueueItem[]>(LangfusePersistedProperty.Queue, queue);

const MAX_MSG_SIZE = 1_000_000;
const BATCH_SIZE_LIMIT = 2_500_000;

this.processQueueItems(items, MAX_MSG_SIZE, BATCH_SIZE_LIMIT);
const { processedItems, remainingItems } = this.processQueueItems(items, MAX_MSG_SIZE, BATCH_SIZE_LIMIT);

// Add remaining items back to the start of the queue
queue.unshift(...remainingItems);

const promiseUUID = generateUUID();

Expand All @@ -708,9 +711,9 @@ abstract class LangfuseCoreStateless {
};

const payload = JSON.stringify({
batch: items,
batch: processedItems,
metadata: {
batch_size: items.length,
batch_size: processedItems.length,
sdk_integration: this.sdkIntegration,
sdk_version: this.getLibraryVersion(),
sdk_variant: this.getLibraryId(),
Expand Down Expand Up @@ -755,7 +758,6 @@ abstract class LangfuseCoreStateless {
console.warn(`Item exceeds size limit (size: ${itemSize}), dropping item.`);
continue;
}

// if adding the next item would exceed the batch size limit, stop processing
if (totalSize + itemSize >= BATCH_SIZE_LIMIT) {
console.debug(`hit batch size limit (size: ${totalSize + itemSize})`);
Expand Down
32 changes: 32 additions & 0 deletions langfuse-core/test/langfuse.flush.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,37 @@ describe("Langfuse Core", () => {
jest.advanceTimersByTime(300);
expect(mocks.fetch).toHaveBeenCalledTimes(1);
});

describe("when queue is completely full", () => {
const MAX_MSG_SIZE = 1_000_000;
const BATCH_SIZE_LIMIT = 2_500_000;
// Message is right under the message size limit
const MSG_SIZE = MAX_MSG_SIZE - 1000;
const BIG_STRING = "a".repeat(MSG_SIZE);

it("should flush remaining items on subsequent flush", () => {
const n = Math.floor(BATCH_SIZE_LIMIT / MSG_SIZE) + 1;

[langfuse, mocks] = createTestClient({
publicKey: "pk-lf-111",
secretKey: "sk-lf-111",
flushAt: n,
flushInterval: 200,
});

// Adds enough messages to exceed batch size limit
for (let i = 0; i < n; i++) {
langfuse.trace({ name: `test-trace-${i}`, input: { content: BIG_STRING } });
}

// First call flushes the messages that fit under the batch size limit
expect(mocks.fetch).toHaveBeenCalledTimes(1);

jest.advanceTimersByTime(300);

// Second call flushes the remaining messages
expect(mocks.fetch).toHaveBeenCalledTimes(2);
});
});
});
});

0 comments on commit 0d140e1

Please sign in to comment.