-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
executable file
·113 lines (94 loc) · 2.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
const mongoose = require('mongoose');
const amqp = require('amqp');
const winston = require('winston');
const consts = require('./src/consts');
const Fleet = require('./src/models/fleet');
const Message = require('./src/models/message');
const thinky = require('./src/thinky');
// Logger
const logger = new (winston.Logger)({
transports: [
new (winston.transports.Console)({
level: consts.DEBUG_LEVEL,
colorize: true,
prettyPrint: true,
}),
],
});
// Connect AMQP
const connection = amqp.createConnection({
host: consts.RABBITMQ_HOST,
port: consts.RABBITMQ_PORT,
login: consts.RABBITMQ_USER,
password: consts.RABBITMQ_PASS,
connectionTimeout: 10000,
authMechanism: 'AMQPLAIN',
vhost: '/',
noDelay: true,
ssl: { enabled: false },
});
// AQMP queue connection
connection.on('error', (e) => {
logger.error('Error from amqp: ', e);
});
// MongoDB on connection
mongoose.connection.on('connected', () => {
logger.info('Connected to MongoDB');
connection.on('ready', () => {
logger.info('Connected to RabbitMQ');
Fleet.find({}, (err, fleets) => {
if (err) throw err;
for (const fleet of fleets) {
logger.info(`Subscription to ${fleet.id}`);
connection.queue(fleet.id, { autoDelete: false }, (p) => {
p.bind('amq.direct', fleet.id);
});
connection.queue('stream-processor', { autoDelete: false }, (q) => {
q.bind(`${fleet.id}.#`);
// Receive messages
q.subscribe((msg, headers, deliveryInfo) => {
const i = deliveryInfo.routingKey.indexOf('.');
const fleetKey = deliveryInfo.routingKey.slice(0, i);
const topic = deliveryInfo.routingKey.slice(i + 1);
logger.debug(`Incoming message from fleet: ${fleetKey}/${topic}`);
const message = new Message({
data: msg.data,
fleet: fleetKey,
topic,
});
message.saveAll().then(() => {
logger.debug('Message stored ok');
});
});
});
}
});
});
});
// Rethink connected
thinky.dbReady().then(() => {
logger.info('Connected to RethinkDB');
// Get notifications for new messages inserted on the database
Message.changes().then((feed) => {
logger.info(`Subscribed to ${feed}`);
feed.each((error, doc) => {
if (error) throw error;
logger.debug(`Sending message to fleet ${doc.fleet}`);
// Is a new message
if (doc.getOldValue() == null) {
// Send the message to the user
connection.publish(doc.fleet, doc.data, {
messageId: doc.id,
timestamp: doc.timestamp.getTime() / 1000,
headers: { topic: doc.topic },
}, (err) => {
logger.error(err);
});
}
});
}).error((error) => {
logger.error(error);
});
});
// Connect MongoDB
mongoose.connect(`mongodb://${consts.MONGO_HOST}/${consts.MONGO_DB_NAME}`);