Skip to content

Commit

Permalink
Add async queueing methods
Browse files Browse the repository at this point in the history
  • Loading branch information
erayd committed Jan 2, 2020
1 parent 8742f7d commit 7cf699f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ synonymous.
If more than one value is supplied, then they will all be added to the queue
in the order that they are supplied.

### .pushAsync(), .enqueueAsync()
```javascript
for (let item of items) {
await s.pushAsync(item);
}
```
Asynchronous version of `push()` / `enqueue()`, returning a `Promise`. Allows
for backpressure and feeding the queue at the same rate items are removed from
it.

### .shift(), .dequeue()
```javascript
var myValue = s.shift();
Expand All @@ -69,6 +79,16 @@ the queue is full, then this method will return an error.
If more than one value is supplied, then they will all be added to the queue
in the order that they are supplied.

### .unshiftAsync()
```javascript
for (let item of items) {
await s.unshiftAsync(item);
}
```
Asynchronous version of `unshift()`, returning a `Promise`. Allows
for backpressure and feeding the queue at the same rate items are removed from
it.

### .pop()
```javascript
var myValue = s.pop();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spique",
"version": "2.2.0",
"version": "2.3.0",
"description": "A spiral deque - high performance and dynamic queue size",
"main": "spique.js",
"scripts": {
Expand Down
39 changes: 35 additions & 4 deletions spique.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = class Spique extends events.EventEmitter {
var spareRing = undefined;
var rings = 1;
var items = 0;
var pending = new Spique();

// allocate a new ring, or return the spare if available
function allocateRing() {
Expand Down Expand Up @@ -80,6 +81,17 @@ module.exports = class Spique extends events.EventEmitter {
}
}

// push item(s) onto the end of the buffer when there is space available
this.enqueueAsync = this.pushAsync = function pushAsync(value) {
if (!this.isFull()) {
return Promise.resolve(this.push(value));
} else {
pending.push(() => {
return Promise.resolve(this.push(value));
});
}
}

// push item(s) onto the start of the buffer
this.unshift = function unshift(value) {
if(items >= maxItems)
Expand Down Expand Up @@ -108,6 +120,17 @@ module.exports = class Spique extends events.EventEmitter {
}
}

// push item(s) onto the start of the buffer when there is space available
this.unshiftAsync = function unshiftAsync(value) {
if (!this.isFull()) {
return Promise.resolve(this.unshift(value));
} else {
pending.push(() => {
return Promise.resolve(this.unshift(value));
});
}
}

// pop an item off the end of the buffer
this.pop = function() {
var value = lastRing.pop();
Expand All @@ -121,8 +144,12 @@ module.exports = class Spique extends events.EventEmitter {
items--;
if (items === 0)
this.emit("empty", this);
if (items < maxItems)
this.emit("space", this);
if (items < maxItems) {
while (!this.isFull() && !pending.isEmpty())
this.pending.shift()();
if (items < maxItems)
this.emit("space", this);
}
return value;
};

Expand All @@ -139,8 +166,12 @@ module.exports = class Spique extends events.EventEmitter {
items--;
if (items === 0)
this.emit("empty", this);
if (items < maxItems)
this.emit("space", this);
if (items < maxItems) {
while (!this.isFull() && !pending.isEmpty())
this.pending.shift()();
if (items < maxItems)
this.emit("space", this);
}
return value;
};

Expand Down

0 comments on commit 7cf699f

Please sign in to comment.