Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#25 Emit added for queue overflow. Tests added as well #26

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ If a message is discarded entirely because it does not pass your `shouldRetry` l

If the queue is reclaiming events from an abandonded queue, and sees duplicate entries, we will keep the first, and discard the rest, emitting a `duplication` event for each.

If a message is discarded because the queue exceeds `maxItems`, the queue will emit an `overflow` event.

### `processed`

```javascript
Expand All @@ -171,6 +173,13 @@ queue.on('discard', function(item, attempts) {
```javascript
queue.on('duplication', function(item, attempts) {
console.error('discarding message %O due to duplicate entries', item, attempts);
```

### `overflow`

```javascript
queue.on('overflow', function(item, attempts) {
console.error('discarding message %O after %d attempts due to queue overflow', item, attempts);
})
```

Expand Down
19 changes: 17 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,18 @@ Queue.prototype.requeue = function(item, attemptNumber, error, id) {

Queue.prototype._enqueue = function(entry) {
var queue = this._store.get(this.keys.QUEUE) || [];
queue = queue.slice(-(this.maxItems - 1));
queue.push(entry);

// We should remove events from the end of the array as these are generally the oldest events that have been retried.

// If greater than the length of the array, start will be set to the length of the array.
var sliceStart = this.maxItems - 1;
// If deleteCount is omitted, or if its value is equal to or larger than array.length - start,
// then all the elements from sliceStart to the end of the array will be deleted.
var sliceDeleteCount = queue.length - sliceStart
// More docs https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/splice
var removedEntrys = queue.splice(sliceStart, sliceDeleteCount, entry);
// We can add the new event at the end of the array since we sort the array on the next line
// We keep the deleted events so we can emit events for them below.
queue = queue.sort(function(a,b) {
return a.time - b.time;
});
Expand All @@ -185,6 +195,11 @@ Queue.prototype._enqueue = function(entry) {
if (this._running) {
this._processHead();
}
if (removedEntrys.length > 0) {
for (var i = 0; i < removedEntrys.length; i++) {
this.emit('overflow', removedEntrys[i].item, removedEntrys[i].attemptNumber);
}
}
};

Queue.prototype._processHead = function() {
Expand Down
29 changes: 29 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,19 @@ describe('Queue', function() {

describe('events', function() {
var queue;
var clock;

beforeEach(function() {
clock = lolex.createClock(0);
Schedule.setClock(clock);
queue = new Queue('events', function(_, cb) {
cb();
});
});

afterEach(function() {
queue.stop();
Schedule.resetClock();
});

it('should emit processed with response, and item', function(done) {
Expand Down Expand Up @@ -605,6 +610,30 @@ describe('events', function() {
});
queue.start();
queue.addItem({ a: 'b' });
clock.runAll();
});

it('should emit overflow if the adding a message exceeds queue maxItems', function(done) {
var firstEvent = { a: 'b' };
var otherEvents = { c: 'd' };

queue.fn = function(item, cb) {
cb(new Error('no'));
};

queue.maxItems = 5;
queue.on('overflow', function(item, attempts) {
assert.equal(item.a, firstEvent.a);
assert.equal(attempts, 0);
done();
});
queue.addItem(firstEvent);
clock.tick(10);
queue.addItem(otherEvents);
queue.addItem(otherEvents);
queue.addItem(otherEvents);
queue.addItem(otherEvents);
queue.addItem(otherEvents);
});
});

Expand Down