-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.js
110 lines (94 loc) · 2.8 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
const dotenv = require("dotenv");
const amqp = require("amqplib");
const SessionHandler = require("./src/SessionHandler.js");
const healthChecker = require("./utils/health_checker.js");
(async () => {
dotenv.config();
var creationAvailable = true;
const amqpUrl = process.env.RABBIT_MQ_URL || "amqp://localhost";
const redisUrl = process.env.REDIS_URL || "redis://localhost";
const amqpConnection = await amqp.connect(amqpUrl);
const amqpChannel = await amqpConnection.createChannel();
const multipleWppSessionHandler = new SessionHandler(redisUrl, amqpUrl);
setInterval(performHealthCheck, 10000);
await assertQueueList([
"send_message",
"create_instance",
"destroy_instance",
]);
let creationConsumer = await performCreationConsumer();
await amqpChannel.consume(
"destroy_instance",
async (msg) => {
await verifyKey(msg, async (data) => {
await multipleWppSessionHandler.destroyClient(data.key);
});
},
{ noAck: false }
);
await amqpChannel.consume(
"send_message",
async (msg) => {
await verifyKey(msg, async (data) => {
if (!data.chat_id || !data.message) return;
await multipleWppSessionHandler.sendMessage(
data.key,
data.chat_id,
data.message
);
});
},
{ noAck: false }
);
async function performHealthCheck() {
const usageMetric = await healthChecker();
if (creationAvailable) {
if (usageMetric.cpu >= 90 || usageMetric.ram >= 90) {
await amqpChannel.cancel(creationConsumer.consumerTag);
creationAvailable = false;
}
} else {
if (usageMetric.cpu <= 80 && usageMetric.ram < 80) {
creationConsumer = await performCreationConsumer();
creationAvailable = true;
}
}
}
async function performCreationConsumer() {
return await amqpChannel.consume(
"create_instance",
async (msg) => {
const data = parseMessageToJson(msg);
if (data.key) {
await multipleWppSessionHandler.createClient(data.key);
}
},
{ noAck: true }
);
}
async function verifyKey(message, isValidCallback = async (data) => {}) {
const data = parseMessageToJson(message);
if (data.key) {
const isValid = multipleWppSessionHandler.hasInstance(data.key);
if (isValid) {
await amqpChannel.ack(message);
await isValidCallback(data);
return;
}
}
await amqpChannel.nack(message);
}
function parseMessageToJson(message) {
if (!message) return {};
try {
return JSON.parse(message.content.toString());
} catch (error) {
return {};
}
}
async function assertQueueList(list = []) {
for (const queue of list) {
await amqpChannel.assertQueue(queue, { durable: true });
}
}
})();