Skip to content
This repository has been archived by the owner on Oct 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from dkhuntrods/add-revive-methods
Browse files Browse the repository at this point in the history
Add revive methods
  • Loading branch information
johnbender authored Jul 11, 2016
2 parents a16f853 + 3fe2018 commit 51a6e70
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Worker Farm allows me to spin up multiple JVMs to be controlled by Node, and hav

## API

Worker Farm exports a main function an an `end()` method. The main function sets up a "farm" of coordinated child-process workers and it can be used to instantiate multiple farms, all operating independently.
Worker Farm exports a main function and an `end()` method. The main function sets up a "farm" of coordinated child-process workers and it can be used to instantiate multiple farms, all operating independently.

### workerFarm([options, ]pathToModule[, exportedMethods])

Expand Down
12 changes: 8 additions & 4 deletions lib/child/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ function handle (data) {
, callback = function () {
var _args = Array.prototype.slice.call(arguments)
if (_args[0] instanceof Error) {
var e = _args[0]
_args[0] = {
'$error' : '$error'
, 'type' : _args[0].constructor.name
, 'message' : _args[0].message
, 'stack' : _args[0].stack
, 'type' : e.constructor.name
, 'message' : e.message
, 'stack' : e.stack
}
Object.keys(e).forEach(function(key) {
_args[0][key] = e[key]
})
}
process.send({ idx: idx, child: child, args: _args })
}
Expand All @@ -41,4 +45,4 @@ process.on('message', function (data) {
if (!$module) return $module = require(data.module)
if (data == 'die') return process.exit(0)
handle(data)
})
})
53 changes: 31 additions & 22 deletions lib/farm.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ function Farm (options, path) {
this.options = extend(DEFAULT_OPTIONS, options)
this.path = path
this.activeCalls = 0
this.onEndingCallbacks = []
}

// make a handle to pass back in the form of an external API
Expand Down Expand Up @@ -171,6 +172,11 @@ Farm.prototype.receive = function (data) {
}
args[0].type = e.type
args[0].stack = e.stack

// Copy any custom properties to pass it on.
Object.keys(e).forEach(function(key) {
args[0][key] = e[key];
});
}

