Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable two way communication with worker using Event Emitter pattern #210

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ var Promise = require('./Promise');
var WorkerHandler = require('./WorkerHandler');
var environment = require('./environment');
var DebugPortAllocator = require('./debug-port-allocator');
var EventEmitter = require('./eventEmitter');

var DEBUG_PORT_ALLOCATOR = new DebugPortAllocator();

/**
* A pool to manage workers
* @param {String} [script] Optional worker script
Expand Down Expand Up @@ -104,13 +107,21 @@ Pool.prototype.exec = function (method, params) {
}

// add a new task to the queue
var eventEmitter = new EventEmitter();
eventEmitter._emit = eventEmitter.emit
eventEmitter.emit = function (event, data) {
eventEmitter._emit('__EVENT__', { name: event, data: data })
}

var tasks = this.tasks;
var task = {
method: method,
params: params,
resolver: resolver,
timeout: null
timeout: null,
eventEmitter: eventEmitter
};

tasks.push(task);

// replace the timeout method of the Promise with our own,
Expand All @@ -131,7 +142,61 @@ Pool.prototype.exec = function (method, params) {
// trigger task execution
this._next();

return resolver.promise;
// extend promises with EventEmitter methods to allow chaining

function bindEventEmmiter(obj) {
['on', 'emit', 'once', 'removeListener'].map(function(key) {
if (obj[key] && obj[key].__EMITTER__) return;

function emitter() {
eventEmitter[key].apply(eventEmitter, arguments);
return obj;
}

Object.defineProperty(emitter, '__EMITTER__', {
enumerable: false,
value: true
});

Object.defineProperty(obj, key, {
enumerable: true,
value: emitter
});
});
return obj;
}

function bindChaining(obj) {
['then', 'catch', 'cancel', 'timeout'].map(function(key) {

if(!obj[key]) return;
if(obj[key].__CHAIN__) return;

var originalMethod = obj[key];
function chain() {
const fn = originalMethod.apply(resolver.promise, arguments);
bindEventEmmiter(fn);
bindChaining(fn);
return fn;
}

Object.defineProperty(chain, '__CHAIN__', {
enumerable: false,
value: true
});

Object.defineProperty(obj, key, {
enumerable: true,
value: chain
});
});

return obj;
}



return bindChaining(bindEventEmmiter(resolver.promise));
}
else if (typeof method === 'function') {
// send stringified function and function arguments to worker
Expand Down Expand Up @@ -202,7 +267,7 @@ Pool.prototype._next = function () {
// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
// send the request to the worker
var promise = worker.exec(task.method, task.params, task.resolver)
var promise = worker.exec(task.method, task.params, task.resolver, task.eventEmitter)
.then(me._boundNext)
.catch(function () {
// if the worker crashed and terminated, remove it from the pool
Expand Down
21 changes: 19 additions & 2 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var Promise = require('./Promise');
var environment = require('./environment');
var requireFoolWebpack = require('./requireFoolWebpack');
const EventEmitter = require('./eventEmitter');

/**
* Special message sent by parent which causes a child process worker to terminate itself.
Expand Down Expand Up @@ -219,6 +220,12 @@ function WorkerHandler(script, _options) {
if (typeof response === 'string' && response === 'ready') {
me.worker.ready = true;
dispatchQueuedRequests();
} else if (typeof response === 'object' && response.eventName) {
var id = response.id;
var task = me.processing[id];
if (task !== undefined && task.eventEmitter) {
task.eventEmitter._emit(response.eventName, response.eventData);
}
} else {
// find the task from the processing queue, and run the tasks callback
var id = response.id;
Expand Down Expand Up @@ -305,18 +312,28 @@ WorkerHandler.prototype.methods = function () {
* @param {{resolve: Function, reject: Function}} [resolver]
* @return {Promise.<*, Error>} result
*/
WorkerHandler.prototype.exec = function(method, params, resolver) {
WorkerHandler.prototype.exec = function(method, params, resolver, eventEmitter) {
var self = this;

if (!resolver) {
resolver = Promise.defer();
}

if (!eventEmitter) {
eventEmitter = new EventEmitter();
}

eventEmitter.on('__EVENT__', function(event) {
self.worker.send({ eventName: event.name, eventData: event.data });
})
// generate a unique id for the task
var id = ++this.lastId;

// register a new task as being in progress
this.processing[id] = {
id: id,
resolver: resolver
resolver: resolver,
eventEmitter: eventEmitter
};

// build a JSON-RPC request
Expand Down
64 changes: 64 additions & 0 deletions src/eventEmitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
var requireFoolWebpack = require('./requireFoolWebpack');

function EventEmitter() {
this.callbacks = {};
}

EventEmitter.prototype.removeListener = function(event, listener) {
if(!this.callbacks[event]) return;
if(typeof listener !== 'function') {
throw TypeError('listener must be a function');
}

var list = this.callbacks[event];
var length = list.length;
var i;
var position = -1;

// https://github.com/nodejs/node-v0.x-archive/blob/ed0d1c384cd4578f7168633c838f1252bddb260e/lib/events.js#L226-L227
for (i = length; i-- > 0;) {
if (list[i] === listener ||
(list[i].listener && list[i].listener === listener)) {
position = i;
break;
}
}

if (position < 0)
return this;

if (list.length === 1) {
list.length = 0;
delete this.callbacks[event];
} else {
list.splice(position, 1);
}
}

EventEmitter.prototype.on = function(event, cb) {
if(!this.callbacks[event]) this.callbacks[event] = [];
this.callbacks[event].push(cb)
};

EventEmitter.prototype.emit = function(event, data){
let cbs = this.callbacks[event]
if(cbs){
cbs.forEach(function(cb) { cb(data) })
}
}

EventEmitter.prototype.once = function(event, cb) {
(function listener(data) {
cb(data);
this.removeListener(event, listener)
}).bind(this);

this.on(event, listener);
}

try {
module.exports = requireFoolWebpack('events');
} catch(erro) {
module.exports = EventEmitter;
}

2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 31 additions & 4 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ var requireFoolWebpack = eval(
' : function (module) { throw new Error(\'Module " + module + " not found.\') }'
);

var EventEmitter = require('./eventEmitter');


/**
* Special message sent by parent which causes the worker to terminate itself.
* Not a "message object"; this string is the entire message.
Expand All @@ -21,7 +24,8 @@ var TERMINATE_METHOD_ID = '__workerpool-terminate__';
// create a worker API for sending and receiving messages which works both on
// node.js and in the browser
var worker = {
exit: function() {}
exit: function() {},
eventEmitter: new EventEmitter()
};
if (typeof self !== 'undefined' && typeof postMessage === 'function' && typeof addEventListener === 'function') {
// worker in the browser
Expand All @@ -47,7 +51,7 @@ else if (typeof process !== 'undefined') {
throw error;
}
}

if (WorkerThreads &&
/* if there is a parentPort, we are in a WorkerThread */
WorkerThreads.parentPort !== null) {
Expand Down Expand Up @@ -94,11 +98,12 @@ worker.methods = {};
* Execute a function with provided arguments
* @param {String} fn Stringified function
* @param {Array} [args] Function arguments
* @param {Object} [context] Function Context
* @returns {*}
*/
worker.methods.run = function run(fn, args) {
var f = eval('(' + fn + ')');
return f.apply(f, args);
return f.apply(Object.assign(f, this), args); // extends the fn with EventEmitter methods
};

/**
Expand All @@ -110,15 +115,37 @@ worker.methods.methods = function methods() {
};

worker.on('message', function (request) {
function emit(eventName, eventData) {
worker.send({
eventName: eventName,
eventData: eventData,
id: request.id
});
};

function on(event, cb) {
worker.eventEmitter.on(event, cb);
}

function once(event, cb) {
worker.eventEmitter.once(event, cb);
}


if (request === TERMINATE_METHOD_ID) {
return worker.exit(0);
}

if (request.eventName) {
return worker.eventEmitter.emit(request.eventName, request.eventData);
}

try {
var method = worker.methods[request.method];

if (method) {
// execute the function
var result = method.apply(method, request.params);
var result = method.apply(Object.assign(method, { emit: emit, on: on, once: once }), request.params);

if (isPromise(result)) {
// promise returned, resolve this and then return
Expand Down
Loading