forked from victor73/OSDF
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker.js
332 lines (291 loc) · 11.5 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
var _ = require('lodash');
var auth_enforcer = require('auth_enforcer');
var each = require('async/each');
var events = require('events');
var express = require('express');
var fs = require('fs');
var morgan = require('morgan');
var osdf_utils = require('osdf_utils');
var parallel = require('async/parallel');
var logger = osdf_utils.get_logger();
var node_handler = require('node-handler');
var info_handler = require('info-handler');
var perms_handler = require('perms-handler');
var ns_handler = require('namespace-handler');
var schema_handler = require('schema-handler');
var query_handler = require('query-handler');
// This event emitter is instrumental in providing us a way of knowing when all
// of our handlers are ready.
var eventEmitter = new events.EventEmitter();
eventEmitter.setMaxListeners(0);
var allowCrossDomain = function(req, res, next) {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
// intercept OPTIONS method
if (req.method === 'OPTIONS' ) {
res.send(200);
} else {
next();
}
};
// Calls the various handlers' initialization methods.
function initialize(working_path) {
// These initializations happen asynchronously, so we use events to
// to track their completion.
auth_enforcer.init(eventEmitter, working_path);
info_handler.init(eventEmitter);
node_handler.init(eventEmitter, working_path);
perms_handler.init(eventEmitter);
query_handler.init(eventEmitter);
schema_handler.init(eventEmitter, working_path);
fs.watchFile(osdf_utils.get_config(), function(curr, prev) {
if (curr.mtime.getTime() !== prev.mtime.getTime()) {
logger.info('Detected that the configuration has been updated.');
info_handler.update();
}
});
}
// This is the function that launches the app when all
// initialization is complete.
function launch(config) {
var app = express();
// Check if CORS Support should be enabled or not.
var cors = config.value('global', 'cors_enabled');
if (cors != undefined && cors != null && (cors === 'true' || cors === 'yes')) {
app.use(allowCrossDomain);
}
// Check if CORS Support should be enabled or not.
var https_enabled = config.value('global', 'https_enabled');
if (https_enabled !== undefined && https_enabled !== null &&
(https_enabled === 'true' || https_enabled === 'yes')) {
https_enabled = true;
} else {
https_enabled = false;
}
// Register various middleware functions
// Logging of the request
app.use(morgan('common'));
// Enforce authentication
app.use(auth_enforcer.authenticate());
// Removed the "X-Powered-By" header (reduce bandwidth a bit).
app.disable('x-powered-by');
// This custom middleware is what sets the 'rawBody' property
app.use(function(req, res, next) {
var data = '';
req.setEncoding('utf8');
req.on('data', function(chunk) {
data += chunk;
});
req.on('end', function() {
req.rawBody = data;
next();
});
});
// Node handler functions
var routes = require('routes');
routes.set_routes(app);
var bind_address = config.value('global', 'bind_address');
var port = config.value('global', 'port');
// Check that we have some valid settings.
if (bind_address === undefined ||
bind_address === null ||
bind_address.length === 0) {
var bind_err = "The 'bind_address' setting is not configured.";
console.log(bind_err);
process.send({ cmd: 'abort', reason: bind_err });
}
if (port === undefined || port === null || port.length === 0) {
var port_err = "The 'port' setting is not configured.";
console.log(port_err);
process.send({ cmd: 'abort', reason: port_err });
}
process.on('uncaughtException', function(err) {
logger.error('Caught exception: ' + err);
logger.error(err.stack);
console.log('Check log file for stack trace. Caught exception: ' + err);
});
process.on('message', function(msg) {
if (msg && msg.hasOwnProperty('cmd')) {
logger.info('Got a message from the master: ' + msg['cmd']);
if (msg['cmd'] === 'schema_change') {
node_handler.process_schema_change(msg);
schema_handler.process_schema_change(msg);
} else if (msg['cmd'] === 'aux_schema_change') {
node_handler.process_aux_schema_change(msg);
schema_handler.process_aux_schema_change(msg);
} else {
logger.error('Received unknown process message type.');
}
} else {
logger.error('Received invalid process message.');
}
});
if (https_enabled) {
logger.info('Using encrypted https.');
// Need the key and cert to establish the SSL enabled server
get_ssl_options(config, function(err, options) {
if (err) {
logger.error('Unable to configure SSL: ' + err.message);
process.send({ cmd: 'abort', reason: err.message });
} else {
var https = require('https');
var server = https.createServer(options, app);
server.listen(port, bind_address);
}
});
} else {
logger.info('Using regular http (unencrypted).');
// Just use regular http
var http = require('http');
var server = http.createServer(app);
server.listen(port, bind_address);
}
// If we are being started via sys-v style init scripts we are probably being
// invoked as root. If we need to listen on a well known port, we need to be
// launched as root to bind to the port, but then drop down to another UID.
if (process.getuid() === 0) {
// Who do we drop privileges to?
var user = config.value('global', 'user');
if (user === null) {
console.log("The 'user' setting is not configured.");
process.exit(1);
}
console.log('Launched as root. Switching to ' + user);
process.setuid(user);
}
}
// This function sets up the mechanism to wait for all the handlers
// to be ready by acting upon events that are emitted by the handlers
// when they are finished. When all the events are received, we're ready
// to proceed, and launch() is called.
function listen_for_init_completion(config) {
var handlers = [ 'node', 'info', 'auth', 'perms', 'query', 'schema' ];
var handler_count = 0;
var examine_handlers = function() {
if (++handler_count === handlers.length) {
console.log('Handlers initialized for worker with PID ' +
process.pid + '.');
// Send message to master process
process.send({ cmd: 'init_completed' });
// You may fire when ready, Gridley...
try {
launch(config);
} catch (err) {
process.send({ cmd: 'abort', reason: err.message });
}
}
};
eventEmitter.on('auth_handler_initialized', function(message) {
var user_count = message;
process.send({ cmd: 'user_count', users: user_count });
});
// Allow each handler to abort the launch if there is a configuration
// problem somewhere. For example, maybe CouchDB or ElasticSearch are down.
_.each(handlers, function(handler) {
eventEmitter.on(handler + '_handler_initialized', function(message) {
examine_handlers();
});
eventEmitter.on(handler + '_handler_aborted', function(message) {
console.error('Got an abort from ' + handler +
' handler. Reason: ' + message);
process.send({ cmd: 'abort', reason: message });
});
});
}
function get_ssl_options(config, callback) {
logger.debug('In get_ssl_options.');
parallel([
function(callback) {
var ca_file = config.value('global', 'ca_file');
var ca = [];
if (ca_file == undefined || ca_file == null) {
logger.debug('Certificate Authority (CA) listing not set.');
// This will return an empty array
callback(null, ca);
} else {
logger.debug('Certificate Authority (CA) file listing found.');
fs.readFile(ca_file, 'utf8', function(err, chain) {
var chain_files = chain.split('\n');
chain_files = _.without(chain_files, '');
logger.debug('Number of CA chain files to read: ' +
chain_files.length);
each(chain_files, function(file, cb) {
logger.debug('Reading file ' + file);
fs.readFile(file, 'utf8', function(err, data) {
if (err) {
cb(err);
} else {
ca.push(data);
cb();
}
});
},
function(err) {
if (err) {
logger.error(err);
callback(err, null);
} else {
// Return the array of CA data...
logger.debug('Completed reading SSL CA files.');
callback(null, ca);
}
});
});
}
},
function(callback) {
var key_file = config.value('global', 'key_file');
if (key_file == undefined || key_file == null) {
callback('key_file not set in configuration file.', null);
return;
}
logger.debug('Reading key_file ' + key_file);
fs.readFile(key_file, 'utf8', function(err, data) {
if (err) {
logger.error('Error reading SSL key file.', err);
callback(err, null);
} else {
callback(null, data);
}
});
},
function(callback) {
var cert_file = config.value('global', 'cert_file');
if (cert_file == undefined || cert_file == null) {
callback('cert_file not set in configuration file.', null);
return;
}
logger.debug('Reading cert_file ' + cert_file);
fs.readFile(cert_file, 'utf8', function(err, data) {
if (err) {
logger.error('Error reading SSL cert file.', err);
callback(err, null);
} else {
callback(null, data);
}
});
}
],
function(err, results) {
if (err) {
logger.error(err);
callback(err, null);
} else {
var ca = results[0];
var key = results[1];
var cert = results[2];
var options = {
ca: ca,
key: key,
cert: cert
};
callback(null, options);
}
});
}
exports.start_worker = function(config, working_path) {
// Wait for everything to be ready before we get going.
listen_for_init_completion(config);
initialize(working_path);
};