From 866748965a912b7cb7181df2af52778db654c4ed Mon Sep 17 00:00:00 2001 From: Ivan Dimanov Date: Thu, 30 Jul 2015 14:46:43 +0300 Subject: [PATCH] + paratask.usePromise() API for letting the user use his own Promise constructor --- build_tests/use_promise_test.js | 65 ++++++++++++ lib/index.js | 147 ++++++++++++++++---------- lib/promise.js | 182 ++++++++++++++++++++++++++++++++ lib/shared_data/README.md | 2 +- package.json | 7 +- readme.md | 17 +++ 6 files changed, 363 insertions(+), 57 deletions(-) create mode 100644 build_tests/use_promise_test.js create mode 100644 lib/promise.js diff --git a/build_tests/use_promise_test.js b/build_tests/use_promise_test.js new file mode 100644 index 0000000..e0bab02 --- /dev/null +++ b/build_tests/use_promise_test.js @@ -0,0 +1,65 @@ +/* + This file will cover test cases of using "paratask.usePromise()" functionality for setting + custom/user preferred Promise constructor function +*/ +'use strict'; + +var paratask = require('../lib/index.js'); +var promise = require('../lib/promise.js'); +var bluebird = require('bluebird'); + + +module.exports = { + 'calling "paratask.usePromise()" with wrong argument': function (test) { + test.expect(3); + + try { + paratask.usePromise(); + } catch (error) { + test.ok( error instanceof TypeError, 'Empty call triggers a "TypeError"'); + } + + try { + paratask.usePromise({}); + } catch (error) { + test.ok( error instanceof TypeError, 'Called with empty object {} triggers a "TypeError"'); + } + + try { + paratask.usePromise(function () {}); + } catch (error) { + test.ok( error instanceof TypeError, 'Called with empty function "function () {}" triggers a "TypeError"'); + } + + test.done(); + }, + + + 'calling "paratask.usePromise()" with valid Promise constructor functions': function (test) { + test.expect(3); + + var task = { + fork: function (resolve) { + resolve(true); + } + }; + + paratask([task]) + .then(function (resolved) { + test.ok( resolved, 'Calling "paratask()" without setting "paratask.usePromise()" should use a default Promise constructor'); + + paratask.usePromise( promise ); + return paratask([task]); + }) + .then(function (resolved) { + test.ok( resolved, 'Calling "paratask()" with set custom Promise constructor'); + + paratask.usePromise( bluebird.Promise ); + return paratask([task]); + }) + .then(function (resolved) { + test.ok( resolved, 'Calling "paratask()" with set "bluebird" Promise constructor'); + test.done(); + }) + } +}; \ No newline at end of file diff --git a/lib/index.js b/lib/index.js index 9740e62..3ea2826 100644 --- a/lib/index.js +++ b/lib/index.js @@ -15,39 +15,31 @@ var log = utils.log; var child_process_module = require('child_process'); -/*Secure valid Promise uses*/ -(function () { - if (typeof Promise !== 'function') { - throw new ReferenceError('This "paratask-promises" package needs Promise function (currently not available). You can use ECMAScript 5 "callback" style with: https://www.npmjs.com/package/paratask'); - } - - var promise = new Promise(function () {}); - - if (typeof promise.then !== 'function') { - throw new ReferenceError('Current Promise function must expose "then" function as property'); - } - - if (typeof promise.catch !== 'function') { - throw new ReferenceError('Current Promise function must expose "catch" function as property'); - } -})(); +/* + This variable will hold the user preferred Promise constructor function. + By default we'll use the Node/io.js Promise constructor if same is available + or fallback to a predefined Promise/A+ constructor. +*/ +var PromiseConstructor = typeof Promise === 'function' ? Promise : require('./promise'); +/*Will tell whether or not we have "fs" function in a Promise variant*/ +var is_fs_promisified = false; /* Create a set of File System common functions that uses Promises in order to stay consistent with the main code */ -(function () { +function promisifyFs() { fs.existsPromise = function (path) { - return new Promise(function (resolve) { + return new PromiseConstructor(function (resolve) { fs.exists(path, resolve); }); } fs.unlinkPromise = function (path) { - return new Promise(function (resolve, reject) { + return new PromiseConstructor(function (resolve, reject) { fs.unlink(path, function (error) { if (error) { reject(error); @@ -60,7 +52,7 @@ var child_process_module = require('child_process'); fs.writeFilePromise = function (filename, data, options) { - return new Promise(function (resolve, reject) { + return new PromiseConstructor(function (resolve, reject) { fs.writeFile(filename, data, options, function (error) { if (error) { reject(error); @@ -70,7 +62,9 @@ var child_process_module = require('child_process'); }); }); } -})(); + + is_fs_promisified = true; +} /* @@ -113,7 +107,18 @@ var child_process_module = require('child_process'); }); */ function paratask(tasks) { - return new Promise(function (resolve, reject) { + if (typeof PromiseConstructor !== 'function') { + throw new ReferenceError('"paratask-promises" package needs Promise constructor function (currently not available). You can use ECMAScript 5 "callback" style with: https://www.npmjs.com/package/paratask'); + } + + + /*Be sure all "fs" function we need are in Promise variants*/ + if (!is_fs_promisified) { + promisifyFs(); + } + + + return new PromiseConstructor(function (resolve, reject) { /* Basic tasks list validation, @@ -125,6 +130,7 @@ function paratask(tasks) { throw new TypeError('1st argument must be an Array of task objects but was: {'+ typeof tasks +'} '+ utils.toString( tasks )); } + var results = []; /*An array of all workers returned results.*/ var shared_data_file_paths = []; /*An array of each child worker JSON file path. Thees files hold the dependency injections for each worker.*/ var child_processes = []; /*An array of all spawned worker processes. On successfully program completion, this list will be empty.*/ @@ -141,12 +147,18 @@ function paratask(tasks) { /*Remove the fork task instructions*/ fs.existsPromise( shared_data_file_paths[ child_process_id ] ) - .then(function () { - return fs.unlinkPromise( shared_data_file_paths[ child_process_id ] ); - }) - .catch(function (error) { - throw new Error('Unable to delete processing file "'+ shared_data_file_paths[ child_process_id ] +'": '+ error); - }); + .then(function (does_file_exist) { + return does_file_exist ? fs.unlinkPromise( shared_data_file_paths[ child_process_id ] ) : false; + }) + .catch(function (error) { + + /*No need to alarm if the file is already deleted*/ + if (!(error instanceof Error) || + !~error.message.indexOf('no such file') + ) { + throw new Error('Unable to delete processing file "'+ shared_data_file_paths[ child_process_id ] +'": '+ error); + } + }); /*Erase the internal record for the child process*/ @@ -277,38 +289,37 @@ function paratask(tasks) { /*Send the instructions only if all are been correctly recorded*/ fs.writeFilePromise( shared_data_file_paths[ child_process_id ], JSON.stringify( shared_data_json )) - .catch(function (error) { - finishMainProcess( error ); - }) + .then(function () { + child_processes[ child_process_id ].send({shared_data_file_path: shared_data_file_paths[ child_process_id ]}); - .then(function () { - child_processes[ child_process_id ].send({shared_data_file_path: shared_data_file_paths[ child_process_id ]}); + /*Wait for any process update of the started task*/ + child_processes[ child_process_id ].on('message', function (message) { + if (message.error) { - /*Wait for any process update of the started task*/ - child_processes[ child_process_id ].on('message', function (message) { - if (message.error) { + /* + Interprocess communicator is unable to exchange Error objects. + In order to return the original error to the callee, + we'll use a common converter + */ + message.error = convertProcessMessage( message.error_exact_typeof, message.error ); - /* - Interprocess communicator is unable to exchange Error objects. - In order to return the original error to the callee, - we'll use a common converter - */ - message.error = convertProcessMessage( message.error_exact_typeof, message.error ); + /* + All processes calculations are assumed to be connected so + error in one of them should cancel further work in all + */ + finishMainProcess( message.error ); - /* - All processes calculations are assumed to be connected so - error in one of them should cancel further work in all - */ - finishMainProcess( message.error ); + } else { - } else { + /*Keep the calculated result from the child process and kill the spawned process since it's no longer needed*/ + results[ child_process_id ] = convertProcessMessage( message.result_exact_typeof, message.result ); + killProcess( child_process_id ); + } - /*Keep the calculated result from the child process and kill the spawned process since it's no longer needed*/ - results[ child_process_id ] = convertProcessMessage( message.result_exact_typeof, message.result ); - killProcess( child_process_id ); - } - }); + }, function (error) { + finishMainProcess( error ); + }); /*Notify the end-user whenever all of the child processes are been completed (or canceled)*/ @@ -336,5 +347,35 @@ function paratask(tasks) { } +/*Letting the user choose his own version of Promise constructor function*/ +paratask.usePromise = function (_Promise) { + + /*Secure valid Promise uses*/ + if (typeof _Promise !== 'function') { + throw new TypeError('Incoming parameter must be a Promise constructor function'); + } + + + var test_pomise = new _Promise(function () {}); + + if (typeof test_pomise !== 'object') { + throw new TypeError('Current Promise constructor function must return Promise object'); + } + + if (typeof test_pomise.then !== 'function') { + throw new TypeError('Current Promise constructor function must return Promise object with "then" function property'); + } + + /*Start using the validated user preferred Promise constructor*/ + PromiseConstructor = _Promise; + + /* + Use the user preferred Promise constructor as + the de facto function for File System access + */ + promisifyFs(); +} + + /*Give external access to the main function*/ module.exports = paratask; \ No newline at end of file diff --git a/lib/promise.js b/lib/promise.js new file mode 100644 index 0000000..373b80b --- /dev/null +++ b/lib/promise.js @@ -0,0 +1,182 @@ +/*Complete Promise/A+ implementation from https://www.promisejs.org/implementing*/ +(function () { + 'use strict'; + + + /** + * Check if a value is a Promise and, if it is, + * return the `then` method of that promise. + * + * @param {Promise|Any} value + * @return {Function|Null} + */ + function getThen(value) { + var t = typeof value; + if (value && (t === 'object' || t === 'function')) { + var then = value.then; + if (typeof then === 'function') { + return then; + } + } + return null; + } + + /** + * Take a potentially misbehaving resolver function and make sure + * onFulfilled and onRejected are only called once. + * + * Makes no guarantees about asynchrony. + * + * @param {Function} fn A resolver function that may not be trusted + * @param {Function} onFulfilled + * @param {Function} onRejected + */ + function doResolve(fn, onFulfilled, onRejected) { + var done = false; + try { + fn(function (value) { + if (done) return + done = true + onFulfilled(value) + }, function (reason) { + if (done) return + done = true + onRejected(reason) + }) + } catch (ex) { + if (done) return + done = true + onRejected(ex) + } + } + + + var PENDING = 0; + var FULFILLED = 1; + var REJECTED = 2; + + function Promise(fn) { + // store state which can be PENDING, FULFILLED or REJECTED + var state = PENDING; + + // store value once FULFILLED or REJECTED + var value = null; + + // store success & failure handlers + var handlers = []; + + function fulfill(result) { + state = FULFILLED; + value = result; + handlers.forEach(handle); + handlers = null; + } + + function reject(error) { + state = REJECTED; + value = error; + handlers.forEach(handle); + handlers = null; + } + + function resolve(result) { + try { + var then = getThen(result); + if (then) { + doResolve(then.bind(result), resolve, reject) + return + } + fulfill(result); + } catch (e) { + reject(e); + } + } + + function handle(handler) { + if (state === PENDING) { + handlers.push(handler); + } else { + if (state === FULFILLED && + typeof handler.onFulfilled === 'function') { + handler.onFulfilled(value); + } + if (state === REJECTED && + typeof handler.onRejected === 'function') { + handler.onRejected(value); + } + } + } + + this.done = function (onFulfilled, onRejected) { + // ensure we are always asynchronous + setTimeout(function () { + handle({ + onFulfilled: onFulfilled, + onRejected: onRejected + }); + }, 0); + } + + this.then = function (onFulfilled, onRejected) { + var self = this; + return new Promise(function (resolve, reject) { + return self.done(function (result) { + if (typeof onFulfilled === 'function') { + try { + return resolve(onFulfilled(result)); + } catch (ex) { + return reject(ex); + } + } else { + return resolve(result); + } + }, function (error) { + if (typeof onRejected === 'function') { + try { + return resolve(onRejected(error)); + } catch (ex) { + return reject(ex); + } + } else { + return reject(error); + } + }); + }); + } + + this.catch = function (onRejected) { + var self = this; + return new Promise(function (resolve, reject) { + return self.done(null, function (error) { + if (typeof onRejected === 'function') { + try { + return resolve(onRejected(error)); + } catch (ex) { + return reject(ex); + } + } else { + return reject(error); + } + }); + }); + } + + doResolve(fn, resolve, reject); + } + + + /*Export for browser 'window' object and Node/io.js 'module.exports' object*/ + if (typeof window === 'object' && + window + ) { + window.Promise = Promise; + + } else if (typeof module === 'object' && + module && + typeof module.exports === 'object' && + module.exports + ) { + + module.exports = Promise + } +})(); \ No newline at end of file diff --git a/lib/shared_data/README.md b/lib/shared_data/README.md index 6cf8d35..70c6ef5 100644 --- a/lib/shared_data/README.md +++ b/lib/shared_data/README.md @@ -1,2 +1,2 @@ -The folder './shared_data' holds JSON files which are used to share data between forked functions. +The folder './shared_data' holds JSON files which are used to share data between forked functions. On a successful complete the software will remove all JSON files. \ No newline at end of file diff --git a/package.json b/package.json index c5997c8..e62a2c4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "paratask-promises", - "version": "3.1.3", + "version": "4.0.0", "author": { "name": "Ivan Dimanov", "email": "mail@idimanov.com" @@ -14,7 +14,7 @@ ], "scripts": { "start": "node ./lib/index.js", - "test": "grunt nodeunit" + "test": "grunt --stack nodeunit -v" }, "main": "./lib/index.js", "engines": { @@ -29,7 +29,8 @@ "url": "https://github.com/IvanDimanov/paratask-promises/issues" }, "devDependencies": { - "grunt-contrib-nodeunit": "~0.2.0", + "bluebird": "~2.9.34", + "grunt-contrib-nodeunit": "~0.4.1", "grunt": "~0.4.1" }, "keywords": [ diff --git a/readme.md b/readme.md index 79bd9dc..70731aa 100644 --- a/readme.md +++ b/readme.md @@ -22,6 +22,23 @@ or by getting it from [this repo](https://github.com/IvanDimanov/paratask-promis Paratask uses only native Node/io.js modules that do not need additional installation: `fs` and `child_process`. +## Custom Promise lib +You like to use your own Promise constructor or 3rd party alternative? +No problem, just type: +```javascript +var paratask = require('paratask-promises'); + +paratask.usePromise( require('bluebird').Promise ); +// or +paratask.usePromise( require('q').Promise ); +// or +paratask.usePromise( require('when').Promise ); +// or +paratask.usePromise( require('rsvp').Promise ); +``` +Thanks to [machinewu](https://github.com/machinewu) for the great suggestion. + + ### Example: Parallel calculation Both `task_1` and `task_2` will fork a new Node/io.js process and will run __concurrently__. When both call `resolve()`, the final results will be printed in the console.