Skip to content

Commit

Permalink
fix(weave_ts): Fix issue where batch processing could crash (#3298)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewtruong authored Dec 20, 2024
1 parent fcb51c3 commit e31ced6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 9 deletions.
22 changes: 21 additions & 1 deletion sdks/node/src/__tests__/weaveClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('WeaveClient', () => {
beforeEach(() => {
mockTraceServerApi = {
call: {
callStartBatchCallUpsertBatchPost: jest.fn(),
callStartBatchCallUpsertBatchPost: jest.fn().mockResolvedValue({}),
},
} as any;
mockWandbServerApi = {} as any;
Expand All @@ -121,6 +121,26 @@ describe('WeaveClient', () => {
(client as any).BATCH_INTERVAL = 10;
});

it('should handle oversized batch items', async () => {
const bigPayloadSize = 11 * 1024 * 1024;
const smallData = {mode: 'start', data: {id: '2', payload: 'small'}};
const bigData = {
mode: 'start',
data: {id: '1', payload: 'x'.repeat(bigPayloadSize)},
};
(client as any).callQueue.push(smallData, bigData);

await (client as any).processBatch();

expect(
mockTraceServerApi.call.callStartBatchCallUpsertBatchPost
).toHaveBeenCalledWith({
batch: [{mode: 'start', req: smallData.data}],
});

expect((client as any).callQueue).toContain(bigData);
});

it('should batch multiple calls together', async () => {
// Add test calls to queue
(client as any).callQueue.push(
Expand Down
56 changes: 48 additions & 8 deletions sdks/node/src/weaveClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {AsyncLocalStorage} from 'async_hooks';
import * as fs from 'fs';
import {uuidv7} from 'uuidv7';

import {Dataset} from './dataset';
Expand Down Expand Up @@ -32,6 +33,8 @@ import {packageVersion} from './utils/userAgent';
import {WandbServerApi} from './wandb/wandbServerApi';
import {ObjectRef, WeaveObject, getClassChain} from './weaveObject';

const WEAVE_ERRORS_LOG_FNAME = 'weaveErrors.log';

export type CallStackEntry = {
callId: string;
traceId: string;
Expand Down Expand Up @@ -71,13 +74,18 @@ class CallStack {
type CallStartParams = StartedCallSchemaForInsert;
type CallEndParams = EndedCallSchemaForInsert;

// We count characters item by item, and try to limit batches to about this size.
const MAX_BATCH_SIZE_CHARS = 10 * 1024 * 1024;

export class WeaveClient {
private stackContext = new AsyncLocalStorage<CallStack>();
private callQueue: Array<{mode: 'start' | 'end'; data: any}> = [];
private batchProcessTimeout: NodeJS.Timeout | null = null;
private isBatchProcessing: boolean = false;
private batchProcessingPromises: Set<Promise<void>> = new Set();
private readonly BATCH_INTERVAL: number = 200;
private errorCount = 0;
private readonly MAX_ERRORS = 10;

constructor(
public traceServerApi: TraceServerApi<any>,
Expand Down Expand Up @@ -114,25 +122,43 @@ export class WeaveClient {

this.isBatchProcessing = true;

// We count characters item by item, and try to limit batches to about
// this size.
const maxBatchSizeChars = 5 * 1024 * 1024;

let batchToProcess = [];
let currentBatchSize = 0;

while (this.callQueue.length > 0 && currentBatchSize < maxBatchSizeChars) {
const item = this.callQueue[0];
while (
this.callQueue.length > 0 &&
currentBatchSize < MAX_BATCH_SIZE_CHARS
) {
const item = this.callQueue.shift();
if (item === undefined) {
throw new Error('Call queue is empty');
}

const itemSize = JSON.stringify(item).length;
if (itemSize > MAX_BATCH_SIZE_CHARS) {
fs.appendFileSync(
WEAVE_ERRORS_LOG_FNAME,
`Item size ${itemSize} exceeds max batch size ${MAX_BATCH_SIZE_CHARS}. Item: ${JSON.stringify(item)}\n`
);
}

if (currentBatchSize + itemSize <= maxBatchSizeChars) {
batchToProcess.push(this.callQueue.shift()!);
if (currentBatchSize + itemSize <= MAX_BATCH_SIZE_CHARS) {
batchToProcess.push(item);
currentBatchSize += itemSize;
} else {
// doesn't fit, put it back
this.callQueue.unshift(item);
break;
}
}

if (batchToProcess.length === 0) {
this.batchProcessTimeout = null;
return;
}

this.isBatchProcessing = true;

const batchReq = {
batch: batchToProcess.map(item => ({
mode: item.mode,
Expand All @@ -146,8 +172,20 @@ export class WeaveClient {
);
} catch (error) {
console.error('Error processing batch:', error);
this.errorCount++;
fs.appendFileSync(
WEAVE_ERRORS_LOG_FNAME,
`Error processing batch: ${error}\n`
);

// Put failed items back at the front of the queue
this.callQueue.unshift(...batchToProcess);

// Exit if we have too many errors
if (this.errorCount > this.MAX_ERRORS) {
console.error(`Exceeded max errors: ${this.MAX_ERRORS}; exiting`);
process.exit(1);
}
} finally {
this.isBatchProcessing = false;
this.batchProcessTimeout = null;
Expand Down Expand Up @@ -734,7 +772,9 @@ function mergeSummaries(left: Summary, right: Summary): Summary {
if (typeof leftValue === 'number' && typeof result[key] === 'number') {
result[key] = leftValue + result[key];
} else if (
leftValue != null &&
typeof leftValue === 'object' &&
result[key] != null &&
typeof result[key] === 'object'
) {
result[key] = mergeSummaries(leftValue, result[key]);
Expand Down

0 comments on commit e31ced6

Please sign in to comment.