Skip to content

Commit

Permalink
Merge pull request #122 from taskrabbit/retry
Browse files Browse the repository at this point in the history
Retry
  • Loading branch information
evantahler committed Mar 5, 2016
2 parents 163f9e3 + a3e77fe commit 71a1dcb
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 179 deletions.
30 changes: 22 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# node-resque

Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler). Resque is a background job system based on redis. It includes priority queus, plugins, locking, delayed jobs, and more!
Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler). Resque is a background job system based on redis. It includes priority queus, plugins, locking, delayed jobs, and more!

[![Nodei stats](https://nodei.co/npm/node-resque.png?downloads=true)](https://npmjs.org/package/node-resque)

Expand Down Expand Up @@ -38,9 +38,13 @@ var connectionDetails = {

var jobs = {
"add": {
plugins: [ 'jobLock' ],
plugins: [ 'jobLock', 'retry' ],
pluginOptions: {
jobLock: {},
retry: {
retryLimit: 3,
retryDelay: (1000 * 5),
}
},
perform: function(a,b,callback){
var answer = a + b;
Expand Down Expand Up @@ -158,7 +162,7 @@ You can also pass redis client directly.
var redisClient = new Redis();
var connectionDetails = { redis: redisClient }

var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs,
var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs,

worker.on('error', function(){
// handler errors
Expand Down Expand Up @@ -282,11 +286,11 @@ scheduler.connect(function(){
var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
schedule.scheduleJob('10,20,30,40,50 * * * * *', function(){ // do this job every 10 seconds, CRON style
// we want to ensure that only one instance of this job is scheduled in our environment at once,
// we want to ensure that only one instance of this job is scheduled in our environment at once,
// no matter how many schedulers we have running
if(scheduler.master){
if(scheduler.master){
console.log(">>> enquing a job");
queue.enqueue('time', "ticktock", new Date().toString() );
queue.enqueue('time', "ticktock", new Date().toString() );
}
});
});
Expand Down Expand Up @@ -379,6 +383,16 @@ var jobs = {
}
```
The plugins which are included with this package are:
- `delayQueueLock`
- If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again
- `jobLock`
- If a job with the same name, queue, and args is already running, put this job back in the queue and try later
- `queueLock`
- If a job with the same name, queue, and args is already in the queue, do not enqueue it again
- `retry`
- If a job fails, retry it N times before finally placing it into the failed queue
## Multi Worker
node-resque provides a wrapper around the `worker` object which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low rendering overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The `multiWorker` handles this by spawning more and more node-resque workers and managing the pool.
Expand All @@ -393,7 +407,7 @@ var connectionDetails = {
}
var multiWorker = new NR.multiWorker({
connection: connectionDetails,
connection: connectionDetails,
queues: ['slowQueue'],
minTaskProcessors: 1,
maxTaskProcessors: 100,
Expand Down Expand Up @@ -433,7 +447,7 @@ The Options available for the multiWorker are:
- `toDisconnectProcessors`: If false, all multiWorker children will share a single redis connection. If false, each child will connect and disconnect seperatly. This will lead to more redis connections, but faster retrival of events.
## Presentation
This package was featured heavily in [this presentation I gave](http://blog.evantahler.com/blog/background-tasks-for-node.html) about background jobs + node.js. It contains more examples!
This package was featured heavily in [this presentation I gave](http://blog.evantahler.com/blog/background-tasks-for-node.html) about background jobs + node.js. It contains more examples!
## Acknowledgments
Most of this code was inspired by / stolen from [coffee-resque](https://npmjs.org/package/coffee-resque) and [coffee-resque-scheduler](https://github.com/leeadkins/coffee-resque-scheduler). Thanks!
102 changes: 102 additions & 0 deletions examples/retry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/////////////////////////
// REQUIRE THE PACKAGE //
/////////////////////////

var NR = require(__dirname + "/../index.js");
// In your projects: var NR = require("node-resque");

///////////////////////////
// SET UP THE CONNECTION //
///////////////////////////

var connectionDetails = {
pkg: 'ioredis',
host: '127.0.0.1',
password: null,
port: 6379,
database: 0,
// namespace: 'resque',
// looping: true,
// options: {password: 'abc'},
};

//////////////////////////////
// DEFINE YOUR WORKER TASKS //
//////////////////////////////

var jobs = {
"add": {
plugins: [ 'retry' ],
pluginOptions: {
retry: {
retryLimit: 3,
// retryDelay: 1000,
backoffStrategy: [1000 * 10, 1000 * 20, 1000 * 30],
},
},
perform: function(a,b,callback){
var broken = true;

if(broken){
return callback(new Error('BUSTED'));
}else{
return callback(null, (a + b));
}
},
}
};

////////////////////
// START A WORKER //
////////////////////

var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs);
worker.connect(function(){
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
worker.start();
});

///////////////////////
// START A SCHEDULER //
///////////////////////

var scheduler = new NR.scheduler({connection: connectionDetails});
scheduler.connect(function(){
scheduler.start();
});

/////////////////////////
// REGESTER FOR EVENTS //
/////////////////////////

worker.on('start', function(){ console.log("worker started"); });
worker.on('end', function(){ console.log("worker ended"); });
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
worker.on('poll', function(queue){ console.log("worker polling " + queue); });
worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
worker.on('pause', function(){ console.log("worker paused"); });
worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
worker.on('failure', function(queue, job, failure){
console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure);
setTimeout(process.exit, 2000);
});

scheduler.on('start', function(){ console.log("scheduler started"); });
scheduler.on('end', function(){ console.log("scheduler ended"); });
scheduler.on('poll', function(){ console.log("scheduler polling"); });
scheduler.on('master', function(state){ console.log("scheduler became master"); });
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); });
scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); });
scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); });

////////////////////////////////////
// CONNECT TO A QUEUE AND WORK IT //
////////////////////////////////////

var queue = new NR.queue({connection: connectionDetails}, jobs);
queue.on('error', function(error){ console.log(error); });
queue.connect(function(){
queue.enqueue('math', "add", [1,2]);
});
4 changes: 2 additions & 2 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ connection.prototype.connect = function(callback){

}else{

if(self.options['package'] && !self.options.pkg) {
if(self.options.package && !self.options.pkg) {
self.emit('Depreciation warning: You need to use \'pkg\' instead of \'package\'! Please update your configuration.');
self.options.pkg = self.options['package'];
self.options.pkg = self.options.package;
}
var pkg = require(self.options.pkg);
self.redis = pkg.createClient(self.options.port, self.options.host, self.options.options);
Expand Down
169 changes: 169 additions & 0 deletions lib/plugins/retry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// If a job fails, retry it N times before finally placing it into the failed queue
// a port of some of the features in https://github.com/lantins/resque-retry

var crypto = require('crypto');
var os = require('os');

var retry = function(worker, func, queue, job, args, options){
var self = this;

if(!options.retryLimit){ options.retryLimit = 1; }
if(!options.retryDelay){ options.retryDelay = (1000 * 5); }
if(!options.backoffStrategy){ options.backoffStrategy = null; }

self.name = 'retry';
self.worker = worker;
self.queue = queue;
self.func = func;
self.job = job;
self.args = args;
self.options = options;

if(self.worker.queueObject){
self.queueObject = self.worker.queueObject;
}else{
self.queueObject = self.worker;
}
};

////////////////////
// PLUGIN METHODS //
////////////////////

retry.prototype.argsKey = function(){
var self = this;
// return crypto.createHash('sha1').update(self.args.join('-')).digest('hex');
if(!self.args || self.args.length === 0){ return ''; }
return self.args.join('-');
};

retry.prototype.retryKey = function(){
var self = this;
return self.queueObject.connection.key('resque-retry', self.func, self.argsKey()).replace(/\s/, '');
};

retry.prototype.failureKey = function(){
var self = this;
return self.queueObject.connection.key('failure-resque-retry:' + self.func + ':' + self.argsKey()).replace(/\s/, '');
};

retry.prototype.maxDelay = function(){
var self = this;
var maxDelay = self.options.retryDelay || 1;
if(Array.isArray(self.options.backoffStrategy)){
self.options.backoffStrategy.forEach(function(d){
if(d > maxDelay){ maxDelay = d; }
});
}
return maxDelay;
};

retry.prototype.redis = function(){
var self = this;
return self.queueObject.connection.redis;
};

retry.prototype.attemptUp = function(callback){
var self = this;
var key = self.retryKey();
self.redis().setnx(key, -1, function(error){
if(error){ return callback(error); }
self.redis().incr(key, function(error, retryCount){
if(error){ return callback(error); }
self.redis().expire(key, self.maxDelay(), function(error){
if(error){ return callback(error); }
var remaning = self.options.retryLimit - retryCount - 1;
return callback(null, remaning);
});
});
});
};

retry.prototype.saveLastError = function(callback){
var self = this;
var now = new Date();
var failedAt = '' +
now.getFullYear() + '/' +
(('0' + (now.getMonth() + 1)).slice(-2)) + '/' +
(('0' + now.getDate()).slice(-2)) + ' ' +
(('0' + now.getHours()).slice(-2)) + ':' +
(('0' + now.getMinutes()).slice(-2)) + ':' +
(('0' + now.getSeconds()).slice(-2))
;

var data = {
failed_at : failedAt,
payload : self.args,
exception : String(self.worker.error),
error : String(self.worker.error),
backtrace : self.worker.error.stack.split(os.EOL) || [],
worker : self.func,
queue : self.queue
};

self.redis().setex(self.failureKey(), self.maxDelay(), JSON.stringify(data), callback);
};

retry.prototype.cleanup = function(callback){
var self = this;
var key = self.retryKey();
var failureKey = self.failureKey();
self.redis().del(key, function(error){
if(error){ return callback(error); }
self.redis().del(failureKey, function(error){
if(error){ return callback(error); }
return callback();
});
});
};

retry.prototype.after_perform = function(callback){
var self = this;

if(!self.worker.error){
self.cleanup(callback);
}

self.attemptUp(function(error, remaning){
if(error){ return callback(error); }
self.saveLastError(function(error){
if(error){ return callback(error); }
if(remaning <= 0){
self.cleanup(function(error){
if(error){ return callback(error); }
return callback(self.worker.error, true);
});
}else{
var nextTryDelay = self.options.retryDelay;
if(Array.isArray(self.options.backoffStrategy)){
var index = (self.options.retryLimit - remaning - 1);
if(index > (self.options.backoffStrategy.length - 1)){
index = (self.options.backoffStrategy.length - 1);
}
nextTryDelay = self.options.backoffStrategy[index];
}

self.queueObject.enqueueIn(nextTryDelay, self.queue, self.func, self.args, function(error){
if(error){ return callback(error); }

self.worker.emit('reEnqueue', self.queue, self.job, {
delay: nextTryDelay,
remaningAttempts: remaning,
err: self.worker.error
});

self.redis().decr(self.queueObject.connection.key('stat', 'processed'));
self.redis().decr(self.queueObject.connection.key('stat', 'processed', self.worker.name));

self.redis().incr(self.queueObject.connection.key('stat', 'failed'));
self.redis().incr(self.queueObject.connection.key('stat', 'failed', self.worker.name));

delete self.worker.error;
return callback(null, true);
});
}
});
});
};

exports.retry = retry;
Loading

0 comments on commit 71a1dcb

Please sign in to comment.