Skip to content

Commit

Permalink
+ paratask.usePromise() API for letting the user use his own Promise …
Browse files Browse the repository at this point in the history
…constructor
  • Loading branch information
IvanDimanov committed Jul 30, 2015
1 parent 731fc6d commit 8667489
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 57 deletions.
65 changes: 65 additions & 0 deletions build_tests/use_promise_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
This file will cover test cases of using "paratask.usePromise()" functionality for setting
custom/user preferred Promise constructor function
*/
'use strict';

var paratask = require('../lib/index.js');
var promise = require('../lib/promise.js');
var bluebird = require('bluebird');


module.exports = {
'calling "paratask.usePromise()" with wrong argument': function (test) {
test.expect(3);

try {
paratask.usePromise();
} catch (error) {
test.ok( error instanceof TypeError, 'Empty call triggers a "TypeError"');
}

try {
paratask.usePromise({});
} catch (error) {
test.ok( error instanceof TypeError, 'Called with empty object {} triggers a "TypeError"');
}

try {
paratask.usePromise(function () {});
} catch (error) {
test.ok( error instanceof TypeError, 'Called with empty function "function () {}" triggers a "TypeError"');
}

test.done();
},


'calling "paratask.usePromise()" with valid Promise constructor functions': function (test) {
test.expect(3);

var task = {
fork: function (resolve) {
resolve(true);
}
};

paratask([task])
.then(function (resolved) {
test.ok( resolved, 'Calling "paratask()" without setting "paratask.usePromise()" should use a default Promise constructor');

paratask.usePromise( promise );
return paratask([task]);
})
.then(function (resolved) {
test.ok( resolved, 'Calling "paratask()" with set custom Promise constructor');

paratask.usePromise( bluebird.Promise );
return paratask([task]);
})
.then(function (resolved) {
test.ok( resolved, 'Calling "paratask()" with set "bluebird" Promise constructor');
test.done();
})
}
};
147 changes: 94 additions & 53 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,31 @@ var log = utils.log;
var child_process_module = require('child_process');


/*Secure valid Promise uses*/
(function () {
if (typeof Promise !== 'function') {
throw new ReferenceError('This "paratask-promises" package needs Promise function (currently not available). You can use ECMAScript 5 "callback" style with: https://www.npmjs.com/package/paratask');
}

var promise = new Promise(function () {});

if (typeof promise.then !== 'function') {
throw new ReferenceError('Current Promise function must expose "then" function as property');
}

if (typeof promise.catch !== 'function') {
throw new ReferenceError('Current Promise function must expose "catch" function as property');
}
})();
/*
This variable will hold the user preferred Promise constructor function.
By default we'll use the Node/io.js Promise constructor if same is available
or fallback to a predefined Promise/A+ constructor.
*/
var PromiseConstructor = typeof Promise === 'function' ? Promise : require('./promise');

/*Will tell whether or not we have "fs" function in a Promise variant*/
var is_fs_promisified = false;


