Skip to content

Commit

Permalink
fix: batch entry retry (#1918)
Browse files Browse the repository at this point in the history
* fix: batch entry retry

* chore: bump size limit
  • Loading branch information
saikumarrs authored Nov 12, 2024
1 parent 890fb7b commit ff346b8
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ describe('Device mode transformation plugin', () => {
id: 'sample_uuid',
time: expect.any(Number),
// time: 1 + 500 * 2 ** 1, // this is the delay calculation in RetryQueue
type: 'Single',
},
]);
mockSendTransformedEventToDestinations.mockRestore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
]);
});
Expand All @@ -87,6 +88,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
]);

Expand All @@ -112,6 +114,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Batch',
},
]);
});
Expand All @@ -138,6 +141,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Batch',
},
]);
});
Expand Down Expand Up @@ -165,6 +169,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Batch',
},
]);
});
Expand Down Expand Up @@ -192,6 +197,7 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
]);

Expand Down Expand Up @@ -235,7 +241,7 @@ describe('Queue', () => {
queue.processQueueCb = mockProcessItemCb;
queue.start();

queue.requeue('b', 1);
queue.requeue({ item: 'b', attemptNumber: 0, type: 'Single' });
queue.addItem('a');

expect(queue.processQueueCb).toHaveBeenCalledTimes(1);
Expand All @@ -260,17 +266,17 @@ describe('Queue', () => {
queue.start();

// over maxattempts
queue.requeue('a', 3);
queue.requeue({ item: 'b', attemptNumber: 2, type: 'Single' });
jest.advanceTimersByTime(queue.getDelay(3));
expect(queue.processQueueCb).toHaveBeenCalledTimes(0);

mockProcessItemCb.mockReset();
queue.requeue('a', 2);
queue.requeue({ item: ['a', 'b'], attemptNumber: 1, type: 'Batch' });
jest.advanceTimersByTime(queue.getDelay(2));
expect(queue.processQueueCb).toHaveBeenCalledTimes(1);

mockProcessItemCb.mockReset();
queue.requeue('a', 3);
queue.requeue({ item: 'b', attemptNumber: 2, type: 'Single' });
jest.advanceTimersByTime(queue.getDelay(1));
expect(queue.processQueueCb).toHaveBeenCalledTimes(0);
});
Expand Down Expand Up @@ -307,6 +313,23 @@ describe('Queue', () => {
item: 'a',
time: 0,
attemptNumber: 0,
type: 'Single',
},
{
item: ['b', 'c'],
time: 0,
attemptNumber: 0,
type: 'Batch',
},
{
item: ['d', 'e'],
time: 0,
attemptNumber: 0,
},
{
item: 'f',
time: 0,
attemptNumber: 0,
},
]);

Expand All @@ -318,8 +341,39 @@ describe('Queue', () => {
// wait long enough for the other queue to expire and be reclaimed
jest.advanceTimersByTime(queue.timeouts.reclaimTimer + queue.timeouts.reclaimWait * 2);

expect(queue.processQueueCb).toHaveBeenCalledTimes(1);
expect(queue.processQueueCb).toHaveBeenCalledWith('a', expect.any(Function), 0, Infinity, true);
expect(queue.processQueueCb).toHaveBeenCalledTimes(4);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
1,
'a',
expect.any(Function),
0,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
2,
['b', 'c'],
expect.any(Function),
0,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
3,
['d', 'e'],
expect.any(Function),
0,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
4,
'f',
expect.any(Function),
0,
Infinity,
true,
);
});

it('should take over an in-progress task if a queue is abandoned', () => {
Expand All @@ -334,10 +388,27 @@ describe('Queue', () => {
);
foundQueue.set(foundQueue.validKeys.ACK, -15000);
foundQueue.set(foundQueue.validKeys.IN_PROGRESS, {
'task-id': {
'task-id-1': {
item: 'a',
time: 0,
attemptNumber: 0,
type: 'Single',
},
'task-id-2': {
item: ['b', 'c'],
time: 0,
attemptNumber: 0,
type: 'Batch',
},
'task-id-3': {
item: ['d', 'e'],
time: 0,
attemptNumber: 0,
},
'task-id-4': {
item: 'f',
time: 0,
attemptNumber: 0,
},
});

Expand All @@ -349,8 +420,39 @@ describe('Queue', () => {
// wait long enough for the other queue to expire and be reclaimed
jest.advanceTimersByTime(queue.timeouts.reclaimTimer + queue.timeouts.reclaimWait * 2);

expect(queue.processQueueCb).toHaveBeenCalledTimes(1);
expect(queue.processQueueCb).toHaveBeenCalledWith('a', expect.any(Function), 1, Infinity, true);
expect(queue.processQueueCb).toHaveBeenCalledTimes(4);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
1,
'a',
expect.any(Function),
1,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
2,
['b', 'c'],
expect.any(Function),
1,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
3,
['d', 'e'],
expect.any(Function),
1,
Infinity,
true,
);
expect(queue.processQueueCb).toHaveBeenNthCalledWith(
4,
'f',
expect.any(Function),
1,
Infinity,
true,
);
});

it('should take over a batch queued task if a queue is abandoned', () => {
Expand All @@ -376,6 +478,7 @@ describe('Queue', () => {
item: 'a',
time: 0,
attemptNumber: 0,
type: 'Single',
},
{
item: 'b',
Expand Down Expand Up @@ -419,6 +522,7 @@ describe('Queue', () => {
item: 'a',
time: 0,
attemptNumber: 0,
type: 'Single',
},
{
item: 'b',
Expand Down Expand Up @@ -1070,18 +1174,21 @@ describe('Queue', () => {
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
{
item: 'b',
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
{
item: 'c',
attemptNumber: 0,
time: expect.any(Number),
id: expect.any(String),
type: 'Single',
},
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ describe('XhrQueue', () => {
attemptNumber: 1,
id: 'sample_uuid',
time: 1 + 1000 * 2 ** 1, // this is the delay calculation in RetryQueue
type: 'Single',
},
]);
});
Expand Down
3 changes: 3 additions & 0 deletions packages/analytics-js-plugins/src/types/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ export type RudderEventType = 'page' | 'track' | 'identify' | 'alias' | 'group';

export type LogLevel = 'LOG' | 'INFO' | 'DEBUG' | 'WARN' | 'ERROR' | 'NONE';

export type QueueItemType = 'Single' | 'Batch';

export type QueueItem<T = QueueItemData> = {
item: T;
attemptNumber: number;
time: number;
id: string;
type: QueueItemType;
};

export type QueueItemData =
Expand Down
Loading

0 comments on commit ff346b8

Please sign in to comment.