Skip to content
This repository has been archived by the owner on Oct 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request rvagg#22 from thaumant/max-retries
Browse files Browse the repository at this point in the history
Add maxRetries option (rvagg#20)
  • Loading branch information
rvagg committed Nov 17, 2014
2 parents caaad0c + c258058 commit 8264e46
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 3 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ If you don't provide an `options` object then the following defaults will be use
, maxConcurrentCallsPerWorker : 10
, maxConcurrentCalls : Infinity
, maxCallTime : Infinity
, maxRetries : Infinity
}
```

Expand All @@ -119,7 +120,9 @@ If you don't provide an `options` object then the following defaults will be use

* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue&mdash;either actively being processed or waiting for a worker to be processed. `Infinity` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`).

* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and not be automatically resubmitted. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted uless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.

* **<code>maxRetries</code>** allows you to control the max number of call requeues after worker termination (unexpected or timeout). By default this option is set to `Infinity` which means that each call of each terminated worker will always be auto requeued. When the number of retries exceeds `maxRetries` value, the job callback will be executed with a `ProcessTerminatedError`. Note that if you are running with finite `maxCallTime` and `maxConcurrentCallsPerWorkers` greater than `1` then any `TimeoutError` will increase the retries counter *for each* concurrent call of the terminated worker.

### workerFarm.end(farm)

Expand Down
15 changes: 13 additions & 2 deletions lib/farm.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ const DEFAULT_OPTIONS = {
, maxConcurrentCallsPerWorker : 10
, maxConcurrentCalls : Infinity
, maxCallTime : Infinity // exceed this and the whole worker is terminated
, maxRetries : Infinity
, forcedKillTime : 100
}

const extend = require('xtend')
, fork = require('./fork')
, TimeoutError = require('errno').create('TimeoutError')
, ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')

function Farm (options, path) {
Expand All @@ -32,6 +34,7 @@ Farm.prototype.mkhandle = function (method) {
method : method
, callback : args.pop()
, args : args
, retries : 0
})
}.bind(this)
}
Expand Down Expand Up @@ -63,8 +66,16 @@ Farm.prototype.onExit = function (childId) {
setTimeout(function () {
var doQueue = false
if (this.children[childId] && this.children[childId].activeCalls) {
this.children[childId].calls.reverse().forEach(function (call) {
if (call) {
this.children[childId].calls.forEach(function (call, i) {
if (!call) return
else if (call.retries >= this.options.maxRetries) {
this.receive({
idx : i
, child : childId
, args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
})
} else {
call.retries++
this.callQueue.unshift(call)
doQueue = true
}
Expand Down
26 changes: 26 additions & 0 deletions tests/child.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
var fs = require('fs')

module.exports = function (timeout, callback) {
callback = callback.bind(null, null, process.pid, Math.random(), timeout)
if (timeout)
Expand All @@ -23,4 +25,28 @@ module.exports.err = function (type, message, callback) {

module.exports.block = function () {
while (true);
}

// use provided file path to save retries count among terminated workers
module.exports.stubborn = function (path, callback) {
function isOutdated(path) {
return ((new Date).getTime() - fs.statSync(path).mtime.getTime()) > 2000
}

// file may not be properly deleted, check if modified no earler than two seconds ago
if (!fs.existsSync(path) || isOutdated(path)) {
fs.writeFileSync(path, '1')
process.exit(-1)
}

var retry = parseInt(fs.readFileSync(path, 'utf8'))
if (Number.isNaN(retry))
return callback(new Error('file contents is not a number'))

if (retry > 4) {
callback(null, 12)
} else {
fs.writeFileSync(path, String(retry + 1))
process.exit(-1)
}
}
32 changes: 32 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var tape = require('tape')
, workerFarm = require('../')
, childPath = require.resolve('./child')
, fs = require('fs')

, uniq = function (ar) {
var a = [], i, j
Expand Down Expand Up @@ -396,4 +397,35 @@ tape('test timeout kill', function (t) {
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
})


tape('test max retries after process terminate', function (t) {
t.plan(7)

// temporary file is used to store the number of retries among terminating workers
var filepath1 = '.retries1'
var child1 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5}, childPath, [ 'stubborn' ])
child1.stubborn(filepath1, function (err, result) {
t.notOk(err, 'no error')
t.equal(result, 12, 'correct result')
})

workerFarm.end(child1, function () {
fs.unlinkSync(filepath1)
t.ok(true, 'workerFarm ended')
})

var filepath2 = '.retries2'
var child2 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 3}, childPath, [ 'stubborn' ])
child2.stubborn(filepath2, function (err, result) {
t.ok(err, 'got an error')
t.equal(err.type, 'ProcessTerminatedError', 'correct error type')
t.equal(err.message, 'cancel after 3 retries!', 'correct message and number of retries')
})

workerFarm.end(child2, function () {
fs.unlinkSync(filepath2)
t.ok(true, 'workerFarm ended')
})
})

0 comments on commit 8264e46

Please sign in to comment.