/*
Create a set of File System common functions that
uses Promises in order to stay consistent with the main code
*/
(function () {
function promisifyFs() {
fs.existsPromise = function (path) {
return new Promise(function (resolve) {
return new PromiseConstructor(function (resolve) {
fs.exists(path, resolve);
});
}


fs.unlinkPromise = function (path) {
return new Promise(function (resolve, reject) {
return new PromiseConstructor(function (resolve, reject) {
fs.unlink(path, function (error) {
if (error) {
reject(error);
Expand All @@ -60,7 +52,7 @@ var child_process_module = require('child_process');


fs.writeFilePromise = function (filename, data, options) {
return new Promise(function (resolve, reject) {
return new PromiseConstructor(function (resolve, reject) {
fs.writeFile(filename, data, options, function (error) {
if (error) {
reject(error);
Expand All @@ -70,7 +62,9 @@ var child_process_module = require('child_process');
});
});
}
})();

is_fs_promisified = true;
}


/*
Expand Down Expand Up @@ -113,7 +107,18 @@ var child_process_module = require('child_process');
});
*/
function paratask(tasks) {
return new Promise(function (resolve, reject) {
if (typeof PromiseConstructor !== 'function') {
throw new ReferenceError('"paratask-promises" package needs Promise constructor function (currently not available). You can use ECMAScript 5 "callback" style with: https://www.npmjs.com/package/paratask');
}


/*Be sure all "fs" function we need are in Promise variants*/
if (!is_fs_promisified) {
promisifyFs();
}


return new PromiseConstructor(function (resolve, reject) {

/*
Basic tasks list validation,
Expand All @@ -125,6 +130,7 @@ function paratask(tasks) {
throw new TypeError('1st argument must be an Array of task objects but was: {'+ typeof tasks +'} '+ utils.toString( tasks ));
}


var results = []; /*An array of all workers returned results.*/
var shared_data_file_paths = []; /*An array of each child worker JSON file path. Thees files hold the dependency injections for each worker.*/
var child_processes = []; /*An array of all spawned worker processes. On successfully program completion, this list will be empty.*/
Expand All @@ -141,12 +147,18 @@ function paratask(tasks) {

/*Remove the fork task instructions*/
fs.existsPromise( shared_data_file_paths[ child_process_id ] )
.then(function () {
return fs.unlinkPromise( shared_data_file_paths[ child_process_id ] );
})
.catch(function (error) {
throw new Error('Unable to delete processing file "'+ shared_data_file_paths[ child_process_id ] +'": '+ error);
});
.then(function (does_file_exist) {
return does_file_exist ? fs.unlinkPromise( shared_data_file_paths[ child_process_id ] ) : false;
})
.catch(function (error) {

/*No need to alarm if the file is already deleted*/
if (!(error instanceof Error) ||
!~error.message.indexOf('no such file')
) {
throw new Error('Unable to delete processing file "'+ shared_data_file_paths[ child_process_id ] +'": '+ error);
}
});


/*Erase the internal record for the child process*/
Expand Down Expand Up @@ -277,38 +289,37 @@ function paratask(tasks) {

/*Send the instructions only if all are been correctly recorded*/
fs.writeFilePromise( shared_data_file_paths[ child_process_id ], JSON.stringify( shared_data_json ))
.catch(function (error) {
finishMainProcess( error );
})
.then(function () {
child_processes[ child_process_id ].send({shared_data_file_path: shared_data_file_paths[ child_process_id ]});

.then(function () {
child_processes[ child_process_id ].send({shared_data_file_path: shared_data_file_paths[ child_process_id ]});

/*Wait for any process update of the started task*/
child_processes[ child_process_id ].on('message', function (message) {
if (message.error) {

/*Wait for any process update of the started task*/
child_processes[ child_process_id ].on('message', function (message) {
if (message.error) {
/*
Interprocess communicator is unable to exchange Error objects.
In order to return the original error to the callee,
we'll use a common converter
*/
message.error = convertProcessMessage( message.error_exact_typeof, message.error );

/*
Interprocess communicator is unable to exchange Error objects.
In order to return the original error to the callee,
we'll use a common converter
*/
message.error = convertProcessMessage( message.error_exact_typeof, message.error );
/*
All processes calculations are assumed to be connected so
error in one of them should cancel further work in all
*/
finishMainProcess( message.error );

/*
All processes calculations are assumed to be connected so
error in one of them should cancel further work in all
*/
finishMainProcess( message.error );
} else {

} else {
/*Keep the calculated result from the child process and kill the spawned process since it's no longer needed*/
results[ child_process_id ] = convertProcessMessage( message.result_exact_typeof, message.result );
killProcess( child_process_id );
}

/*Keep the calculated result from the child process and kill the spawned process since it's no longer needed*/
results[ child_process_id ] = convertProcessMessage( message.result_exact_typeof, message.result );
killProcess( child_process_id );
}
});
}, function (error) {
finishMainProcess( error );
});


/*Notify the end-user whenever all of the child processes are been completed (or canceled)*/
Expand Down Expand Up @@ -336,5 +347,35 @@ function paratask(tasks) {
}


/*Letting the user choose his own version of Promise constructor function*/
paratask.usePromise = function (_Promise) {

/*Secure valid Promise uses*/
if (typeof _Promise !== 'function') {
throw new TypeError('Incoming parameter must be a Promise constructor function');
}


var test_pomise = new _Promise(function () {});

if (typeof test_pomise !== 'object') {
throw new TypeError('Current Promise constructor function must return Promise object');
}

if (typeof test_pomise.then !== 'function') {
throw new TypeError('Current Promise constructor function must return Promise object with "then" function property');
}

/*Start using the validated user preferred Promise constructor*/
PromiseConstructor = _Promise;

/*
Use the user preferred Promise constructor as
the de facto function for File System access
*/
promisifyFs();
}


/*Give external access to the main function*/
module.exports = paratask;
Loading

0 comments on commit 8667489

Please sign in to comment.