forked from doomhz/coinnext
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.js
126 lines (115 loc) · 3.69 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Configure logger
if (process.env.NODE_ENV === "production") require("./configs/logger");
// Configure modules
var environment = process.env.NODE_ENV || 'development';
var QUEUE_DELAY = 500;
// Configure globals
GLOBAL.appConfig = require("./configs/config");
GLOBAL.db = require('./models/index');
GLOBAL.queue = require('./lib/queue/index');
var TradeHelper = require('./lib/trade_helper');
var Slackhook = require('slackhook');
var slack = new Slackhook({
domain: GLOBAL.appConfig().slackalerts.domain,
token: GLOBAL.appConfig().slackalerts.token,
});
var processEvents = function () {
GLOBAL.queue.Event.findNextValid(function (err, event) {
if (err) return exit("Could not fetch the next event. Exitting...", err);
if (!event) {
setTimeout(processEvents, QUEUE_DELAY);
} else if (event.type === "order_canceled") {
return processCancellation(event, function (err) {
if (err) return exit("Could not process cancellation. Exitting...", err);
setTimeout(processEvents, QUEUE_DELAY);
});
} else if (event.type === "order_added") {
return processAdd(event, function (err) {
if (err) return exit("Could not process order add. Exitting...", err);
setTimeout(processEvents, QUEUE_DELAY);
});
} else if (event.type === "orders_match") {
return processMatch(event, function (err) {
if (err) return exit("Could not process order match. Exitting...", err);
setTimeout(processEvents, QUEUE_DELAY);
});
}
});
};
var processCancellation = function (event, callback) {
TradeHelper.cancelOrder(event.loadout.order_id, function (err) {
if (!err) {
event.status = "processed";
event.save().complete(function () {
return callback();
});
} else {
console.error("Could not process event " + event.id, err);
return callback(err);
}
});
};
var processAdd = function (event, callback) {
TradeHelper.publishOrder(event.loadout.order_id, function (err) {
if (!err) {
event.status = "processed";
event.save().complete(function () {
return callback();
});
} else {
console.error("Could not process event " + event.id, err);
return callback(err);
}
});
};
var processMatch = function (event, callback) {
TradeHelper.matchOrders(event.loadout, function (err) {
if (!err) {
event.status = "processed";
event.save().complete(function () {
return callback();
});
} else {
console.error("Could not process event " + event.id, err);
return callback(err);
}
});
};
var sendAlert = function (msg, callback) {
if (!GLOBAL.appConfig().slackalerts.enabled) {
if (callback && callback instanceof Function) {
return callback();
}
return;
}
if (process.env.NODE_ENV !== "production") {
console.log("sendAlert: not sending alert in dev environment. Message: " + msg);
if (callback && callback instanceof Function) {
return callback();
}
return;
}
slack.send({
text: msg,
channel: GLOBAL.appConfig().slackalerts.channel,
username: "alertbot",
icon_emoji: ":rotating_light:"
}, callback);
}
var exit = function (errMessage, err) {
console.error(errMessage, err);
sendAlert("Event queue exiting! Message: " + errMessage, function (err, res) {
console.log("slack:", err, res);
process.exit();
})
};
processEvents();
sendAlert("Event queue is processing events...");
console.log("processing events...");
process.on('uncaughtException', function (err) {
console.log('Caught exception: ' + err);
sendAlert("Event queue exiting! Exception: " + err, function (err, res) {
console.log("slack:", err, res);
process.exit();
})
});