process.nextTick(function () {
Expand Down Expand Up @@ -255,7 +261,7 @@ Farm.prototype.processQueue = function () {
var cka, i = 0, childId

if (!this.callQueue.length)
return this.ending && this.end()
return this.isEnding && this.end()

if (this.activeChildren < this.options.maxConcurrentWorkers)
this.startChild()
Expand All @@ -267,22 +273,21 @@ Farm.prototype.processQueue = function () {

this.send(childId, this.callQueue.shift())
if (!this.callQueue.length)
return this.ending && this.end()
return this.isEnding && this.end()
} /*else {
console.log(
, this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
, this.children[childId].calls.length < this.options.maxCallsPerWorker
, this.children[childId].calls.length , this.options.maxCallsPerWorker)
}*/
}

if (this.ending)
if (this.isEnding)
this.end()
}

// add a new call to the call queue, then trigger a process of the queue
Farm.prototype.addCall = function (call) {
if (this.ending)
if (this.isEnding)
return this.end() // don't add anything new to the queue
this.callQueue.push(call)
this.processQueue()
Expand All @@ -291,12 +296,16 @@ Farm.prototype.addCall = function (call) {
// kills child workers when they're all done
Farm.prototype.end = function (callback) {
var complete = true
if (this.ending === false)

if (this.isEnding === false )
return false
if (callback)
this.ending = callback
else if (this.ending == null)
this.ending = true

this.isEnding = true

if (typeof callback === 'function') {
this.onEndingCallbacks.push(callback)
}

Object.keys(this.children).forEach(function (child) {
if (!this.children[child])
return
Expand All @@ -306,20 +315,20 @@ Farm.prototype.end = function (callback) {
complete = false
}.bind(this))

// NOTE the following assumes that the `ending` callback
// should only ever be run once
if (complete && typeof this.ending == 'function' && !this.completeCallbackSet) {
process.nextTick(function () {
this.ending();
this.ending = false;
}.bind(this));

// the nextTick can be set twice before it runs once
// which means that after the first tick callback is
// fired `this.ending` will be a boolean, not a function
this.completeCallbackSet = true;
if (complete && this.onEndingCallbacks.length > 0) {
process.nextTick(function(){
while(this.onEndingCallbacks.length > 0) {
var onEnd = this.onEndingCallbacks.shift()
if (onEnd) onEnd()
}
if (this.activeChildren === 0) this.isEnding = false
}.bind(this))
}
}

Farm.prototype.revive = function(){
this.isEnding = undefined
}

module.exports = Farm
module.exports.TimeoutError = TimeoutError
8 changes: 7 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,11 @@ function end (api, callback) {
process.nextTick(callback.bind(null, 'Worker farm not found!'))
}

function revive (api) {
for (var i = 0; i < farms.length; i++)
if (farms[i] && farms[i].api === api)
return farms[i].farm.revive()
}
module.exports = farm
module.exports.end = end
module.exports.end = end
module.exports.revive = revive
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "worker-farm",
"description": "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options.",
"version": "1.2.0",
"version": "1.3.1",
"homepage": "https://github.com/rvagg/node-worker-farm",
"authors": [
"Rod Vagg @rvagg <[email protected]> (https://github.com/rvagg)"
Expand Down
14 changes: 13 additions & 1 deletion tests/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@ module.exports.killable = function (id, callback) {
callback(null, id, process.pid)
}

module.exports.err = function (type, message, callback) {
module.exports.err = function (type, message, data, callback) {
if (typeof data == 'function') {
callback = data
data = null
} else {
var err = new Error(message)
Object.keys(data).forEach(function(key) {
err[key] = data[key]
})
callback(err)
return
}

if (type == 'TypeError')
return callback(new TypeError(message))
callback(new Error(message))
Expand Down
96 changes: 94 additions & 2 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ tape('single concurrent call', function (t) {
child(10, function () {
if (++cbc == 10) {
var time = Date.now() - start
t.ok(time > 100 && time < 190, 'processed tasks sequentially (' + time + 'ms)')
t.ok(time > 100 && time < 200, 'processed tasks sequentially (' + time + 'ms)')
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
Expand Down Expand Up @@ -359,7 +359,7 @@ tape('call timeout test', function (t) {
})

tape('test error passing', function (t) {
t.plan(7)
t.plan(10)

var child = workerFarm(childPath, [ 'err' ])
child.err('Error', 'this is an Error', function (err) {
Expand All @@ -372,6 +372,11 @@ tape('test error passing', function (t) {
t.equal('TypeError', err.type, 'correct type')
t.equal('this is a TypeError', err.message, 'correct message')
})
child.err('Error', 'this is an Error with custom props', {foo: 'bar', 'baz': 1}, function (err) {
t.ok(err instanceof Error, 'is an Error object')
t.equal(err.foo, 'bar', 'passes data')
t.equal(err.baz, 1, 'passes data')
})

workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
Expand Down Expand Up @@ -449,3 +454,90 @@ tape('test max retries after process terminate', function (t) {
t.ok(true, 'workerFarm ended')
})
})

// a child where module.exports = function with revive...
tape('test revive method when nested', function (t) {
t.plan(4)

var child = workerFarm({ maxConcurrentWorkers: 10 }, childPath)
, pids = []
, i = 10

while (i--) {
child(1, function (err, pid) {
pids.push(pid)
if (pids.length == 10) {
t.equal(10, uniq(pids).length, 'pids are all the same (by pid)')
} else if (pids.length > 10)
t.fail('too many callbacks!')
})
}


workerFarm.end(child, function beforeRevive () {
t.ok(true, 'workerFarm first end')

workerFarm.revive(child);
var pids2 = []
i = 10

while (i--) {
child(1, function (err, pid) {
pids2.push(pid)
if (pids2.length == 10) {
t.equal(10, uniq(pids2).length, 'pids are all the same (by pid)')
} else if (pids2.length > 10)
t.fail('too many callbacks!')
})
}

workerFarm.end(child, function afterRevive(){
t.ok(true, 'workerFarm second end')
})

})

})

tape('test revive method when not nested', function (t) {
t.plan(4)

var child = workerFarm({ maxConcurrentWorkers: 20 }, childPath)
, pids = []
, i = 10

while (i--) {
child(1, function (err, pid) {
pids.push(pid)
if (pids.length == 10) {
t.equal(10, uniq(pids).length, 'pids are all the same (by pid)')
} else if (pids.length > 10)
t.fail('too many callbacks!')
})
}


workerFarm.end(child, function beforeRevive () {
t.ok(true, 'workerFarm first end')
})

workerFarm.revive(child);

var pids2 = [],
j = 10

while (j--) {
child(1, function (err, pid) {
pids2.push(pid)
if (pids2.length == 10) {
t.equal(10, uniq(pids2).length, 'pids are all the same (by pid)')
} else if (pids2.length > 10)
t.fail('too many callbacks!')
})
}

workerFarm.end(child, function afterRevive(){
t.ok(true, 'workerFarm second end')
})

})

0 comments on commit 51a6e70

Please sign in to comment.