diff --git a/src/Pool.js b/src/Pool.js index 37609daa..fd57b98f 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -2,7 +2,10 @@ var Promise = require('./Promise'); var WorkerHandler = require('./WorkerHandler'); var environment = require('./environment'); var DebugPortAllocator = require('./debug-port-allocator'); +var EventEmitter = require('./eventEmitter'); + var DEBUG_PORT_ALLOCATOR = new DebugPortAllocator(); + /** * A pool to manage workers * @param {String} [script] Optional worker script @@ -104,13 +107,21 @@ Pool.prototype.exec = function (method, params) { } // add a new task to the queue + var eventEmitter = new EventEmitter(); + eventEmitter._emit = eventEmitter.emit + eventEmitter.emit = function (event, data) { + eventEmitter._emit('__EVENT__', { name: event, data: data }) + } + var tasks = this.tasks; var task = { method: method, params: params, resolver: resolver, - timeout: null + timeout: null, + eventEmitter: eventEmitter }; + tasks.push(task); // replace the timeout method of the Promise with our own, @@ -131,7 +142,61 @@ Pool.prototype.exec = function (method, params) { // trigger task execution this._next(); - return resolver.promise; + // extend promises with EventEmitter methods to allow chaining + + function bindEventEmmiter(obj) { + ['on', 'emit', 'once', 'removeListener'].map(function(key) { + if (obj[key] && obj[key].__EMITTER__) return; + + function emitter() { + eventEmitter[key].apply(eventEmitter, arguments); + return obj; + } + + Object.defineProperty(emitter, '__EMITTER__', { + enumerable: false, + value: true + }); + + Object.defineProperty(obj, key, { + enumerable: true, + value: emitter + }); + }); + return obj; + } + + function bindChaining(obj) { + ['then', 'catch', 'cancel', 'timeout'].map(function(key) { + + if(!obj[key]) return; + if(obj[key].__CHAIN__) return; + + var originalMethod = obj[key]; + function chain() { + const fn = originalMethod.apply(resolver.promise, arguments); + bindEventEmmiter(fn); + bindChaining(fn); + return fn; + } + + Object.defineProperty(chain, '__CHAIN__', { + enumerable: false, + value: true + }); + + Object.defineProperty(obj, key, { + enumerable: true, + value: chain + }); + }); + + return obj; + } + + + + return bindChaining(bindEventEmmiter(resolver.promise)); } else if (typeof method === 'function') { // send stringified function and function arguments to worker @@ -202,7 +267,7 @@ Pool.prototype._next = function () { // check if the task is still pending (and not cancelled -> promise rejected) if (task.resolver.promise.pending) { // send the request to the worker - var promise = worker.exec(task.method, task.params, task.resolver) + var promise = worker.exec(task.method, task.params, task.resolver, task.eventEmitter) .then(me._boundNext) .catch(function () { // if the worker crashed and terminated, remove it from the pool diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index c3790ac0..aa4aae4e 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -3,6 +3,7 @@ var Promise = require('./Promise'); var environment = require('./environment'); var requireFoolWebpack = require('./requireFoolWebpack'); +const EventEmitter = require('./eventEmitter'); /** * Special message sent by parent which causes a child process worker to terminate itself. @@ -219,6 +220,12 @@ function WorkerHandler(script, _options) { if (typeof response === 'string' && response === 'ready') { me.worker.ready = true; dispatchQueuedRequests(); + } else if (typeof response === 'object' && response.eventName) { + var id = response.id; + var task = me.processing[id]; + if (task !== undefined && task.eventEmitter) { + task.eventEmitter._emit(response.eventName, response.eventData); + } } else { // find the task from the processing queue, and run the tasks callback var id = response.id; @@ -305,18 +312,28 @@ WorkerHandler.prototype.methods = function () { * @param {{resolve: Function, reject: Function}} [resolver] * @return {Promise.<*, Error>} result */ -WorkerHandler.prototype.exec = function(method, params, resolver) { +WorkerHandler.prototype.exec = function(method, params, resolver, eventEmitter) { + var self = this; + if (!resolver) { resolver = Promise.defer(); } + if (!eventEmitter) { + eventEmitter = new EventEmitter(); + } + + eventEmitter.on('__EVENT__', function(event) { + self.worker.send({ eventName: event.name, eventData: event.data }); + }) // generate a unique id for the task var id = ++this.lastId; // register a new task as being in progress this.processing[id] = { id: id, - resolver: resolver + resolver: resolver, + eventEmitter: eventEmitter }; // build a JSON-RPC request diff --git a/src/eventEmitter.js b/src/eventEmitter.js new file mode 100644 index 00000000..bbc2844c --- /dev/null +++ b/src/eventEmitter.js @@ -0,0 +1,64 @@ +var requireFoolWebpack = require('./requireFoolWebpack'); + +function EventEmitter() { + this.callbacks = {}; +} + +EventEmitter.prototype.removeListener = function(event, listener) { + if(!this.callbacks[event]) return; + if(typeof listener !== 'function') { + throw TypeError('listener must be a function'); + } + + var list = this.callbacks[event]; + var length = list.length; + var i; + var position = -1; + + // https://github.com/nodejs/node-v0.x-archive/blob/ed0d1c384cd4578f7168633c838f1252bddb260e/lib/events.js#L226-L227 + for (i = length; i-- > 0;) { + if (list[i] === listener || + (list[i].listener && list[i].listener === listener)) { + position = i; + break; + } + } + + if (position < 0) + return this; + + if (list.length === 1) { + list.length = 0; + delete this.callbacks[event]; + } else { + list.splice(position, 1); + } +} + +EventEmitter.prototype.on = function(event, cb) { + if(!this.callbacks[event]) this.callbacks[event] = []; + this.callbacks[event].push(cb) +}; + +EventEmitter.prototype.emit = function(event, data){ + let cbs = this.callbacks[event] + if(cbs){ + cbs.forEach(function(cb) { cb(data) }) + } +} + +EventEmitter.prototype.once = function(event, cb) { + (function listener(data) { + cb(data); + this.removeListener(event, listener) + }).bind(this); + + this.on(event, listener); +} + +try { + module.exports = requireFoolWebpack('events'); +} catch(erro) { + module.exports = EventEmitter; +} + diff --git a/src/generated/embeddedWorker.js b/src/generated/embeddedWorker.js index 526e8d4c..33c1e820 100644 --- a/src/generated/embeddedWorker.js +++ b/src/generated/embeddedWorker.js @@ -3,4 +3,4 @@ * This file is automatically generated, * changes made in this file will be overwritten. */ -module.exports = "!function(o){var t={};function n(r){if(t[r])return t[r].exports;var e=t[r]={i:r,l:!1,exports:{}};return o[r].call(e.exports,e,e.exports,n),e.l=!0,e.exports}n.m=o,n.c=t,n.d=function(r,e,o){n.o(r,e)||Object.defineProperty(r,e,{enumerable:!0,get:o})},n.r=function(r){\"undefined\"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(r,Symbol.toStringTag,{value:\"Module\"}),Object.defineProperty(r,\"__esModule\",{value:!0})},n.t=function(e,r){if(1&r&&(e=n(e)),8&r)return e;if(4&r&&\"object\"==typeof e&&e&&e.__esModule)return e;var o=Object.create(null);if(n.r(o),Object.defineProperty(o,\"default\",{enumerable:!0,value:e}),2&r&&\"string\"!=typeof e)for(var t in e)n.d(o,t,function(r){return e[r]}.bind(null,t));return o},n.n=function(r){var e=r&&r.__esModule?function(){return r.default}:function(){return r};return n.d(e,\"a\",e),e},n.o=function(r,e){return Object.prototype.hasOwnProperty.call(r,e)},n.p=\"\",n(n.s=0)}([function(module,exports,__webpack_require__){var requireFoolWebpack=eval(\"typeof require !== 'undefined' ? require : function (module) { throw new Error('Module \\\" + module + \\\" not found.') }\"),TERMINATE_METHOD_ID=\"__workerpool-terminate__\",worker={exit:function(){}},WorkerThreads,parentPort;if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)worker.on=function(r,e){addEventListener(r,function(r){e(r.data)})},worker.send=function(r){postMessage(r)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");try{WorkerThreads=requireFoolWebpack(\"worker_threads\")}catch(error){if(\"object\"!=typeof error||null===error||\"MODULE_NOT_FOUND\"!==error.code)throw error}WorkerThreads&&null!==WorkerThreads.parentPort?(parentPort=WorkerThreads.parentPort,worker.send=parentPort.postMessage.bind(parentPort),worker.on=parentPort.on.bind(parentPort)):(worker.on=process.on.bind(process),worker.send=process.send.bind(process),worker.on(\"disconnect\",function(){process.exit(1)}),worker.exit=process.exit.bind(process))}function convertError(o){return Object.getOwnPropertyNames(o).reduce(function(r,e){return Object.defineProperty(r,e,{value:o[e],enumerable:!0})},{})}function isPromise(r){return r&&\"function\"==typeof r.then&&\"function\"==typeof r.catch}worker.methods={},worker.methods.run=function run(fn,args){var f=eval(\"(\"+fn+\")\");return f.apply(f,args)},worker.methods.methods=function(){return Object.keys(worker.methods)},worker.on(\"message\",function(e){if(e===TERMINATE_METHOD_ID)return worker.exit(0);try{var r=worker.methods[e.method];if(!r)throw new Error('Unknown method \"'+e.method+'\"');r=r.apply(r,e.params);isPromise(r)?r.then(function(r){worker.send({id:e.id,result:r,error:null})}).catch(function(r){worker.send({id:e.id,result:null,error:convertError(r)})}):worker.send({id:e.id,result:r,error:null})}catch(r){worker.send({id:e.id,result:null,error:convertError(r)})}}),worker.register=function(r){if(r)for(var e in r)r.hasOwnProperty(e)&&(worker.methods[e]=r[e]);worker.send(\"ready\")},exports.add=worker.register}]);"; +module.exports = "!function(t){var o={};function n(e){if(o[e])return o[e].exports;var r=o[e]={i:e,l:!1,exports:{}};return t[e].call(r.exports,r,r.exports,n),r.l=!0,r.exports}n.m=t,n.c=o,n.d=function(e,r,t){n.o(e,r)||Object.defineProperty(e,r,{enumerable:!0,get:t})},n.r=function(e){\"undefined\"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:\"Module\"}),Object.defineProperty(e,\"__esModule\",{value:!0})},n.t=function(r,e){if(1&e&&(r=n(r)),8&e)return r;if(4&e&&\"object\"==typeof r&&r&&r.__esModule)return r;var t=Object.create(null);if(n.r(t),Object.defineProperty(t,\"default\",{enumerable:!0,value:r}),2&e&&\"string\"!=typeof r)for(var o in r)n.d(t,o,function(e){return r[e]}.bind(null,o));return t},n.n=function(e){var r=e&&e.__esModule?function(){return e.default}:function(){return e};return n.d(r,\"a\",r),r},n.o=function(e,r){return Object.prototype.hasOwnProperty.call(e,r)},n.p=\"\",n(n.s=0)}([function(module,exports,__webpack_require__){var requireFoolWebpack=eval(\"typeof require !== 'undefined' ? require : function (module) { throw new Error('Module \\\" + module + \\\" not found.') }\"),EventEmitter=__webpack_require__(1),TERMINATE_METHOD_ID=\"__workerpool-terminate__\",worker={exit:function(){},eventEmitter:new EventEmitter},WorkerThreads,parentPort;if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)worker.on=function(e,r){addEventListener(e,function(e){r(e.data)})},worker.send=function(e){postMessage(e)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");try{WorkerThreads=requireFoolWebpack(\"worker_threads\")}catch(error){if(\"object\"!=typeof error||null===error||\"MODULE_NOT_FOUND\"!==error.code)throw error}WorkerThreads&&null!==WorkerThreads.parentPort?(parentPort=WorkerThreads.parentPort,worker.send=parentPort.postMessage.bind(parentPort),worker.on=parentPort.on.bind(parentPort)):(worker.on=process.on.bind(process),worker.send=process.send.bind(process),worker.on(\"disconnect\",function(){process.exit(1)}),worker.exit=process.exit.bind(process))}function convertError(t){return Object.getOwnPropertyNames(t).reduce(function(e,r){return Object.defineProperty(e,r,{value:t[r],enumerable:!0})},{})}function isPromise(e){return e&&\"function\"==typeof e.then&&\"function\"==typeof e.catch}worker.methods={},worker.methods.run=function run(fn,args){var f=eval(\"(\"+fn+\")\");return f.apply(Object.assign(f,this),args)},worker.methods.methods=function(){return Object.keys(worker.methods)},worker.on(\"message\",function(t){if(t===TERMINATE_METHOD_ID)return worker.exit(0);if(t.eventName)return worker.eventEmitter.emit(t.eventName,t.eventData);try{var e=worker.methods[t.method];if(!e)throw new Error('Unknown method \"'+t.method+'\"');e=e.apply(Object.assign(e,{emit:function(e,r){worker.send({eventName:e,eventData:r,id:t.id})},on:function(e,r){worker.eventEmitter.on(e,r)},once:function(e,r){worker.eventEmitter.once(e,r)}}),t.params);isPromise(e)?e.then(function(e){worker.send({id:t.id,result:e,error:null})}).catch(function(e){worker.send({id:t.id,result:null,error:convertError(e)})}):worker.send({id:t.id,result:e,error:null})}catch(e){worker.send({id:t.id,result:null,error:convertError(e)})}}),worker.register=function(e){if(e)for(var r in e)e.hasOwnProperty(r)&&(worker.methods[r]=e[r]);worker.send(\"ready\")},exports.add=worker.register},function(r,e,t){var o=t(2);function n(){this.callbacks={}}n.prototype.removeListener=function(e,r){if(this.callbacks[e]){if(\"function\"!=typeof r)throw TypeError(\"listener must be a function\");for(var t=this.callbacks[e],o=-1,n=t.length;0 { + return new Promise(function(resolve) { + return resolve(); + }); + }) + .then(done) + .catch(done); + + assert.ok(typeof poolController.emit === 'function'); + assert.ok(typeof poolController.on === 'function'); + assert.ok(typeof poolController.once === 'function'); + assert.strictEqual(pool.workers.length, 1); + }); + it('should offload a function to a worker', function (done) { var pool = new Pool({maxWorkers: 10});