Skip to content

Commit

Permalink
Add support for adding tasks dynamically when concurrency > 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mterrel committed Jan 18, 2019
1 parent f3294a5 commit bb78fbe
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
60 changes: 43 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
const pMap = require('p-map');
const PQueue = require('p-queue');
const pDefer = require('p-defer');
const Subject = require('rxjs').Subject;
const Task = require('./lib/task');
const TaskWrapper = require('./lib/task-wrapper');
Expand Down Expand Up @@ -45,6 +46,8 @@ class Listr extends Subject {
this.concurrency = this._options.concurrent;
}

this._queue = new PQueue({concurrency: this.concurrency});

this._RendererClass = renderer.getRenderer(this._options.renderer, this._options.nonTTYRenderer);

this.exitOnError = this._options.exitOnError;
Expand All @@ -67,10 +70,6 @@ class Listr extends Subject {
}

add(task) {
if (this.running && this.concurrency !== 1) {
throw new ListrError('Cannot add tasks while running if concurrency != 1');
}

const tasks = Array.isArray(task) ? task : [task];

for (const task of tasks) {
Expand All @@ -81,12 +80,28 @@ class Listr extends Subject {
type: 'ADDTASK',
data: t
});
this.queueTask(t);
}
}

return this;
}

queueTask(task) {
this._queue.add(() => {
if (this._errorThrown) {
return;
}

this._checkAll(this._context);
return runTask(task, this._context, this._errors)
.catch(error => {
this._errorThrown = true;
throw error;
});
}).catch(this._done.reject);
}

render() {
if (!this._renderer) {
this._renderer = new this._RendererClass(this._tasks, this._options, this);
Expand All @@ -102,22 +117,33 @@ class Listr extends Subject {
run(context) {
this.render();

context = context || Object.create(null);
this._context = context || Object.create(null);

this._errors = [];

const errors = [];
this._checkAll(this._context);

this._checkAll(context);
// This is the promise that will settle when run() is complete.
// It settles on one of two conditions:
// 1. It resolves successfully when the task queue is idle (queue is
// empty and all promises have been resolved successfully)
// 2. It rejects immediately if any task throws an error.
this._done = pDefer();

const tasks = pMap(this._tasks, task => {
this._checkAll(context);
return runTask(task, context, errors);
}, {concurrency: this.concurrency});
// Add all the tasks we have so far to the queue
this._tasks.map(task => this.queueTask(task));

return tasks
this._queue.onIdle()
.then(this._done.resolve) // Successful completion of all tasks
.catch(this._done.reject);

return this._done.promise
.then(() => {
if (errors.length > 0) {
// Check for errors from tasks with exitOnError===false
if (this._errors.length > 0) {
const err = new ListrError('Something went wrong');
err.errors = errors;
err.errors = this._errors;

throw err;
}

Expand All @@ -126,10 +152,10 @@ class Listr extends Subject {

this._renderer.end();

return context;
return this._context;
})
.catch(error => {
error.context = context;
error.context = this._context;
this.complete();
this._renderer.end(error);
throw error;
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"listr-silent-renderer": "^1.1.1",
"listr-update-renderer": "^0.5.0",
"listr-verbose-renderer": "^0.5.0",
"p-map": "^2.0.0",
"p-defer": "^1.0.0",
"p-queue": "^3.0.0",
"rxjs": "^6.3.3"
},
"devDependencies": {
Expand Down

0 comments on commit bb78fbe

Please sign in to comment.